You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2016/12/22 16:00:32 UTC
[2/3] nifi git commit: NIFI-2585 Moving attributes into loop in
AbstractFlowFileServerProtocol,
and also updating StandardRemoteGroupPort to apply the same attributes when
doing a pull-based site-to-site.
NIFI-2585 Moving attributes into loop in AbstractFlowFileServerProtocol, and also updating StandardRemoteGroupPort to apply the same attributes when doing a pull-based site-to-site.
Signed-off-by: Bryan Bende <bb...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f7d761a2
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f7d761a2
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f7d761a2
Branch: refs/heads/master
Commit: f7d761a28ae3a07cb8e614e243e197716df025b1
Parents: 28e5d85
Author: Bryan Bende <bb...@apache.org>
Authored: Thu Dec 8 15:17:06 2016 -0500
Committer: Bryan Bende <bb...@apache.org>
Committed: Thu Dec 22 10:59:56 2016 -0500
----------------------------------------------------------------------
.../attributes/SiteToSiteAttributes.java | 39 ++++++++++++++++++++
.../nifi/remote/StandardRemoteGroupPort.java | 15 ++++++++
.../AbstractFlowFileServerProtocol.java | 20 ++++++----
.../remote/TestStandardRemoteGroupPort.java | 4 +-
.../http/TestHttpFlowFileServerProtocol.java | 12 ++++--
5 files changed, 77 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/f7d761a2/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/SiteToSiteAttributes.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/SiteToSiteAttributes.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/SiteToSiteAttributes.java
new file mode 100644
index 0000000..ed6437e
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/SiteToSiteAttributes.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.flowfile.attributes;
+
+/**
+ * FlowFile attributes used during site-to-site transfer.
+ */
+public enum SiteToSiteAttributes implements FlowFileAttributeKey {
+
+ S2S_HOST("s2s.host"),
+
+ S2S_ADDRESS("s2s.address");
+
+ private final String key;
+
+ private SiteToSiteAttributes(final String key) {
+ this.key = key;
+ }
+
+ @Override
+ public String key() {
+ return key;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/f7d761a2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index 3a23601..b1a1c92 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -21,7 +21,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -37,6 +39,7 @@ import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.processor.ProcessContext;
@@ -56,6 +59,7 @@ import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -338,6 +342,17 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
FlowFile flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes());
+
+ final Communicant communicant = transaction.getCommunicant();
+ final String host = StringUtils.isEmpty(communicant.getHost()) ? "unknown" : communicant.getHost();
+ final String port = communicant.getPort() < 0 ? "unknown" : String.valueOf(communicant.getPort());
+
+ final Map<String,String> attributes = new HashMap<>(2);
+ attributes.put(SiteToSiteAttributes.S2S_HOST.key(), host);
+ attributes.put(SiteToSiteAttributes.S2S_ADDRESS.key(), host + ":" + port);
+
+ flowFile = session.putAllAttributes(flowFile, attributes);
+
flowFile = session.importFrom(dataPacket.getData(), flowFile);
final long receiveNanos = System.nanoTime() - start;
flowFilesReceived.add(flowFile);
http://git-wip-us.apache.org/repos/asf/nifi/blob/f7d761a2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java
index e149481..fe4b1b1 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java
@@ -20,6 +20,7 @@ import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@@ -37,6 +38,7 @@ import org.apache.nifi.remote.io.CompressionOutputStream;
import org.apache.nifi.remote.util.StandardDataPacket;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,6 +47,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -448,7 +451,16 @@ public abstract class AbstractFlowFileServerProtocol implements ServerProtocol {
final long transferNanos = System.nanoTime() - startNanos;
final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
final String sourceSystemFlowFileUuid = dataPacket.getAttributes().get(CoreAttributes.UUID.key());
- flowFile = session.putAttribute(flowFile, CoreAttributes.UUID.key(), UUID.randomUUID().toString());
+
+ final String host = StringUtils.isEmpty(peer.getHost()) ? "unknown" : peer.getHost();
+ final String port = peer.getPort() <= 0 ? "unknown" : String.valueOf(peer.getPort());
+
+ final Map<String,String> attributes = new HashMap<>(4);
+ attributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
+ attributes.put(SiteToSiteAttributes.S2S_HOST.key(), host);
+ attributes.put(SiteToSiteAttributes.S2S_ADDRESS.key(), host + ":" + port);
+
+ flowFile = session.putAllAttributes(flowFile, attributes);
final String transitUri = createTransitUri(peer, sourceSystemFlowFileUuid);
session.getProvenanceReporter().receive(flowFile, transitUri, sourceSystemFlowFileUuid == null
@@ -506,12 +518,6 @@ public abstract class AbstractFlowFileServerProtocol implements ServerProtocol {
throw new ProtocolException(this + " Received unexpected Response Code from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code");
}
- // For routing purposes, downstream consumers often need to reference Flowfile's originating system
- for (FlowFile flowFile : transaction.getFlowFilesSent()){
- flowFile = session.putAttribute(flowFile, "remote.host", peer.getHost());
- flowFile = session.putAttribute(flowFile, "remote.address", peer.getHost() + ":" + peer.getPort());
- }
-
// Commit the session so that we have persisted the data
session.commit();
http://git-wip-us.apache.org/repos/asf/nifi/blob/f7d761a2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
index 23d3fda..43009b4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
@@ -176,7 +176,7 @@ public class TestStandardRemoteGroupPort {
// Return null when it gets called second time.
doReturn(dataPacket).doReturn(null).when(this.transaction).receive();
- doReturn(flowFile).when(session).putAllAttributes(eq(flowFile), eq(attributes));
+ doReturn(flowFile).doReturn(flowFile).when(session).putAllAttributes(eq(flowFile), any(Map.class));
doReturn(flowFile).when(session).importFrom(any(InputStream.class), eq(flowFile));
port.onTrigger(context, session);
@@ -244,7 +244,7 @@ public class TestStandardRemoteGroupPort {
// Return null when it's called second time.
doReturn(dataPacket).doReturn(null).when(transaction).receive();
- doReturn(flowFile).when(session).putAllAttributes(eq(flowFile), eq(attributes));
+ doReturn(flowFile).doReturn(flowFile).when(session).putAllAttributes(eq(flowFile), any(Map.class));
doReturn(flowFile).when(session).importFrom(any(InputStream.class), eq(flowFile));
port.onTrigger(context, session);
http://git-wip-us.apache.org/repos/asf/nifi/blob/f7d761a2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java
index f5e803d..9f86d5b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java
@@ -505,7 +505,7 @@ public class TestHttpFlowFileServerProtocol {
}).when(processSession).importFrom(any(InputStream.class), any(FlowFile.class));
// AbstractFlowFileServerProtocol adopts builder pattern and putAttribute is the last execution
// which returns flowFile instance used later.
- doReturn(flowFile).when(processSession).putAttribute(any(FlowFile.class), any(String.class), any(String.class));
+ doReturn(flowFile).when(processSession).putAllAttributes(any(FlowFile.class), any(Map.class));
doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
doAnswer(invocation -> {
final String transitUri = (String)invocation.getArguments()[1];
@@ -567,12 +567,16 @@ public class TestHttpFlowFileServerProtocol {
}
return flowFile1;
}).when(processSession).importFrom(any(InputStream.class), any(FlowFile.class));
- // AbstractFlowFileServerProtocol adopts builder pattern and putAttribute is the last execution
- // which returns flowFile instance used later.
+
+ // AbstractFlowFileServerProtocol adopts builder pattern and putAllAttributes is the last execution
+ // which returns flowFile instance used later, it is called twice for each flow file
doReturn(flowFile1)
+ .doReturn(flowFile1)
.doReturn(flowFile2)
- .when(processSession).putAttribute(any(FlowFile.class), any(String.class), any(String.class));
+ .doReturn(flowFile2)
+ .when(processSession).putAllAttributes(any(FlowFile.class), any(Map.class));
doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
+
doAnswer(invocation -> {
final String transitUri = (String)invocation.getArguments()[1];
final String detail = (String)invocation.getArguments()[3];