You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2019/02/13 20:02:44 UTC
[nifi] branch master updated: NIFI-5869 Support Reconnection for JMS
This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 3492313 NIFI-5869 Support Reconnection for JMS
3492313 is described below
commit 3492313d0b3436cdd0f7390d46d403fed9d65b77
Author: Ed <ed...@gmail.com>
AuthorDate: Thu Jan 10 11:55:29 2019 -0500
NIFI-5869 Support Reconnection for JMS
resets worker if it doesn't work anymore for any reason. this will add "reconnect" capabilities. Will solve issues for following use cases:
- authentication changed after successful connection
- JNDI mapping changed and requires recaching.
- JMS server isn't available anymore or restarted.
improved controller reset on exception
Signed-off-by: Matthew Burgess <ma...@apache.org>
This closes #3261
---
.../cf/JMSConnectionFactoryProviderDefinition.java | 8 +++++
.../nifi/jms/cf/JMSConnectionFactoryProvider.java | 9 ++++-
.../jms/cf/JndiJmsConnectionFactoryProvider.java | 8 +++++
.../nifi/jms/processors/AbstractJMSProcessor.java | 22 +++++++++++-
.../org/apache/nifi/jms/processors/ConsumeJMS.java | 41 ++++++++++++----------
.../org/apache/nifi/jms/processors/JMSWorker.java | 9 +++++
.../org/apache/nifi/jms/processors/PublishJMS.java | 15 ++++++--
7 files changed, 90 insertions(+), 22 deletions(-)
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderDefinition.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderDefinition.java
index adb94fd..6bab920 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderDefinition.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderDefinition.java
@@ -35,4 +35,12 @@ public interface JMSConnectionFactoryProviderDefinition extends ControllerServic
*/
ConnectionFactory getConnectionFactory();
+ /**
+ * Resets {@link ConnectionFactory}.
+ * Provider should reset {@link ConnectionFactory} only if a copy provided by a client matches
+ * current {@link ConnectionFactory}.
+ * @param cachedFactory - {@link ConnectionFactory} cached by client.
+ */
+ void resetConnectionFactory(ConnectionFactory cachedFactory);
+
}
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java
index ecb4e7a..781ce65 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java
@@ -139,6 +139,14 @@ public class JMSConnectionFactoryProvider extends AbstractControllerService impl
.build();
}
+ @Override
+ public void resetConnectionFactory(ConnectionFactory cachedFactory) {
+ if (cachedFactory == connectionFactory) {
+ getLogger().debug("Resetting connection factory");
+ connectionFactory = null;
+ }
+ }
+
/**
* @return new instance of {@link ConnectionFactory}
*/
@@ -316,5 +324,4 @@ public class JMSConnectionFactoryProvider extends AbstractControllerService impl
return StandardValidators.NON_EMPTY_VALIDATOR.validate(subject, input, context);
}
}
-
}
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JndiJmsConnectionFactoryProvider.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JndiJmsConnectionFactoryProvider.java
index a293d84..44d8d99 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JndiJmsConnectionFactoryProvider.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JndiJmsConnectionFactoryProvider.java
@@ -139,6 +139,14 @@ public class JndiJmsConnectionFactoryProvider extends AbstractControllerService
}
@Override
+ public synchronized void resetConnectionFactory(ConnectionFactory cachedFactory) {
+ if (cachedFactory == connectionFactory) {
+ getLogger().debug("Resetting connection factory");
+ connectionFactory = null;
+ }
+ }
+
+ @Override
public synchronized ConnectionFactory getConnectionFactory() {
if (connectionFactory == null) {
connectionFactory = lookupConnectionFactory();
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
index 0094eaf..f47cf78 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
@@ -158,7 +158,27 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcess
try {
rendezvousWithJms(context, session, worker);
} finally {
- workerPool.offer(worker);
+ //in case of exception during worker's connection (consumer or publisher),
+ //an appropriate service is responsible to invalidate the worker.
+ //if worker is not valid anymore, don't put it back into a pool, try to rebuild it first, or discard.
+ //this will be helpful in a situation, when JNDI has changed, or JMS server is not available
+ //and reconnection is required.
+ if (worker == null || !worker.isValid()){
+ getLogger().debug("Worker is invalid. Will try re-create... ");
+ final JMSConnectionFactoryProviderDefinition cfProvider = context.getProperty(CF_SERVICE).asControllerService(JMSConnectionFactoryProviderDefinition.class);
+ try {
+ // Safe to cast. Method #buildTargetResource(ProcessContext context) sets only CachingConnectionFactory
+ CachingConnectionFactory currentCF = (CachingConnectionFactory)worker.jmsTemplate.getConnectionFactory();
+ cfProvider.resetConnectionFactory(currentCF.getTargetConnectionFactory());
+ worker = buildTargetResource(context);
+ }catch(Exception e) {
+ getLogger().error("Failed to rebuild: " + cfProvider);
+ worker = null;
+ }
+ }
+ if (worker != null) {
+ workerPool.offer(worker);
+ }
}
}
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
index 997c6dd..4b149e2 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
@@ -188,28 +188,33 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
final String subscriptionName = context.getProperty(SUBSCRIPTION_NAME).evaluateAttributeExpressions().getValue();
final String charset = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
- consumer.consume(destinationName, durable, shared, subscriptionName, charset, new ConsumerCallback() {
- @Override
- public void accept(final JMSResponse response) {
- if (response == null) {
- return;
- }
+ try {
+ consumer.consume(destinationName, durable, shared, subscriptionName, charset, new ConsumerCallback() {
+ @Override
+ public void accept(final JMSResponse response) {
+ if (response == null) {
+ return;
+ }
- FlowFile flowFile = processSession.create();
- flowFile = processSession.write(flowFile, out -> out.write(response.getMessageBody()));
+ FlowFile flowFile = processSession.create();
+ flowFile = processSession.write(flowFile, out -> out.write(response.getMessageBody()));
- final Map<String, String> jmsHeaders = response.getMessageHeaders();
- final Map<String, String> jmsProperties = response.getMessageProperties();
+ final Map<String, String> jmsHeaders = response.getMessageHeaders();
+ final Map<String, String> jmsProperties = response.getMessageProperties();
- flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsHeaders, flowFile, processSession);
- flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsProperties, flowFile, processSession);
- flowFile = processSession.putAttribute(flowFile, JMS_SOURCE_DESTINATION_NAME, destinationName);
+ flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsHeaders, flowFile, processSession);
+ flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsProperties, flowFile, processSession);
+ flowFile = processSession.putAttribute(flowFile, JMS_SOURCE_DESTINATION_NAME, destinationName);
- processSession.getProvenanceReporter().receive(flowFile, destinationName);
- processSession.transfer(flowFile, REL_SUCCESS);
- processSession.commit();
- }
- });
+ processSession.getProvenanceReporter().receive(flowFile, destinationName);
+ processSession.transfer(flowFile, REL_SUCCESS);
+ processSession.commit();
+ }
+ });
+ } catch(Exception e) {
+ consumer.setValid(false);
+ throw e; // for backward compatibility with exception handling in flows
+ }
}
/**
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSWorker.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSWorker.java
index e6fa1bb..ee4d76d 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSWorker.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSWorker.java
@@ -36,6 +36,7 @@ abstract class JMSWorker {
protected final JmsTemplate jmsTemplate;
protected final ComponentLog processLog;
private final CachingConnectionFactory connectionFactory;
+ private boolean isValid = true;
/**
@@ -61,4 +62,12 @@ abstract class JMSWorker {
return this.getClass().getSimpleName() + "[destination:" + this.jmsTemplate.getDefaultDestinationName()
+ "; pub-sub:" + this.jmsTemplate.isPubSubDomain() + ";]";
}
+
+ public boolean isValid() {
+ return isValid;
+ }
+
+ public void setValid(boolean isValid) {
+ this.isValid = isValid;
+ }
}
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
index a32a895..12451cf 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
@@ -123,11 +123,21 @@ public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> {
String charset = context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue();
switch (context.getProperty(MESSAGE_BODY).getValue()) {
case TEXT_MESSAGE:
- publisher.publish(destinationName, this.extractTextMessageBody(flowFile, processSession, charset), flowFile.getAttributes());
+ try {
+ publisher.publish(destinationName, this.extractTextMessageBody(flowFile, processSession, charset), flowFile.getAttributes());
+ } catch(Exception e) {
+ publisher.setValid(false);
+ throw e;
+ }
break;
case BYTES_MESSAGE:
default:
- publisher.publish(destinationName, this.extractMessageBody(flowFile, processSession), flowFile.getAttributes());
+ try {
+ publisher.publish(destinationName, this.extractMessageBody(flowFile, processSession), flowFile.getAttributes());
+ } catch(Exception e) {
+ publisher.setValid(false);
+ throw e;
+ }
break;
}
processSession.transfer(flowFile, REL_SUCCESS);
@@ -136,6 +146,7 @@ public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> {
processSession.transfer(flowFile, REL_FAILURE);
this.getLogger().error("Failed while sending message to JMS via " + publisher, e);
context.yield();
+ publisher.setValid(false);
}
}
}