You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/02/10 16:33:27 UTC
[1/2] incubator-nifi git commit: NIFI-84 add MapMessage value pairs
as FlowFile attributes
Repository: incubator-nifi
Updated Branches:
refs/heads/develop afe446774 -> eabf2d52f
NIFI-84 add MapMessage value pairs as FlowFile attributes
Signed-off-by: Toivo Adams <to...@gmail.com>
Signed-off-by: Mark Payne <ma...@hotmail.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/602fa7a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/602fa7a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/602fa7a8
Branch: refs/heads/develop
Commit: 602fa7a8605851b565390fefc7d5d3ef3afeb3e2
Parents: afe4467
Author: Toivo Adams <to...@gmail.com>
Authored: Tue Feb 10 10:03:51 2015 +0200
Committer: Mark Payne <ma...@hotmail.com>
Committed: Tue Feb 10 09:49:43 2015 -0500
----------------------------------------------------------------------
.../nifi/processors/standard/JmsConsumer.java | 138 +++++++++------
.../standard/util/JmsProcessingSummary.java | 83 +++++++++
.../processors/standard/TestJmsConsumer.java | 173 +++++++++++++++++++
3 files changed, 339 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/602fa7a8/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
index 2d262f3..c48d520 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
@@ -33,6 +33,8 @@ import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -40,6 +42,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.jms.JMSException;
+import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@@ -54,6 +57,7 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processors.standard.util.JmsFactory;
+import org.apache.nifi.processors.standard.util.JmsProcessingSummary;
import org.apache.nifi.processors.standard.util.WrappedMessageConsumer;
import org.apache.nifi.util.BooleanHolder;
import org.apache.nifi.util.IntegerHolder;
@@ -63,6 +67,8 @@ import org.apache.nifi.util.StopWatch;
public abstract class JmsConsumer extends AbstractProcessor {
+ public static final String MAP_MESSAGE_PREFIX = "jms.mapmessage.";
+
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("All FlowFiles are routed to success").build();
@@ -108,22 +114,17 @@ public abstract class JmsConsumer extends AbstractProcessor {
final boolean addAttributes = context.getProperty(JMS_PROPS_TO_ATTRIBUTES).asBoolean();
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
- final ObjectHolder<Message> lastMessageReceived = new ObjectHolder<>(null);
- final ObjectHolder<Map<String, String>> attributesFromJmsProps = new ObjectHolder<>(null);
- final Set<FlowFile> allFlowFilesCreated = new HashSet<>();
- final IntegerHolder messagesReceived = new IntegerHolder(0);
- final LongHolder bytesReceived = new LongHolder(0L);
-
+ final JmsProcessingSummary processingSummary = new JmsProcessingSummary();
+
final StopWatch stopWatch = new StopWatch(true);
for (int i = 0; i < batchSize; i++) {
- final BooleanHolder failure = new BooleanHolder(false);
final Message message;
try {
// If we haven't received a message, wait until one is available. If we have already received at least one
// message, then we are not willing to wait for more to become available, but we are willing to keep receiving
// all messages that are immediately available.
- if (messagesReceived.get() == 0) {
+ if (processingSummary.getMessagesReceived() == 0) {
message = consumer.receive(timeout);
} else {
message = consumer.receiveNoWait();
@@ -131,7 +132,6 @@ public abstract class JmsConsumer extends AbstractProcessor {
} catch (final JMSException e) {
logger.error("Failed to receive JMS Message due to {}", e);
wrappedConsumer.close(logger);
- failure.set(true);
break;
}
@@ -139,48 +139,16 @@ public abstract class JmsConsumer extends AbstractProcessor {
break;
}
- final IntegerHolder msgsThisFlowFile = new IntegerHolder(0);
- FlowFile flowFile = session.create();
try {
- flowFile = session.write(flowFile, new OutputStreamCallback() {
- @Override
- public void process(final OutputStream rawOut) throws IOException {
- try (final OutputStream out = new BufferedOutputStream(rawOut, 65536)) {
- messagesReceived.getAndIncrement();
- final Map<String, String> attributes = (addAttributes ? JmsFactory.createAttributeMap(message) : null);
- attributesFromJmsProps.set(attributes);
-
- final byte[] messageBody = JmsFactory.createByteArray(message);
- out.write(messageBody);
- bytesReceived.addAndGet(messageBody.length);
- msgsThisFlowFile.incrementAndGet();
- lastMessageReceived.set(message);
- } catch (final JMSException e) {
- logger.error("Failed to receive JMS Message due to {}", e);
- failure.set(true);
- }
- }
- });
- } finally {
- if (failure.get()) { // no flowfile created
- session.remove(flowFile);
- wrappedConsumer.close(logger);
- } else {
- allFlowFilesCreated.add(flowFile);
-
- final Map<String, String> attributes = attributesFromJmsProps.get();
- if (attributes != null) {
- flowFile = session.putAllAttributes(flowFile, attributes);
- }
-
- session.getProvenanceReporter().receive(flowFile, context.getProperty(URL).getValue());
- session.transfer(flowFile, REL_SUCCESS);
- logger.info("Created {} from {} messages received from JMS Server and transferred to 'success'", new Object[]{flowFile, msgsThisFlowFile.get()});
- }
- }
+ processingSummary.add( map2FlowFile(context, session, message, addAttributes, logger) );
+ } catch (Exception e) {
+ logger.error("Failed to receive JMS Message due to {}", e);
+ wrappedConsumer.close(logger);
+ break;
+ }
}
-
- if (allFlowFilesCreated.isEmpty()) {
+
+ if (processingSummary.getFlowFilesCreated()==0) {
context.yield();
return;
}
@@ -188,21 +156,81 @@ public abstract class JmsConsumer extends AbstractProcessor {
session.commit();
stopWatch.stop();
- if (!allFlowFilesCreated.isEmpty()) {
+ if (processingSummary.getFlowFilesCreated()>0) {
final float secs = ((float) stopWatch.getDuration(TimeUnit.MILLISECONDS) / 1000F);
- float messagesPerSec = ((float) messagesReceived.get()) / secs;
- final String dataRate = stopWatch.calculateDataRate(bytesReceived.get());
- logger.info("Received {} messages in {} milliseconds, at a rate of {} messages/sec or {}", new Object[]{messagesReceived.get(), stopWatch.getDuration(TimeUnit.MILLISECONDS), messagesPerSec, dataRate});
+ float messagesPerSec = ((float) processingSummary.getMessagesReceived()) / secs;
+ final String dataRate = stopWatch.calculateDataRate(processingSummary.getBytesReceived());
+ logger.info("Received {} messages in {} milliseconds, at a rate of {} messages/sec or {}", new Object[]{processingSummary.getMessagesReceived(), stopWatch.getDuration(TimeUnit.MILLISECONDS), messagesPerSec, dataRate});
}
// if we need to acknowledge the messages, do so now.
- final Message lastMessage = lastMessageReceived.get();
+ final Message lastMessage = processingSummary.getLastMessageReceived();
if (clientAcknowledge && lastMessage != null) {
try {
lastMessage.acknowledge(); // acknowledge all received messages by acknowledging only the last.
} catch (final JMSException e) {
- logger.error("Failed to acknowledge {} JMS Message(s). This may result in duplicate messages. Reason for failure: {}", new Object[]{messagesReceived.get(), e});
+ logger.error("Failed to acknowledge {} JMS Message(s). This may result in duplicate messages. Reason for failure: {}", new Object[]{processingSummary.getMessagesReceived(), e});
}
}
}
+
+ public static JmsProcessingSummary map2FlowFile(final ProcessContext context, final ProcessSession session, final Message message, final boolean addAttributes, ProcessorLog logger) throws Exception {
+
+ // Currently not very useful, because always one Message == one FlowFile
+ final IntegerHolder msgsThisFlowFile = new IntegerHolder(1);
+
+ FlowFile flowFile = session.create();
+ try {
+ // MapMessage is exception, add only name-value pairs to FlowFile attributes
+ if (message instanceof MapMessage) {
+ MapMessage mapMessage = (MapMessage) message;
+ flowFile = session.putAllAttributes(flowFile, createMapMessageValues(mapMessage));
+ }
+ // all other message types, write Message body to FlowFile content
+ else {
+ flowFile = session.write(flowFile, new OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream rawOut) throws IOException {
+ try (final OutputStream out = new BufferedOutputStream(rawOut, 65536)) {
+ final byte[] messageBody = JmsFactory.createByteArray(message);
+ out.write(messageBody);
+ } catch (final JMSException e) {
+ throw new ProcessException("Failed to receive JMS Message due to {}", e);
+ }
+ }
+ });
+ }
+
+ if (addAttributes)
+ flowFile = session.putAllAttributes(flowFile, JmsFactory.createAttributeMap(message));
+
+ session.getProvenanceReporter().receive(flowFile, context.getProperty(URL).getValue());
+ session.transfer(flowFile, REL_SUCCESS);
+ logger.info("Created {} from {} messages received from JMS Server and transferred to 'success'", new Object[]{flowFile, msgsThisFlowFile.get()});
+
+ return new JmsProcessingSummary(flowFile.getSize(), message, flowFile);
+
+ } catch (Exception e) {
+ session.remove(flowFile);
+ throw e;
+ }
+ }
+
+ public static Map<String, String> createMapMessageValues(final MapMessage mapMessage) throws JMSException {
+ final Map<String, String> valueMap = new HashMap<>();
+
+ final Enumeration<?> enumeration = mapMessage.getMapNames();
+ while (enumeration.hasMoreElements()) {
+ final String name = (String) enumeration.nextElement();
+
+ final Object value = mapMessage.getObject(name);
+ if (value==null)
+ valueMap.put(MAP_MESSAGE_PREFIX+name, "");
+ else
+ valueMap.put(MAP_MESSAGE_PREFIX+name, value.toString());
+ }
+
+ return valueMap;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/602fa7a8/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProcessingSummary.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProcessingSummary.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProcessingSummary.java
new file mode 100644
index 0000000..02a4096
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProcessingSummary.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.util;
+
+import javax.jms.Message;
+
+import org.apache.nifi.flowfile.FlowFile;
+
+
+/**
+ * Data structure which allows to collect processing summary data.
+ *
+ */
+public class JmsProcessingSummary {
+
+ private int messagesReceived;
+ private long bytesReceived;
+ private Message lastMessageReceived;
+ private int flowFilesCreated;
+ private FlowFile lastFlowFile; // helps testing
+
+ public JmsProcessingSummary() {
+ super();
+ this.messagesReceived = 0;
+ this.bytesReceived = 0;
+ this.lastMessageReceived = null;
+ this.flowFilesCreated = 0;
+ this.lastFlowFile = null;
+ }
+
+ public JmsProcessingSummary(long bytesReceived, Message lastMessageReceived, FlowFile lastFlowFile) {
+ super();
+ this.messagesReceived = 1;
+ this.bytesReceived = bytesReceived;
+ this.lastMessageReceived = lastMessageReceived;
+ this.flowFilesCreated = 1;
+ this.lastFlowFile = lastFlowFile;
+ }
+
+ public void add(JmsProcessingSummary jmsProcessingSummary) {
+ this.messagesReceived += jmsProcessingSummary.messagesReceived;
+ this.bytesReceived += jmsProcessingSummary.bytesReceived;
+ this.lastMessageReceived = jmsProcessingSummary.lastMessageReceived;
+ this.flowFilesCreated += jmsProcessingSummary.flowFilesCreated;
+ this.lastFlowFile = jmsProcessingSummary.lastFlowFile;
+ }
+
+ public int getMessagesReceived() {
+ return messagesReceived;
+ }
+
+ public long getBytesReceived() {
+ return bytesReceived;
+ }
+
+ public Message getLastMessageReceived() {
+ return lastMessageReceived;
+ }
+
+ public int getFlowFilesCreated() {
+ return flowFilesCreated;
+ }
+
+ public FlowFile getLastFlowFile() {
+ return lastFlowFile;
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/602fa7a8/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJmsConsumer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJmsConsumer.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJmsConsumer.java
new file mode 100644
index 0000000..1777a89
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJmsConsumer.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processors.standard.util.JmsProcessingSummary;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.MockProcessContext;
+import org.apache.nifi.util.MockProcessorInitializationContext;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class TestJmsConsumer {
+
+ static protected MapMessage createMapMessage() throws JMSException {
+ MapMessage mapMessage = new ActiveMQMapMessage();
+ mapMessage.setString("name", "Arnold");
+ mapMessage.setInt ("age", 97);
+ mapMessage.setDouble("xyz", 89686.564);
+ mapMessage.setBoolean("good", true);
+ return mapMessage;
+ }
+
+ /**
+ * Test method for {@link org.apache.nifi.processors.standard.JmsConsumer#createMapMessageAttrs(javax.jms.MapMessage)}.
+ * @throws JMSException
+ */
+ @Test
+ public void testCreateMapMessageValues() throws JMSException {
+
+ MapMessage mapMessage = createMapMessage();
+
+ Map<String, String> mapMessageValues = JmsConsumer.createMapMessageValues(mapMessage);
+ assertEquals("", 4, mapMessageValues.size());
+ assertEquals("", "Arnold", mapMessageValues.get(JmsConsumer.MAP_MESSAGE_PREFIX+"name"));
+ assertEquals("", "97", mapMessageValues.get(JmsConsumer.MAP_MESSAGE_PREFIX+"age"));
+ assertEquals("", "89686.564", mapMessageValues.get(JmsConsumer.MAP_MESSAGE_PREFIX+"xyz"));
+ assertEquals("", "true", mapMessageValues.get(JmsConsumer.MAP_MESSAGE_PREFIX+"good"));
+ }
+
+ /**
+ * Test MapMessage to FlowFile conversion
+ */
+ @Test
+ public void testMap2FlowFileMapMessage() throws Exception {
+
+ TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class);
+ MapMessage mapMessage = createMapMessage();
+
+ ProcessContext context = runner.getProcessContext();
+ ProcessSession session = runner.getProcessSessionFactory().createSession();
+ ProcessorInitializationContext pic = new MockProcessorInitializationContext(runner.getProcessor(),
+ (MockProcessContext) runner.getProcessContext());
+
+ JmsProcessingSummary summary = JmsConsumer.map2FlowFile(context, session, mapMessage, true, pic.getLogger());
+
+ assertEquals("MapMessage should not create FlowFile content", 0, summary.getBytesReceived());
+
+ Map<String, String> attributes = summary.getLastFlowFile().getAttributes();
+ assertEquals("", "Arnold", attributes.get(JmsConsumer.MAP_MESSAGE_PREFIX+"name"));
+ assertEquals("", "97", attributes.get(JmsConsumer.MAP_MESSAGE_PREFIX+"age"));
+ assertEquals("", "89686.564", attributes.get(JmsConsumer.MAP_MESSAGE_PREFIX+"xyz"));
+ assertEquals("", "true", attributes.get(JmsConsumer.MAP_MESSAGE_PREFIX+"good"));
+ }
+
+ /**
+ * Test TextMessage to FlowFile conversion
+ */
+ @Test
+ public void testMap2FlowFileTextMessage() throws Exception {
+
+ TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class);
+ TextMessage textMessage = new ActiveMQTextMessage();
+
+ String payload = "Hello world!";
+ textMessage.setText(payload);
+
+ ProcessContext context = runner.getProcessContext();
+ ProcessSession session = runner.getProcessSessionFactory().createSession();
+ ProcessorInitializationContext pic = new MockProcessorInitializationContext(runner.getProcessor(),
+ (MockProcessContext) runner.getProcessContext());
+
+ JmsProcessingSummary summary = JmsConsumer.map2FlowFile(context, session, textMessage, true, pic.getLogger());
+
+ assertEquals("TextMessage content length should equal to FlowFile content size", payload.length(), summary.getLastFlowFile().getSize());
+
+ final byte[] buffer = new byte[payload.length()];
+ runner.clearTransferState();
+
+ session.read(summary.getLastFlowFile(), new InputStreamCallback() {
+ @Override
+ public void process(InputStream in) throws IOException {
+ StreamUtils.fillBuffer(in, buffer, false);
+ }
+ });
+
+ String contentString = new String(buffer,"UTF-8");
+ assertEquals("", payload, contentString);
+ }
+
+ /**
+ * Test BytesMessage to FlowFile conversion
+ */
+ @Test
+ public void testMap2FlowFileBytesMessage() throws Exception {
+
+ TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class);
+ BytesMessage bytesMessage = new ActiveMQBytesMessage();
+
+ String sourceString = "Apache NiFi is an easy to use, powerful, and reliable system to process and distribute data.!";
+ byte[] payload = sourceString.getBytes("UTF-8");
+ bytesMessage.writeBytes(payload);
+ bytesMessage.reset();
+
+ ProcessContext context = runner.getProcessContext();
+ ProcessSession session = runner.getProcessSessionFactory().createSession();
+ ProcessorInitializationContext pic = new MockProcessorInitializationContext(runner.getProcessor(),
+ (MockProcessContext) runner.getProcessContext());
+
+ JmsProcessingSummary summary = JmsConsumer.map2FlowFile(context, session, bytesMessage, true, pic.getLogger());
+
+ assertEquals("BytesMessage content length should equal to FlowFile content size", payload.length, summary.getLastFlowFile().getSize());
+
+ final byte[] buffer = new byte[payload.length];
+ runner.clearTransferState();
+
+ session.read(summary.getLastFlowFile(), new InputStreamCallback() {
+ @Override
+ public void process(InputStream in) throws IOException {
+ StreamUtils.fillBuffer(in, buffer, false);
+ }
+ });
+
+ String contentString = new String(buffer,"UTF-8");
+ assertEquals("", sourceString, contentString);
+ }
+
+}
[2/2] incubator-nifi git commit: NIFI-84: Allow PutJMS to create
MapMessage's so that we can test GetJMS* Processors
Posted by ma...@apache.org.
NIFI-84: Allow PutJMS to create MapMessage's so that we can test GetJMS* Processors
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/eabf2d52
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/eabf2d52
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/eabf2d52
Branch: refs/heads/develop
Commit: eabf2d52fc48731cd5a78b602d401d441657b3a6
Parents: 602fa7a
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Feb 10 10:27:17 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Tue Feb 10 10:27:17 2015 -0500
----------------------------------------------------------------------
.../org/apache/nifi/processors/standard/JmsConsumer.java | 5 +----
.../java/org/apache/nifi/processors/standard/PutJMS.java | 11 ++++++++---
.../nifi/processors/standard/util/JmsProperties.java | 3 ++-
3 files changed, 11 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/eabf2d52/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
index c48d520..e4bbaec 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
@@ -48,7 +48,6 @@ import javax.jms.MessageConsumer;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@@ -59,10 +58,8 @@ import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processors.standard.util.JmsFactory;
import org.apache.nifi.processors.standard.util.JmsProcessingSummary;
import org.apache.nifi.processors.standard.util.WrappedMessageConsumer;
-import org.apache.nifi.util.BooleanHolder;
+import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.util.IntegerHolder;
-import org.apache.nifi.util.LongHolder;
-import org.apache.nifi.util.ObjectHolder;
import org.apache.nifi.util.StopWatch;
public abstract class JmsConsumer extends AbstractProcessor {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/eabf2d52/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java
index ce5bea5..99c7bb7 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java
@@ -40,6 +40,7 @@ import static org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_BY
import static org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_EMPTY;
import static org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_STREAM;
import static org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_TEXT;
+import static org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_MAP;
import static org.apache.nifi.processors.standard.util.JmsProperties.PASSWORD;
import static org.apache.nifi.processors.standard.util.JmsProperties.REPLY_TO_QUEUE;
import static org.apache.nifi.processors.standard.util.JmsProperties.TIMEOUT;
@@ -257,18 +258,22 @@ public class PutJMS extends AbstractProcessor {
switch (context.getProperty(MESSAGE_TYPE).getValue()) {
case MSG_TYPE_EMPTY: {
message = jmsSession.createTextMessage("");
+ break;
}
- break;
case MSG_TYPE_STREAM: {
final StreamMessage streamMessage = jmsSession.createStreamMessage();
streamMessage.writeBytes(messageContent);
message = streamMessage;
+ break;
}
- break;
case MSG_TYPE_TEXT: {
message = jmsSession.createTextMessage(new String(messageContent, UTF8));
+ break;
+ }
+ case MSG_TYPE_MAP: {
+ message = jmsSession.createMapMessage();
+ break;
}
- break;
case MSG_TYPE_BYTE:
default: {
final BytesMessage bytesMessage = jmsSession.createBytesMessage();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/eabf2d52/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java
index 67d0bbf..3a5695e 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java
@@ -33,6 +33,7 @@ public class JmsProperties {
public static final String MSG_TYPE_BYTE = "byte";
public static final String MSG_TYPE_TEXT = "text";
public static final String MSG_TYPE_STREAM = "stream";
+ public static final String MSG_TYPE_MAP = "map";
public static final String MSG_TYPE_EMPTY = "empty";
// Standard JMS Properties
@@ -142,7 +143,7 @@ public class JmsProperties {
.name("Message Type")
.description("The Type of JMS Message to Construct")
.required(true)
- .allowableValues(MSG_TYPE_BYTE, MSG_TYPE_STREAM, MSG_TYPE_TEXT, MSG_TYPE_EMPTY)
+ .allowableValues(MSG_TYPE_BYTE, MSG_TYPE_STREAM, MSG_TYPE_TEXT, MSG_TYPE_MAP, MSG_TYPE_EMPTY)
.defaultValue(MSG_TYPE_BYTE)
.build();
public static final PropertyDescriptor MESSAGE_PRIORITY = new PropertyDescriptor.Builder()