You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2017/01/03 17:09:43 UTC

nifi git commit: NIFI-3231 Added EL support to hostname and port in PutTCP/UDP

Repository: nifi
Updated Branches:
  refs/heads/master 1b4729e44 -> 9b47961d1


NIFI-3231 Added EL support to hostname and port in PutTCP/UDP

This closes #1361.

Signed-off-by: Bryan Bende <bb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9b47961d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9b47961d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9b47961d

Branch: refs/heads/master
Commit: 9b47961d1c0dcc29f91c083e0affb99e1db07084
Parents: 1b4729e
Author: Pierre Villard <pi...@gmail.com>
Authored: Wed Dec 28 14:39:22 2016 +0100
Committer: Bryan Bende <bb...@apache.org>
Committed: Tue Jan 3 12:09:19 2017 -0500

----------------------------------------------------------------------
 .../processor/util/put/AbstractPutEventProcessor.java  |  2 ++
 .../org/apache/nifi/processors/splunk/PutSplunk.java   |  8 ++++----
 .../org/apache/nifi/processors/standard/PutTCP.java    |  8 ++++----
 .../org/apache/nifi/processors/standard/PutUDP.java    | 11 +++++++----
 .../apache/nifi/processors/standard/TestPutUDP.java    | 12 ++++++++++++
 .../processors/standard/util/TestPutTCPCommon.java     | 13 +++++++++++++
 6 files changed, 42 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/9b47961d/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
index 9fd0496..65b11ff 100644
--- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
+++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
@@ -56,12 +56,14 @@ public abstract class AbstractPutEventProcessor extends AbstractSessionFactoryPr
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .defaultValue("localhost")
             .required(true)
+            .expressionLanguageSupported(true)
             .build();
     public static final PropertyDescriptor PORT = new PropertyDescriptor
             .Builder().name("Port")
             .description("The port on the destination.")
             .required(true)
             .addValidator(StandardValidators.PORT_VALIDATOR)
+            .expressionLanguageSupported(true)
             .build();
     public static final PropertyDescriptor MAX_SOCKET_SEND_BUFFER_SIZE = new PropertyDescriptor.Builder()
             .name("Max Size of Socket Send Buffer")

http://git-wip-us.apache.org/repos/asf/nifi/blob/9b47961d/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java
index 0a09243..56d3e26 100644
--- a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java
+++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java
@@ -103,16 +103,16 @@ public class PutSplunk extends AbstractPutEventProcessor {
 
     @Override
     protected String createTransitUri(ProcessContext context) {
-        final String port = context.getProperty(PORT).getValue();
-        final String host = context.getProperty(HOSTNAME).getValue();
+        final String port = context.getProperty(PORT).evaluateAttributeExpressions().getValue();
+        final String host = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
         final String protocol = context.getProperty(PROTOCOL).getValue().toLowerCase();
         return new StringBuilder().append(protocol).append("://").append(host).append(":").append(port).toString();
     }
 
     @Override
     protected ChannelSender createSender(ProcessContext context) throws IOException {
-        final int port = context.getProperty(PORT).asInteger();
-        final String host = context.getProperty(HOSTNAME).getValue();
+        final int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
+        final String host = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
         final String protocol = context.getProperty(PROTOCOL).getValue();
         final int timeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
         final int maxSendBuffer = context.getProperty(MAX_SOCKET_SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();

http://git-wip-us.apache.org/repos/asf/nifi/blob/9b47961d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
index 3354b09..34f6277 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
@@ -107,8 +107,8 @@ public class PutTCP extends AbstractPutEventProcessor {
     @Override
     protected ChannelSender createSender(final ProcessContext context) throws IOException {
         final String protocol = TCP_VALUE.getValue();
-        final String hostname = context.getProperty(HOSTNAME).getValue();
-        final int port = context.getProperty(PORT).asInteger();
+        final String hostname = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
+        final int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
         final int timeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
         final int bufferSize = context.getProperty(MAX_SOCKET_SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
         final SSLContextService sslContextService = (SSLContextService) context.getProperty(SSL_CONTEXT_SERVICE).asControllerService();
@@ -133,8 +133,8 @@ public class PutTCP extends AbstractPutEventProcessor {
     @Override
     protected String createTransitUri(final ProcessContext context) {
         final String protocol = TCP_VALUE.getValue();
-        final String host = context.getProperty(HOSTNAME).getValue();
-        final String port = context.getProperty(PORT).getValue();
+        final String host = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
+        final String port = context.getProperty(PORT).evaluateAttributeExpressions().getValue();
 
         return new StringBuilder().append(protocol).append("://").append(host).append(":").append(port).toString();
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/9b47961d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java
index 8144786..af23c54 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java
@@ -37,6 +37,7 @@ import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.put.AbstractPutEventProcessor;
 import org.apache.nifi.processor.util.put.sender.ChannelSender;
 import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.StopWatch;
 
 /**
  * <p>
@@ -91,8 +92,8 @@ public class PutUDP extends AbstractPutEventProcessor {
     @Override
     protected ChannelSender createSender(final ProcessContext context) throws IOException {
         final String protocol = UDP_VALUE.getValue();
-        final String hostname = context.getProperty(HOSTNAME).getValue();
-        final int port = context.getProperty(PORT).asInteger();
+        final String hostname = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
+        final int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
         final int bufferSize = context.getProperty(MAX_SOCKET_SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
 
         return createSender(protocol, hostname, port, 0, bufferSize, null);
@@ -109,8 +110,8 @@ public class PutUDP extends AbstractPutEventProcessor {
     @Override
     protected String createTransitUri(final ProcessContext context) {
         final String protocol = UDP_VALUE.getValue();
-        final String host = context.getProperty(HOSTNAME).getValue();
-        final String port = context.getProperty(PORT).getValue();
+        final String host = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
+        final String port = context.getProperty(PORT).evaluateAttributeExpressions().getValue();
 
         return new StringBuilder().append(protocol).append("://").append(host).append(":").append(port).toString();
     }
@@ -142,7 +143,9 @@ public class PutUDP extends AbstractPutEventProcessor {
 
         try {
             byte[] content = readContent(session, flowFile);
+            StopWatch stopWatch = new StopWatch(true);
             sender.send(content);
+            session.getProvenanceReporter().send(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
             session.transfer(flowFile, REL_SUCCESS);
             session.commit();
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/9b47961d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutUDP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutUDP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutUDP.java
index 30d89c8..a1fe113 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutUDP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutUDP.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertNull;
 import java.net.DatagramPacket;
 import java.net.InetAddress;
 import java.util.concurrent.ArrayBlockingQueue;
+
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.After;
@@ -33,6 +34,8 @@ import org.junit.Test;
 public class TestPutUDP {
 
     private final static String UDP_SERVER_ADDRESS = "127.0.0.1";
+    private final static String SERVER_VARIABLE = "ALKJAFLKJDFLSKJSDFLKJSDF";
+    private final static String UDP_SERVER_ADDRESS_EL = "${" + SERVER_VARIABLE + "}";
     private final static String UNKNOWN_HOST = "fgdsfgsdffd";
     private final static String INVALID_IP_ADDRESS = "300.300.300.300";
     private final static int BUFFER_SIZE = 1024;
@@ -60,6 +63,7 @@ public class TestPutUDP {
     public void setup() throws Exception {
         createTestServer(UDP_SERVER_ADDRESS, 0, BUFFER_SIZE);
         runner = TestRunners.newTestRunner(PutUDP.class);
+        runner.setVariable(SERVER_VARIABLE, UDP_SERVER_ADDRESS);
     }
 
     private void createTestServer(final String address, final int port, final int recvQueueSize) throws Exception {
@@ -100,6 +104,14 @@ public class TestPutUDP {
     }
 
     @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+    public void testValidFilesEL() throws Exception {
+        configureProperties(UDP_SERVER_ADDRESS_EL, true);
+        sendTestData(VALID_FILES);
+        checkReceivedAllData(VALID_FILES);
+        checkInputQueueIsEmpty();
+    }
+
+    @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
     public void testEmptyFile() throws Exception {
         configureProperties(UDP_SERVER_ADDRESS, true);
         sendTestData(EMPTY_FILE);

http://git-wip-us.apache.org/repos/asf/nifi/blob/9b47961d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestPutTCPCommon.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestPutTCPCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestPutTCPCommon.java
index ed52c35..e07d44a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestPutTCPCommon.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestPutTCPCommon.java
@@ -37,6 +37,8 @@ import static org.junit.Assert.assertNull;
 
 public abstract class TestPutTCPCommon {
     private final static String TCP_SERVER_ADDRESS = "127.0.0.1";
+    private final static String SERVER_VARIABLE = "ALKJAFLKJDFLSKJSDFLKJSDF";
+    private final static String TCP_SERVER_ADDRESS_EL = "${" + SERVER_VARIABLE + "}";
     private final static String UNKNOWN_HOST = "fgdsfgsdffd";
     private final static String INVALID_IP_ADDRESS = "300.300.300.300";
     private final static int MIN_INVALID_PORT = 0;
@@ -72,6 +74,7 @@ public abstract class TestPutTCPCommon {
     public void setup() throws Exception {
         recvQueue = new ArrayBlockingQueue<List<Byte>>(BUFFER_SIZE);
         runner = TestRunners.newTestRunner(PutTCP.class);
+        runner.setVariable(SERVER_VARIABLE, TCP_SERVER_ADDRESS);
     }
 
     private synchronized TCPTestServer createTestServer(final String address, final ArrayBlockingQueue<List<Byte>> recvQueue, final String delimiter) throws Exception {
@@ -105,6 +108,16 @@ public abstract class TestPutTCPCommon {
     }
 
     @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+    public void testValidFilesEL() throws Exception {
+        server = createTestServer(TCP_SERVER_ADDRESS, recvQueue, OUTGOING_MESSAGE_DELIMITER);
+        configureProperties(TCP_SERVER_ADDRESS_EL, tcp_server_port, OUTGOING_MESSAGE_DELIMITER, false, true);
+        sendTestData(VALID_FILES);
+        checkReceivedAllData(recvQueue, VALID_FILES);
+        checkInputQueueIsEmpty();
+        checkTotalNumConnections(server, 1);
+    }
+
+    @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
     public void testPruneSenders() throws Exception {
         server = createTestServer(TCP_SERVER_ADDRESS, recvQueue, OUTGOING_MESSAGE_DELIMITER);
         configureProperties(TCP_SERVER_ADDRESS, tcp_server_port, OUTGOING_MESSAGE_DELIMITER, false, true);