You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/11/10 21:12:53 UTC

[2/7] nifi git commit: NIFI-1000 Fixed JmsFactory to properly obtain destiniation name Re-enabled JMS Tests that were annotated with @Ignore

NIFI-1000 Fixed JmsFactory to properly obtain destiniation name
Re-enabled JMS Tests that were annotated with @Ignore


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ef0be5a5
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ef0be5a5
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ef0be5a5

Branch: refs/heads/master
Commit: ef0be5a5d6fbfc8abb55e250fc1ddbc294fda743
Parents: 5f8fdae
Author: Oleg Zhurakousky <ol...@suitcase.io>
Authored: Mon Nov 9 18:35:31 2015 -0500
Committer: Oleg Zhurakousky <ol...@suitcase.io>
Committed: Mon Nov 9 18:35:31 2015 -0500

----------------------------------------------------------------------
 .../nifi-standard-processors/pom.xml            |  3 +-
 .../processors/standard/util/JmsFactory.java    |  8 ++-
 .../processors/standard/TestGetJMSQueue.java    | 72 ++++++++++++++++----
 pom.xml                                         |  4 +-
 4 files changed, 71 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/ef0be5a5/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 2d93944..931b939 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -152,7 +152,8 @@ language governing permissions and limitations under the License. -->
         </dependency>
         <dependency>
             <groupId>org.apache.activemq</groupId>
-            <artifactId>activemq-client</artifactId>
+            <artifactId>activemq-all</artifactId>
+            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>com.jayway.jsonpath</groupId>

http://git-wip-us.apache.org/repos/asf/nifi/blob/ef0be5a5/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java
index ca5df9f..5f6bea5 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java
@@ -481,7 +481,13 @@ public class JmsFactory {
             attributes.put(ATTRIBUTE_PREFIX + JMS_CORRELATION_ID, message.getJMSCorrelationID());
         }
         if (message.getJMSDestination() != null) {
-            attributes.put(ATTRIBUTE_PREFIX + JMS_DESTINATION, message.getJMSDestination().toString());
+            String destinationName;
+            if (message.getJMSDestination() instanceof Queue) {
+                destinationName = ((Queue) message.getJMSDestination()).getQueueName();
+            } else {
+                destinationName = ((Topic) message.getJMSDestination()).getTopicName();
+            }
+            attributes.put(ATTRIBUTE_PREFIX + JMS_DESTINATION, destinationName);
         }
         if (message.getJMSMessageID() != null) {
             attributes.put(ATTRIBUTE_PREFIX + JMS_MESSAGE_ID, message.getJMSMessageID());

http://git-wip-us.apache.org/repos/asf/nifi/blob/ef0be5a5/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetJMSQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetJMSQueue.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetJMSQueue.java
index 9c833f5..ecb04c1 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetJMSQueue.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetJMSQueue.java
@@ -16,6 +16,11 @@
  */
 package org.apache.nifi.processors.standard;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
 import javax.jms.BytesMessage;
 import javax.jms.MapMessage;
 import javax.jms.Message;
@@ -24,73 +29,116 @@ import javax.jms.ObjectMessage;
 import javax.jms.Session;
 import javax.jms.StreamMessage;
 
+import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processors.standard.util.JmsFactory;
 import org.apache.nifi.processors.standard.util.JmsProperties;
 import org.apache.nifi.processors.standard.util.WrappedMessageProducer;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockProcessSession;
+import org.apache.nifi.util.StandardProcessorTestRunner;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.apache.nifi.web.Revision;
+import org.junit.Test;
 
 public class TestGetJMSQueue {
 
-    @org.junit.Ignore
+    @Test
     public void testSendTextToQueue() throws Exception {
-        final TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class);
+        GetJMSQueue getJmsQueue = new GetJMSQueue();
+        StandardProcessorTestRunner runner = (StandardProcessorTestRunner) TestRunners.newTestRunner(getJmsQueue);
         runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER);
-        runner.setProperty(JmsProperties.URL, "tcp://localhost:61616");
+        runner.setProperty(JmsProperties.URL, "vm://localhost?broker.persistent=false");
         runner.setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE);
         runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing");
         runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO);
+
+        MockProcessSession pSession = (MockProcessSession) runner.getProcessSessionFactory().createSession();
         WrappedMessageProducer wrappedProducer = JmsFactory.createMessageProducer(runner.getProcessContext(), true);
         final Session jmsSession = wrappedProducer.getSession();
         final MessageProducer producer = wrappedProducer.getProducer();
-
         final Message message = jmsSession.createTextMessage("Hello World");
 
         producer.send(message);
         jmsSession.commit();
+
+        getJmsQueue.onTrigger(runner.getProcessContext(), pSession);
+
+        List<MockFlowFile> flowFiles = pSession
+                .getFlowFilesForRelationship(new Relationship.Builder().name("success").build());
+
+        assertTrue(flowFiles.size() == 1);
+        MockFlowFile successFlowFile = flowFiles.get(0);
+        String receivedMessage = new String(runner.getContentAsByteArray(successFlowFile));
+        assertEquals("Hello World", receivedMessage);
+        assertEquals("queue.testing", successFlowFile.getAttribute("jms.JMSDestination"));
         producer.close();
         jmsSession.close();
     }
 
-    @org.junit.Ignore
+    @Test
     public void testSendBytesToQueue() throws Exception {
-        final TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class);
+        GetJMSQueue getJmsQueue = new GetJMSQueue();
+        StandardProcessorTestRunner runner = (StandardProcessorTestRunner) TestRunners.newTestRunner(getJmsQueue);
         runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER);
-        runner.setProperty(JmsProperties.URL, "tcp://localhost:61616");
+        runner.setProperty(JmsProperties.URL, "vm://localhost?broker.persistent=false");
         runner.setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE);
         runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing");
         runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO);
         WrappedMessageProducer wrappedProducer = JmsFactory.createMessageProducer(runner.getProcessContext(), true);
         final Session jmsSession = wrappedProducer.getSession();
         final MessageProducer producer = wrappedProducer.getProducer();
-
+        MockProcessSession pSession = (MockProcessSession) runner.getProcessSessionFactory().createSession();
         final BytesMessage message = jmsSession.createBytesMessage();
         message.writeBytes("Hello Bytes".getBytes());
 
         producer.send(message);
         jmsSession.commit();
+
+        getJmsQueue.onTrigger(runner.getProcessContext(), pSession);
+
+        List<MockFlowFile> flowFiles = pSession
+                .getFlowFilesForRelationship(new Relationship.Builder().name("success").build());
+
+        assertTrue(flowFiles.size() == 1);
+        MockFlowFile successFlowFile = flowFiles.get(0);
+        String receivedMessage = new String(runner.getContentAsByteArray(successFlowFile));
+        assertEquals("Hello Bytes", receivedMessage);
+        assertEquals("queue.testing", successFlowFile.getAttribute("jms.JMSDestination"));
         producer.close();
         jmsSession.close();
     }
 
-    @org.junit.Ignore
+    @Test
     public void testSendStreamToQueue() throws Exception {
-        final TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class);
+        GetJMSQueue getJmsQueue = new GetJMSQueue();
+        StandardProcessorTestRunner runner = (StandardProcessorTestRunner) TestRunners.newTestRunner(getJmsQueue);
         runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER);
-        runner.setProperty(JmsProperties.URL, "tcp://localhost:61616");
+        runner.setProperty(JmsProperties.URL, "vm://localhost?broker.persistent=false");
         runner.setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE);
         runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing");
         runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO);
         WrappedMessageProducer wrappedProducer = JmsFactory.createMessageProducer(runner.getProcessContext(), true);
         final Session jmsSession = wrappedProducer.getSession();
         final MessageProducer producer = wrappedProducer.getProducer();
-
+        MockProcessSession pSession = (MockProcessSession) runner.getProcessSessionFactory().createSession();
         final StreamMessage message = jmsSession.createStreamMessage();
         message.writeBytes("Hello Stream".getBytes());
 
         producer.send(message);
         jmsSession.commit();
+
+        getJmsQueue.onTrigger(runner.getProcessContext(), pSession);
+
+        List<MockFlowFile> flowFiles = pSession
+                .getFlowFilesForRelationship(new Relationship.Builder().name("success").build());
+
+        assertTrue(flowFiles.size() == 1);
+        MockFlowFile successFlowFile = flowFiles.get(0);
+        String receivedMessage = new String(runner.getContentAsByteArray(successFlowFile));
+        assertEquals("Hello Stream", receivedMessage);
+        assertEquals("queue.testing", successFlowFile.getAttribute("jms.JMSDestination"));
+
         producer.close();
         jmsSession.close();
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ef0be5a5/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d563408..2ab8c94 100644
--- a/pom.xml
+++ b/pom.xml
@@ -511,8 +511,8 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.activemq</groupId>
-                <artifactId>activemq-client</artifactId>
-                <version>5.12.0</version>
+                <artifactId>activemq-all</artifactId>
+                <version>5.12.1</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.lucene</groupId>