You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2016/12/22 16:00:32 UTC

[2/3] nifi git commit: NIFI-2585 Moving attributes into loop in AbstractFlowFileServerProtocol, and also updating StandardRemoteGroupPort to apply the same attributes when doing a pull-based site-to-site.

NIFI-2585 Moving attributes into loop in AbstractFlowFileServerProtocol, and also updating StandardRemoteGroupPort to apply the same attributes when doing a pull-based site-to-site.

Signed-off-by: Bryan Bende <bb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f7d761a2
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f7d761a2
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f7d761a2

Branch: refs/heads/master
Commit: f7d761a28ae3a07cb8e614e243e197716df025b1
Parents: 28e5d85
Author: Bryan Bende <bb...@apache.org>
Authored: Thu Dec 8 15:17:06 2016 -0500
Committer: Bryan Bende <bb...@apache.org>
Committed: Thu Dec 22 10:59:56 2016 -0500

----------------------------------------------------------------------
 .../attributes/SiteToSiteAttributes.java        | 39 ++++++++++++++++++++
 .../nifi/remote/StandardRemoteGroupPort.java    | 15 ++++++++
 .../AbstractFlowFileServerProtocol.java         | 20 ++++++----
 .../remote/TestStandardRemoteGroupPort.java     |  4 +-
 .../http/TestHttpFlowFileServerProtocol.java    | 12 ++++--
 5 files changed, 77 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/f7d761a2/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/SiteToSiteAttributes.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/SiteToSiteAttributes.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/SiteToSiteAttributes.java
new file mode 100644
index 0000000..ed6437e
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/SiteToSiteAttributes.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.flowfile.attributes;
+
+/**
+ * FlowFile attributes used during site-to-site transfer.
+ */
+public enum SiteToSiteAttributes implements FlowFileAttributeKey {
+
+    S2S_HOST("s2s.host"),
+
+    S2S_ADDRESS("s2s.address");
+
+    private final String key;
+
+    private SiteToSiteAttributes(final String key) {
+        this.key = key;
+    }
+
+    @Override
+    public String key() {
+        return key;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/f7d761a2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index 3a23601..b1a1c92 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -21,7 +21,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -37,6 +39,7 @@ import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.processor.ProcessContext;
@@ -56,6 +59,7 @@ import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -338,6 +342,17 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
 
             FlowFile flowFile = session.create();
             flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes());
+
+            final Communicant communicant = transaction.getCommunicant();
+            final String host = StringUtils.isEmpty(communicant.getHost()) ? "unknown" : communicant.getHost();
+            final String port = communicant.getPort() < 0 ? "unknown" : String.valueOf(communicant.getPort());
+
+            final Map<String,String> attributes = new HashMap<>(2);
+            attributes.put(SiteToSiteAttributes.S2S_HOST.key(), host);
+            attributes.put(SiteToSiteAttributes.S2S_ADDRESS.key(), host + ":" + port);
+
+            flowFile = session.putAllAttributes(flowFile, attributes);
+
             flowFile = session.importFrom(dataPacket.getData(), flowFile);
             final long receiveNanos = System.nanoTime() - start;
             flowFilesReceived.add(flowFile);

http://git-wip-us.apache.org/repos/asf/nifi/blob/f7d761a2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java
index e149481..fe4b1b1 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java
@@ -20,6 +20,7 @@ import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.connectable.Port;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
@@ -37,6 +38,7 @@ import org.apache.nifi.remote.io.CompressionOutputStream;
 import org.apache.nifi.remote.util.StandardDataPacket;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,6 +47,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -448,7 +451,16 @@ public abstract class AbstractFlowFileServerProtocol implements ServerProtocol {
             final long transferNanos = System.nanoTime() - startNanos;
             final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
             final String sourceSystemFlowFileUuid = dataPacket.getAttributes().get(CoreAttributes.UUID.key());
-            flowFile = session.putAttribute(flowFile, CoreAttributes.UUID.key(), UUID.randomUUID().toString());
+
+            final String host = StringUtils.isEmpty(peer.getHost()) ? "unknown" : peer.getHost();
+            final String port = peer.getPort() <= 0 ? "unknown" : String.valueOf(peer.getPort());
+
+            final Map<String,String> attributes = new HashMap<>(4);
+            attributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
+            attributes.put(SiteToSiteAttributes.S2S_HOST.key(), host);
+            attributes.put(SiteToSiteAttributes.S2S_ADDRESS.key(), host + ":" + port);
+
+            flowFile = session.putAllAttributes(flowFile, attributes);
 
             final String transitUri = createTransitUri(peer, sourceSystemFlowFileUuid);
             session.getProvenanceReporter().receive(flowFile, transitUri, sourceSystemFlowFileUuid == null
@@ -506,12 +518,6 @@ public abstract class AbstractFlowFileServerProtocol implements ServerProtocol {
                 throw new ProtocolException(this + " Received unexpected Response Code from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code");
         }
 
-        // For routing purposes, downstream consumers often need to reference Flowfile's originating system
-        for (FlowFile flowFile : transaction.getFlowFilesSent()){
-          flowFile = session.putAttribute(flowFile, "remote.host", peer.getHost());
-          flowFile = session.putAttribute(flowFile, "remote.address", peer.getHost() + ":" + peer.getPort());
-        }
-
         // Commit the session so that we have persisted the data
         session.commit();
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/f7d761a2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
index 23d3fda..43009b4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
@@ -176,7 +176,7 @@ public class TestStandardRemoteGroupPort {
             // Return null when it gets called second time.
             doReturn(dataPacket).doReturn(null).when(this.transaction).receive();
 
-            doReturn(flowFile).when(session).putAllAttributes(eq(flowFile), eq(attributes));
+            doReturn(flowFile).doReturn(flowFile).when(session).putAllAttributes(eq(flowFile), any(Map.class));
             doReturn(flowFile).when(session).importFrom(any(InputStream.class), eq(flowFile));
 
             port.onTrigger(context, session);
@@ -244,7 +244,7 @@ public class TestStandardRemoteGroupPort {
         // Return null when it's called second time.
         doReturn(dataPacket).doReturn(null).when(transaction).receive();
 
-        doReturn(flowFile).when(session).putAllAttributes(eq(flowFile), eq(attributes));
+        doReturn(flowFile).doReturn(flowFile).when(session).putAllAttributes(eq(flowFile), any(Map.class));
         doReturn(flowFile).when(session).importFrom(any(InputStream.class), eq(flowFile));
 
         port.onTrigger(context, session);

http://git-wip-us.apache.org/repos/asf/nifi/blob/f7d761a2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java
index f5e803d..9f86d5b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java
@@ -505,7 +505,7 @@ public class TestHttpFlowFileServerProtocol {
         }).when(processSession).importFrom(any(InputStream.class), any(FlowFile.class));
         // AbstractFlowFileServerProtocol adopts builder pattern and putAttribute is the last execution
         // which returns flowFile instance used later.
-        doReturn(flowFile).when(processSession).putAttribute(any(FlowFile.class), any(String.class), any(String.class));
+        doReturn(flowFile).when(processSession).putAllAttributes(any(FlowFile.class), any(Map.class));
         doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
         doAnswer(invocation -> {
             final String transitUri = (String)invocation.getArguments()[1];
@@ -567,12 +567,16 @@ public class TestHttpFlowFileServerProtocol {
             }
             return flowFile1;
         }).when(processSession).importFrom(any(InputStream.class), any(FlowFile.class));
-        // AbstractFlowFileServerProtocol adopts builder pattern and putAttribute is the last execution
-        // which returns flowFile instance used later.
+
+        // AbstractFlowFileServerProtocol adopts builder pattern and putAllAttributes is the last execution
+        // which returns flowFile instance used later, it is called twice for each flow file
         doReturn(flowFile1)
+                .doReturn(flowFile1)
                 .doReturn(flowFile2)
-                .when(processSession).putAttribute(any(FlowFile.class), any(String.class), any(String.class));
+                .doReturn(flowFile2)
+                .when(processSession).putAllAttributes(any(FlowFile.class), any(Map.class));
         doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
+
         doAnswer(invocation -> {
             final String transitUri = (String)invocation.getArguments()[1];
             final String detail = (String)invocation.getArguments()[3];