You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/02/10 16:33:27 UTC

[1/2] incubator-nifi git commit: NIFI-84 add MapMessage value pairs as FlowFile attributes

Repository: incubator-nifi
Updated Branches:
  refs/heads/develop afe446774 -> eabf2d52f


NIFI-84 add MapMessage value pairs as FlowFile attributes

Signed-off-by: Toivo Adams <to...@gmail.com>
Signed-off-by: Mark Payne <ma...@hotmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/602fa7a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/602fa7a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/602fa7a8

Branch: refs/heads/develop
Commit: 602fa7a8605851b565390fefc7d5d3ef3afeb3e2
Parents: afe4467
Author: Toivo Adams <to...@gmail.com>
Authored: Tue Feb 10 10:03:51 2015 +0200
Committer: Mark Payne <ma...@hotmail.com>
Committed: Tue Feb 10 09:49:43 2015 -0500

----------------------------------------------------------------------
 .../nifi/processors/standard/JmsConsumer.java   | 138 +++++++++------
 .../standard/util/JmsProcessingSummary.java     |  83 +++++++++
 .../processors/standard/TestJmsConsumer.java    | 173 +++++++++++++++++++
 3 files changed, 339 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/602fa7a8/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
index 2d262f3..c48d520 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
@@ -33,6 +33,8 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -40,6 +42,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import javax.jms.JMSException;
+import javax.jms.MapMessage;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 
@@ -54,6 +57,7 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processors.standard.util.JmsFactory;
+import org.apache.nifi.processors.standard.util.JmsProcessingSummary;
 import org.apache.nifi.processors.standard.util.WrappedMessageConsumer;
 import org.apache.nifi.util.BooleanHolder;
 import org.apache.nifi.util.IntegerHolder;
@@ -63,6 +67,8 @@ import org.apache.nifi.util.StopWatch;
 
 public abstract class JmsConsumer extends AbstractProcessor {
 
+    public static final String MAP_MESSAGE_PREFIX = "jms.mapmessage.";
+    
     public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
             .description("All FlowFiles are routed to success").build();
 
@@ -108,22 +114,17 @@ public abstract class JmsConsumer extends AbstractProcessor {
         final boolean addAttributes = context.getProperty(JMS_PROPS_TO_ATTRIBUTES).asBoolean();
         final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
 
-        final ObjectHolder<Message> lastMessageReceived = new ObjectHolder<>(null);
-        final ObjectHolder<Map<String, String>> attributesFromJmsProps = new ObjectHolder<>(null);
-        final Set<FlowFile> allFlowFilesCreated = new HashSet<>();
-        final IntegerHolder messagesReceived = new IntegerHolder(0);
-        final LongHolder bytesReceived = new LongHolder(0L);
-
+        final JmsProcessingSummary processingSummary = new JmsProcessingSummary();
+        
         final StopWatch stopWatch = new StopWatch(true);
         for (int i = 0; i < batchSize; i++) {
-            final BooleanHolder failure = new BooleanHolder(false);
 
             final Message message;
             try {
                 // If we haven't received a message, wait until one is available. If we have already received at least one
                 // message, then we are not willing to wait for more to become available, but we are willing to keep receiving
                 // all messages that are immediately available.
-                if (messagesReceived.get() == 0) {
+                if (processingSummary.getMessagesReceived() == 0) {
                     message = consumer.receive(timeout);
                 } else {
                     message = consumer.receiveNoWait();
@@ -131,7 +132,6 @@ public abstract class JmsConsumer extends AbstractProcessor {
             } catch (final JMSException e) {
                 logger.error("Failed to receive JMS Message due to {}", e);
                 wrappedConsumer.close(logger);
-                failure.set(true);
                 break;
             }
 
@@ -139,48 +139,16 @@ public abstract class JmsConsumer extends AbstractProcessor {
                 break;
             }
 
-            final IntegerHolder msgsThisFlowFile = new IntegerHolder(0);
-            FlowFile flowFile = session.create();
             try {
-                flowFile = session.write(flowFile, new OutputStreamCallback() {
-                    @Override
-                    public void process(final OutputStream rawOut) throws IOException {
-                        try (final OutputStream out = new BufferedOutputStream(rawOut, 65536)) {
-                            messagesReceived.getAndIncrement();
-                            final Map<String, String> attributes = (addAttributes ? JmsFactory.createAttributeMap(message) : null);
-                            attributesFromJmsProps.set(attributes);
-
-                            final byte[] messageBody = JmsFactory.createByteArray(message);
-                            out.write(messageBody);
-                            bytesReceived.addAndGet(messageBody.length);
-                            msgsThisFlowFile.incrementAndGet();
-                            lastMessageReceived.set(message);
-                        } catch (final JMSException e) {
-                            logger.error("Failed to receive JMS Message due to {}", e);
-                            failure.set(true);
-                        }
-                    }
-                });
-            } finally {
-                if (failure.get()) {    // no flowfile created
-                    session.remove(flowFile);
-                    wrappedConsumer.close(logger);
-                } else {
-                    allFlowFilesCreated.add(flowFile);
-
-                    final Map<String, String> attributes = attributesFromJmsProps.get();
-                    if (attributes != null) {
-                        flowFile = session.putAllAttributes(flowFile, attributes);
-                    }
-
-                    session.getProvenanceReporter().receive(flowFile, context.getProperty(URL).getValue());
-                    session.transfer(flowFile, REL_SUCCESS);
-                    logger.info("Created {} from {} messages received from JMS Server and transferred to 'success'", new Object[]{flowFile, msgsThisFlowFile.get()});
-                }
-            }
+				processingSummary.add( map2FlowFile(context, session, message, addAttributes, logger) );
+			} catch (Exception e) {
+                logger.error("Failed to receive JMS Message due to {}", e);
+                wrappedConsumer.close(logger);
+                break;				
+			}
         }
-
-        if (allFlowFilesCreated.isEmpty()) {
+        
+        if (processingSummary.getFlowFilesCreated()==0) {
             context.yield();
             return;
         }
@@ -188,21 +156,81 @@ public abstract class JmsConsumer extends AbstractProcessor {
         session.commit();
 
         stopWatch.stop();
-        if (!allFlowFilesCreated.isEmpty()) {
+        if (processingSummary.getFlowFilesCreated()>0) {
             final float secs = ((float) stopWatch.getDuration(TimeUnit.MILLISECONDS) / 1000F);
-            float messagesPerSec = ((float) messagesReceived.get()) / secs;
-            final String dataRate = stopWatch.calculateDataRate(bytesReceived.get());
-            logger.info("Received {} messages in {} milliseconds, at a rate of {} messages/sec or {}", new Object[]{messagesReceived.get(), stopWatch.getDuration(TimeUnit.MILLISECONDS), messagesPerSec, dataRate});
+            float messagesPerSec = ((float) processingSummary.getMessagesReceived()) / secs;
+            final String dataRate = stopWatch.calculateDataRate(processingSummary.getBytesReceived());
+            logger.info("Received {} messages in {} milliseconds, at a rate of {} messages/sec or {}", new Object[]{processingSummary.getMessagesReceived(), stopWatch.getDuration(TimeUnit.MILLISECONDS), messagesPerSec, dataRate});
         }
 
         // if we need to acknowledge the messages, do so now.
-        final Message lastMessage = lastMessageReceived.get();
+        final Message lastMessage = processingSummary.getLastMessageReceived();
         if (clientAcknowledge && lastMessage != null) {
             try {
                 lastMessage.acknowledge();  // acknowledge all received messages by acknowledging only the last.
             } catch (final JMSException e) {
-                logger.error("Failed to acknowledge {} JMS Message(s). This may result in duplicate messages. Reason for failure: {}", new Object[]{messagesReceived.get(), e});
+                logger.error("Failed to acknowledge {} JMS Message(s). This may result in duplicate messages. Reason for failure: {}", new Object[]{processingSummary.getMessagesReceived(), e});
             }
         }
     }
+    
+    public static JmsProcessingSummary map2FlowFile(final ProcessContext context, final ProcessSession session, final Message message, final boolean addAttributes, ProcessorLog logger) throws Exception {
+    	
+    	// Currently not very useful, because always one Message == one FlowFile
+        final IntegerHolder msgsThisFlowFile = new IntegerHolder(1);
+        
+        FlowFile flowFile = session.create();
+        try {
+        	// MapMessage is exception, add only name-value pairs to FlowFile attributes
+            if (message instanceof MapMessage) {
+				MapMessage mapMessage = (MapMessage) message;
+            	flowFile = session.putAllAttributes(flowFile, createMapMessageValues(mapMessage));				
+			}
+            // all other message types, write Message body to FlowFile content 
+            else {
+	            flowFile = session.write(flowFile, new OutputStreamCallback() {
+	                @Override
+	                public void process(final OutputStream rawOut) throws IOException {
+	                    try (final OutputStream out = new BufferedOutputStream(rawOut, 65536)) {
+	                        final byte[] messageBody = JmsFactory.createByteArray(message);
+	                        out.write(messageBody);
+	                    } catch (final JMSException e) {
+	                        throw new ProcessException("Failed to receive JMS Message due to {}", e);
+	                    }
+	                }
+	            });
+            }
+            
+            if (addAttributes)
+            	flowFile = session.putAllAttributes(flowFile, JmsFactory.createAttributeMap(message));
+            
+            session.getProvenanceReporter().receive(flowFile, context.getProperty(URL).getValue());
+            session.transfer(flowFile, REL_SUCCESS);
+            logger.info("Created {} from {} messages received from JMS Server and transferred to 'success'", new Object[]{flowFile, msgsThisFlowFile.get()});
+
+            return new JmsProcessingSummary(flowFile.getSize(), message, flowFile);
+            
+        } catch (Exception e) {
+        	session.remove(flowFile);
+            throw e;
+        }
+    }
+
+    public static Map<String, String> createMapMessageValues(final MapMessage mapMessage) throws JMSException {
+        final Map<String, String> valueMap = new HashMap<>();
+
+        final Enumeration<?> enumeration = mapMessage.getMapNames();
+        while (enumeration.hasMoreElements()) {
+            final String name = (String) enumeration.nextElement();
+
+            final Object value = mapMessage.getObject(name);
+            if (value==null)
+            	valueMap.put(MAP_MESSAGE_PREFIX+name, "");
+            else
+            	valueMap.put(MAP_MESSAGE_PREFIX+name, value.toString());        
+        }
+
+        return valueMap;
+    }    
+    
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/602fa7a8/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProcessingSummary.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProcessingSummary.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProcessingSummary.java
new file mode 100644
index 0000000..02a4096
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProcessingSummary.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.util;
+
+import javax.jms.Message;
+
+import org.apache.nifi.flowfile.FlowFile;
+
+
+/**
+ * 	Data structure which allows to collect processing summary data. 
+ * 
+ */
+public class JmsProcessingSummary {
+	
+	private int			messagesReceived;
+	private long		bytesReceived;
+	private Message 	lastMessageReceived;
+	private int	 		flowFilesCreated;
+	private FlowFile	lastFlowFile;			// helps testing
+
+	public JmsProcessingSummary() {
+		super();
+		this.messagesReceived 	= 0;
+		this.bytesReceived 		= 0;
+		this.lastMessageReceived = null;
+		this.flowFilesCreated 	= 0;
+		this.lastFlowFile 		= null;
+	}
+	
+	public JmsProcessingSummary(long bytesReceived, Message lastMessageReceived, FlowFile lastFlowFile) {
+		super();
+		this.messagesReceived 	= 1;
+		this.bytesReceived 		= bytesReceived;
+		this.lastMessageReceived = lastMessageReceived;
+		this.flowFilesCreated 	= 1;
+		this.lastFlowFile 		= lastFlowFile;
+	}
+	
+	public void add(JmsProcessingSummary jmsProcessingSummary) {
+		this.messagesReceived 	+= jmsProcessingSummary.messagesReceived;
+		this.bytesReceived 		+= jmsProcessingSummary.bytesReceived;
+		this.lastMessageReceived = jmsProcessingSummary.lastMessageReceived;
+		this.flowFilesCreated 	+= jmsProcessingSummary.flowFilesCreated;
+		this.lastFlowFile 		=  jmsProcessingSummary.lastFlowFile;
+	}
+
+	public int getMessagesReceived() {
+		return messagesReceived;
+	}
+
+	public long getBytesReceived() {
+		return bytesReceived;
+	}
+
+	public Message getLastMessageReceived() {
+		return lastMessageReceived;
+	}
+
+	public int getFlowFilesCreated() {
+		return flowFilesCreated;
+	}
+
+	public FlowFile getLastFlowFile() {
+		return lastFlowFile;
+	}
+	
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/602fa7a8/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJmsConsumer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJmsConsumer.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJmsConsumer.java
new file mode 100644
index 0000000..1777a89
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJmsConsumer.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processors.standard.util.JmsProcessingSummary;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.MockProcessContext;
+import org.apache.nifi.util.MockProcessorInitializationContext;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+/**
+ * 
+ */
+public class TestJmsConsumer {
+
+	static protected MapMessage createMapMessage() throws JMSException {
+		MapMessage mapMessage = new ActiveMQMapMessage();
+		mapMessage.setString("name", 	"Arnold");
+		mapMessage.setInt	("age", 	97);
+		mapMessage.setDouble("xyz",		89686.564);
+		mapMessage.setBoolean("good", 	true);
+		return mapMessage;
+	}
+	
+	/**
+	 * Test method for {@link org.apache.nifi.processors.standard.JmsConsumer#createMapMessageAttrs(javax.jms.MapMessage)}.
+	 * @throws JMSException 
+	 */
+	@Test
+	public void testCreateMapMessageValues() throws JMSException {
+		
+		MapMessage mapMessage = createMapMessage();
+
+		Map<String, String> mapMessageValues = JmsConsumer.createMapMessageValues(mapMessage);
+		assertEquals("", 4, mapMessageValues.size());
+		assertEquals("", "Arnold", 		mapMessageValues.get(JmsConsumer.MAP_MESSAGE_PREFIX+"name"));
+		assertEquals("", "97", 			mapMessageValues.get(JmsConsumer.MAP_MESSAGE_PREFIX+"age"));
+		assertEquals("", "89686.564", 	mapMessageValues.get(JmsConsumer.MAP_MESSAGE_PREFIX+"xyz"));
+		assertEquals("", "true", 		mapMessageValues.get(JmsConsumer.MAP_MESSAGE_PREFIX+"good"));	
+	}
+
+	/**
+	 * Test MapMessage to FlowFile conversion
+	 */
+	@Test
+	public void testMap2FlowFileMapMessage() throws Exception {
+
+		TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class);
+		MapMessage mapMessage = createMapMessage();
+
+		ProcessContext context = runner.getProcessContext();
+		ProcessSession session = runner.getProcessSessionFactory().createSession();
+        ProcessorInitializationContext pic = new MockProcessorInitializationContext(runner.getProcessor(),
+                (MockProcessContext) runner.getProcessContext());
+		
+        JmsProcessingSummary summary = JmsConsumer.map2FlowFile(context, session, mapMessage, true, pic.getLogger());
+		
+		assertEquals("MapMessage should not create FlowFile content", 0, summary.getBytesReceived());
+
+        Map<String, String> attributes = summary.getLastFlowFile().getAttributes();
+		assertEquals("", "Arnold", 		attributes.get(JmsConsumer.MAP_MESSAGE_PREFIX+"name"));
+		assertEquals("", "97", 			attributes.get(JmsConsumer.MAP_MESSAGE_PREFIX+"age"));
+		assertEquals("", "89686.564", 	attributes.get(JmsConsumer.MAP_MESSAGE_PREFIX+"xyz"));
+		assertEquals("", "true", 		attributes.get(JmsConsumer.MAP_MESSAGE_PREFIX+"good"));
+	}
+
+	/**
+	 * Test TextMessage to FlowFile conversion
+	 */
+	@Test
+	public void testMap2FlowFileTextMessage() throws Exception {
+
+		TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class);
+		TextMessage textMessage = new ActiveMQTextMessage();
+		
+		String payload = "Hello world!";
+		textMessage.setText(payload);
+
+		ProcessContext context = runner.getProcessContext();
+		ProcessSession session = runner.getProcessSessionFactory().createSession();
+        ProcessorInitializationContext pic = new MockProcessorInitializationContext(runner.getProcessor(),
+                (MockProcessContext) runner.getProcessContext());
+		
+        JmsProcessingSummary summary = JmsConsumer.map2FlowFile(context, session, textMessage, true, pic.getLogger());
+		
+		assertEquals("TextMessage content length should equal to FlowFile content size", payload.length(), summary.getLastFlowFile().getSize());
+
+        final byte[] buffer = new byte[payload.length()];
+        runner.clearTransferState();
+        
+        session.read(summary.getLastFlowFile(), new InputStreamCallback() {
+            @Override
+            public void process(InputStream in) throws IOException {
+                StreamUtils.fillBuffer(in, buffer, false);
+            }
+        });
+
+        String contentString = new String(buffer,"UTF-8");
+        assertEquals("", payload, contentString);
+	}
+
+	/**
+	 * Test BytesMessage to FlowFile conversion
+	 */
+	@Test
+	public void testMap2FlowFileBytesMessage() throws Exception {
+
+		TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class);
+		BytesMessage bytesMessage = new ActiveMQBytesMessage();
+		
+		String sourceString = "Apache NiFi is an easy to use, powerful, and reliable system to process and distribute data.!";
+		byte[] payload = sourceString.getBytes("UTF-8");
+		bytesMessage.writeBytes(payload);
+		bytesMessage.reset();
+
+		ProcessContext context = runner.getProcessContext();
+		ProcessSession session = runner.getProcessSessionFactory().createSession();
+        ProcessorInitializationContext pic = new MockProcessorInitializationContext(runner.getProcessor(),
+                (MockProcessContext) runner.getProcessContext());
+		
+        JmsProcessingSummary summary = JmsConsumer.map2FlowFile(context, session, bytesMessage, true, pic.getLogger());
+
+		assertEquals("BytesMessage content length should equal to FlowFile content size", payload.length, summary.getLastFlowFile().getSize());
+
+        final byte[] buffer = new byte[payload.length];
+        runner.clearTransferState();
+        
+        session.read(summary.getLastFlowFile(), new InputStreamCallback() {
+            @Override
+            public void process(InputStream in) throws IOException {
+                StreamUtils.fillBuffer(in, buffer, false);
+            }
+        });
+
+        String contentString = new String(buffer,"UTF-8");
+        assertEquals("", sourceString, contentString);
+	}
+
+}


[2/2] incubator-nifi git commit: NIFI-84: Allow PutJMS to create MapMessage's so that we can test GetJMS* Processors

Posted by ma...@apache.org.
NIFI-84: Allow PutJMS to create MapMessage's so that we can test GetJMS* Processors


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/eabf2d52
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/eabf2d52
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/eabf2d52

Branch: refs/heads/develop
Commit: eabf2d52fc48731cd5a78b602d401d441657b3a6
Parents: 602fa7a
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Feb 10 10:27:17 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Tue Feb 10 10:27:17 2015 -0500

----------------------------------------------------------------------
 .../org/apache/nifi/processors/standard/JmsConsumer.java |  5 +----
 .../java/org/apache/nifi/processors/standard/PutJMS.java | 11 ++++++++---
 .../nifi/processors/standard/util/JmsProperties.java     |  3 ++-
 3 files changed, 11 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/eabf2d52/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
index c48d520..e4bbaec 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
@@ -48,7 +48,6 @@ import javax.jms.MessageConsumer;
 
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.stream.io.BufferedOutputStream;
 import org.apache.nifi.logging.ProcessorLog;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
@@ -59,10 +58,8 @@ import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processors.standard.util.JmsFactory;
 import org.apache.nifi.processors.standard.util.JmsProcessingSummary;
 import org.apache.nifi.processors.standard.util.WrappedMessageConsumer;
-import org.apache.nifi.util.BooleanHolder;
+import org.apache.nifi.stream.io.BufferedOutputStream;
 import org.apache.nifi.util.IntegerHolder;
-import org.apache.nifi.util.LongHolder;
-import org.apache.nifi.util.ObjectHolder;
 import org.apache.nifi.util.StopWatch;
 
 public abstract class JmsConsumer extends AbstractProcessor {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/eabf2d52/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java
index ce5bea5..99c7bb7 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java
@@ -40,6 +40,7 @@ import static org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_BY
 import static org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_EMPTY;
 import static org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_STREAM;
 import static org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_TEXT;
+import static org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_MAP;
 import static org.apache.nifi.processors.standard.util.JmsProperties.PASSWORD;
 import static org.apache.nifi.processors.standard.util.JmsProperties.REPLY_TO_QUEUE;
 import static org.apache.nifi.processors.standard.util.JmsProperties.TIMEOUT;
@@ -257,18 +258,22 @@ public class PutJMS extends AbstractProcessor {
         switch (context.getProperty(MESSAGE_TYPE).getValue()) {
             case MSG_TYPE_EMPTY: {
                 message = jmsSession.createTextMessage("");
+                break;
             }
-            break;
             case MSG_TYPE_STREAM: {
                 final StreamMessage streamMessage = jmsSession.createStreamMessage();
                 streamMessage.writeBytes(messageContent);
                 message = streamMessage;
+                break;
             }
-            break;
             case MSG_TYPE_TEXT: {
                 message = jmsSession.createTextMessage(new String(messageContent, UTF8));
+                break;
+            }
+            case MSG_TYPE_MAP: {
+                message = jmsSession.createMapMessage();
+                break;
             }
-            break;
             case MSG_TYPE_BYTE:
             default: {
                 final BytesMessage bytesMessage = jmsSession.createBytesMessage();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/eabf2d52/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java
index 67d0bbf..3a5695e 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java
@@ -33,6 +33,7 @@ public class JmsProperties {
     public static final String MSG_TYPE_BYTE = "byte";
     public static final String MSG_TYPE_TEXT = "text";
     public static final String MSG_TYPE_STREAM = "stream";
+    public static final String MSG_TYPE_MAP = "map";
     public static final String MSG_TYPE_EMPTY = "empty";
 
     // Standard JMS Properties
@@ -142,7 +143,7 @@ public class JmsProperties {
             .name("Message Type")
             .description("The Type of JMS Message to Construct")
             .required(true)
-            .allowableValues(MSG_TYPE_BYTE, MSG_TYPE_STREAM, MSG_TYPE_TEXT, MSG_TYPE_EMPTY)
+            .allowableValues(MSG_TYPE_BYTE, MSG_TYPE_STREAM, MSG_TYPE_TEXT, MSG_TYPE_MAP, MSG_TYPE_EMPTY)
             .defaultValue(MSG_TYPE_BYTE)
             .build();
     public static final PropertyDescriptor MESSAGE_PRIORITY = new PropertyDescriptor.Builder()