You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2020/01/19 15:30:54 UTC

[nifi] branch master updated: NIFI-6915 This closes #3961. Jms Durable non shared subscription is broken Revert NIFI-4834 enhancement for durable non shared consumers only.

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new c1301e1  NIFI-6915 This closes #3961. Jms Durable non shared subscription is broken Revert NIFI-4834 enhancement for durable non shared consumers only.
c1301e1 is described below

commit c1301e196c5be89819e702ec76912f3c426f3055
Author: Gardella Juan Pablo <ju...@pitzil.com>
AuthorDate: Tue Jan 7 03:42:43 2020 -0300

    NIFI-6915 This closes #3961. Jms Durable non shared subscription is broken
    Revert NIFI-4834 enhancement for durable non shared consumers only.
    
    Updated also AbstractJMSProcessor class to be public. The testing are not working
    without that change, as org.apache.nifi.jms.processors.PublishJMSIT and
    org.apache.nifi.jms.processors.ConsumeJMSIT are not working, as @OnSchedule
    method is not called (because it is not public).
    The method org.apache.nifi.util.StandardProcessorTestRunner.run(int iterations, boolean stopOnFinish, boolean initialize, long runWait) uses ReflectionUtils.invokeMethodsWithAnnotation which does not call non public
    methods.
    
    Signed-off-by: Joe Witt <jo...@apache.org>
---
 .../nifi/jms/processors/AbstractJMSProcessor.java  |  31 ++++--
 .../org/apache/nifi/jms/processors/ConsumeJMS.java |  47 +++++++-
 .../apache/nifi/jms/processors/ConsumeJMSIT.java   | 121 ++++++++++++++++++++-
 3 files changed, 183 insertions(+), 16 deletions(-)

diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
index 82ed075..33cc87c 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
@@ -29,6 +29,7 @@ import org.apache.nifi.processor.Processor;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.springframework.jms.connection.CachingConnectionFactory;
+import org.springframework.jms.connection.SingleConnectionFactory;
 import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
 import org.springframework.jms.core.JmsTemplate;
 
@@ -49,7 +50,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  * @see ConsumeJMS
  * @see JMSConnectionFactoryProviderDefinition
  */
-abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcessor {
+public abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcessor {
 
     static final String QUEUE = "QUEUE";
     static final String TOPIC = "TOPIC";
@@ -164,6 +165,10 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcess
         propertyDescriptors.add(ATTRIBUTES_AS_HEADERS_REGEX);
     }
 
+    protected static String getClientId(ProcessContext context) {
+        return context.getProperty(CLIENT_ID).evaluateAttributeExpressions().getValue();
+      }
+
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return propertyDescriptors;
@@ -258,12 +263,7 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcess
         cfCredentialsAdapter.setPassword(context.getProperty(PASSWORD).getValue());
 
         final CachingConnectionFactory cachingFactory = new CachingConnectionFactory(cfCredentialsAdapter);
-
-        String clientId = context.getProperty(CLIENT_ID).evaluateAttributeExpressions().getValue();
-        if (clientId != null) {
-            clientId = clientId + "-" + clientIdCounter.getAndIncrement();
-            cachingFactory.setClientId(clientId);
-        }
+        setClientId(context, cachingFactory);
 
         JmsTemplate jmsTemplate = new JmsTemplate();
         jmsTemplate.setConnectionFactory(cachingFactory);
@@ -271,4 +271,21 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcess
 
         return finishBuildingJmsWorker(cachingFactory, jmsTemplate, context);
     }
+
+    /**
+     * Set clientId for JMS connections when <tt>clientId</tt> is not null.
+     * It is overridden by {@code}ConsumeJMS{@code} when durable subscriptions
+     * is configured on the processor.
+     * @param context context.
+     * @param connectionFactory the connection factory.
+     * @since NIFI-6915
+     */
+    protected void setClientId(ProcessContext context, final SingleConnectionFactory connectionFactory) {
+        String clientId = getClientId(context);
+        if (clientId != null) {
+            clientId = clientId + "-" + clientIdCounter.getAndIncrement();
+            connectionFactory.setClientId(clientId);
+        }
+    }
+
 }
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
index fac995c..3e278bc 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
@@ -23,6 +23,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
@@ -38,6 +39,7 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.springframework.jms.connection.CachingConnectionFactory;
+import org.springframework.jms.connection.SingleConnectionFactory;
 import org.springframework.jms.core.JmsTemplate;
 import org.springframework.jms.support.JmsHeaders;
 
@@ -188,6 +190,23 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
         relationships = Collections.unmodifiableSet(_relationships);
     }
 
+    private static boolean isDurableSubscriber(final ProcessContext context) {
+        final Boolean durableBoolean = context.getProperty(DURABLE_SUBSCRIBER).evaluateAttributeExpressions().asBoolean();
+        return durableBoolean == null ? false : durableBoolean;
+    }
+
+    private static boolean isShared(final ProcessContext context) {
+        final Boolean sharedBoolean = context.getProperty(SHARED_SUBSCRIBER).evaluateAttributeExpressions().asBoolean();
+        return sharedBoolean == null ? false : sharedBoolean;
+    }
+
+    @OnScheduled
+    public void onSchedule(ProcessContext context) {
+        if (context.getMaxConcurrentTasks() > 1 && isDurableSubscriber(context) && !isShared(context)) {
+            throw new ProcessException("Durable non shared subscriptions cannot work on multiple threads. Check javax/jms/Session#createDurableConsumer API doc.");
+        }
+    }
+
     @Override
     protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
         final List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(validationContext));
@@ -203,7 +222,6 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
                     "'" + DESTINATION_TYPE.getDisplayName() + "'='" + QUEUE + "'")
                 .build());
         }
-
         return validationResults;
     }
 
@@ -218,10 +236,8 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
     protected void rendezvousWithJms(final ProcessContext context, final ProcessSession processSession, final JMSConsumer consumer) throws ProcessException {
         final String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue();
         final String errorQueueName = context.getProperty(ERROR_QUEUE).evaluateAttributeExpressions().getValue();
-        final Boolean durableBoolean = context.getProperty(DURABLE_SUBSCRIBER).evaluateAttributeExpressions().asBoolean();
-        final boolean durable = durableBoolean == null ? false : durableBoolean;
-        final Boolean sharedBoolean = context.getProperty(SHARED_SUBSCRIBER).evaluateAttributeExpressions().asBoolean();
-        final boolean shared = sharedBoolean == null ? false : sharedBoolean;
+        final boolean durable = isDurableSubscriber(context);
+        final boolean shared = isShared(context);
         final String subscriptionName = context.getProperty(SUBSCRIPTION_NAME).evaluateAttributeExpressions().getValue();
         final String charset = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
 
@@ -280,6 +296,27 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
     }
 
     /**
+     * <p>
+     * Use provided clientId for non shared durable consumers, if not set
+     * always a different value as defined in {@link AbstractJMSProcessor#setClientId(ProcessContext, SingleConnectionFactory)}.
+     * </p>
+     * See {@link Session#createDurableConsumer(javax.jms.Topic, String, String, boolean)},
+     * in special following part: <i>An unshared durable subscription is
+     * identified by a name specified by the client and by the client identifier,
+     * which must be set. An application which subsequently wishes to create
+     * a consumer on that unshared durable subscription must use the same
+     * client identifier.</i>
+     */
+    @Override
+    protected void setClientId(ProcessContext context, SingleConnectionFactory cachingFactory) {
+        if (isDurableSubscriber(context) && !isShared(context)) {
+            cachingFactory.setClientId(getClientId(context));
+        } else {
+            super.setClientId(context, cachingFactory);
+        }
+    }
+
+    /**
      * Copies JMS attributes (i.e., headers and properties) as FF attributes.
      * Given that FF attributes mandate that values are of type String, the
      * copied values of JMS attributes will be "stringified" via
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
index f40f60c..e5ca276 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
@@ -18,16 +18,17 @@ package org.apache.nifi.jms.processors;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import java.util.HashMap;
-import java.util.Map;
-
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -37,10 +38,16 @@ import org.springframework.jms.core.JmsTemplate;
 import org.springframework.jms.core.MessageCreator;
 import org.springframework.jms.support.JmsHeaders;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import javax.jms.BytesMessage;
+import javax.jms.Connection;
 import javax.jms.JMSException;
 import javax.jms.MapMessage;
 import javax.jms.Message;
+import javax.jms.MessageProducer;
 import javax.jms.ObjectMessage;
 import javax.jms.Session;
 import javax.jms.StreamMessage;
@@ -50,7 +57,7 @@ public class ConsumeJMSIT {
 
     @Test
     public void validateSuccessfulConsumeAndTransferToSuccess() throws Exception {
-        final String  destinationName = "cooQueue";
+        final String destinationName = "cooQueue";
         JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
         try {
             JMSPublisher sender = new JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
@@ -258,4 +265,110 @@ public class ConsumeJMSIT {
 
         return message;
     }
+
+    /**
+     * Validates <a href="https://issues.apache.org/jira/browse/NIFI-6915">NIFI-6915</a>.
+     * <p>
+     * The test consists on:
+     * <ul>
+     * <li>Start a durable non shared consumer <tt>C1</tt> with client id <tt>client1</tt> subscribed to topic <tt>T</tt>.</li>
+     * <li>Stop <tt>C1</tt>.</li>
+     * <li>Publish a message <tt>M1</tt> to topic <tt>T</tt>.</li>
+     * <li>Start <tt>C1</tt>.</li>
+     * </ul>
+     * It is expected <tt>C1</tt> receives message <tt>M1</tt>.
+     * </p>
+     * @throws Exception unexpected
+     */
+    @Test(timeout = 10000)
+    public void validateNifi6915() throws Exception {
+        BrokerService broker = new BrokerService();
+        try {
+            broker.setPersistent(false);
+            broker.setBrokerName("broker1");
+            broker.start();
+            ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://broker1");
+            final String destinationName = "validateNifi6915";
+
+            TestRunner c1Consumer = createNonSharedDurableConsumer(cf, destinationName);
+            // 1. Start a durable non shared consumer C1 with client id client1 subscribed to topic T.
+            boolean stopConsumer = true;
+            c1Consumer.run(1, stopConsumer);
+            List<MockFlowFile> flowFiles = c1Consumer.getFlowFilesForRelationship(ConsumeJMS.REL_SUCCESS);
+            assertTrue("Expected no messages", flowFiles.isEmpty());
+            // 2. Publish a message M1 to topic T.
+            publishAMessage(cf, destinationName, "Hi buddy!!");
+            // 3. Start C1.
+            c1Consumer.run(1, true);
+            flowFiles = c1Consumer.getFlowFilesForRelationship(ConsumeJMS.REL_SUCCESS);
+            assertEquals(1, flowFiles.size());
+
+            // It is expected C1 receives message M1.
+            final MockFlowFile successFF = flowFiles.get(0);
+            assertNotNull(successFF);
+            successFF.assertAttributeExists(JmsHeaders.DESTINATION);
+            successFF.assertAttributeEquals(JmsHeaders.DESTINATION, destinationName);
+            successFF.assertContentEquals("Hi buddy!!".getBytes());
+            assertEquals(destinationName, successFF.getAttribute(ConsumeJMS.JMS_SOURCE_DESTINATION_NAME));
+        } catch (Exception e) {
+            throw e;
+        } finally {
+            if (broker != null) {
+                broker.stop();
+            }
+        }
+    }
+
+    @Test(timeout = 10000)
+    public void validateNifi6915OnlyOneThreadAllowed() {
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+        final String destinationName = "validateNifi6915";
+        try {
+            TestRunner runner = createNonSharedDurableConsumer(cf, destinationName);
+            runner.setThreadCount(2);
+            runner.run(1, true);
+            fail();
+        } catch (Throwable e) {
+            // Unable to capture the message :(
+        }
+
+        TestRunner runner = createNonSharedDurableConsumer(cf, destinationName);
+        // using one thread, it should not fail.
+        runner.setThreadCount(1);
+        runner.run(1, true);
+    }
+
+    private static void publishAMessage(ActiveMQConnectionFactory cf, final String destinationName, String messageContent) throws JMSException {
+        // Publish a message.
+        try (Connection conn = cf.createConnection();
+                Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                MessageProducer producer = session.createProducer(session.createTopic(destinationName))) {
+            producer.send(session.createTextMessage(messageContent));
+        }
+    }
+
+    private static TestRunner createNonSharedDurableConsumer(ActiveMQConnectionFactory cf, final String destinationName) {
+        ConsumeJMS c1 = new ConsumeJMS();
+        TestRunner c1Consumer = TestRunners.newTestRunner(c1);
+        JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class);
+        when(cs.getIdentifier()).thenReturn("cfProvider");
+        when(cs.getConnectionFactory()).thenReturn(cf);
+
+        try {
+            c1Consumer.addControllerService("cfProvider", cs);
+        } catch (InitializationException e) {
+            throw new IllegalStateException(e);
+        }
+        c1Consumer.enableControllerService(cs);
+
+        c1Consumer.setProperty(ConsumeJMS.CF_SERVICE, "cfProvider");
+        c1Consumer.setProperty(ConsumeJMS.DESTINATION, destinationName);
+        c1Consumer.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.TOPIC);
+        c1Consumer.setProperty(ConsumeJMS.DURABLE_SUBSCRIBER, "true");
+        c1Consumer.setProperty(ConsumeJMS.SUBSCRIPTION_NAME, "SubscriptionName");
+        c1Consumer.setProperty(ConsumeJMS.SHARED_SUBSCRIBER, "false");
+        c1Consumer.setProperty(ConsumeJMS.CLIENT_ID, "client1");
+        return c1Consumer;
+    }
+
 }