You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/11/10 21:12:53 UTC
[2/7] nifi git commit: NIFI-1000 Fixed JmsFactory to properly obtain
destiniation name Re-enabled JMS Tests that were annotated with @Ignore
NIFI-1000 Fixed JmsFactory to properly obtain destiniation name
Re-enabled JMS Tests that were annotated with @Ignore
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ef0be5a5
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ef0be5a5
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ef0be5a5
Branch: refs/heads/master
Commit: ef0be5a5d6fbfc8abb55e250fc1ddbc294fda743
Parents: 5f8fdae
Author: Oleg Zhurakousky <ol...@suitcase.io>
Authored: Mon Nov 9 18:35:31 2015 -0500
Committer: Oleg Zhurakousky <ol...@suitcase.io>
Committed: Mon Nov 9 18:35:31 2015 -0500
----------------------------------------------------------------------
.../nifi-standard-processors/pom.xml | 3 +-
.../processors/standard/util/JmsFactory.java | 8 ++-
.../processors/standard/TestGetJMSQueue.java | 72 ++++++++++++++++----
pom.xml | 4 +-
4 files changed, 71 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/ef0be5a5/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 2d93944..931b939 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -152,7 +152,8 @@ language governing permissions and limitations under the License. -->
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
- <artifactId>activemq-client</artifactId>
+ <artifactId>activemq-all</artifactId>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
http://git-wip-us.apache.org/repos/asf/nifi/blob/ef0be5a5/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java
index ca5df9f..5f6bea5 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java
@@ -481,7 +481,13 @@ public class JmsFactory {
attributes.put(ATTRIBUTE_PREFIX + JMS_CORRELATION_ID, message.getJMSCorrelationID());
}
if (message.getJMSDestination() != null) {
- attributes.put(ATTRIBUTE_PREFIX + JMS_DESTINATION, message.getJMSDestination().toString());
+ String destinationName;
+ if (message.getJMSDestination() instanceof Queue) {
+ destinationName = ((Queue) message.getJMSDestination()).getQueueName();
+ } else {
+ destinationName = ((Topic) message.getJMSDestination()).getTopicName();
+ }
+ attributes.put(ATTRIBUTE_PREFIX + JMS_DESTINATION, destinationName);
}
if (message.getJMSMessageID() != null) {
attributes.put(ATTRIBUTE_PREFIX + JMS_MESSAGE_ID, message.getJMSMessageID());
http://git-wip-us.apache.org/repos/asf/nifi/blob/ef0be5a5/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetJMSQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetJMSQueue.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetJMSQueue.java
index 9c833f5..ecb04c1 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetJMSQueue.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetJMSQueue.java
@@ -16,6 +16,11 @@
*/
package org.apache.nifi.processors.standard;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
import javax.jms.BytesMessage;
import javax.jms.MapMessage;
import javax.jms.Message;
@@ -24,73 +29,116 @@ import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
+import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processors.standard.util.JmsFactory;
import org.apache.nifi.processors.standard.util.JmsProperties;
import org.apache.nifi.processors.standard.util.WrappedMessageProducer;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockProcessSession;
+import org.apache.nifi.util.StandardProcessorTestRunner;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.Revision;
+import org.junit.Test;
public class TestGetJMSQueue {
- @org.junit.Ignore
+ @Test
public void testSendTextToQueue() throws Exception {
- final TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class);
+ GetJMSQueue getJmsQueue = new GetJMSQueue();
+ StandardProcessorTestRunner runner = (StandardProcessorTestRunner) TestRunners.newTestRunner(getJmsQueue);
runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER);
- runner.setProperty(JmsProperties.URL, "tcp://localhost:61616");
+ runner.setProperty(JmsProperties.URL, "vm://localhost?broker.persistent=false");
runner.setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE);
runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing");
runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO);
+
+ MockProcessSession pSession = (MockProcessSession) runner.getProcessSessionFactory().createSession();
WrappedMessageProducer wrappedProducer = JmsFactory.createMessageProducer(runner.getProcessContext(), true);
final Session jmsSession = wrappedProducer.getSession();
final MessageProducer producer = wrappedProducer.getProducer();
-
final Message message = jmsSession.createTextMessage("Hello World");
producer.send(message);
jmsSession.commit();
+
+ getJmsQueue.onTrigger(runner.getProcessContext(), pSession);
+
+ List<MockFlowFile> flowFiles = pSession
+ .getFlowFilesForRelationship(new Relationship.Builder().name("success").build());
+
+ assertTrue(flowFiles.size() == 1);
+ MockFlowFile successFlowFile = flowFiles.get(0);
+ String receivedMessage = new String(runner.getContentAsByteArray(successFlowFile));
+ assertEquals("Hello World", receivedMessage);
+ assertEquals("queue.testing", successFlowFile.getAttribute("jms.JMSDestination"));
producer.close();
jmsSession.close();
}
- @org.junit.Ignore
+ @Test
public void testSendBytesToQueue() throws Exception {
- final TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class);
+ GetJMSQueue getJmsQueue = new GetJMSQueue();
+ StandardProcessorTestRunner runner = (StandardProcessorTestRunner) TestRunners.newTestRunner(getJmsQueue);
runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER);
- runner.setProperty(JmsProperties.URL, "tcp://localhost:61616");
+ runner.setProperty(JmsProperties.URL, "vm://localhost?broker.persistent=false");
runner.setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE);
runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing");
runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO);
WrappedMessageProducer wrappedProducer = JmsFactory.createMessageProducer(runner.getProcessContext(), true);
final Session jmsSession = wrappedProducer.getSession();
final MessageProducer producer = wrappedProducer.getProducer();
-
+ MockProcessSession pSession = (MockProcessSession) runner.getProcessSessionFactory().createSession();
final BytesMessage message = jmsSession.createBytesMessage();
message.writeBytes("Hello Bytes".getBytes());
producer.send(message);
jmsSession.commit();
+
+ getJmsQueue.onTrigger(runner.getProcessContext(), pSession);
+
+ List<MockFlowFile> flowFiles = pSession
+ .getFlowFilesForRelationship(new Relationship.Builder().name("success").build());
+
+ assertTrue(flowFiles.size() == 1);
+ MockFlowFile successFlowFile = flowFiles.get(0);
+ String receivedMessage = new String(runner.getContentAsByteArray(successFlowFile));
+ assertEquals("Hello Bytes", receivedMessage);
+ assertEquals("queue.testing", successFlowFile.getAttribute("jms.JMSDestination"));
producer.close();
jmsSession.close();
}
- @org.junit.Ignore
+ @Test
public void testSendStreamToQueue() throws Exception {
- final TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class);
+ GetJMSQueue getJmsQueue = new GetJMSQueue();
+ StandardProcessorTestRunner runner = (StandardProcessorTestRunner) TestRunners.newTestRunner(getJmsQueue);
runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER);
- runner.setProperty(JmsProperties.URL, "tcp://localhost:61616");
+ runner.setProperty(JmsProperties.URL, "vm://localhost?broker.persistent=false");
runner.setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE);
runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing");
runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO);
WrappedMessageProducer wrappedProducer = JmsFactory.createMessageProducer(runner.getProcessContext(), true);
final Session jmsSession = wrappedProducer.getSession();
final MessageProducer producer = wrappedProducer.getProducer();
-
+ MockProcessSession pSession = (MockProcessSession) runner.getProcessSessionFactory().createSession();
final StreamMessage message = jmsSession.createStreamMessage();
message.writeBytes("Hello Stream".getBytes());
producer.send(message);
jmsSession.commit();
+
+ getJmsQueue.onTrigger(runner.getProcessContext(), pSession);
+
+ List<MockFlowFile> flowFiles = pSession
+ .getFlowFilesForRelationship(new Relationship.Builder().name("success").build());
+
+ assertTrue(flowFiles.size() == 1);
+ MockFlowFile successFlowFile = flowFiles.get(0);
+ String receivedMessage = new String(runner.getContentAsByteArray(successFlowFile));
+ assertEquals("Hello Stream", receivedMessage);
+ assertEquals("queue.testing", successFlowFile.getAttribute("jms.JMSDestination"));
+
producer.close();
jmsSession.close();
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/ef0be5a5/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d563408..2ab8c94 100644
--- a/pom.xml
+++ b/pom.xml
@@ -511,8 +511,8 @@
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
- <artifactId>activemq-client</artifactId>
- <version>5.12.0</version>
+ <artifactId>activemq-all</artifactId>
+ <version>5.12.1</version>
</dependency>
<dependency>
<groupId>org.apache.lucene</groupId>