You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/02/11 14:54:34 UTC

[1/7] incubator-nifi git commit: NIFI-332 Proxy support in GetFTP Processor

Repository: incubator-nifi
Updated Branches:
  refs/heads/NIFI-250 4bc5ed13e -> 0d2041b05


NIFI-332 Proxy support in GetFTP Processor

Change-Id: I72fee6c5f2ef576a6c7d736199aab510bee744a7


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/8a1a4b80
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/8a1a4b80
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/8a1a4b80

Branch: refs/heads/NIFI-250
Commit: 8a1a4b807e682fc6e69bcc4656682b187f9847fa
Parents: b64fe47
Author: Adam Sotona <ad...@merck.com>
Authored: Tue Feb 10 10:07:03 2015 +0100
Committer: Adam Sotona <ad...@merck.com>
Committed: Tue Feb 10 10:07:03 2015 +0100

----------------------------------------------------------------------
 .../apache/nifi/processors/standard/GetFTP.java |  5 ++
 .../processors/standard/util/FTPTransfer.java   | 48 +++++++++++++-
 .../standard/util/SocksProxySocketFactory.java  | 69 ++++++++++++++++++++
 3 files changed, 121 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8a1a4b80/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFTP.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFTP.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFTP.java
index 2dabbc6..18bdc93 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFTP.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFTP.java
@@ -57,6 +57,11 @@ public class GetFTP extends GetFileTransfer {
         properties.add(FTPTransfer.MAX_SELECTS);
         properties.add(FTPTransfer.REMOTE_POLL_BATCH_SIZE);
         properties.add(FTPTransfer.USE_NATURAL_ORDERING);
+        properties.add(FTPTransfer.PROXY_TYPE);
+        properties.add(FTPTransfer.PROXY_HOST);
+        properties.add(FTPTransfer.PROXY_PORT);
+        properties.add(FTPTransfer.HTTP_PROXY_USERNAME);
+        properties.add(FTPTransfer.HTTP_PROXY_PASSWORD);
         this.properties = Collections.unmodifiableList(properties);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8a1a4b80/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
index 04a9a0f..c3d7bbf 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
@@ -20,6 +20,8 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Proxy;
 import java.net.UnknownHostException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -40,6 +42,7 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.commons.net.ftp.FTPClient;
 import org.apache.commons.net.ftp.FTPFile;
+import org.apache.commons.net.ftp.FTPHTTPClient;
 import org.apache.commons.net.ftp.FTPReply;
 
 public class FTPTransfer implements FileTransfer {
@@ -49,6 +52,9 @@ public class FTPTransfer implements FileTransfer {
     public static final String TRANSFER_MODE_ASCII = "ASCII";
     public static final String TRANSFER_MODE_BINARY = "Binary";
     public static final String FTP_TIMEVAL_FORMAT = "yyyyMMddHHmmss";
+    public static final String PROXY_TYPE_DIRECT = Proxy.Type.DIRECT.name();
+    public static final String PROXY_TYPE_HTTP = Proxy.Type.HTTP.name();
+    public static final String PROXY_TYPE_SOCKS = Proxy.Type.SOCKS.name();
 
     public static final PropertyDescriptor CONNECTION_MODE = new PropertyDescriptor.Builder()
             .name("Connection Mode")
@@ -69,6 +75,35 @@ public class FTPTransfer implements FileTransfer {
             .required(true)
             .defaultValue("21")
             .build();
+    public static final PropertyDescriptor PROXY_TYPE = new PropertyDescriptor.Builder()
+            .name("Proxy Type")
+            .description("Proxy type used for file transfers")
+            .allowableValues(PROXY_TYPE_DIRECT, PROXY_TYPE_HTTP, PROXY_TYPE_SOCKS)
+            .defaultValue(PROXY_TYPE_DIRECT)
+            .build();
+    public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor.Builder()
+            .name("Proxy Host")
+            .description("The fully qualified hostname or IP address of the proxy server")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor PROXY_PORT = new PropertyDescriptor.Builder()
+            .name("Proxy Port")
+            .description("The port of the proxy server")
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor HTTP_PROXY_USERNAME = new PropertyDescriptor.Builder()
+            .name("Http Proxy Username")
+            .description("Http Proxy Username")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .build();
+    public static final PropertyDescriptor HTTP_PROXY_PASSWORD = new PropertyDescriptor.Builder()
+            .name("Http Proxy Password")
+            .description("Http Proxy Password")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .sensitive(true)
+            .build();
 
     private final ProcessorLog logger;
 
@@ -450,7 +485,18 @@ public class FTPTransfer implements FileTransfer {
             }
         }
 
-        final FTPClient client = new FTPClient();
+        final Proxy.Type proxyType = Proxy.Type.valueOf(ctx.getProperty(PROXY_TYPE).getValue());
+        final String proxyHost = ctx.getProperty(PROXY_HOST).getValue();
+        final Integer proxyPort = ctx.getProperty(PROXY_PORT).asInteger();
+        FTPClient client;
+        if (proxyType == Proxy.Type.HTTP) {
+            client = new FTPHTTPClient(proxyHost, proxyPort, ctx.getProperty(HTTP_PROXY_USERNAME).getValue(), ctx.getProperty(HTTP_PROXY_PASSWORD).getValue());
+        } else {
+            client = new FTPClient();
+            if (proxyType == Proxy.Type.SOCKS) {
+                client.setSocketFactory(new SocksProxySocketFactory(new Proxy(proxyType, new InetSocketAddress(proxyHost, proxyPort))));
+            }
+        }
         this.client = client;
         client.setDataTimeout(ctx.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
         client.setDefaultTimeout(ctx.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8a1a4b80/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SocksProxySocketFactory.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SocksProxySocketFactory.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SocksProxySocketFactory.java
new file mode 100644
index 0000000..55ae468
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SocksProxySocketFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.util;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Proxy;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import javax.net.SocketFactory;
+
+public final class SocksProxySocketFactory extends SocketFactory {
+
+    private final Proxy proxy;
+
+    public SocksProxySocketFactory(Proxy proxy) {
+        this.proxy = proxy;
+    }
+
+    @Override
+    public Socket createSocket() throws IOException {
+        return new Socket(proxy);
+    }
+
+    @Override
+    public Socket createSocket(InetAddress addr, int port) throws IOException {
+        Socket socket = createSocket();
+        socket.connect(new InetSocketAddress(addr, port));
+        return socket;
+    }
+
+    @Override
+    public Socket createSocket(InetAddress addr, int port, InetAddress localHostAddr, int localPort) throws IOException {
+        Socket socket = createSocket();
+        socket.bind(new InetSocketAddress(localHostAddr, localPort));
+        socket.connect(new InetSocketAddress(addr, port));
+        return socket;
+    }
+
+    @Override
+    public Socket createSocket(String host, int port) throws IOException, UnknownHostException {
+        Socket socket = createSocket();
+        socket.connect(new InetSocketAddress(host, port));
+        return socket;
+    }
+
+    @Override
+    public Socket createSocket(String host, int port, InetAddress localHostAddr, int localPort) throws IOException, UnknownHostException {
+        Socket socket = createSocket();
+        socket.bind(new InetSocketAddress(localHostAddr, localPort));
+        socket.connect(new InetSocketAddress(host, port));
+        return socket;
+    }
+}


[2/7] incubator-nifi git commit: NIFI-332: Exposed proxy properties on PutFTP as well

Posted by mc...@apache.org.
NIFI-332: Exposed proxy properties on PutFTP as well


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/afe44677
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/afe44677
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/afe44677

Branch: refs/heads/NIFI-250
Commit: afe446774d3152aa0d651b97f45c239a7d9e51a2
Parents: 8a1a4b8
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Feb 10 09:19:41 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Tue Feb 10 09:19:41 2015 -0500

----------------------------------------------------------------------
 .../main/java/org/apache/nifi/processors/standard/PutFTP.java | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/afe44677/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFTP.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFTP.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFTP.java
index dac367f..458fcbc 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFTP.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFTP.java
@@ -71,7 +71,12 @@ public class PutFTP extends PutFileTransfer<FTPTransfer> {
         properties.add(FTPTransfer.LAST_MODIFIED_TIME);
         properties.add(FTPTransfer.PERMISSIONS);
         properties.add(FTPTransfer.USE_COMPRESSION);
-
+        properties.add(FTPTransfer.PROXY_TYPE);
+        properties.add(FTPTransfer.PROXY_HOST);
+        properties.add(FTPTransfer.PROXY_PORT);
+        properties.add(FTPTransfer.HTTP_PROXY_USERNAME);
+        properties.add(FTPTransfer.HTTP_PROXY_PASSWORD);
+        
         this.properties = Collections.unmodifiableList(properties);
     }
 


[3/7] incubator-nifi git commit: NIFI-84 add MapMessage value pairs as FlowFile attributes

Posted by mc...@apache.org.
NIFI-84 add MapMessage value pairs as FlowFile attributes

Signed-off-by: Toivo Adams <to...@gmail.com>
Signed-off-by: Mark Payne <ma...@hotmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/602fa7a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/602fa7a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/602fa7a8

Branch: refs/heads/NIFI-250
Commit: 602fa7a8605851b565390fefc7d5d3ef3afeb3e2
Parents: afe4467
Author: Toivo Adams <to...@gmail.com>
Authored: Tue Feb 10 10:03:51 2015 +0200
Committer: Mark Payne <ma...@hotmail.com>
Committed: Tue Feb 10 09:49:43 2015 -0500

----------------------------------------------------------------------
 .../nifi/processors/standard/JmsConsumer.java   | 138 +++++++++------
 .../standard/util/JmsProcessingSummary.java     |  83 +++++++++
 .../processors/standard/TestJmsConsumer.java    | 173 +++++++++++++++++++
 3 files changed, 339 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/602fa7a8/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
index 2d262f3..c48d520 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
@@ -33,6 +33,8 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -40,6 +42,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import javax.jms.JMSException;
+import javax.jms.MapMessage;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 
@@ -54,6 +57,7 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processors.standard.util.JmsFactory;
+import org.apache.nifi.processors.standard.util.JmsProcessingSummary;
 import org.apache.nifi.processors.standard.util.WrappedMessageConsumer;
 import org.apache.nifi.util.BooleanHolder;
 import org.apache.nifi.util.IntegerHolder;
@@ -63,6 +67,8 @@ import org.apache.nifi.util.StopWatch;
 
 public abstract class JmsConsumer extends AbstractProcessor {
 
+    public static final String MAP_MESSAGE_PREFIX = "jms.mapmessage.";
+    
     public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
             .description("All FlowFiles are routed to success").build();
 
@@ -108,22 +114,17 @@ public abstract class JmsConsumer extends AbstractProcessor {
         final boolean addAttributes = context.getProperty(JMS_PROPS_TO_ATTRIBUTES).asBoolean();
         final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
 
-        final ObjectHolder<Message> lastMessageReceived = new ObjectHolder<>(null);
-        final ObjectHolder<Map<String, String>> attributesFromJmsProps = new ObjectHolder<>(null);
-        final Set<FlowFile> allFlowFilesCreated = new HashSet<>();
-        final IntegerHolder messagesReceived = new IntegerHolder(0);
-        final LongHolder bytesReceived = new LongHolder(0L);
-
+        final JmsProcessingSummary processingSummary = new JmsProcessingSummary();
+        
         final StopWatch stopWatch = new StopWatch(true);
         for (int i = 0; i < batchSize; i++) {
-            final BooleanHolder failure = new BooleanHolder(false);
 
             final Message message;
             try {
                 // If we haven't received a message, wait until one is available. If we have already received at least one
                 // message, then we are not willing to wait for more to become available, but we are willing to keep receiving
                 // all messages that are immediately available.
-                if (messagesReceived.get() == 0) {
+                if (processingSummary.getMessagesReceived() == 0) {
                     message = consumer.receive(timeout);
                 } else {
                     message = consumer.receiveNoWait();
@@ -131,7 +132,6 @@ public abstract class JmsConsumer extends AbstractProcessor {
             } catch (final JMSException e) {
                 logger.error("Failed to receive JMS Message due to {}", e);
                 wrappedConsumer.close(logger);
-                failure.set(true);
                 break;
             }
 
@@ -139,48 +139,16 @@ public abstract class JmsConsumer extends AbstractProcessor {
                 break;
             }
 
-            final IntegerHolder msgsThisFlowFile = new IntegerHolder(0);
-            FlowFile flowFile = session.create();
             try {
-                flowFile = session.write(flowFile, new OutputStreamCallback() {
-                    @Override
-                    public void process(final OutputStream rawOut) throws IOException {
-                        try (final OutputStream out = new BufferedOutputStream(rawOut, 65536)) {
-                            messagesReceived.getAndIncrement();
-                            final Map<String, String> attributes = (addAttributes ? JmsFactory.createAttributeMap(message) : null);
-                            attributesFromJmsProps.set(attributes);
-
-                            final byte[] messageBody = JmsFactory.createByteArray(message);
-                            out.write(messageBody);
-                            bytesReceived.addAndGet(messageBody.length);
-                            msgsThisFlowFile.incrementAndGet();
-                            lastMessageReceived.set(message);
-                        } catch (final JMSException e) {
-                            logger.error("Failed to receive JMS Message due to {}", e);
-                            failure.set(true);
-                        }
-                    }
-                });
-            } finally {
-                if (failure.get()) {    // no flowfile created
-                    session.remove(flowFile);
-                    wrappedConsumer.close(logger);
-                } else {
-                    allFlowFilesCreated.add(flowFile);
-
-                    final Map<String, String> attributes = attributesFromJmsProps.get();
-                    if (attributes != null) {
-                        flowFile = session.putAllAttributes(flowFile, attributes);
-                    }
-
-                    session.getProvenanceReporter().receive(flowFile, context.getProperty(URL).getValue());
-                    session.transfer(flowFile, REL_SUCCESS);
-                    logger.info("Created {} from {} messages received from JMS Server and transferred to 'success'", new Object[]{flowFile, msgsThisFlowFile.get()});
-                }
-            }
+				processingSummary.add( map2FlowFile(context, session, message, addAttributes, logger) );
+			} catch (Exception e) {
+                logger.error("Failed to receive JMS Message due to {}", e);
+                wrappedConsumer.close(logger);
+                break;				
+			}
         }
-
-        if (allFlowFilesCreated.isEmpty()) {
+        
+        if (processingSummary.getFlowFilesCreated()==0) {
             context.yield();
             return;
         }
@@ -188,21 +156,81 @@ public abstract class JmsConsumer extends AbstractProcessor {
         session.commit();
 
         stopWatch.stop();
-        if (!allFlowFilesCreated.isEmpty()) {
+        if (processingSummary.getFlowFilesCreated()>0) {
             final float secs = ((float) stopWatch.getDuration(TimeUnit.MILLISECONDS) / 1000F);
-            float messagesPerSec = ((float) messagesReceived.get()) / secs;
-            final String dataRate = stopWatch.calculateDataRate(bytesReceived.get());
-            logger.info("Received {} messages in {} milliseconds, at a rate of {} messages/sec or {}", new Object[]{messagesReceived.get(), stopWatch.getDuration(TimeUnit.MILLISECONDS), messagesPerSec, dataRate});
+            float messagesPerSec = ((float) processingSummary.getMessagesReceived()) / secs;
+            final String dataRate = stopWatch.calculateDataRate(processingSummary.getBytesReceived());
+            logger.info("Received {} messages in {} milliseconds, at a rate of {} messages/sec or {}", new Object[]{processingSummary.getMessagesReceived(), stopWatch.getDuration(TimeUnit.MILLISECONDS), messagesPerSec, dataRate});
         }
 
         // if we need to acknowledge the messages, do so now.
-        final Message lastMessage = lastMessageReceived.get();
+        final Message lastMessage = processingSummary.getLastMessageReceived();
         if (clientAcknowledge && lastMessage != null) {
             try {
                 lastMessage.acknowledge();  // acknowledge all received messages by acknowledging only the last.
             } catch (final JMSException e) {
-                logger.error("Failed to acknowledge {} JMS Message(s). This may result in duplicate messages. Reason for failure: {}", new Object[]{messagesReceived.get(), e});
+                logger.error("Failed to acknowledge {} JMS Message(s). This may result in duplicate messages. Reason for failure: {}", new Object[]{processingSummary.getMessagesReceived(), e});
             }
         }
     }
+    
+    public static JmsProcessingSummary map2FlowFile(final ProcessContext context, final ProcessSession session, final Message message, final boolean addAttributes, ProcessorLog logger) throws Exception {
+    	
+    	// Currently not very useful, because always one Message == one FlowFile
+        final IntegerHolder msgsThisFlowFile = new IntegerHolder(1);
+        
+        FlowFile flowFile = session.create();
+        try {
+        	// MapMessage is exception, add only name-value pairs to FlowFile attributes
+            if (message instanceof MapMessage) {
+				MapMessage mapMessage = (MapMessage) message;
+            	flowFile = session.putAllAttributes(flowFile, createMapMessageValues(mapMessage));				
+			}
+            // all other message types, write Message body to FlowFile content 
+            else {
+	            flowFile = session.write(flowFile, new OutputStreamCallback() {
+	                @Override
+	                public void process(final OutputStream rawOut) throws IOException {
+	                    try (final OutputStream out = new BufferedOutputStream(rawOut, 65536)) {
+	                        final byte[] messageBody = JmsFactory.createByteArray(message);
+	                        out.write(messageBody);
+	                    } catch (final JMSException e) {
+	                        throw new ProcessException("Failed to receive JMS Message due to {}", e);
+	                    }
+	                }
+	            });
+            }
+            
+            if (addAttributes)
+            	flowFile = session.putAllAttributes(flowFile, JmsFactory.createAttributeMap(message));
+            
+            session.getProvenanceReporter().receive(flowFile, context.getProperty(URL).getValue());
+            session.transfer(flowFile, REL_SUCCESS);
+            logger.info("Created {} from {} messages received from JMS Server and transferred to 'success'", new Object[]{flowFile, msgsThisFlowFile.get()});
+
+            return new JmsProcessingSummary(flowFile.getSize(), message, flowFile);
+            
+        } catch (Exception e) {
+        	session.remove(flowFile);
+            throw e;
+        }
+    }
+
+    public static Map<String, String> createMapMessageValues(final MapMessage mapMessage) throws JMSException {
+        final Map<String, String> valueMap = new HashMap<>();
+
+        final Enumeration<?> enumeration = mapMessage.getMapNames();
+        while (enumeration.hasMoreElements()) {
+            final String name = (String) enumeration.nextElement();
+
+            final Object value = mapMessage.getObject(name);
+            if (value==null)
+            	valueMap.put(MAP_MESSAGE_PREFIX+name, "");
+            else
+            	valueMap.put(MAP_MESSAGE_PREFIX+name, value.toString());        
+        }
+
+        return valueMap;
+    }    
+    
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/602fa7a8/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProcessingSummary.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProcessingSummary.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProcessingSummary.java
new file mode 100644
index 0000000..02a4096
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProcessingSummary.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.util;
+
+import javax.jms.Message;
+
+import org.apache.nifi.flowfile.FlowFile;
+
+
+/**
+ * 	Data structure which allows to collect processing summary data. 
+ * 
+ */
+public class JmsProcessingSummary {
+	
+	private int			messagesReceived;
+	private long		bytesReceived;
+	private Message 	lastMessageReceived;
+	private int	 		flowFilesCreated;
+	private FlowFile	lastFlowFile;			// helps testing
+
+	public JmsProcessingSummary() {
+		super();
+		this.messagesReceived 	= 0;
+		this.bytesReceived 		= 0;
+		this.lastMessageReceived = null;
+		this.flowFilesCreated 	= 0;
+		this.lastFlowFile 		= null;
+	}
+	
+	public JmsProcessingSummary(long bytesReceived, Message lastMessageReceived, FlowFile lastFlowFile) {
+		super();
+		this.messagesReceived 	= 1;
+		this.bytesReceived 		= bytesReceived;
+		this.lastMessageReceived = lastMessageReceived;
+		this.flowFilesCreated 	= 1;
+		this.lastFlowFile 		= lastFlowFile;
+	}
+	
+	public void add(JmsProcessingSummary jmsProcessingSummary) {
+		this.messagesReceived 	+= jmsProcessingSummary.messagesReceived;
+		this.bytesReceived 		+= jmsProcessingSummary.bytesReceived;
+		this.lastMessageReceived = jmsProcessingSummary.lastMessageReceived;
+		this.flowFilesCreated 	+= jmsProcessingSummary.flowFilesCreated;
+		this.lastFlowFile 		=  jmsProcessingSummary.lastFlowFile;
+	}
+
+	public int getMessagesReceived() {
+		return messagesReceived;
+	}
+
+	public long getBytesReceived() {
+		return bytesReceived;
+	}
+
+	public Message getLastMessageReceived() {
+		return lastMessageReceived;
+	}
+
+	public int getFlowFilesCreated() {
+		return flowFilesCreated;
+	}
+
+	public FlowFile getLastFlowFile() {
+		return lastFlowFile;
+	}
+	
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/602fa7a8/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJmsConsumer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJmsConsumer.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJmsConsumer.java
new file mode 100644
index 0000000..1777a89
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJmsConsumer.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processors.standard.util.JmsProcessingSummary;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.MockProcessContext;
+import org.apache.nifi.util.MockProcessorInitializationContext;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+/**
+ * 
+ */
+public class TestJmsConsumer {
+
+	static protected MapMessage createMapMessage() throws JMSException {
+		MapMessage mapMessage = new ActiveMQMapMessage();
+		mapMessage.setString("name", 	"Arnold");
+		mapMessage.setInt	("age", 	97);
+		mapMessage.setDouble("xyz",		89686.564);
+		mapMessage.setBoolean("good", 	true);
+		return mapMessage;
+	}
+	
+	/**
+	 * Test method for {@link org.apache.nifi.processors.standard.JmsConsumer#createMapMessageAttrs(javax.jms.MapMessage)}.
+	 * @throws JMSException 
+	 */
+	@Test
+	public void testCreateMapMessageValues() throws JMSException {
+		
+		MapMessage mapMessage = createMapMessage();
+
+		Map<String, String> mapMessageValues = JmsConsumer.createMapMessageValues(mapMessage);
+		assertEquals("", 4, mapMessageValues.size());
+		assertEquals("", "Arnold", 		mapMessageValues.get(JmsConsumer.MAP_MESSAGE_PREFIX+"name"));
+		assertEquals("", "97", 			mapMessageValues.get(JmsConsumer.MAP_MESSAGE_PREFIX+"age"));
+		assertEquals("", "89686.564", 	mapMessageValues.get(JmsConsumer.MAP_MESSAGE_PREFIX+"xyz"));
+		assertEquals("", "true", 		mapMessageValues.get(JmsConsumer.MAP_MESSAGE_PREFIX+"good"));	
+	}
+
+	/**
+	 * Test MapMessage to FlowFile conversion
+	 */
+	@Test
+	public void testMap2FlowFileMapMessage() throws Exception {
+
+		TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class);
+		MapMessage mapMessage = createMapMessage();
+
+		ProcessContext context = runner.getProcessContext();
+		ProcessSession session = runner.getProcessSessionFactory().createSession();
+        ProcessorInitializationContext pic = new MockProcessorInitializationContext(runner.getProcessor(),
+                (MockProcessContext) runner.getProcessContext());
+		
+        JmsProcessingSummary summary = JmsConsumer.map2FlowFile(context, session, mapMessage, true, pic.getLogger());
+		
+		assertEquals("MapMessage should not create FlowFile content", 0, summary.getBytesReceived());
+
+        Map<String, String> attributes = summary.getLastFlowFile().getAttributes();
+		assertEquals("", "Arnold", 		attributes.get(JmsConsumer.MAP_MESSAGE_PREFIX+"name"));
+		assertEquals("", "97", 			attributes.get(JmsConsumer.MAP_MESSAGE_PREFIX+"age"));
+		assertEquals("", "89686.564", 	attributes.get(JmsConsumer.MAP_MESSAGE_PREFIX+"xyz"));
+		assertEquals("", "true", 		attributes.get(JmsConsumer.MAP_MESSAGE_PREFIX+"good"));
+	}
+
+	/**
+	 * Test TextMessage to FlowFile conversion
+	 */
+	@Test
+	public void testMap2FlowFileTextMessage() throws Exception {
+
+		TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class);
+		TextMessage textMessage = new ActiveMQTextMessage();
+		
+		String payload = "Hello world!";
+		textMessage.setText(payload);
+
+		ProcessContext context = runner.getProcessContext();
+		ProcessSession session = runner.getProcessSessionFactory().createSession();
+        ProcessorInitializationContext pic = new MockProcessorInitializationContext(runner.getProcessor(),
+                (MockProcessContext) runner.getProcessContext());
+		
+        JmsProcessingSummary summary = JmsConsumer.map2FlowFile(context, session, textMessage, true, pic.getLogger());
+		
+		assertEquals("TextMessage content length should equal to FlowFile content size", payload.length(), summary.getLastFlowFile().getSize());
+
+        final byte[] buffer = new byte[payload.length()];
+        runner.clearTransferState();
+        
+        session.read(summary.getLastFlowFile(), new InputStreamCallback() {
+            @Override
+            public void process(InputStream in) throws IOException {
+                StreamUtils.fillBuffer(in, buffer, false);
+            }
+        });
+
+        String contentString = new String(buffer,"UTF-8");
+        assertEquals("", payload, contentString);
+	}
+
+	/**
+	 * Test BytesMessage to FlowFile conversion
+	 */
+	@Test
+	public void testMap2FlowFileBytesMessage() throws Exception {
+
+		TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class);
+		BytesMessage bytesMessage = new ActiveMQBytesMessage();
+		
+		String sourceString = "Apache NiFi is an easy to use, powerful, and reliable system to process and distribute data.!";
+		byte[] payload = sourceString.getBytes("UTF-8");
+		bytesMessage.writeBytes(payload);
+		bytesMessage.reset();
+
+		ProcessContext context = runner.getProcessContext();
+		ProcessSession session = runner.getProcessSessionFactory().createSession();
+        ProcessorInitializationContext pic = new MockProcessorInitializationContext(runner.getProcessor(),
+                (MockProcessContext) runner.getProcessContext());
+		
+        JmsProcessingSummary summary = JmsConsumer.map2FlowFile(context, session, bytesMessage, true, pic.getLogger());
+
+		assertEquals("BytesMessage content length should equal to FlowFile content size", payload.length, summary.getLastFlowFile().getSize());
+
+        final byte[] buffer = new byte[payload.length];
+        runner.clearTransferState();
+        
+        session.read(summary.getLastFlowFile(), new InputStreamCallback() {
+            @Override
+            public void process(InputStream in) throws IOException {
+                StreamUtils.fillBuffer(in, buffer, false);
+            }
+        });
+
+        String contentString = new String(buffer,"UTF-8");
+        assertEquals("", sourceString, contentString);
+	}
+
+}


[5/7] incubator-nifi git commit: NIFI-340: - Adding screencasts.

Posted by mc...@apache.org.
NIFI-340:
- Adding screencasts.

Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/56a6bc4f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/56a6bc4f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/56a6bc4f

Branch: refs/heads/NIFI-250
Commit: 56a6bc4fe494828fbfe66b39a16d38cf4133c043
Parents: eabf2d5
Author: Matt Gilman <ma...@gmail.com>
Authored: Tue Feb 10 20:43:39 2015 -0500
Committer: Matt Gilman <ma...@gmail.com>
Committed: Tue Feb 10 20:43:39 2015 -0500

----------------------------------------------------------------------
 nifi-site/src/includes/topbar.hbs        |  1 +
 nifi-site/src/pages/html/screencasts.hbs | 35 +++++++++++++++++++++++++++
 2 files changed, 36 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/56a6bc4f/nifi-site/src/includes/topbar.hbs
----------------------------------------------------------------------
diff --git a/nifi-site/src/includes/topbar.hbs b/nifi-site/src/includes/topbar.hbs
index dd8e805..6069a34 100644
--- a/nifi-site/src/includes/topbar.hbs
+++ b/nifi-site/src/includes/topbar.hbs
@@ -28,6 +28,7 @@
                     <a href="#">Documentation</a>
                     <ul class="dropdown">
                         <li><a href="faq.html">FAQ</a></li>
+                        <li><a href="screencasts.html">Screencasts</a></li>
                         <li><a href="overview.html">NiFi Overview</a></li>
                         <li><a href="user-guide.html">User Guide</a></li>
                         <li><a href="developer-guide.html">Developer Guide</a></li>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/56a6bc4f/nifi-site/src/pages/html/screencasts.hbs
----------------------------------------------------------------------
diff --git a/nifi-site/src/pages/html/screencasts.hbs b/nifi-site/src/pages/html/screencasts.hbs
index 497cf25..78c0466 100644
--- a/nifi-site/src/pages/html/screencasts.hbs
+++ b/nifi-site/src/pages/html/screencasts.hbs
@@ -11,5 +11,40 @@ title: Apache NiFi Screencasts
 <div class="medium-space"></div>
 <div class="row">
     <div class="large-12 columns">
+        <a href="#" data-reveal-id="toolbar-overview">NiFi Toolbar Overview</a>
+        <div id="toolbar-overview" class="reveal-modal medium" data-reveal>
+            <h2>NiFi Toolbar Overview</h2>
+            <div class="flex-video widescreen" style="display: block;">
+                <iframe width="560" height="315" src="https://www.youtube.com/embed/LGXRAVUzL4U" frameborder="0" allowfullscreen></iframe>
+            </div>
+            <a class="close-reveal-modal">&#215;</a>
+        </div>
+        <br/>
+        <a href="#" data-reveal-id="creating-process-groups">Creating Process Groups</a>
+        <div id="creating-process-groups" class="reveal-modal medium" data-reveal>
+            <h2>Creating Process Groups</h2>
+            <div class="flex-video widescreen" style="display: block;">
+                <iframe width="560" height="315" src="https://www.youtube.com/embed/hAveiDgDj-8" frameborder="0" allowfullscreen></iframe>
+            </div>
+            <a class="close-reveal-modal">&#215;</a>
+        </div>
+        <br/>
+        <a href="#" data-reveal-id="creating-templates">Creating Templates</a>
+        <div id="creating-templates" class="reveal-modal medium" data-reveal>
+            <h2>Creating Templates</h2>
+            <div class="flex-video widescreen" style="display: block;">
+                <iframe width="560" height="315" src="https://www.youtube.com/embed/PpmL-IMoCnU" frameborder="0" allowfullscreen></iframe>
+            </div>
+            <a class="close-reveal-modal">&#215;</a>
+        </div>
+        <br/>
+        <a href="#" data-reveal-id="managing-templates">Managing Templates</a>
+        <div id="managing-templates" class="reveal-modal medium" data-reveal>
+            <h2>Managing Templates</h2>
+            <div class="flex-video widescreen" style="display: block;">
+                <iframe width="560" height="315" src="https://www.youtube.com/embed/HU5_3PlNmtQ" frameborder="0" allowfullscreen></iframe>
+            </div>
+            <a class="close-reveal-modal">&#215;</a>
+        </div>
     </div>
 </div>
\ No newline at end of file


[7/7] incubator-nifi git commit: NIFI-250: - Replacing usage of ControllerServiceProvider with ControllerServiceLookup.

Posted by mc...@apache.org.
NIFI-250:
- Replacing usage of ControllerServiceProvider with ControllerServiceLookup.

Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/0d2041b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/0d2041b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/0d2041b0

Branch: refs/heads/NIFI-250
Commit: 0d2041b05ffc618c2393713e96c44cf8384aa6e3
Parents: 3d38d8c
Author: Matt Gilman <ma...@gmail.com>
Authored: Wed Feb 11 08:54:19 2015 -0500
Committer: Matt Gilman <ma...@gmail.com>
Committed: Wed Feb 11 08:54:19 2015 -0500

----------------------------------------------------------------------
 .../apache/nifi/web/StandardNiFiWebContext.java | 18 ++--
 .../nifi/web/controller/ControllerFacade.java   | 67 +-------------
 .../spring/ControllerServiceDAOFactoryBean.java |  1 -
 .../web/spring/NiFiWebContextFactoryBean.java   | 93 ++++++++++++++++++++
 .../src/main/resources/nifi-web-api-context.xml |  3 +-
 5 files changed, 102 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/0d2041b0/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebContext.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebContext.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebContext.java
index 4741192..b4f49ee 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebContext.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebContext.java
@@ -49,7 +49,6 @@ import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
 import org.apache.nifi.web.api.dto.ProcessorDTO;
 import org.apache.nifi.web.api.dto.RevisionDTO;
 import org.apache.nifi.web.api.entity.ProcessorEntity;
-import org.apache.nifi.web.controller.ControllerFacade;
 import org.apache.nifi.web.util.WebUtils;
 
 import org.apache.commons.lang3.StringUtils;
@@ -60,6 +59,7 @@ import org.springframework.security.core.Authentication;
 import org.springframework.security.core.context.SecurityContextHolder;
 
 import com.sun.jersey.core.util.MultivaluedMapImpl;
+import org.apache.nifi.controller.ControllerServiceLookup;
 import org.apache.nifi.web.util.ClientResponseUtils;
 
 /**
@@ -76,16 +76,12 @@ public class StandardNiFiWebContext implements NiFiWebContext {
     private NiFiProperties properties;
     private NiFiServiceFacade serviceFacade;
     private WebClusterManager clusterManager;
-    private ControllerFacade controllerFacade;
+    private ControllerServiceLookup controllerServiceLookup;
     private AuditService auditService;
 
     @Override
     public ControllerService getControllerService(String serviceIdentifier) {
-        if (properties.isClusterManager()) {
-            return clusterManager.getControllerService(serviceIdentifier);
-        } else {
-            return controllerFacade.getControllerService(serviceIdentifier);
-        }
+        return controllerServiceLookup.getControllerService(serviceIdentifier);
     }
 
     @Override
@@ -325,12 +321,12 @@ public class StandardNiFiWebContext implements NiFiWebContext {
         this.serviceFacade = serviceFacade;
     }
 
-    public void setControllerFacade(ControllerFacade controllerFacade) {
-        this.controllerFacade = controllerFacade;
-    }
-
     public void setAuditService(AuditService auditService) {
         this.auditService = auditService;
     }
 
+    public void setControllerServiceLookup(ControllerServiceLookup controllerServiceLookup) {
+        this.controllerServiceLookup = controllerServiceLookup;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/0d2041b0/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
index 2f6ce2f..7359c5a 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
@@ -56,8 +56,6 @@ import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.repository.ContentNotFoundException;
 import org.apache.nifi.controller.repository.claim.ContentDirection;
-import org.apache.nifi.controller.service.ControllerServiceNode;
-import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
 import org.apache.nifi.diagnostics.SystemDiagnostics;
 import org.apache.nifi.flowfile.FlowFilePrioritizer;
@@ -122,7 +120,7 @@ import org.springframework.security.access.AccessDeniedException;
 /**
  *
  */
-public class ControllerFacade implements ControllerServiceProvider {
+public class ControllerFacade {
 
     private static final Logger logger = LoggerFactory.getLogger(ControllerFacade.class);
 
@@ -391,69 +389,6 @@ public class ControllerFacade implements ControllerServiceProvider {
         return counter;
     }
     
-    
-
-    /**
-     * Return the controller service for the specified identifier.
-     *
-     * @param serviceIdentifier
-     * @return
-     */
-    @Override
-    public ControllerService getControllerService(String serviceIdentifier) {
-        return flowController.getControllerService(serviceIdentifier);
-    }
-
-    @Override
-    public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
-        return flowController.createControllerService(type, id, firstTimeAdded);
-    }
-    
-    @Override
-    public void removeControllerService(ControllerServiceNode serviceNode) {
-        flowController.removeControllerService(serviceNode);
-    }
-
-    @Override
-    public Set<String> getControllerServiceIdentifiers(Class<? extends ControllerService> serviceType) {
-        return flowController.getControllerServiceIdentifiers(serviceType);
-    }
-
-    @Override
-    public ControllerServiceNode getControllerServiceNode(final String id) {
-        return flowController.getControllerServiceNode(id);
-    }
-
-    @Override
-    public boolean isControllerServiceEnabled(final ControllerService service) {
-        return flowController.isControllerServiceEnabled(service);
-    }
-
-    @Override
-    public boolean isControllerServiceEnabled(final String serviceIdentifier) {
-        return flowController.isControllerServiceEnabled(serviceIdentifier);
-    }
-    
-    @Override
-    public String getControllerServiceName(final String serviceIdentifier) {
-    	return flowController.getControllerServiceName(serviceIdentifier);
-    }
-
-    @Override
-    public Set<ControllerServiceNode> getAllControllerServices() {
-    	return flowController.getAllControllerServices();
-    }
-
-    @Override
-    public void enableControllerService(final ControllerServiceNode serviceNode) {
-        flowController.enableControllerService(serviceNode);
-    }
-    
-    @Override
-    public void disableControllerService(ControllerServiceNode serviceNode) {
-        flowController.disableControllerService(serviceNode);
-    }
-    
     /**
      * Gets the status of this controller.
      *

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/0d2041b0/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/spring/ControllerServiceDAOFactoryBean.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/spring/ControllerServiceDAOFactoryBean.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/spring/ControllerServiceDAOFactoryBean.java
index af41a9f..040de0d 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/spring/ControllerServiceDAOFactoryBean.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/spring/ControllerServiceDAOFactoryBean.java
@@ -18,7 +18,6 @@ package org.apache.nifi.web.spring;
 
 import org.apache.nifi.cluster.manager.impl.WebClusterManager;
 import org.apache.nifi.controller.FlowController;
-import org.apache.nifi.controller.repository.FlowFileEventRepository;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.web.dao.ControllerServiceDAO;
 import org.apache.nifi.web.dao.impl.StandardControllerServiceDAO;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/0d2041b0/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/spring/NiFiWebContextFactoryBean.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/spring/NiFiWebContextFactoryBean.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/spring/NiFiWebContextFactoryBean.java
new file mode 100644
index 0000000..8dc9f57
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/spring/NiFiWebContextFactoryBean.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.spring;
+
+import org.apache.nifi.admin.service.AuditService;
+import org.apache.nifi.cluster.manager.impl.WebClusterManager;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.web.NiFiServiceFacade;
+import org.apache.nifi.web.NiFiWebContext;
+import org.apache.nifi.web.StandardNiFiWebContext;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.FactoryBean;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+
+/**
+ *
+ */
+public class NiFiWebContextFactoryBean implements FactoryBean, ApplicationContextAware {
+
+    private ApplicationContext context;
+    private StandardNiFiWebContext webContext;
+    
+    private NiFiProperties properties;
+    private NiFiServiceFacade serviceFacade;
+    private WebClusterManager clusterManager;
+    private AuditService auditService;
+
+    @Override
+    public Object getObject() throws Exception {
+        if (webContext == null) {
+            webContext = new StandardNiFiWebContext();
+            if (properties.isClusterManager()) {
+                webContext.setControllerServiceLookup(context.getBean("clusterManager", WebClusterManager.class));
+            } else {
+                webContext.setControllerServiceLookup(context.getBean("flowController", FlowController.class));
+            }
+            webContext.setAuditService(auditService);
+            webContext.setClusterManager(clusterManager);
+            webContext.setProperties(properties);
+            webContext.setServiceFacade(serviceFacade);
+        }
+
+        return webContext;
+    }
+
+    @Override
+    public Class getObjectType() {
+        return NiFiWebContext.class;
+    }
+
+    @Override
+    public boolean isSingleton() {
+        return true;
+    }
+
+    @Override
+    public void setApplicationContext(ApplicationContext context) throws BeansException {
+        this.context = context;
+    }
+
+    public void setProperties(NiFiProperties properties) {
+        this.properties = properties;
+    }
+
+    public void setServiceFacade(NiFiServiceFacade serviceFacade) {
+        this.serviceFacade = serviceFacade;
+    }
+
+    public void setClusterManager(WebClusterManager clusterManager) {
+        this.clusterManager = clusterManager;
+    }
+
+    public void setAuditService(AuditService auditService) {
+        this.auditService = auditService;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/0d2041b0/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
index 94887c7..a4ee88c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
@@ -106,9 +106,8 @@
         <property name="clusterManager" ref="clusterManager"/>
     </bean>
 
-    <bean id="nifiWebContext" class="org.apache.nifi.web.StandardNiFiWebContext">
+    <bean id="nifiWebContext" class="org.apache.nifi.web.spring.NiFiWebContextFactoryBean" depends-on="clusterManager flowController">
         <property name="serviceFacade" ref="serviceFacade"/>
-        <property name="controllerFacade" ref="controllerFacade"/>
         <property name="properties" ref="nifiProperties"/>
         <property name="clusterManager" ref="clusterManager"/>
         <property name="auditService" ref="auditService"/>


[4/7] incubator-nifi git commit: NIFI-84: Allow PutJMS to create MapMessage's so that we can test GetJMS* Processors

Posted by mc...@apache.org.
NIFI-84: Allow PutJMS to create MapMessage's so that we can test GetJMS* Processors


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/eabf2d52
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/eabf2d52
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/eabf2d52

Branch: refs/heads/NIFI-250
Commit: eabf2d52fc48731cd5a78b602d401d441657b3a6
Parents: 602fa7a
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Feb 10 10:27:17 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Tue Feb 10 10:27:17 2015 -0500

----------------------------------------------------------------------
 .../org/apache/nifi/processors/standard/JmsConsumer.java |  5 +----
 .../java/org/apache/nifi/processors/standard/PutJMS.java | 11 ++++++++---
 .../nifi/processors/standard/util/JmsProperties.java     |  3 ++-
 3 files changed, 11 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/eabf2d52/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
index c48d520..e4bbaec 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java
@@ -48,7 +48,6 @@ import javax.jms.MessageConsumer;
 
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.stream.io.BufferedOutputStream;
 import org.apache.nifi.logging.ProcessorLog;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
@@ -59,10 +58,8 @@ import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processors.standard.util.JmsFactory;
 import org.apache.nifi.processors.standard.util.JmsProcessingSummary;
 import org.apache.nifi.processors.standard.util.WrappedMessageConsumer;
-import org.apache.nifi.util.BooleanHolder;
+import org.apache.nifi.stream.io.BufferedOutputStream;
 import org.apache.nifi.util.IntegerHolder;
-import org.apache.nifi.util.LongHolder;
-import org.apache.nifi.util.ObjectHolder;
 import org.apache.nifi.util.StopWatch;
 
 public abstract class JmsConsumer extends AbstractProcessor {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/eabf2d52/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java
index ce5bea5..99c7bb7 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java
@@ -40,6 +40,7 @@ import static org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_BY
 import static org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_EMPTY;
 import static org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_STREAM;
 import static org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_TEXT;
+import static org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_MAP;
 import static org.apache.nifi.processors.standard.util.JmsProperties.PASSWORD;
 import static org.apache.nifi.processors.standard.util.JmsProperties.REPLY_TO_QUEUE;
 import static org.apache.nifi.processors.standard.util.JmsProperties.TIMEOUT;
@@ -257,18 +258,22 @@ public class PutJMS extends AbstractProcessor {
         switch (context.getProperty(MESSAGE_TYPE).getValue()) {
             case MSG_TYPE_EMPTY: {
                 message = jmsSession.createTextMessage("");
+                break;
             }
-            break;
             case MSG_TYPE_STREAM: {
                 final StreamMessage streamMessage = jmsSession.createStreamMessage();
                 streamMessage.writeBytes(messageContent);
                 message = streamMessage;
+                break;
             }
-            break;
             case MSG_TYPE_TEXT: {
                 message = jmsSession.createTextMessage(new String(messageContent, UTF8));
+                break;
+            }
+            case MSG_TYPE_MAP: {
+                message = jmsSession.createMapMessage();
+                break;
             }
-            break;
             case MSG_TYPE_BYTE:
             default: {
                 final BytesMessage bytesMessage = jmsSession.createBytesMessage();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/eabf2d52/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java
index 67d0bbf..3a5695e 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java
@@ -33,6 +33,7 @@ public class JmsProperties {
     public static final String MSG_TYPE_BYTE = "byte";
     public static final String MSG_TYPE_TEXT = "text";
     public static final String MSG_TYPE_STREAM = "stream";
+    public static final String MSG_TYPE_MAP = "map";
     public static final String MSG_TYPE_EMPTY = "empty";
 
     // Standard JMS Properties
@@ -142,7 +143,7 @@ public class JmsProperties {
             .name("Message Type")
             .description("The Type of JMS Message to Construct")
             .required(true)
-            .allowableValues(MSG_TYPE_BYTE, MSG_TYPE_STREAM, MSG_TYPE_TEXT, MSG_TYPE_EMPTY)
+            .allowableValues(MSG_TYPE_BYTE, MSG_TYPE_STREAM, MSG_TYPE_TEXT, MSG_TYPE_MAP, MSG_TYPE_EMPTY)
             .defaultValue(MSG_TYPE_BYTE)
             .build();
     public static final PropertyDescriptor MESSAGE_PRIORITY = new PropertyDescriptor.Builder()


[6/7] incubator-nifi git commit: Merge branch 'develop' into NIFI-250

Posted by mc...@apache.org.
Merge branch 'develop' into NIFI-250


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/3d38d8c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/3d38d8c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/3d38d8c9

Branch: refs/heads/NIFI-250
Commit: 3d38d8c98c180f5446b0f2dafea78eeec38e432e
Parents: 4bc5ed1 56a6bc4
Author: Matt Gilman <ma...@gmail.com>
Authored: Wed Feb 11 07:09:05 2015 -0500
Committer: Matt Gilman <ma...@gmail.com>
Committed: Wed Feb 11 07:09:05 2015 -0500

----------------------------------------------------------------------
 nifi-site/src/includes/topbar.hbs               |   1 +
 nifi-site/src/pages/html/screencasts.hbs        |  35 ++++
 .../apache/nifi/processors/standard/GetFTP.java |   5 +
 .../nifi/processors/standard/JmsConsumer.java   | 143 ++++++++-------
 .../apache/nifi/processors/standard/PutFTP.java |   7 +-
 .../apache/nifi/processors/standard/PutJMS.java |  11 +-
 .../processors/standard/util/FTPTransfer.java   |  48 ++++-
 .../standard/util/JmsProcessingSummary.java     |  83 +++++++++
 .../processors/standard/util/JmsProperties.java |   3 +-
 .../standard/util/SocksProxySocketFactory.java  |  69 ++++++++
 .../processors/standard/TestJmsConsumer.java    | 173 +++++++++++++++++++
 11 files changed, 513 insertions(+), 65 deletions(-)
----------------------------------------------------------------------