You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2016/12/22 16:00:31 UTC

[1/3] nifi git commit: NIFI-2585: Add attributes to track where a flow file came from when receiving over site-to-site

Repository: nifi
Updated Branches:
  refs/heads/master 44c9ea0a6 -> 908e7d313


NIFI-2585: Add attributes to track where a flow file came from when receiving over site-to-site

Signed-off-by: Bryan Bende <bb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/28e5d854
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/28e5d854
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/28e5d854

Branch: refs/heads/master
Commit: 28e5d85493e43a6df7ec7f39657373734cb162ed
Parents: 44c9ea0
Author: Randy Gelhausen <rg...@gmail.com>
Authored: Wed Dec 7 02:18:09 2016 -0500
Committer: Bryan Bende <bb...@apache.org>
Committed: Thu Dec 22 10:59:55 2016 -0500

----------------------------------------------------------------------
 .../nifi/remote/protocol/AbstractFlowFileServerProtocol.java   | 6 ++++++
 1 file changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/28e5d854/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java
index 2ba87a2..e149481 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java
@@ -506,6 +506,12 @@ public abstract class AbstractFlowFileServerProtocol implements ServerProtocol {
                 throw new ProtocolException(this + " Received unexpected Response Code from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code");
         }
 
+        // For routing purposes, downstream consumers often need to reference Flowfile's originating system
+        for (FlowFile flowFile : transaction.getFlowFilesSent()){
+          flowFile = session.putAttribute(flowFile, "remote.host", peer.getHost());
+          flowFile = session.putAttribute(flowFile, "remote.address", peer.getHost() + ":" + peer.getPort());
+        }
+
         // Commit the session so that we have persisted the data
         session.commit();
 


[3/3] nifi git commit: NIFI-2585: Add attributes to track s2s host and port

Posted by bb...@apache.org.
NIFI-2585: Add attributes to track s2s host and port

- Removed host and port field from Peer since the same information is
  available in PeerDescription
- Refactored variable names in SocketRemoteSiteListener to improve readability
- Changed how SocketRemoteSiteListener constructs PeerDescription
  instance. It used to use hard-coded 'localhost' as hostname, and
  getPort() which returns server's port. Since the peer is a remote peer,
  i.e the client, it should be client hostname and port.
- Added hostname resolution at DataTransferResource to make s2s.host
  value consistent with RAW transport. Without this, RAW uses hostname
  while HTTP uses IP address. It will be hard to be used from downstream flows.
- Replaced heavy use of mockito which was difficult to maintain, with
  nifi-mock
- Added SiteToSiteAttributes and more assertions in unit tests

This closes #1342.

Signed-off-by: Bryan Bende <bb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/908e7d31
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/908e7d31
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/908e7d31

Branch: refs/heads/master
Commit: 908e7d3131908671d7e7df4b4aa4a29a9ec91694
Parents: f7d761a
Author: Koji Kawamura <ij...@apache.org>
Authored: Tue Dec 20 11:19:22 2016 +0900
Committer: Bryan Bende <bb...@apache.org>
Committed: Thu Dec 22 11:00:00 2016 -0500

----------------------------------------------------------------------
 .../main/java/org/apache/nifi/remote/Peer.java  |  11 +-
 .../nifi-framework/nifi-site-to-site/pom.xml    |   4 +-
 .../nifi/remote/SocketRemoteSiteListener.java   |  14 +-
 .../remote/TestStandardRemoteGroupPort.java     | 171 +++++----
 .../http/TestHttpFlowFileServerProtocol.java    | 369 +++++++++----------
 .../nifi/web/api/DataTransferResource.java      |  11 +-
 6 files changed, 308 insertions(+), 272 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/908e7d31/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
index 962baec..17bc011 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
@@ -29,8 +29,6 @@ public class Peer implements Communicant {
     private final CommunicationsSession commsSession;
     private final String url;
     private final String clusterUrl;
-    private final String host;
-    private final int port;
 
     private final Map<String, Long> penaltyExpirationMap = new HashMap<>();
     private boolean closed = false;
@@ -42,9 +40,8 @@ public class Peer implements Communicant {
         this.clusterUrl = clusterUrl;
 
         try {
-            final URI uri = new URI(peerUrl);
-            this.port = uri.getPort();
-            this.host = uri.getHost();
+            // Parse peerUrl to validate it.
+            new URI(peerUrl);
         } catch (final Exception e) {
             throw new IllegalArgumentException("Invalid URL: " + peerUrl);
         }
@@ -104,7 +101,7 @@ public class Peer implements Communicant {
 
     @Override
     public String getHost() {
-        return host;
+        return description.getHostname();
     }
 
     @Override
@@ -141,7 +138,7 @@ public class Peer implements Communicant {
 
     @Override
     public int getPort() {
-        return port;
+        return description.getPort();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/908e7d31/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/pom.xml
index 2f0183c..a44989a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/pom.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/pom.xml
@@ -71,8 +71,8 @@
             <artifactId>httpclient</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.mockito</groupId>
-            <artifactId>mockito-core</artifactId>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
             <scope>test</scope>
         </dependency>
     </dependencies>

http://git-wip-us.apache.org/repos/asf/nifi/blob/908e7d31/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
index 9f9e3aa..5222bbc 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
@@ -137,16 +137,16 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
                         public void run() {
                             LOG.debug("{} Determining URL of connection", this);
                             final InetAddress inetAddress = socket.getInetAddress();
-                            String hostname = inetAddress.getHostName();
-                            final int slashIndex = hostname.indexOf("/");
+                            String clientHostName = inetAddress.getHostName();
+                            final int slashIndex = clientHostName.indexOf("/");
                             if (slashIndex == 0) {
-                                hostname = hostname.substring(1);
+                                clientHostName = clientHostName.substring(1);
                             } else if (slashIndex > 0) {
-                                hostname = hostname.substring(0, slashIndex);
+                                clientHostName = clientHostName.substring(0, slashIndex);
                             }
 
-                            final int port = socket.getPort();
-                            final String peerUri = "nifi://" + hostname + ":" + port;
+                            final int clientPort = socket.getPort();
+                            final String peerUri = "nifi://" + clientHostName + ":" + clientPort;
                             LOG.debug("{} Connection URL is {}", this, peerUri);
 
                             final CommunicationsSession commsSession;
@@ -211,7 +211,7 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
                                 protocol.setRootProcessGroup(rootGroup.get());
                                 protocol.setNodeInformant(nodeInformant);
 
-                                final PeerDescription description = new PeerDescription("localhost", getPort(), sslContext != null);
+                                final PeerDescription description = new PeerDescription(clientHostName, clientPort, sslContext != null);
                                 peer = new Peer(description, commsSession, peerUri, "nifi://localhost:" + getPort());
                                 LOG.debug("Handshaking....");
                                 protocol.handshake(peer);

http://git-wip-us.apache.org/repos/asf/nifi/blob/908e7d31/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
index 43009b4..2d48515 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
@@ -18,15 +18,15 @@ package org.apache.nifi.remote;
 
 import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.controller.ProcessScheduler;
-import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroup;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.provenance.ProvenanceReporter;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.remote.client.SiteToSiteClient;
 import org.apache.nifi.remote.io.http.HttpCommunicationsSession;
 import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
@@ -34,22 +34,30 @@ import org.apache.nifi.remote.protocol.CommunicationsSession;
 import org.apache.nifi.remote.protocol.DataPacket;
 import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
 import org.apache.nifi.remote.util.StandardDataPacket;
-import org.apache.nifi.stream.io.ByteArrayInputStream;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockProcessContext;
+import org.apache.nifi.util.MockProcessSession;
+import org.apache.nifi.util.SharedSessionState;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.io.InputStream;
+import java.io.ByteArrayInputStream;
 import java.nio.channels.SocketChannel;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.nifi.util.NiFiProperties;
 
-import static org.mockito.Matchers.any;
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class TestStandardRemoteGroupPort {
 
@@ -64,9 +72,9 @@ public class TestStandardRemoteGroupPort {
     private ProcessGroup processGroup;
     public static final String REMOTE_CLUSTER_URL = "http://node0.example.com:8080/nifi";
     private StandardRemoteGroupPort port;
-    private ProcessContext context;
-    private ProcessSession session;
-    private ProvenanceReporter provenanceReporter;
+    private SharedSessionState sessionState;
+    private MockProcessSession processSession;
+    private MockProcessContext processContext;
 
     @BeforeClass
     public static void setup() throws Exception {
@@ -112,41 +120,49 @@ public class TestStandardRemoteGroupPort {
         doReturn(transaction).when(siteToSiteClient).createTransaction(eq(direction));
         doReturn(eventReporter).when(remoteGroup).getEventReporter();
 
-        context = null;
-        session = mock(ProcessSession.class);
-        provenanceReporter = mock(ProvenanceReporter.class);
-        doReturn(provenanceReporter).when(session).getProvenanceReporter();
+    }
 
+    private void setupMockProcessSession() {
+        // Construct a RemoteGroupPort as a processor to use NiFi mock library.
+        final Processor remoteGroupPort = mock(Processor.class);
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(Relationship.ANONYMOUS);
+        when(remoteGroupPort.getRelationships()).thenReturn(relationships);
+        when(remoteGroupPort.getIdentifier()).thenReturn("remote-group-port-id");
+
+        sessionState = new SharedSessionState(remoteGroupPort, new AtomicLong(0));
+        processSession = new MockProcessSession(sessionState, remoteGroupPort);
+        processContext = new MockProcessContext(remoteGroupPort);
     }
 
     @Test
     public void testSendRaw() throws Exception {
 
         setupMock(SiteToSiteTransportProtocol.RAW, TransferDirection.SEND);
+        setupMockProcessSession();
 
         final String peerUrl = "nifi://node1.example.com:9090";
-        final PeerDescription peerDescription = new PeerDescription("node1.example.com", 9090, false);
+        final PeerDescription peerDescription = new PeerDescription("node1.example.com", 9090, true);
         try (final SocketChannel socketChannel = SocketChannel.open()) {
             final CommunicationsSession commsSession = new SocketChannelCommunicationsSession(socketChannel);
+            commsSession.setUserDn("nifi.node1.example.com");
             final Peer peer = new Peer(peerDescription, commsSession, peerUrl, REMOTE_CLUSTER_URL);
 
             doReturn(peer).when(transaction).getCommunicant();
 
-            final QueueSize queueSize = new QueueSize(1, 10);
-            final FlowFile flowFile = mock(FlowFile.class);
+            final MockFlowFile flowFile = processSession.createFlowFile("0123456789".getBytes());
+            sessionState.getFlowFileQueue().offer(flowFile);
 
-            doReturn(queueSize).when(session).getQueueSize();
-            // Return null when it gets called second time.
-            doReturn(flowFile).doReturn(null).when(session).get();
+            port.onTrigger(processContext, processSession);
 
-            final String flowFileUuid = "flowfile-uuid";
-            doReturn(flowFileUuid).when(flowFile).getAttribute(eq(CoreAttributes.UUID.key()));
+            // Assert provenance.
+            final List<ProvenanceEventRecord> provenanceEvents = sessionState.getProvenanceEvents();
+            assertEquals(1, provenanceEvents.size());
+            final ProvenanceEventRecord provenanceEvent = provenanceEvents.get(0);
+            assertEquals(ProvenanceEventType.SEND, provenanceEvent.getEventType());
+            assertEquals(peerUrl + "/" + flowFile.getAttribute(CoreAttributes.UUID.key()), provenanceEvent.getTransitUri());
+            assertEquals("Remote DN=nifi.node1.example.com", provenanceEvent.getDetails());
 
-            port.onTrigger(context, session);
-
-            // Transit uri can be customized if necessary.
-            verify(provenanceReporter).send(eq(flowFile), eq(peerUrl + "/" + flowFileUuid), any(String.class),
-                    any(Long.class), eq(false));
         }
     }
 
@@ -154,16 +170,17 @@ public class TestStandardRemoteGroupPort {
     public void testReceiveRaw() throws Exception {
 
         setupMock(SiteToSiteTransportProtocol.RAW, TransferDirection.RECEIVE);
+        setupMockProcessSession();
 
         final String peerUrl = "nifi://node1.example.com:9090";
-        final PeerDescription peerDescription = new PeerDescription("node1.example.com", 9090, false);
+        final PeerDescription peerDescription = new PeerDescription("node1.example.com", 9090, true);
         try (final SocketChannel socketChannel = SocketChannel.open()) {
             final CommunicationsSession commsSession = new SocketChannelCommunicationsSession(socketChannel);
+            commsSession.setUserDn("nifi.node1.example.com");
             final Peer peer = new Peer(peerDescription, commsSession, peerUrl, REMOTE_CLUSTER_URL);
 
             doReturn(peer).when(transaction).getCommunicant();
 
-            final FlowFile flowFile = mock(FlowFile.class);
             final String sourceFlowFileUuid = "flowfile-uuid";
             final Map<String, String> attributes = new HashMap<>();
             attributes.put(CoreAttributes.UUID.key(), sourceFlowFileUuid);
@@ -172,18 +189,26 @@ public class TestStandardRemoteGroupPort {
             final DataPacket dataPacket = new StandardDataPacket(attributes,
                     dataPacketInputStream, dataPacketContents.length);
 
-            doReturn(flowFile).when(session).create();
             // Return null when it gets called second time.
             doReturn(dataPacket).doReturn(null).when(this.transaction).receive();
 
-            doReturn(flowFile).doReturn(flowFile).when(session).putAllAttributes(eq(flowFile), any(Map.class));
-            doReturn(flowFile).when(session).importFrom(any(InputStream.class), eq(flowFile));
-
-            port.onTrigger(context, session);
-
-            // Transit uri can be customized if necessary.
-            verify(provenanceReporter).receive(eq(flowFile), eq(peerUrl + "/" + sourceFlowFileUuid), any(String.class),
-                    any(String.class), any(Long.class));
+            port.onTrigger(processContext, processSession);
+
+            // Assert provenance.
+            final List<ProvenanceEventRecord> provenanceEvents = sessionState.getProvenanceEvents();
+            assertEquals(1, provenanceEvents.size());
+            final ProvenanceEventRecord provenanceEvent = provenanceEvents.get(0);
+            assertEquals(ProvenanceEventType.RECEIVE, provenanceEvent.getEventType());
+            assertEquals(peerUrl + "/" + sourceFlowFileUuid, provenanceEvent.getTransitUri());
+            assertEquals("Remote DN=nifi.node1.example.com", provenanceEvent.getDetails());
+
+            // Assert received flow files.
+            processSession.assertAllFlowFilesTransferred(Relationship.ANONYMOUS);
+            final List<MockFlowFile> flowFiles = processSession.getFlowFilesForRelationship(Relationship.ANONYMOUS);
+            assertEquals(1, flowFiles.size());
+            final MockFlowFile flowFile = flowFiles.get(0);
+            flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_HOST.key(), peer.getHost());
+            flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_ADDRESS.key(), peer.getHost() + ":" + peer.getPort());
         }
 
     }
@@ -192,66 +217,76 @@ public class TestStandardRemoteGroupPort {
     public void testSendHttp() throws Exception {
 
         setupMock(SiteToSiteTransportProtocol.HTTP, TransferDirection.SEND);
+        setupMockProcessSession();
 
-        final String peerUrl = "http://node1.example.com:8080/nifi";
-        final PeerDescription peerDescription = new PeerDescription("node1.example.com", 8080, false);
+        final String peerUrl = "https://node1.example.com:8080/nifi";
+        final PeerDescription peerDescription = new PeerDescription("node1.example.com", 8080, true);
         final HttpCommunicationsSession commsSession = new HttpCommunicationsSession();
+        commsSession.setUserDn("nifi.node1.example.com");
         final Peer peer = new Peer(peerDescription, commsSession, peerUrl, REMOTE_CLUSTER_URL);
 
-        final String flowFileEndpointUri = "http://node1.example.com:8080/nifi-api/output-ports/port-id/transactions/transaction-id/flow-files";
+        final String flowFileEndpointUri = "https://node1.example.com:8080/nifi-api/output-ports/port-id/transactions/transaction-id/flow-files";
 
         doReturn(peer).when(transaction).getCommunicant();
         commsSession.setDataTransferUrl(flowFileEndpointUri);
 
-        final QueueSize queueSize = new QueueSize(1, 10);
-        final FlowFile flowFile = mock(FlowFile.class);
-
-        doReturn(queueSize).when(session).getQueueSize();
-        // Return null when it's called second time.
-        doReturn(flowFile).doReturn(null).when(session).get();
-
-        port.onTrigger(context, session);
+        final MockFlowFile flowFile = processSession.createFlowFile("0123456789".getBytes());
+        sessionState.getFlowFileQueue().offer(flowFile);
 
-        // peerUrl should be used as the transit url.
-        verify(provenanceReporter).send(eq(flowFile), eq(flowFileEndpointUri), any(String.class),
-                any(Long.class), eq(false));
+        port.onTrigger(processContext, processSession);
 
+        // Assert provenance.
+        final List<ProvenanceEventRecord> provenanceEvents = sessionState.getProvenanceEvents();
+        assertEquals(1, provenanceEvents.size());
+        final ProvenanceEventRecord provenanceEvent = provenanceEvents.get(0);
+        assertEquals(ProvenanceEventType.SEND, provenanceEvent.getEventType());
+        assertEquals(flowFileEndpointUri, provenanceEvent.getTransitUri());
+        assertEquals("Remote DN=nifi.node1.example.com", provenanceEvent.getDetails());
     }
 
     @Test
     public void testReceiveHttp() throws Exception {
 
         setupMock(SiteToSiteTransportProtocol.HTTP, TransferDirection.RECEIVE);
+        setupMockProcessSession();
 
-        final String peerUrl = "http://node1.example.com:8080/nifi";
-        final PeerDescription peerDescription = new PeerDescription("node1.example.com", 8080, false);
+        final String peerUrl = "https://node1.example.com:8080/nifi";
+        final PeerDescription peerDescription = new PeerDescription("node1.example.com", 8080, true);
         final HttpCommunicationsSession commsSession = new HttpCommunicationsSession();
+        commsSession.setUserDn("nifi.node1.example.com");
         final Peer peer = new Peer(peerDescription, commsSession, peerUrl, REMOTE_CLUSTER_URL);
 
-        final String flowFileEndpointUri = "http://node1.example.com:8080/nifi-api/output-ports/port-id/transactions/transaction-id/flow-files";
+        final String flowFileEndpointUri = "https://node1.example.com:8080/nifi-api/output-ports/port-id/transactions/transaction-id/flow-files";
 
         doReturn(peer).when(transaction).getCommunicant();
         commsSession.setDataTransferUrl(flowFileEndpointUri);
 
-        final FlowFile flowFile = mock(FlowFile.class);
         final Map<String, String> attributes = new HashMap<>();
         final byte[] dataPacketContents = "DataPacket Contents".getBytes();
         final ByteArrayInputStream dataPacketInputStream = new ByteArrayInputStream(dataPacketContents);
         final DataPacket dataPacket = new StandardDataPacket(attributes,
                 dataPacketInputStream, dataPacketContents.length);
 
-        doReturn(flowFile).when(session).create();
-        // Return null when it's called second time.
+        // Return null when it gets called second time.
         doReturn(dataPacket).doReturn(null).when(transaction).receive();
 
-        doReturn(flowFile).doReturn(flowFile).when(session).putAllAttributes(eq(flowFile), any(Map.class));
-        doReturn(flowFile).when(session).importFrom(any(InputStream.class), eq(flowFile));
-
-        port.onTrigger(context, session);
-
-        // peerUrl should be used as the transit url.
-        verify(provenanceReporter).receive(eq(flowFile), eq(flowFileEndpointUri), any(String.class),
-                any(String.class), any(Long.class));
+        port.onTrigger(processContext, processSession);
+
+        // Assert provenance.
+        final List<ProvenanceEventRecord> provenanceEvents = sessionState.getProvenanceEvents();
+        assertEquals(1, provenanceEvents.size());
+        final ProvenanceEventRecord provenanceEvent = provenanceEvents.get(0);
+        assertEquals(ProvenanceEventType.RECEIVE, provenanceEvent.getEventType());
+        assertEquals(flowFileEndpointUri, provenanceEvent.getTransitUri());
+        assertEquals("Remote DN=nifi.node1.example.com", provenanceEvent.getDetails());
+
+        // Assert received flow files.
+        processSession.assertAllFlowFilesTransferred(Relationship.ANONYMOUS);
+        final List<MockFlowFile> flowFiles = processSession.getFlowFilesForRelationship(Relationship.ANONYMOUS);
+        assertEquals(1, flowFiles.size());
+        final MockFlowFile flowFile = flowFiles.get(0);
+        flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_HOST.key(), peer.getHost());
+        flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_ADDRESS.key(), peer.getHost() + ":" + peer.getPort());
 
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/908e7d31/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java
index 9f86d5b..8d926c2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java
@@ -18,13 +18,15 @@ package org.apache.nifi.remote.protocol.http;
 
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.queue.FlowFileQueue;
-import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Processor;
 import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.provenance.ProvenanceReporter;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.remote.HttpRemoteSiteListener;
 import org.apache.nifi.remote.Peer;
 import org.apache.nifi.remote.PeerDescription;
@@ -40,32 +42,46 @@ import org.apache.nifi.remote.protocol.DataPacket;
 import org.apache.nifi.remote.protocol.ResponseCode;
 import org.apache.nifi.remote.protocol.HandshakeProperty;
 import org.apache.nifi.remote.util.StandardDataPacket;
-import org.apache.nifi.stream.io.ByteArrayInputStream;
-import org.apache.nifi.stream.io.ByteArrayOutputStream;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockProcessContext;
+import org.apache.nifi.util.MockProcessSession;
 import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.SharedSessionState;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class TestHttpFlowFileServerProtocol {
 
+    private SharedSessionState sessionState;
+    private MockProcessSession processSession;
+    private MockProcessContext processContext;
+
     @BeforeClass
     public static void setup() throws Exception {
         System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/nifi.properties");
@@ -270,11 +286,37 @@ public class TestHttpFlowFileServerProtocol {
         final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol();
 
         final String transactionId = "testTransferOneFile";
-        final Peer peer = transferOneFile(serverProtocol, transactionId);
+        final Peer peer = getDefaultPeer(transactionId);
+        final HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
+        final String endpointUri = "https://remote-host:8443/nifi-api/output-ports/port-id/transactions/"
+                + transactionId + "/flow-files";
+        commsSession.putHandshakeParam(HandshakeProperty.BATCH_COUNT, "1");
+        commsSession.setUserDn("unit-test");
+        commsSession.setDataTransferUrl(endpointUri);
+
+        transferFlowFiles(serverProtocol, transactionId, peer, processSession -> {
+            final MockFlowFile flowFile = processSession.createFlowFile("Server content".getBytes());
+            final HashMap<String, String> attributes = new HashMap<>();
+            attributes.put("uuid", "server-uuid");
+            attributes.put("filename", "server-filename");
+            attributes.put("server-attr-1", "server-attr-1-value");
+            attributes.put("server-attr-2", "server-attr-2-value");
+            flowFile.putAttributes(attributes);
+
+            return Arrays.asList(flowFile);
+        });
 
         // Commit transaction
-        final int flowFileSent = serverProtocol.commitTransferTransaction(peer, "2077607535");
+        final int flowFileSent = serverProtocol.commitTransferTransaction(peer, "3229577812");
         assertEquals(1, flowFileSent);
+
+        // Assert provenance
+        final List<ProvenanceEventRecord> provenanceEvents = sessionState.getProvenanceEvents();
+        assertEquals(1, provenanceEvents.size());
+        final ProvenanceEventRecord provenanceEvent = provenanceEvents.get(0);
+        assertEquals(ProvenanceEventType.SEND, provenanceEvent.getEventType());
+        assertEquals(endpointUri, provenanceEvent.getTransitUri());
+        assertEquals("Remote Host=peer-host, Remote DN=unit-test", provenanceEvent.getDetails());
     }
 
     @Test
@@ -282,7 +324,25 @@ public class TestHttpFlowFileServerProtocol {
         final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol();
 
         final String transactionId = "testTransferOneFileBadChecksum";
-        final Peer peer = transferOneFile(serverProtocol, transactionId);
+        final Peer peer = getDefaultPeer(transactionId);
+        final HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
+        final String endpointUri = "https://remote-host:8443/nifi-api/output-ports/port-id/transactions/"
+                + transactionId + "/flow-files";
+        commsSession.putHandshakeParam(HandshakeProperty.BATCH_COUNT, "1");
+        commsSession.setUserDn("unit-test");
+        commsSession.setDataTransferUrl(endpointUri);
+
+        transferFlowFiles(serverProtocol, transactionId, peer, processSession -> {
+            final MockFlowFile flowFile = processSession.createFlowFile("Server content".getBytes());
+            final HashMap<String, String> attributes = new HashMap<>();
+            attributes.put("uuid", "server-uuid");
+            attributes.put("filename", "server-filename");
+            attributes.put("server-attr-1", "server-attr-1-value");
+            attributes.put("server-attr-2", "server-attr-2-value");
+            flowFile.putAttributes(attributes);
+
+            return Arrays.asList(flowFile);
+        });
 
         // Commit transaction
         try {
@@ -293,44 +353,27 @@ public class TestHttpFlowFileServerProtocol {
         }
     }
 
-    private Peer transferOneFile(final HttpFlowFileServerProtocol serverProtocol, final String transactionId) throws IOException {
+    private Peer transferFlowFiles(final HttpFlowFileServerProtocol serverProtocol, final String transactionId,
+                                   final Peer peer, final Function<MockProcessSession,
+            Collection<MockFlowFile>> flowFileGenerator) throws IOException {
+        setupMockProcessSession();
+
+        // Enqueue flow files to be transferred.
+        final Collection<MockFlowFile> flowFiles = flowFileGenerator.apply(processSession);
+        for (final MockFlowFile flowFile : flowFiles) {
+            sessionState.getFlowFileQueue().offer(flowFile);
+        }
+
         final HttpRemoteSiteListener remoteSiteListener = HttpRemoteSiteListener.getInstance(NiFiProperties.createBasicNiFiProperties(null, null));
-        final Peer peer = getDefaultPeer(transactionId);
-        final HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
-        final String endpointUri = "https://peer-host:8443/nifi-api/output-ports/port-id/transactions/"
-                + transactionId + "/flow-files";
-        commsSession.putHandshakeParam(HandshakeProperty.BATCH_COUNT, "1");
-        commsSession.setUserDn("unit-test");
-        commsSession.setDataTransferUrl(endpointUri);
 
         serverProtocol.handshake(peer);
-
         assertTrue(serverProtocol.isHandshakeSuccessful());
 
         final FlowFileCodec negotiatedCoded = serverProtocol.negotiateCodec(peer);
-        final ProcessContext context = mock(ProcessContext.class);
-        final ProcessSession processSession = mock(ProcessSession.class);
-        final ProvenanceReporter provenanceReporter = mock(ProvenanceReporter.class);
-        final FlowFile flowFile = mock(FlowFile.class);
-        doReturn(flowFile).when(processSession).get();
-        doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
-        doAnswer(invocation -> {
-            final String transitUri = (String)invocation.getArguments()[1];
-            final String detail = (String)invocation.getArguments()[2];
-            assertEquals(endpointUri, transitUri);
-            assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail);
-            return null;
-        }).when(provenanceReporter).send(eq(flowFile), any(String.class), any(String.class), any(Long.class), any(Boolean.class));
-
-        doAnswer(invocation -> {
-            final InputStreamCallback callback = (InputStreamCallback)invocation.getArguments()[1];
-            callback.process(new java.io.ByteArrayInputStream("Server content".getBytes()));
-            return null;
-        }).when(processSession).read(any(FlowFile.class), any(InputStreamCallback.class));
 
         // Execute test using mock
-        final int flowFileSent = serverProtocol.transferFlowFiles(peer, context, processSession, negotiatedCoded);
-        assertEquals(1, flowFileSent);
+        final int flowFileSent = serverProtocol.transferFlowFiles(peer, processContext, processSession, negotiatedCoded);
+        assertEquals(flowFiles.size(), flowFileSent);
 
         assertTrue(remoteSiteListener.isTransactionActive(transactionId));
         return peer;
@@ -338,11 +381,9 @@ public class TestHttpFlowFileServerProtocol {
 
     @Test
     public void testTransferTwoFiles() throws Exception {
-        final HttpRemoteSiteListener remoteSiteListener = HttpRemoteSiteListener.getInstance(NiFiProperties.createBasicNiFiProperties(null, null));
-
         final String transactionId = "testTransferTwoFiles";
         final Peer peer = getDefaultPeer(transactionId);
-        final String endpointUri = "https://peer-host:8443/nifi-api/output-ports/port-id/transactions/"
+        final String endpointUri = "https://remote-host:8443/nifi-api/output-ports/port-id/transactions/"
                 + transactionId + "/flow-files";
         final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol();
         final HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
@@ -350,53 +391,32 @@ public class TestHttpFlowFileServerProtocol {
         commsSession.setUserDn("unit-test");
         commsSession.setDataTransferUrl(endpointUri);
 
-        serverProtocol.handshake(peer);
-
-        assertTrue(serverProtocol.isHandshakeSuccessful());
-
-        final FlowFileCodec negotiatedCoded = serverProtocol.negotiateCodec(peer);
-        final ProcessContext context = mock(ProcessContext.class);
-        final ProcessSession processSession = mock(ProcessSession.class);
-        final ProvenanceReporter provenanceReporter = mock(ProvenanceReporter.class);
-        final FlowFile flowFile1 = mock(FlowFile.class);
-        final FlowFile flowFile2 = mock(FlowFile.class);
-        doReturn(flowFile1)
-                .doReturn(flowFile2)
-                .when(processSession).get();
-
-        doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
-        doAnswer(invocation -> {
-            final String transitUri = (String)invocation.getArguments()[1];
-            final String detail = (String)invocation.getArguments()[2];
-            assertEquals(endpointUri, transitUri);
-            assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail);
-            return null;
-        }).when(provenanceReporter).send(eq(flowFile1), any(String.class), any(String.class), any(Long.class), any(Boolean.class));
-
-        doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
-        doAnswer(invocation -> {
-            final String transitUri = (String)invocation.getArguments()[1];
-            final String detail = (String)invocation.getArguments()[2];
-            assertEquals(endpointUri, transitUri);
-            assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail);
-            return null;
-        }).when(provenanceReporter).send(eq(flowFile2), any(String.class), any(String.class), any(Long.class), any(Boolean.class));
-
-        doAnswer(invocation -> {
-            final InputStreamCallback callback = (InputStreamCallback)invocation.getArguments()[1];
-            callback.process(new java.io.ByteArrayInputStream("Server content".getBytes()));
-            return null;
-        }).when(processSession).read(any(FlowFile.class), any(InputStreamCallback.class));
+        transferFlowFiles(serverProtocol, transactionId, peer, processSession ->
+            IntStream.of(1, 2).mapToObj(i -> {
+                final MockFlowFile flowFile = processSession.createFlowFile(("Server content " + i).getBytes());
+                final HashMap<String, String> attributes = new HashMap<>();
+                attributes.put("uuid", "server-uuid-" + i);
+                attributes.put("filename", "server-filename-" + i);
+                attributes.put("server-attr-" + i + "-1", "server-attr-" + i + "-1-value");
+                attributes.put("server-attr-" + i + "-2", "server-attr-" + i + "-2-value");
+                flowFile.putAttributes(attributes);
+                return flowFile;
+            }).collect(Collectors.toList())
+        );
 
-        // Execute test using mock
-        int flowFileSent = serverProtocol.transferFlowFiles(peer, context, processSession, negotiatedCoded);
+        // Commit transaction
+        final int flowFileSent = serverProtocol.commitTransferTransaction(peer, "3058746557");
         assertEquals(2, flowFileSent);
 
-        assertTrue(remoteSiteListener.isTransactionActive(transactionId));
+        // Assert provenance
+        final List<ProvenanceEventRecord> provenanceEvents = sessionState.getProvenanceEvents();
+        assertEquals(2, provenanceEvents.size());
+        for (final ProvenanceEventRecord provenanceEvent : provenanceEvents) {
+            assertEquals(ProvenanceEventType.SEND, provenanceEvent.getEventType());
+            assertEquals(endpointUri, provenanceEvent.getTransitUri());
+            assertEquals("Remote Host=peer-host, Remote DN=unit-test", provenanceEvent.getDetails());
+        }
 
-        // Commit transaction
-        flowFileSent = serverProtocol.commitTransferTransaction(peer, "2747386400");
-        assertEquals(2, flowFileSent);
     }
 
     private DataPacket createClientDataPacket() {
@@ -404,6 +424,7 @@ public class TestHttpFlowFileServerProtocol {
         final byte[] bytes = contents.getBytes();
         final InputStream in = new ByteArrayInputStream(bytes);
         final Map<String, String> attributes = new HashMap<>();
+        attributes.put(CoreAttributes.UUID.key(), "client-flow-file-uuid");
         attributes.put("client-attr-1", "client-attr-1-value");
         attributes.put("client-attr-2", "client-attr-2-value");
         return new StandardDataPacket(attributes, in, bytes.length);
@@ -440,14 +461,42 @@ public class TestHttpFlowFileServerProtocol {
         final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol();
 
         final String transactionId = "testReceiveOneFile";
+        final String endpointUri = "https://remote-host:8443/nifi-api/input-ports/port-id/transactions/"
+                + transactionId + "/flow-files";
+
         final Peer peer = getDefaultPeer(transactionId);
+
         final HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
-        receiveOneFile(serverProtocol, transactionId, peer);
+        commsSession.putHandshakeParam(HandshakeProperty.BATCH_COUNT, "1");
+        commsSession.setUserDn("unit-test");
+        commsSession.setDataTransferUrl(endpointUri);
+
+        final DataPacket dataPacket = createClientDataPacket();
+        receiveFlowFiles(serverProtocol, transactionId, peer, dataPacket);
 
         // Commit transaction
         commsSession.setResponseCode(ResponseCode.CONFIRM_TRANSACTION);
         final int flowFileReceived = serverProtocol.commitReceiveTransaction(peer);
         assertEquals(1, flowFileReceived);
+
+        // Assert provenance.
+        final List<ProvenanceEventRecord> provenanceEvents = sessionState.getProvenanceEvents();
+        assertEquals(1, provenanceEvents.size());
+        final ProvenanceEventRecord provenanceEvent = provenanceEvents.get(0);
+        assertEquals(ProvenanceEventType.RECEIVE, provenanceEvent.getEventType());
+        assertEquals(endpointUri, provenanceEvent.getTransitUri());
+        assertEquals("Remote Host=peer-host, Remote DN=unit-test", provenanceEvent.getDetails());
+
+        // Assert received flow files.
+        processSession.assertAllFlowFilesTransferred(Relationship.ANONYMOUS);
+        final List<MockFlowFile> flowFiles = processSession.getFlowFilesForRelationship(Relationship.ANONYMOUS);
+        assertEquals(1, flowFiles.size());
+        final MockFlowFile flowFile = flowFiles.get(0);
+        flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_HOST.key(), peer.getHost());
+        flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_ADDRESS.key(), peer.getHost() + ":" + peer.getPort());
+        flowFile.assertAttributeEquals("client-attr-1", "client-attr-1-value");
+        flowFile.assertAttributeEquals("client-attr-2", "client-attr-2-value");
+
     }
 
     @Test
@@ -457,7 +506,7 @@ public class TestHttpFlowFileServerProtocol {
         final String transactionId = "testReceiveOneFileBadChecksum";
         final Peer peer = getDefaultPeer(transactionId);
         final HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
-        receiveOneFile(serverProtocol, transactionId, peer);
+        receiveFlowFiles(serverProtocol, transactionId, peer, createClientDataPacket());
 
         // Commit transaction
         commsSession.setResponseCode(ResponseCode.BAD_CHECKSUM);
@@ -469,71 +518,48 @@ public class TestHttpFlowFileServerProtocol {
         }
     }
 
-    private void receiveOneFile(final HttpFlowFileServerProtocol serverProtocol, final String transactionId, final Peer peer) throws IOException {
+    private void receiveFlowFiles(final HttpFlowFileServerProtocol serverProtocol, final String transactionId, final Peer peer, final DataPacket ... dataPackets) throws IOException {
         final HttpRemoteSiteListener remoteSiteListener = HttpRemoteSiteListener.getInstance(NiFiProperties.createBasicNiFiProperties(null, null));
-        final String endpointUri = "https://peer-host:8443/nifi-api/input-ports/port-id/transactions/"
-                + transactionId + "/flow-files";
         final HttpServerCommunicationsSession commsSession = (HttpServerCommunicationsSession) peer.getCommunicationsSession();
-        commsSession.putHandshakeParam(HandshakeProperty.BATCH_COUNT, "1");
-        commsSession.setUserDn("unit-test");
-        commsSession.setDataTransferUrl(endpointUri);
 
         serverProtocol.handshake(peer);
-
         assertTrue(serverProtocol.isHandshakeSuccessful());
 
-        final FlowFileCodec negotiatedCoded = serverProtocol.negotiateCodec(peer);
-        final ProcessContext context = mock(ProcessContext.class);
-        final ProcessSession processSession = mock(ProcessSession.class);
-        final ProvenanceReporter provenanceReporter = mock(ProvenanceReporter.class);
-        final FlowFile flowFile = mock(FlowFile.class);
-
-        final DataPacket dataPacket = createClientDataPacket();
+        setupMockProcessSession();
 
+        // Emulate dataPackets sent from a Site-to-Site client.
+        final FlowFileCodec negotiatedCoded = serverProtocol.negotiateCodec(peer);
         final ByteArrayOutputStream testDataOs = new ByteArrayOutputStream();
-        negotiatedCoded.encode(dataPacket, testDataOs);
+        for (final DataPacket dataPacket : dataPackets) {
+            negotiatedCoded.encode(dataPacket, testDataOs);
+        }
         final InputStream httpInputStream = new ByteArrayInputStream(testDataOs.toByteArray());
-
         ((HttpInput)commsSession.getInput()).setInputStream(httpInputStream);
 
-        doAnswer(invocation -> {
-            final InputStream is = (InputStream) invocation.getArguments()[0];
-            for (int b; (b = is.read()) >= 0;) {
-                // consume stream.
-            }
-            return flowFile;
-        }).when(processSession).importFrom(any(InputStream.class), any(FlowFile.class));
-        // AbstractFlowFileServerProtocol adopts builder pattern and putAttribute is the last execution
-        // which returns flowFile instance used later.
-        doReturn(flowFile).when(processSession).putAllAttributes(any(FlowFile.class), any(Map.class));
-        doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
-        doAnswer(invocation -> {
-            final String transitUri = (String)invocation.getArguments()[1];
-            final String detail = (String)invocation.getArguments()[3];
-            assertEquals(endpointUri, transitUri);
-            assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail);
-            return null;
-        }).when(provenanceReporter)
-                .receive(any(FlowFile.class), any(String.class), any(String.class), any(String.class), any(Long.class));
-
-        final Set<Relationship> relations = new HashSet<>();
-        final Relationship relationship = new Relationship.Builder().build();
-        relations.add(relationship);
-        doReturn(relations).when(context).getAvailableRelationships();
-
         // Execute test using mock
-        final int flowFileReceived = serverProtocol.receiveFlowFiles(peer, context, processSession, negotiatedCoded);
-        assertEquals(1, flowFileReceived);
+        final int flowFileReceived = serverProtocol.receiveFlowFiles(peer, processContext, processSession, negotiatedCoded);
+        assertEquals(dataPackets.length, flowFileReceived);
 
         assertTrue(remoteSiteListener.isTransactionActive(transactionId));
     }
 
+    private void setupMockProcessSession() {
+        // Construct a RootGroupPort as a processor to use NiFi mock library.
+        final Processor rootGroupPort = mock(Processor.class);
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(Relationship.ANONYMOUS);
+        when(rootGroupPort.getRelationships()).thenReturn(relationships);
+        when(rootGroupPort.getIdentifier()).thenReturn("root-group-port-id");
+
+        sessionState = new SharedSessionState(rootGroupPort, new AtomicLong(0));
+        processSession = new MockProcessSession(sessionState, rootGroupPort);
+        processContext = new MockProcessContext(rootGroupPort);
+    }
+
     @Test
     public void testReceiveTwoFiles() throws Exception {
-        final HttpRemoteSiteListener remoteSiteListener = HttpRemoteSiteListener.getInstance(NiFiProperties.createBasicNiFiProperties(null, null));
-
         final String transactionId = "testReceiveTwoFile";
-        final String endpointUri = "https://peer-host:8443/nifi-api/input-ports/port-id/transactions/"
+        final String endpointUri = "https://remote-host:8443/nifi-api/input-ports/port-id/transactions/"
                 + transactionId + "/flow-files";
         final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol();
         final Peer peer = getDefaultPeer(transactionId);
@@ -542,63 +568,32 @@ public class TestHttpFlowFileServerProtocol {
         commsSession.setUserDn("unit-test");
         commsSession.setDataTransferUrl(endpointUri);
 
-        serverProtocol.handshake(peer);
-
-        assertTrue(serverProtocol.isHandshakeSuccessful());
-
-        final FlowFileCodec negotiatedCoded = serverProtocol.negotiateCodec(peer);
-        final ProcessContext context = mock(ProcessContext.class);
-        final ProcessSession processSession = mock(ProcessSession.class);
-        final ProvenanceReporter provenanceReporter = mock(ProvenanceReporter.class);
-        final FlowFile flowFile1 = mock(FlowFile.class);
-        final FlowFile flowFile2 = mock(FlowFile.class);
-
-        final ByteArrayOutputStream testDataOs = new ByteArrayOutputStream();
-        negotiatedCoded.encode(createClientDataPacket(), testDataOs);
-        negotiatedCoded.encode(createClientDataPacket(), testDataOs);
-        final InputStream httpInputStream = new ByteArrayInputStream(testDataOs.toByteArray());
-
-        ((HttpInput)commsSession.getInput()).setInputStream(httpInputStream);
-
-        doAnswer(invocation -> {
-            final InputStream is = (InputStream) invocation.getArguments()[0];
-            for (int b; (b = is.read()) >= 0;) {
-                // consume stream.
-            }
-            return flowFile1;
-        }).when(processSession).importFrom(any(InputStream.class), any(FlowFile.class));
-
-        // AbstractFlowFileServerProtocol adopts builder pattern and putAllAttributes is the last execution
-        // which returns flowFile instance used later, it is called twice for each flow file
-        doReturn(flowFile1)
-                .doReturn(flowFile1)
-                .doReturn(flowFile2)
-                .doReturn(flowFile2)
-                .when(processSession).putAllAttributes(any(FlowFile.class), any(Map.class));
-        doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
-
-        doAnswer(invocation -> {
-            final String transitUri = (String)invocation.getArguments()[1];
-            final String detail = (String)invocation.getArguments()[3];
-            assertEquals(endpointUri, transitUri);
-            assertEquals("Remote Host=peer-host, Remote DN=unit-test", detail);
-            return null;
-        }).when(provenanceReporter)
-                .receive(any(FlowFile.class), any(String.class), any(String.class), any(String.class), any(Long.class));
-
-        final Set<Relationship> relations = new HashSet<>();
-        doReturn(relations).when(context).getAvailableRelationships();
-
-        // Execute test using mock
-        int flowFileReceived = serverProtocol.receiveFlowFiles(peer, context, processSession, negotiatedCoded);
-        assertEquals(2, flowFileReceived);
-
-        assertTrue(remoteSiteListener.isTransactionActive(transactionId));
+        receiveFlowFiles(serverProtocol, transactionId, peer, createClientDataPacket(), createClientDataPacket());
 
         // Commit transaction
         commsSession.setResponseCode(ResponseCode.CONFIRM_TRANSACTION);
-        flowFileReceived = serverProtocol.commitReceiveTransaction(peer);
+        final int flowFileReceived = serverProtocol.commitReceiveTransaction(peer);
         assertEquals(2, flowFileReceived);
+
+        // Assert provenance.
+        final List<ProvenanceEventRecord> provenanceEvents = sessionState.getProvenanceEvents();
+        assertEquals(2, provenanceEvents.size());
+        for (final ProvenanceEventRecord provenanceEvent : provenanceEvents) {
+            assertEquals(ProvenanceEventType.RECEIVE, provenanceEvent.getEventType());
+            assertEquals(endpointUri, provenanceEvent.getTransitUri());
+            assertEquals("Remote Host=peer-host, Remote DN=unit-test", provenanceEvent.getDetails());
+        }
+
+        // Assert received flow files.
+        processSession.assertAllFlowFilesTransferred(Relationship.ANONYMOUS);
+        final List<MockFlowFile> flowFiles = processSession.getFlowFilesForRelationship(Relationship.ANONYMOUS);
+        assertEquals(2, flowFiles.size());
+        for (final MockFlowFile flowFile : flowFiles) {
+            flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_HOST.key(), peer.getHost());
+            flowFile.assertAttributeEquals(SiteToSiteAttributes.S2S_ADDRESS.key(), peer.getHost() + ":" + peer.getPort());
+            flowFile.assertAttributeEquals("client-attr-1", "client-attr-1-value");
+            flowFile.assertAttributeEquals("client-attr-2", "client-attr-2-value");
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/908e7d31/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
index 831b2f5..d8a71dd 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
@@ -77,6 +77,8 @@ import javax.ws.rs.core.UriInfo;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.nifi.remote.protocol.HandshakeProperty.BATCH_COUNT;
@@ -316,7 +318,14 @@ public class DataTransferResource extends ApplicationResource {
 
     private Peer constructPeer(final HttpServletRequest req, final InputStream inputStream,
                                final OutputStream outputStream, final String portId, final String transactionId) {
-        final String clientHostName = req.getRemoteHost();
+        String clientHostName = req.getRemoteHost();
+        try {
+            // req.getRemoteHost returns IP address, try to resolve hostname to be consistent with RAW protocol.
+            final InetAddress clientAddress = InetAddress.getByName(clientHostName);
+            clientHostName = clientAddress.getHostName();
+        } catch (UnknownHostException e) {
+            logger.info("Failed to resolve client hostname {}, due to {}", clientHostName, e.getMessage());
+        }
         final int clientPort = req.getRemotePort();
 
         final PeerDescription peerDescription = new PeerDescription(clientHostName, clientPort, req.isSecure());


[2/3] nifi git commit: NIFI-2585 Moving attributes into loop in AbstractFlowFileServerProtocol, and also updating StandardRemoteGroupPort to apply the same attributes when doing a pull-based site-to-site.

Posted by bb...@apache.org.
NIFI-2585 Moving attributes into loop in AbstractFlowFileServerProtocol, and also updating StandardRemoteGroupPort to apply the same attributes when doing a pull-based site-to-site.

Signed-off-by: Bryan Bende <bb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f7d761a2
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f7d761a2
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f7d761a2

Branch: refs/heads/master
Commit: f7d761a28ae3a07cb8e614e243e197716df025b1
Parents: 28e5d85
Author: Bryan Bende <bb...@apache.org>
Authored: Thu Dec 8 15:17:06 2016 -0500
Committer: Bryan Bende <bb...@apache.org>
Committed: Thu Dec 22 10:59:56 2016 -0500

----------------------------------------------------------------------
 .../attributes/SiteToSiteAttributes.java        | 39 ++++++++++++++++++++
 .../nifi/remote/StandardRemoteGroupPort.java    | 15 ++++++++
 .../AbstractFlowFileServerProtocol.java         | 20 ++++++----
 .../remote/TestStandardRemoteGroupPort.java     |  4 +-
 .../http/TestHttpFlowFileServerProtocol.java    | 12 ++++--
 5 files changed, 77 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/f7d761a2/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/SiteToSiteAttributes.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/SiteToSiteAttributes.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/SiteToSiteAttributes.java
new file mode 100644
index 0000000..ed6437e
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/SiteToSiteAttributes.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.flowfile.attributes;
+
+/**
+ * FlowFile attributes used during site-to-site transfer.
+ */
+public enum SiteToSiteAttributes implements FlowFileAttributeKey {
+
+    S2S_HOST("s2s.host"),
+
+    S2S_ADDRESS("s2s.address");
+
+    private final String key;
+
+    private SiteToSiteAttributes(final String key) {
+        this.key = key;
+    }
+
+    @Override
+    public String key() {
+        return key;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/f7d761a2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index 3a23601..b1a1c92 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -21,7 +21,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -37,6 +39,7 @@ import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.processor.ProcessContext;
@@ -56,6 +59,7 @@ import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -338,6 +342,17 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
 
             FlowFile flowFile = session.create();
             flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes());
+
+            final Communicant communicant = transaction.getCommunicant();
+            final String host = StringUtils.isEmpty(communicant.getHost()) ? "unknown" : communicant.getHost();
+            final String port = communicant.getPort() < 0 ? "unknown" : String.valueOf(communicant.getPort());
+
+            final Map<String,String> attributes = new HashMap<>(2);
+            attributes.put(SiteToSiteAttributes.S2S_HOST.key(), host);
+            attributes.put(SiteToSiteAttributes.S2S_ADDRESS.key(), host + ":" + port);
+
+            flowFile = session.putAllAttributes(flowFile, attributes);
+
             flowFile = session.importFrom(dataPacket.getData(), flowFile);
             final long receiveNanos = System.nanoTime() - start;
             flowFilesReceived.add(flowFile);

http://git-wip-us.apache.org/repos/asf/nifi/blob/f7d761a2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java
index e149481..fe4b1b1 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java
@@ -20,6 +20,7 @@ import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.connectable.Port;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
@@ -37,6 +38,7 @@ import org.apache.nifi.remote.io.CompressionOutputStream;
 import org.apache.nifi.remote.util.StandardDataPacket;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,6 +47,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -448,7 +451,16 @@ public abstract class AbstractFlowFileServerProtocol implements ServerProtocol {
             final long transferNanos = System.nanoTime() - startNanos;
             final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
             final String sourceSystemFlowFileUuid = dataPacket.getAttributes().get(CoreAttributes.UUID.key());
-            flowFile = session.putAttribute(flowFile, CoreAttributes.UUID.key(), UUID.randomUUID().toString());
+
+            final String host = StringUtils.isEmpty(peer.getHost()) ? "unknown" : peer.getHost();
+            final String port = peer.getPort() <= 0 ? "unknown" : String.valueOf(peer.getPort());
+
+            final Map<String,String> attributes = new HashMap<>(4);
+            attributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
+            attributes.put(SiteToSiteAttributes.S2S_HOST.key(), host);
+            attributes.put(SiteToSiteAttributes.S2S_ADDRESS.key(), host + ":" + port);
+
+            flowFile = session.putAllAttributes(flowFile, attributes);
 
             final String transitUri = createTransitUri(peer, sourceSystemFlowFileUuid);
             session.getProvenanceReporter().receive(flowFile, transitUri, sourceSystemFlowFileUuid == null
@@ -506,12 +518,6 @@ public abstract class AbstractFlowFileServerProtocol implements ServerProtocol {
                 throw new ProtocolException(this + " Received unexpected Response Code from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code");
         }
 
-        // For routing purposes, downstream consumers often need to reference Flowfile's originating system
-        for (FlowFile flowFile : transaction.getFlowFilesSent()){
-          flowFile = session.putAttribute(flowFile, "remote.host", peer.getHost());
-          flowFile = session.putAttribute(flowFile, "remote.address", peer.getHost() + ":" + peer.getPort());
-        }
-
         // Commit the session so that we have persisted the data
         session.commit();
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/f7d761a2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
index 23d3fda..43009b4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
@@ -176,7 +176,7 @@ public class TestStandardRemoteGroupPort {
             // Return null when it gets called second time.
             doReturn(dataPacket).doReturn(null).when(this.transaction).receive();
 
-            doReturn(flowFile).when(session).putAllAttributes(eq(flowFile), eq(attributes));
+            doReturn(flowFile).doReturn(flowFile).when(session).putAllAttributes(eq(flowFile), any(Map.class));
             doReturn(flowFile).when(session).importFrom(any(InputStream.class), eq(flowFile));
 
             port.onTrigger(context, session);
@@ -244,7 +244,7 @@ public class TestStandardRemoteGroupPort {
         // Return null when it's called second time.
         doReturn(dataPacket).doReturn(null).when(transaction).receive();
 
-        doReturn(flowFile).when(session).putAllAttributes(eq(flowFile), eq(attributes));
+        doReturn(flowFile).doReturn(flowFile).when(session).putAllAttributes(eq(flowFile), any(Map.class));
         doReturn(flowFile).when(session).importFrom(any(InputStream.class), eq(flowFile));
 
         port.onTrigger(context, session);

http://git-wip-us.apache.org/repos/asf/nifi/blob/f7d761a2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java
index f5e803d..9f86d5b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/protocol/http/TestHttpFlowFileServerProtocol.java
@@ -505,7 +505,7 @@ public class TestHttpFlowFileServerProtocol {
         }).when(processSession).importFrom(any(InputStream.class), any(FlowFile.class));
         // AbstractFlowFileServerProtocol adopts builder pattern and putAttribute is the last execution
         // which returns flowFile instance used later.
-        doReturn(flowFile).when(processSession).putAttribute(any(FlowFile.class), any(String.class), any(String.class));
+        doReturn(flowFile).when(processSession).putAllAttributes(any(FlowFile.class), any(Map.class));
         doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
         doAnswer(invocation -> {
             final String transitUri = (String)invocation.getArguments()[1];
@@ -567,12 +567,16 @@ public class TestHttpFlowFileServerProtocol {
             }
             return flowFile1;
         }).when(processSession).importFrom(any(InputStream.class), any(FlowFile.class));
-        // AbstractFlowFileServerProtocol adopts builder pattern and putAttribute is the last execution
-        // which returns flowFile instance used later.
+
+        // AbstractFlowFileServerProtocol adopts builder pattern and putAllAttributes is the last execution
+        // which returns flowFile instance used later, it is called twice for each flow file
         doReturn(flowFile1)
+                .doReturn(flowFile1)
                 .doReturn(flowFile2)
-                .when(processSession).putAttribute(any(FlowFile.class), any(String.class), any(String.class));
+                .doReturn(flowFile2)
+                .when(processSession).putAllAttributes(any(FlowFile.class), any(Map.class));
         doReturn(provenanceReporter).when(processSession).getProvenanceReporter();
+
         doAnswer(invocation -> {
             final String transitUri = (String)invocation.getArguments()[1];
             final String detail = (String)invocation.getArguments()[3];