You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2018/07/10 15:48:46 UTC

[2/2] nifi git commit: NIFI-4654: Support reporting RAS S2S lineage to Atlas

NIFI-4654: Support reporting RAS S2S lineage to Atlas

- Added 's2s.port.id' FlowFile attribute to track target remote Port id
- Use 's2s.port.id' to analyze RAW S2S provenance events
- This closes #2863


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e94f0757
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e94f0757
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e94f0757

Branch: refs/heads/master
Commit: e94f0757db617c649ae054d7c914feade9e1185e
Parents: b279624
Author: Koji Kawamura <ij...@apache.org>
Authored: Mon Jul 9 11:28:17 2018 +0900
Committer: Matt Gilman <ma...@gmail.com>
Committed: Tue Jul 10 11:44:00 2018 -0400

----------------------------------------------------------------------
 .../attributes/SiteToSiteAttributes.java        |   4 +-
 .../provenance/analyzer/NiFiRemotePort.java     |  17 +-
 .../provenance/analyzer/NiFiRootGroupPort.java  |   9 +-
 .../nifi/atlas/provenance/analyzer/NiFiS2S.java |  35 +-
 .../additionalDetails.html                      |   5 +-
 .../provenance/analyzer/TestNiFiRemotePort.java |  85 +-
 .../analyzer/TestNiFiRootGroupPort.java         | 203 +++++
 .../atlas/reporting/ITReportLineageToAtlas.java | 280 +++++++
 .../test/resources/flow-templates/S2SGetRAW.xml | 443 ++++++++++
 .../resources/flow-templates/S2SSendRAW.xml     | 822 +++++++++++++++++++
 .../nifi/remote/StandardRemoteGroupPort.java    |   2 +
 .../remote/TestStandardRemoteGroupPort.java     |   4 +
 12 files changed, 1882 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/e94f0757/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/SiteToSiteAttributes.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/SiteToSiteAttributes.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/SiteToSiteAttributes.java
index ed6437e..7af0f0b 100644
--- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/SiteToSiteAttributes.java
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/SiteToSiteAttributes.java
@@ -23,7 +23,9 @@ public enum SiteToSiteAttributes implements FlowFileAttributeKey {
 
     S2S_HOST("s2s.host"),
 
-    S2S_ADDRESS("s2s.address");
+    S2S_ADDRESS("s2s.address"),
+
+    S2S_PORT_ID("s2s.port.id");
 
     private final String key;
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/e94f0757/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRemotePort.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRemotePort.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRemotePort.java
index e2118f7..69b2d47 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRemotePort.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRemotePort.java
@@ -20,6 +20,7 @@ import org.apache.atlas.typesystem.Referenceable;
 import org.apache.nifi.atlas.provenance.AnalysisContext;
 import org.apache.nifi.atlas.provenance.DataSetRefs;
 import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.slf4j.Logger;
@@ -53,14 +54,13 @@ public class NiFiRemotePort extends NiFiS2S {
         final boolean isRemoteInputPort = event.getComponentType().equals("Remote Input Port");
         final String type = isRemoteInputPort ? TYPE_NIFI_INPUT_PORT : TYPE_NIFI_OUTPUT_PORT;
 
-        final String remotePortId = event.getComponentId();
-
-        final S2STransitUrl s2sUrl = parseTransitURL(event.getTransitUri(), context.getClusterResolver());
+        final S2SPort s2SPort = analyzeS2SPort(event, context.getClusterResolver());
 
         // Find connections that connects to/from the remote port.
+        final String componentId = event.getComponentId();
         final List<ConnectionStatus> connections = isRemoteInputPort
-                ? context.findConnectionTo(remotePortId)
-                : context.findConnectionFrom(remotePortId);
+                ? context.findConnectionTo(componentId)
+                : context.findConnectionFrom(componentId);
         if (connections == null || connections.isEmpty()) {
             logger.warn("Connection was not found: {}", new Object[]{event});
             return null;
@@ -70,7 +70,7 @@ public class NiFiRemotePort extends NiFiS2S {
         final ConnectionStatus connection = connections.get(0);
         final Referenceable ref = new Referenceable(type);
         ref.set(ATTR_NAME, isRemoteInputPort ? connection.getDestinationName() : connection.getSourceName());
-        ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(s2sUrl.clusterName, s2sUrl.targetPortId));
+        ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(s2SPort.clusterName, s2SPort.targetPortId));
 
         return singleDataSetRef(event.getComponentId(), event.getEventType(), ref);
     }
@@ -79,4 +79,9 @@ public class NiFiRemotePort extends NiFiS2S {
     public String targetComponentTypePattern() {
         return "^Remote (In|Out)put Port$";
     }
+
+    @Override
+    protected String getRawProtocolPortId(ProvenanceEventRecord event) {
+        return event.getAttribute(SiteToSiteAttributes.S2S_PORT_ID.key());
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e94f0757/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRootGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRootGroupPort.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRootGroupPort.java
index 4f66025..e791c62 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRootGroupPort.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRootGroupPort.java
@@ -54,7 +54,7 @@ public class NiFiRootGroupPort extends NiFiS2S {
         final String type = isInputPort ? TYPE_NIFI_INPUT_PORT : TYPE_NIFI_OUTPUT_PORT;
         final String rootPortId = event.getComponentId();
 
-        final S2STransitUrl s2sUrl = parseTransitURL(event.getTransitUri(), context.getClusterResolver());
+        final S2SPort s2SPort = analyzeS2SPort(event, context.getClusterResolver());
 
         // Find connections connecting to/from the remote port.
         final List<ConnectionStatus> connections = isInputPort
@@ -69,7 +69,7 @@ public class NiFiRootGroupPort extends NiFiS2S {
         final ConnectionStatus connection = connections.get(0);
         final Referenceable ref = new Referenceable(type);
         ref.set(ATTR_NAME, isInputPort ? connection.getSourceName() : connection.getDestinationName());
-        ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(s2sUrl.clusterName, rootPortId));
+        ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(s2SPort.clusterName, rootPortId));
 
         return singleDataSetRef(event.getComponentId(), event.getEventType(), ref);
     }
@@ -78,4 +78,9 @@ public class NiFiRootGroupPort extends NiFiS2S {
     public String targetComponentTypePattern() {
         return "^(In|Out)put Port$";
     }
+
+    @Override
+    protected String getRawProtocolPortId(ProvenanceEventRecord event) {
+        return event.getComponentId();
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e94f0757/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiS2S.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiS2S.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiS2S.java
index 762a1aa..d205b3e 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiS2S.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiS2S.java
@@ -18,10 +18,10 @@ package org.apache.nifi.atlas.provenance.analyzer;
 
 import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
 import org.apache.nifi.atlas.resolver.ClusterResolver;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.URL;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -29,27 +29,30 @@ public abstract class NiFiS2S extends AbstractNiFiProvenanceEventAnalyzer {
 
     private static final Logger logger = LoggerFactory.getLogger(NiFiS2S.class);
 
-    private static final Pattern RAW_URL_REGEX = Pattern.compile("([0-9a-zA-Z\\-]+)");
+    private static final Pattern RAW_URL_REGEX = Pattern.compile("nifi://([^:/]+):\\d+/([0-9a-zA-Z\\-]+)");
     private static final Pattern HTTP_URL_REGEX = Pattern.compile(".*/nifi-api/data-transfer/(in|out)put-ports/([[0-9a-zA-Z\\-]]+)/transactions/.*");
 
-    protected S2STransitUrl parseTransitURL(String transitUri, ClusterResolver clusterResolver) {
-        final URL url = parseUrl(transitUri);
+    protected S2SPort analyzeS2SPort(ProvenanceEventRecord event, ClusterResolver clusterResolver) {
+        final String transitUri = event.getTransitUri();
+        final int protocolIndex = transitUri.indexOf(':');
+        final String protocol = transitUri.substring(0, protocolIndex).toLowerCase();
 
-        final String clusterName = clusterResolver.fromHostNames(url.getHost());
+        final String targetHostname;
         final String targetPortId;
-        final String protocol = url.getProtocol().toLowerCase();
         switch (protocol) {
 
             case "http":
             case "https": {
-                final Matcher uriMatcher = matchUrl(url, HTTP_URL_REGEX);
+                final Matcher uriMatcher = matchUrl(transitUri, HTTP_URL_REGEX);
+                targetHostname = parseUri(transitUri).getHost();
                 targetPortId = uriMatcher.group(2);
             }
             break;
 
             case "nifi": {
-                final Matcher uriMatcher = matchUrl(url, RAW_URL_REGEX);
-                targetPortId = uriMatcher.group(1);
+                final Matcher uriMatcher = matchUrl(transitUri, RAW_URL_REGEX);
+                targetHostname = uriMatcher.group(1);
+                targetPortId = getRawProtocolPortId(event);
             }
             break;
 
@@ -58,23 +61,25 @@ public abstract class NiFiS2S extends AbstractNiFiProvenanceEventAnalyzer {
 
         }
 
-        return new S2STransitUrl(clusterName, targetPortId);
-
+        final String clusterName = clusterResolver.fromHostNames(targetHostname);
+        return new S2SPort(clusterName, targetPortId);
     }
 
-    private Matcher matchUrl(URL url, Pattern pattern) {
-        final Matcher uriMatcher = pattern.matcher(url.getPath());
+    abstract protected String getRawProtocolPortId(ProvenanceEventRecord event);
+
+    private Matcher matchUrl(String url, Pattern pattern) {
+        final Matcher uriMatcher = pattern.matcher(url);
         if (!uriMatcher.matches()) {
             throw new IllegalArgumentException("Unexpected transit URI: " + url);
         }
         return uriMatcher;
     }
 
-    protected static class S2STransitUrl {
+    protected static class S2SPort {
         final String clusterName;
         final String targetPortId;
 
-        public S2STransitUrl(String clusterName, String targetPortId) {
+        public S2SPort(String clusterName, String targetPortId) {
             this.clusterName = clusterName;
             this.targetPortId = targetPortId;
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e94f0757/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html
index cd82079..b94a915 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html
@@ -329,7 +329,7 @@ Processor 3</pre>
                 </td>
                 <td>rootGroupPortGUID@clusterName
                     (e.g. 35dbc0ab-015e-1000-144c-a8d71255027d@cl1)</td>
-                <td rowspan="4"><strong>NOTE:</strong>Only HTTP S2S protocol is supported. RAW support may be added in the future as it needs NiFi code modification. See <a href="https://issues.apache.org/jira/browse/NIFI-4654">NIFI-4654</a> for detail.</td>
+                <td></td>
             </tr>
             <tr>
                 <td>
@@ -345,6 +345,7 @@ upstream (nifi_flow_path)
                 </td>
                 <td>remoteInputPortGUID@clusterName<br/>(e.g. f31a6b53-3077-4c59-144c-a8d71255027d@cl1)
                     <p>NOTE: The remoteInputPortGUID is the client side component ID and different from the remote target port GUID. Multiple Remote Input Ports can send to the same target remote input port.</p></td>
+                <td></td>
             </tr>
             <tr>
                 <td rowspan="2">
@@ -364,6 +365,7 @@ upstream (nifi_flow_path)
                 </td>
                 <td>rootGroupPortGUID@clusterName
                     (e.g. 45dbc0ab-015e-1000-144c-a8d71255027d@cl1)</td>
+                <td></td>
             </tr>
             <tr>
                 <td>
@@ -385,6 +387,7 @@ remote target port
                         <li>For 'nifi_queue': downstreamPathGUID@clusterName<br/>(e.g. bb530e58-ee14-3cac-144c-a8d71255027d@cl1)</li>
                     </ul>
                 </td>
+                <td></td>
             </tr>
             <tr>
                 <td>NiFiRootGroupPort</td>

http://git-wip-us.apache.org/repos/asf/nifi/blob/e94f0757/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRemotePort.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRemotePort.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRemotePort.java
index 3040d50..b0a6654 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRemotePort.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRemotePort.java
@@ -24,6 +24,7 @@ import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory;
 import org.apache.nifi.atlas.reporting.ITReportLineageToAtlas;
 import org.apache.nifi.atlas.resolver.ClusterResolvers;
 import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.junit.Test;
@@ -49,7 +50,7 @@ import static org.mockito.Mockito.when;
 public class TestNiFiRemotePort {
 
     @Test
-    public void testRemoteInputPort() {
+    public void testRemoteInputPortHTTP() {
         final String componentType = "Remote Input Port";
         final String transitUri = "http://0.example.com:8080/nifi-api/data-transfer/input-ports/port-guid/transactions/tx-guid/flow-files";
         final ProvenanceEventRecord sendEvent = Mockito.mock(ProvenanceEventRecord.class);
@@ -89,7 +90,7 @@ public class TestNiFiRemotePort {
     }
 
     @Test
-    public void testRemoteOutputPort() {
+    public void testRemoteOutputPortHTTP() {
         final String componentType = "Remote Output Port";
         final String transitUri = "http://0.example.com:8080/nifi-api/data-transfer/output-ports/port-guid/transactions/tx-guid/flow-files";
         final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class);
@@ -123,5 +124,85 @@ public class TestNiFiRemotePort {
         assertEquals("port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME));
     }
 
+    @Test
+    public void testRemoteInputPortRAW() {
+        final String componentType = "Remote Input Port";
+        // The UUID in a Transit Uri is a FlowFile UUID
+        final String transitUri = "nifi://0.example.com:8081/580b7989-a80b-4089-b25b-3f5e0103af82";
+        final ProvenanceEventRecord sendEvent = Mockito.mock(ProvenanceEventRecord.class);
+        when(sendEvent.getEventId()).thenReturn(123L);
+        // Component Id is an UUID of the RemoteGroupPort instance acting as a S2S client.
+        when(sendEvent.getComponentId()).thenReturn("s2s-client-component-guid");
+        when(sendEvent.getComponentType()).thenReturn(componentType);
+        when(sendEvent.getTransitUri()).thenReturn(transitUri);
+        when(sendEvent.getEventType()).thenReturn(ProvenanceEventType.SEND);
+        when(sendEvent.getAttribute(SiteToSiteAttributes.S2S_PORT_ID.key())).thenReturn("remote-port-guid");
+
+        final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
+        when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
+
+        final List<ConnectionStatus> connections = new ArrayList<>();
+        final ConnectionStatus connection = new ConnectionStatus();
+        connection.setDestinationId("s2s-client-component-guid");
+        connection.setDestinationName("inputPortA");
+        connections.add(connection);
+
+        final AnalysisContext context = Mockito.mock(AnalysisContext.class);
+        when(context.getClusterResolver()).thenReturn(clusterResolvers);
+        when(context.findConnectionTo(matches("s2s-client-component-guid"))).thenReturn(connections);
+
+        final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, sendEvent.getEventType());
+        assertNotNull(analyzer);
+
+        final DataSetRefs refs = analyzer.analyze(context, sendEvent);
+        assertEquals(0, refs.getInputs().size());
+        assertEquals(1, refs.getOutputs().size());
+        assertEquals(1, refs.getComponentIds().size());
+        // Should report connected componentId.
+        assertTrue(refs.getComponentIds().contains("s2s-client-component-guid"));
+
+        Referenceable ref = refs.getOutputs().iterator().next();
+        assertEquals(TYPE_NIFI_INPUT_PORT, ref.getTypeName());
+        assertEquals("inputPortA", ref.get(ATTR_NAME));
+        assertEquals("remote-port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME));
+    }
+
+    @Test
+    public void testRemoteOutputPortRAW() {
+        final String componentType = "Remote Output Port";
+        // The UUID in a Transit Uri is a FlowFile UUID
+        final String transitUri = "nifi://0.example.com:8081/232018cc-a147-40c6-b148-21f9f814e93c";
+        final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class);
+        // Component Id is an UUID of the RemoteGroupPort instance acting as a S2S client.
+        when(record.getComponentId()).thenReturn("s2s-client-component-guid");
+        when(record.getComponentType()).thenReturn(componentType);
+        when(record.getTransitUri()).thenReturn(transitUri);
+        when(record.getEventType()).thenReturn(ProvenanceEventType.RECEIVE);
+        when(record.getAttribute(SiteToSiteAttributes.S2S_PORT_ID.key())).thenReturn("remote-port-guid");
+
+        final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
+        when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
+
+        final List<ConnectionStatus> connections = new ArrayList<>();
+        final ConnectionStatus connection = new ConnectionStatus();
+        connection.setSourceId("s2s-client-component-guid");
+        connection.setSourceName("outputPortA");
+        connections.add(connection);
+
+        final AnalysisContext context = Mockito.mock(AnalysisContext.class);
+        when(context.getClusterResolver()).thenReturn(clusterResolvers);
+        when(context.findConnectionFrom(matches("s2s-client-component-guid"))).thenReturn(connections);
+
+        final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, record.getEventType());
+        assertNotNull(analyzer);
+
+        final DataSetRefs refs = analyzer.analyze(context, record);
+        assertEquals(1, refs.getInputs().size());
+        assertEquals(0, refs.getOutputs().size());
+        Referenceable ref = refs.getInputs().iterator().next();
+        assertEquals(TYPE_NIFI_OUTPUT_PORT, ref.getTypeName());
+        assertEquals("outputPortA", ref.get(ATTR_NAME));
+        assertEquals("remote-port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME));
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e94f0757/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRootGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRootGroupPort.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRootGroupPort.java
new file mode 100644
index 0000000..61d59da
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRootGroupPort.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.analyzer;
+
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
+import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory;
+import org.apache.nifi.atlas.reporting.ITReportLineageToAtlas;
+import org.apache.nifi.atlas.resolver.ClusterResolvers;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.matches;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for RootGroupPorts.
+ * More complex and detailed tests are available at {@link ITReportLineageToAtlas}.
+ */
+public class TestNiFiRootGroupPort {
+
+    @Test
+    public void testInputPortHTTP() {
+        final String componentType = "Input Port";
+        final String transitUri = "http://0.example.com:8080/nifi-api/data-transfer/input-ports/port-guid/transactions/tx-guid/flow-files";
+        final ProvenanceEventRecord receiveEvent = Mockito.mock(ProvenanceEventRecord.class);
+        when(receiveEvent.getEventId()).thenReturn(123L);
+        when(receiveEvent.getComponentId()).thenReturn("port-guid");
+        when(receiveEvent.getComponentType()).thenReturn(componentType);
+        when(receiveEvent.getTransitUri()).thenReturn(transitUri);
+        when(receiveEvent.getEventType()).thenReturn(ProvenanceEventType.RECEIVE);
+
+        final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
+        when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
+
+        final List<ConnectionStatus> connections = new ArrayList<>();
+        final ConnectionStatus connection = new ConnectionStatus();
+        connection.setSourceId("port-guid");
+        connection.setSourceName("inputPortA");
+        connections.add(connection);
+
+        final AnalysisContext context = Mockito.mock(AnalysisContext.class);
+        when(context.getClusterResolver()).thenReturn(clusterResolvers);
+        when(context.findConnectionFrom(matches("port-guid"))).thenReturn(connections);
+
+        final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, receiveEvent.getEventType());
+        assertNotNull(analyzer);
+
+        final DataSetRefs refs = analyzer.analyze(context, receiveEvent);
+        assertEquals(1, refs.getInputs().size());
+        assertEquals(0, refs.getOutputs().size());
+        assertEquals(1, refs.getComponentIds().size());
+        // Should report connected componentId.
+        assertTrue(refs.getComponentIds().contains("port-guid"));
+
+        Referenceable ref = refs.getInputs().iterator().next();
+        assertEquals(TYPE_NIFI_INPUT_PORT, ref.getTypeName());
+        assertEquals("inputPortA", ref.get(ATTR_NAME));
+        assertEquals("port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME));
+    }
+
+    @Test
+    public void testRemoteOutputPortHTTP() {
+        final String componentType = "Output Port";
+        final String transitUri = "http://0.example.com:8080/nifi-api/data-transfer/output-ports/port-guid/transactions/tx-guid/flow-files";
+        final ProvenanceEventRecord sendEvent = Mockito.mock(ProvenanceEventRecord.class);
+        when(sendEvent.getComponentId()).thenReturn("port-guid");
+        when(sendEvent.getComponentType()).thenReturn(componentType);
+        when(sendEvent.getTransitUri()).thenReturn(transitUri);
+        when(sendEvent.getEventType()).thenReturn(ProvenanceEventType.SEND);
+
+        final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
+        when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
+
+        final List<ConnectionStatus> connections = new ArrayList<>();
+        final ConnectionStatus connection = new ConnectionStatus();
+        connection.setDestinationId("port-guid");
+        connection.setDestinationName("outputPortA");
+        connections.add(connection);
+
+        final AnalysisContext context = Mockito.mock(AnalysisContext.class);
+        when(context.getClusterResolver()).thenReturn(clusterResolvers);
+        when(context.findConnectionTo(matches("port-guid"))).thenReturn(connections);
+
+        final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, sendEvent.getEventType());
+        assertNotNull(analyzer);
+
+        final DataSetRefs refs = analyzer.analyze(context, sendEvent);
+        assertEquals(0, refs.getInputs().size());
+        assertEquals(1, refs.getOutputs().size());
+        Referenceable ref = refs.getOutputs().iterator().next();
+        assertEquals(TYPE_NIFI_OUTPUT_PORT, ref.getTypeName());
+        assertEquals("outputPortA", ref.get(ATTR_NAME));
+        assertEquals("port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME));
+    }
+
+    @Test
+    public void testRemoteInputPortRAW() {
+        final String componentType = "Input Port";
+        // The UUID in a Transit Uri is a FlowFile UUID
+        final String transitUri = "nifi://0.example.com:8081/580b7989-a80b-4089-b25b-3f5e0103af82";
+        final ProvenanceEventRecord receiveEvent = Mockito.mock(ProvenanceEventRecord.class);
+        when(receiveEvent.getEventId()).thenReturn(123L);
+        when(receiveEvent.getComponentId()).thenReturn("port-guid");
+        when(receiveEvent.getComponentType()).thenReturn(componentType);
+        when(receiveEvent.getTransitUri()).thenReturn(transitUri);
+        when(receiveEvent.getEventType()).thenReturn(ProvenanceEventType.RECEIVE);
+
+        final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
+        when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
+
+        final List<ConnectionStatus> connections = new ArrayList<>();
+        final ConnectionStatus connection = new ConnectionStatus();
+        connection.setSourceId("port-guid");
+        connection.setSourceName("inputPortA");
+        connections.add(connection);
+
+        final AnalysisContext context = Mockito.mock(AnalysisContext.class);
+        when(context.getClusterResolver()).thenReturn(clusterResolvers);
+        when(context.findConnectionFrom(matches("port-guid"))).thenReturn(connections);
+
+        final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, receiveEvent.getEventType());
+        assertNotNull(analyzer);
+
+        final DataSetRefs refs = analyzer.analyze(context, receiveEvent);
+        assertEquals(1, refs.getInputs().size());
+        assertEquals(0, refs.getOutputs().size());
+        assertEquals(1, refs.getComponentIds().size());
+        // Should report connected componentId.
+        assertTrue(refs.getComponentIds().contains("port-guid"));
+
+        Referenceable ref = refs.getInputs().iterator().next();
+        assertEquals(TYPE_NIFI_INPUT_PORT, ref.getTypeName());
+        assertEquals("inputPortA", ref.get(ATTR_NAME));
+        assertEquals("port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME));
+    }
+
+    @Test
+    public void testRemoteOutputPortRAW() {
+        final String componentType = "Output Port";
+        // The UUID in a Transit Uri is a FlowFile UUID
+        final String transitUri = "nifi://0.example.com:8081/232018cc-a147-40c6-b148-21f9f814e93c";
+        final ProvenanceEventRecord sendEvent = Mockito.mock(ProvenanceEventRecord.class);
+        when(sendEvent.getComponentId()).thenReturn("port-guid");
+        when(sendEvent.getComponentType()).thenReturn(componentType);
+        when(sendEvent.getTransitUri()).thenReturn(transitUri);
+        when(sendEvent.getEventType()).thenReturn(ProvenanceEventType.SEND);
+
+        final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
+        when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
+
+        final List<ConnectionStatus> connections = new ArrayList<>();
+        final ConnectionStatus connection = new ConnectionStatus();
+        connection.setDestinationId("port-guid");
+        connection.setDestinationName("outputPortA");
+        connections.add(connection);
+
+        final AnalysisContext context = Mockito.mock(AnalysisContext.class);
+        when(context.getClusterResolver()).thenReturn(clusterResolvers);
+        when(context.findConnectionTo(matches("port-guid"))).thenReturn(connections);
+
+        final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, sendEvent.getEventType());
+        assertNotNull(analyzer);
+
+        final DataSetRefs refs = analyzer.analyze(context, sendEvent);
+        assertEquals(0, refs.getInputs().size());
+        assertEquals(1, refs.getOutputs().size());
+        Referenceable ref = refs.getOutputs().iterator().next();
+        assertEquals(TYPE_NIFI_OUTPUT_PORT, ref.getTypeName());
+        assertEquals("outputPortA", ref.get(ATTR_NAME));
+        assertEquals("port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e94f0757/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java
index b16e088..370025b 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java
@@ -26,6 +26,7 @@ import org.apache.nifi.controller.status.ConnectionStatus;
 import org.apache.nifi.controller.status.PortStatus;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
 import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceRepository;
 import org.apache.nifi.provenance.lineage.ComputeLineageResult;
@@ -652,6 +653,64 @@ public class ITReportLineageToAtlas {
 
     }
 
+    /**
+     * A client NiFi sends FlowFiles to a remote NiFi using RAW protocol.
+     */
+    private void testS2SSendRAW(TestConfiguration tc) throws Exception {
+        final ProvenanceRecords prs = tc.provenanceRecords;
+        prs.add(pr("ca71e4d9-2a4f-3970", "Generate A", CREATE));
+        prs.add(pr("c439cdca-e989-3491", "Generate C", CREATE));
+        prs.add(pr("b775b657-5a5b-3708", "GetTwitter", CREATE));
+
+        // The remote port GUID is different than the Remote Input Ports.
+        final SimpleProvenanceRecord sendEvent1 = pr("f31a6b53-3077-4c59", "Remote Input Port", SEND,
+                "nifi://nifi.example.com:8081/d668805a-0ad0-44d1-aa65-ac362bf06e10");
+        sendEvent1.getAttributes().put(SiteToSiteAttributes.S2S_PORT_ID.key(), "77919f59-533e-35a3-0000-000000000000");
+        prs.add(sendEvent1);
+
+        final SimpleProvenanceRecord sendEvent2 = pr("f31a6b53-3077-4c59", "Remote Input Port", SEND,
+                "nifi://nifi.example.com:8081/d4ec2459-d903-4a73-a09e-853f9997d3fb");
+        sendEvent2.getAttributes().put(SiteToSiteAttributes.S2S_PORT_ID.key(), "77919f59-533e-35a3-0000-000000000000");
+        prs.add(sendEvent2);
+
+        prs.add(pr("f31a6b53-3077-4c59", "Remote Input Port", DROP)); // C
+        prs.add(pr("f31a6b53-3077-4c59", "Remote Input Port", DROP)); // Twitter
+
+        // Generate C created a FlowFile, then it's sent via S2S
+        tc.addLineage(createLineage(prs, 1, 3, 5));
+        // GetTwitter created a FlowFile, then it's sent via S2S
+        tc.addLineage(createLineage(prs, 2, 4, 6));
+
+        test(tc);
+
+        waitNotificationsGetDelivered();
+
+        final Lineage lineage = getLineage();
+
+        final Node flow = lineage.findNode("nifi_flow", "S2SSendRAW", "S2SSendRAW@example");
+        final Node pathA = lineage.findNode("nifi_flow_path", "Generate A", "ca71e4d9-2a4f-3970");
+        final Node pathB = lineage.findNode("nifi_flow_path", "Generate B", "333255b6-eb02-3056");
+        final Node pathC = lineage.findNode("nifi_flow_path", "Generate C", "c439cdca-e989-3491");
+        final Node pathT = lineage.findNode("nifi_flow_path", "GetTwitter", "b775b657-5a5b-3708");
+        final Node pathI = lineage.findNode("nifi_flow_path", "InactiveProcessor", "7033f311-ac68-3cab");
+        // UpdateAttribute has multiple incoming paths, so it generates a queue to receive those.
+        final Node queueU = lineage.findNode("nifi_queue", "queue", "c5392447-e9f1-33ad");
+        final Node pathU = lineage.findNode("nifi_flow_path", "UpdateAttribute", "c5392447-e9f1-33ad");
+
+        // These are starting paths.
+        lineage.assertLink(flow, pathA);
+        lineage.assertLink(flow, pathB);
+        lineage.assertLink(flow, pathC);
+        lineage.assertLink(flow, pathT);
+        lineage.assertLink(flow, pathI);
+
+        // Multiple paths connected to the same path.
+        lineage.assertLink(pathB, queueU);
+        lineage.assertLink(pathC, queueU);
+        lineage.assertLink(queueU, pathU);
+
+    }
+
     @Test
     public void testS2SSendSimple() throws Exception {
         final TestConfiguration tc = new TestConfiguration("S2SSend");
@@ -711,6 +770,65 @@ public class ITReportLineageToAtlas {
         lineage.assertLink(genT, pathT);
     }
 
+    @Test
+    public void testS2SSendSimpleRAW() throws Exception {
+        final TestConfiguration tc = new TestConfiguration("S2SSendRAW");
+
+        testS2SSendRAW(tc);
+
+        final Lineage lineage = getLineage();
+
+        // The FlowFile created by Generate A has not been finished (by DROP event, but SIMPLE_PATH strategy can report it.
+        final Node pathA = lineage.findNode("nifi_flow_path", "Generate A", "ca71e4d9-2a4f-3970");
+        final Node genA = lineage.findNode("nifi_data", "Generate A", "ca71e4d9-2a4f-3970");
+        lineage.assertLink(genA, pathA);
+
+        final Node pathC = lineage.findNode("nifi_flow_path", "Generate C", "c439cdca-e989-3491");
+        final Node pathT = lineage.findNode("nifi_flow_path", "GetTwitter", "b775b657-5a5b-3708");
+
+        // Generate C and GetTwitter have reported proper SEND lineage to the input port.
+        final Node remoteInputPortD = lineage.findNode("nifi_input_port", "input", "77919f59-533e-35a3");
+        final Node remoteInputPortP = lineage.findNode("nifi_flow_path", "Remote Input Port", "f31a6b53-3077-4c59");
+        final Node remoteInputPortQ = lineage.findNode("nifi_queue", "queue", "f31a6b53-3077-4c59");
+        lineage.assertLink(pathC, remoteInputPortQ);
+        lineage.assertLink(pathT, remoteInputPortQ);
+        lineage.assertLink(remoteInputPortQ, remoteInputPortP);
+        lineage.assertLink(remoteInputPortP, remoteInputPortD);
+
+        // nifi_data is created for each obscure input processor.
+        final Node genC = lineage.findNode("nifi_data", "Generate C", "c439cdca-e989-3491");
+        final Node genT = lineage.findNode("nifi_data", "GetTwitter", "b775b657-5a5b-3708");
+        lineage.assertLink(genC, pathC);
+        lineage.assertLink(genT, pathT);
+    }
+
+    @Test
+    public void testS2SSendCompleteRAW() throws Exception {
+        final TestConfiguration tc = new TestConfiguration("S2SSendRAW");
+        tc.properties.put(NIFI_LINEAGE_STRATEGY, LINEAGE_STRATEGY_COMPLETE_PATH.getValue());
+
+        testS2SSendRAW(tc);
+
+        final Lineage lineage = getLineage();
+
+        // Complete path has hash.
+        final Node pathC = lineage.findNode("nifi_flow_path", "Generate C, Remote Input Port",
+                "c439cdca-e989-3491-0000-000000000000::1605753423@example");
+        final Node pathT = lineage.findNode("nifi_flow_path", "GetTwitter, Remote Input Port",
+                "b775b657-5a5b-3708-0000-000000000000::3843156947@example");
+
+        // Generate C and GetTwitter have reported proper SEND lineage to the input port.
+        final Node remoteInputPort = lineage.findNode("nifi_input_port", "input", "77919f59-533e-35a3");
+        lineage.assertLink(pathC, remoteInputPort);
+        lineage.assertLink(pathT, remoteInputPort);
+
+        // nifi_data is created for each obscure input processor.
+        final Node genC = lineage.findNode("nifi_data", "Generate C", "c439cdca-e989-3491");
+        final Node genT = lineage.findNode("nifi_data", "GetTwitter", "b775b657-5a5b-3708");
+        lineage.assertLink(genC, pathC);
+        lineage.assertLink(genT, pathT);
+    }
+
     /**
      * A client NiFi gets FlowFiles from a remote NiFi.
      */
@@ -755,6 +873,50 @@ public class ITReportLineageToAtlas {
     }
 
     /**
+     * A client NiFi gets FlowFiles from a remote NiFi using RAW protocol.
+     */
+    @Test
+    public void testS2SGetRAW() throws Exception {
+        final TestConfiguration tc = new TestConfiguration("S2SGetRAW");
+        final ProvenanceRecords prs = tc.provenanceRecords;
+        // The remote port GUID is different than the Remote Output Ports.
+        final SimpleProvenanceRecord receiveEvent = pr("7375f8f6-4604-468d", "Remote Output Port", RECEIVE,
+                "nifi://nifi.example.com:8081/7f1a5d65-65bb-4473-b1a4-4a742d9af4a7");
+        receiveEvent.getAttributes().put(SiteToSiteAttributes.S2S_PORT_ID.key(), "392e7343-3950-329b-0000-000000000000");
+        prs.add(receiveEvent);
+
+        test(tc);
+
+        waitNotificationsGetDelivered();
+
+        final Lineage lineage = getLineage();
+
+        final Node flow = lineage.findNode("nifi_flow", "S2SGetRAW", "S2SGetRAW@example");
+        final Node pathL = lineage.findNode("nifi_flow_path", "LogAttribute", "97cc5b27-22f3-3c3b");
+        final Node pathP = lineage.findNode("nifi_flow_path", "PutFile", "4f3bfa4c-6427-3aac");
+        final Node pathU = lineage.findNode("nifi_flow_path", "UpdateAttribute", "bb530e58-ee14-3cac");
+
+        // These entities should be created by notification.
+        final Node remoteOutputPortDataSet = lineage.findNode("nifi_output_port", "output", "392e7343-3950-329b");
+        final Node remoteOutputPortProcess = lineage.findNode("nifi_flow_path", "Remote Output Port", "7375f8f6-4604-468d");
+        final Node queueL = lineage.findNode("nifi_queue", "queue", "97cc5b27-22f3-3c3b");
+        final Node queueP = lineage.findNode("nifi_queue", "queue", "4f3bfa4c-6427-3aac");
+        final Node queueU = lineage.findNode("nifi_queue", "queue", "bb530e58-ee14-3cac");
+
+        lineage.assertLink(remoteOutputPortDataSet, remoteOutputPortProcess);
+
+        lineage.assertLink(flow, remoteOutputPortProcess);
+        lineage.assertLink(remoteOutputPortProcess, queueL);
+        lineage.assertLink(remoteOutputPortProcess, queueP);
+        lineage.assertLink(remoteOutputPortProcess, queueU);
+
+        lineage.assertLink(queueL, pathL);
+        lineage.assertLink(queueP, pathP);
+        lineage.assertLink(queueU, pathU);
+
+    }
+
+    /**
      * A remote NiFi transfers FlowFiles to remote client NiFis.
      * This NiFi instance owns RootProcessGroup output port.
      */
@@ -810,6 +972,60 @@ public class ITReportLineageToAtlas {
         lineage.assertLink(inputPort, path);
     }
 
+    /**
+     * A remote NiFi transfers FlowFiles to remote client NiFis using RAW protocol.
+     * This NiFi instance owns RootProcessGroup output port.
+     */
+    @Test
+    public void testS2STransferRAW() throws Exception {
+        final TestConfiguration tc = new TestConfiguration("S2STransfer");
+
+        final ProvenanceRecords prs = tc.provenanceRecords;
+        prs.add(pr("392e7343-3950-329b", "Output Port", SEND,
+                "nifi://nifi.example.com:8081/580b7989-a80b-4089-b25b-3f5e0103af82"));
+
+        test(tc);
+
+        waitNotificationsGetDelivered();
+
+        final Lineage lineage = getLineage();
+
+        final Node flow = lineage.findNode("nifi_flow", "S2STransfer", "S2STransfer@example");
+        final Node path = lineage.findNode("nifi_flow_path", "GenerateFlowFile, output", "1b9f81db-a0fd-389a");
+        final Node outputPort = lineage.findNode("nifi_output_port", "output", "392e7343-3950-329b");
+
+        lineage.assertLink(flow, path);
+        lineage.assertLink(path, outputPort);
+    }
+
+    /**
+     * A remote NiFi receives FlowFiles from remote client NiFis using RAW protocol.
+     * This NiFi instance owns RootProcessGroup input port.
+     */
+    @Test
+    public void testS2SReceiveRAW() throws Exception {
+        final TestConfiguration tc = new TestConfiguration("S2SReceive");
+
+        final ProvenanceRecords prs = tc.provenanceRecords;
+        prs.add(pr("77919f59-533e-35a3", "Input Port", RECEIVE,
+                "nifi://nifi.example.com:8081/232018cc-a147-40c6-b148-21f9f814e93c"));
+
+        test(tc);
+
+        waitNotificationsGetDelivered();
+
+        final Lineage lineage = getLineage();
+
+        final Node flow = lineage.findNode("nifi_flow", "S2SReceive", "S2SReceive@example");
+        final Node path = lineage.findNode("nifi_flow_path", "input, UpdateAttribute", "77919f59-533e-35a3");
+        final Node inputPort = lineage.findNode("nifi_input_port", "input", "77919f59-533e-35a3");
+
+        lineage.assertLink(flow, path);
+        lineage.assertLink(flow, inputPort);
+
+        lineage.assertLink(inputPort, path);
+    }
+
     @Test
     public void testS2SReceiveAndSendCombination() throws Exception {
         testS2SReceive();
@@ -874,6 +1090,70 @@ public class ITReportLineageToAtlas {
 
     }
 
+    @Test
+    public void testS2SReceiveAndSendCombinationRAW() throws Exception {
+        testS2SReceiveRAW();
+        testS2SSendSimpleRAW();
+
+        final Lineage lineage = getLineage();
+
+        final Node remoteFlow = lineage.findNode("nifi_flow", "S2SReceive", "S2SReceive@example");
+        final Node localFlow = lineage.findNode("nifi_flow", "S2SSendRAW", "S2SSendRAW@example");
+        final Node remoteInputPortQ = lineage.findNode("nifi_queue", "queue", "f31a6b53-3077-4c59");
+        final Node remoteInputPortP = lineage.findNode("nifi_flow_path", "Remote Input Port", "f31a6b53-3077-4c59");
+        final Node inputPort = lineage.findNode("nifi_input_port", "input", "77919f59-533e-35a3");
+        final Node pathC = lineage.findNode("nifi_flow_path", "Generate C", "c439cdca-e989-3491");
+        final Node pathT = lineage.findNode("nifi_flow_path", "GetTwitter", "b775b657-5a5b-3708");
+
+        // Remote flow owns the inputPort.
+        lineage.assertLink(remoteFlow, inputPort);
+
+        // These paths within local flow sends data to the remote flow through the remote input port.
+        lineage.assertLink(localFlow, pathC);
+        lineage.assertLink(localFlow, pathT);
+        lineage.assertLink(pathC, remoteInputPortQ);
+        lineage.assertLink(pathT, remoteInputPortQ);
+        lineage.assertLink(remoteInputPortQ, remoteInputPortP);
+        lineage.assertLink(remoteInputPortP, inputPort);
+
+    }
+
+    @Test
+    public void testS2STransferAndGetCombinationRAW() throws Exception {
+        testS2STransferRAW();
+        testS2SGetRAW();
+
+        final Lineage lineage = getLineage();
+
+        final Node remoteFlow = lineage.findNode("nifi_flow", "S2STransfer", "S2STransfer@example");
+        final Node localFlow = lineage.findNode("nifi_flow", "S2SGetRAW", "S2SGetRAW@example");
+        final Node remoteGen = lineage.findNode("nifi_flow_path", "GenerateFlowFile, output", "1b9f81db-a0fd-389a");
+        final Node outputPort = lineage.findNode("nifi_output_port", "output", "392e7343-3950-329b");
+
+        final Node remoteOutputPortP = lineage.findNode("nifi_flow_path", "Remote Output Port", "7375f8f6-4604-468d");
+        final Node queueL = lineage.findNode("nifi_queue", "queue", "97cc5b27-22f3-3c3b");
+        final Node queueP = lineage.findNode("nifi_queue", "queue", "4f3bfa4c-6427-3aac");
+        final Node queueU = lineage.findNode("nifi_queue", "queue", "bb530e58-ee14-3cac");
+        final Node pathL = lineage.findNode("nifi_flow_path", "LogAttribute", "97cc5b27-22f3-3c3b");
+        final Node pathP = lineage.findNode("nifi_flow_path", "PutFile", "4f3bfa4c-6427-3aac");
+        final Node pathU = lineage.findNode("nifi_flow_path", "UpdateAttribute", "bb530e58-ee14-3cac");
+
+        // Remote flow owns the outputPort and transfer data generated by GenerateFlowFile.
+        lineage.assertLink(remoteFlow, remoteGen);
+        lineage.assertLink(remoteGen, outputPort);
+
+        // The Remote Output Port path in local flow gets data from the remote.
+        lineage.assertLink(localFlow, remoteOutputPortP);
+        lineage.assertLink(outputPort, remoteOutputPortP);
+        lineage.assertLink(remoteOutputPortP, queueL);
+        lineage.assertLink(remoteOutputPortP, queueP);
+        lineage.assertLink(remoteOutputPortP, queueU);
+        lineage.assertLink(queueL, pathL);
+        lineage.assertLink(queueP, pathP);
+        lineage.assertLink(queueU, pathU);
+
+    }
+
     /**
      * A client NiFi gets FlowFiles from a remote output port and sends it to a remote input port without doing anything.
      */

http://git-wip-us.apache.org/repos/asf/nifi/blob/e94f0757/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/resources/flow-templates/S2SGetRAW.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/resources/flow-templates/S2SGetRAW.xml b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/resources/flow-templates/S2SGetRAW.xml
new file mode 100644
index 0000000..20a759a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/resources/flow-templates/S2SGetRAW.xml
@@ -0,0 +1,443 @@
+<?xml version="1.0" ?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<template encoding-version="1.1">
+  <description></description>
+  <groupId>27b7b6b8-015f-1000-0d31-197ae42bab34</groupId>
+  <name>S2SGetRAW</name>
+  <snippet>
+    <processGroups>
+      <id>9fc65d0a-ff54-3c07-0000-000000000000</id>
+      <parentGroupId>c81f8a46-4aa3-313e-0000-000000000000</parentGroupId>
+      <position>
+        <x>0.0</x>
+        <y>0.0</y>
+      </position>
+      <comments></comments>
+      <contents>
+        <connections>
+          <id>8812c40b-5c71-369f-0000-000000000000</id>
+          <parentGroupId>9fc65d0a-ff54-3c07-0000-000000000000</parentGroupId>
+          <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
+          <backPressureObjectThreshold>10000</backPressureObjectThreshold>
+          <destination>
+            <groupId>9fc65d0a-ff54-3c07-0000-000000000000</groupId>
+            <id>bb530e58-ee14-3cac-0000-000000000000</id>
+            <type>PROCESSOR</type>
+          </destination>
+          <flowFileExpiration>0 sec</flowFileExpiration>
+          <labelIndex>1</labelIndex>
+          <name></name>
+          <source>
+            <groupId>c6acb687-616a-3d36-0000-000000000000</groupId>
+            <id>7375f8f6-4604-468d-0000-000000000000</id>
+            <type>REMOTE_OUTPUT_PORT</type>
+          </source>
+          <zIndex>0</zIndex>
+        </connections>
+        <connections>
+          <id>9df33c4b-8c26-33e5-0000-000000000000</id>
+          <parentGroupId>9fc65d0a-ff54-3c07-0000-000000000000</parentGroupId>
+          <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
+          <backPressureObjectThreshold>10000</backPressureObjectThreshold>
+          <destination>
+            <groupId>9fc65d0a-ff54-3c07-0000-000000000000</groupId>
+            <id>97cc5b27-22f3-3c3b-0000-000000000000</id>
+            <type>PROCESSOR</type>
+          </destination>
+          <flowFileExpiration>0 sec</flowFileExpiration>
+          <labelIndex>1</labelIndex>
+          <name></name>
+          <source>
+            <groupId>c6acb687-616a-3d36-0000-000000000000</groupId>
+            <id>7375f8f6-4604-468d-0000-000000000000</id>
+            <type>REMOTE_OUTPUT_PORT</type>
+          </source>
+          <zIndex>0</zIndex>
+        </connections>
+        <connections>
+          <id>1ddd5163-7815-3117-0000-000000000000</id>
+          <parentGroupId>9fc65d0a-ff54-3c07-0000-000000000000</parentGroupId>
+          <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
+          <backPressureObjectThreshold>10000</backPressureObjectThreshold>
+          <destination>
+            <groupId>9fc65d0a-ff54-3c07-0000-000000000000</groupId>
+            <id>4f3bfa4c-6427-3aac-0000-000000000000</id>
+            <type>PROCESSOR</type>
+          </destination>
+          <flowFileExpiration>0 sec</flowFileExpiration>
+          <labelIndex>1</labelIndex>
+          <name></name>
+          <source>
+            <groupId>c6acb687-616a-3d36-0000-000000000000</groupId>
+            <id>7375f8f6-4604-468d-0000-000000000000</id>
+            <type>REMOTE_OUTPUT_PORT</type>
+          </source>
+          <zIndex>0</zIndex>
+        </connections>
+        <labels>
+          <id>12073df1-f38b-3cad-0000-000000000000</id>
+          <parentGroupId>9fc65d0a-ff54-3c07-0000-000000000000</parentGroupId>
+          <position>
+            <x>872.9999891005355</x>
+            <y>296.0000048267144</y>
+          </position>
+          <height>68.00000762939453</height>
+          <label>A FlowFile is passed to every downstream process paths.</label>
+          <style>
+            <entry>
+              <key>font-size</key>
+              <value>12px</value>
+            </entry>
+          </style>
+          <width>338.0</width>
+        </labels>
+        <processors>
+          <id>97cc5b27-22f3-3c3b-0000-000000000000</id>
+          <parentGroupId>9fc65d0a-ff54-3c07-0000-000000000000</parentGroupId>
+          <position>
+            <x>470.9999891005356</x>
+            <y>679.0000048267144</y>
+          </position>
+          <bundle>
+            <artifact>nifi-standard-nar</artifact>
+            <group>org.apache.nifi</group>
+            <version>1.5.0-SNAPSHOT</version>
+          </bundle>
+          <config>
+            <bulletinLevel>WARN</bulletinLevel>
+            <comments></comments>
+            <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
+            <descriptors>
+              <entry>
+                <key>Log Level</key>
+                <value>
+                  <name>Log Level</name>
+                </value>
+              </entry>
+              <entry>
+                <key>Log Payload</key>
+                <value>
+                  <name>Log Payload</name>
+                </value>
+              </entry>
+              <entry>
+                <key>Attributes to Log</key>
+                <value>
+                  <name>Attributes to Log</name>
+                </value>
+              </entry>
+              <entry>
+                <key>attributes-to-log-regex</key>
+                <value>
+                  <name>attributes-to-log-regex</name>
+                </value>
+              </entry>
+              <entry>
+                <key>Attributes to Ignore</key>
+                <value>
+                  <name>Attributes to Ignore</name>
+                </value>
+              </entry>
+              <entry>
+                <key>attributes-to-ignore-regex</key>
+                <value>
+                  <name>attributes-to-ignore-regex</name>
+                </value>
+              </entry>
+              <entry>
+                <key>Log prefix</key>
+                <value>
+                  <name>Log prefix</name>
+                </value>
+              </entry>
+              <entry>
+                <key>character-set</key>
+                <value>
+                  <name>character-set</name>
+                </value>
+              </entry>
+            </descriptors>
+            <executionNode>ALL</executionNode>
+            <lossTolerant>false</lossTolerant>
+            <penaltyDuration>30 sec</penaltyDuration>
+            <properties>
+              <entry>
+                <key>Log Level</key>
+                <value>info</value>
+              </entry>
+              <entry>
+                <key>Log Payload</key>
+                <value>false</value>
+              </entry>
+              <entry>
+                <key>Attributes to Log</key>
+              </entry>
+              <entry>
+                <key>attributes-to-log-regex</key>
+                <value>.*</value>
+              </entry>
+              <entry>
+                <key>Attributes to Ignore</key>
+              </entry>
+              <entry>
+                <key>attributes-to-ignore-regex</key>
+              </entry>
+              <entry>
+                <key>Log prefix</key>
+              </entry>
+              <entry>
+                <key>character-set</key>
+                <value>UTF-8</value>
+              </entry>
+            </properties>
+            <runDurationMillis>0</runDurationMillis>
+            <schedulingPeriod>0 sec</schedulingPeriod>
+            <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
+            <yieldDuration>1 sec</yieldDuration>
+          </config>
+          <name>LogAttribute</name>
+          <relationships>
+            <autoTerminate>false</autoTerminate>
+            <name>success</name>
+          </relationships>
+          <state>STOPPED</state>
+          <style></style>
+          <type>org.apache.nifi.processors.standard.LogAttribute</type>
+        </processors>
+        <processors>
+          <id>bb530e58-ee14-3cac-0000-000000000000</id>
+          <parentGroupId>9fc65d0a-ff54-3c07-0000-000000000000</parentGroupId>
+          <position>
+            <x>150.99998910053557</x>
+            <y>514.0000048267144</y>
+          </position>
+          <bundle>
+            <artifact>nifi-update-attribute-nar</artifact>
+            <group>org.apache.nifi</group>
+            <version>1.5.0-SNAPSHOT</version>
+          </bundle>
+          <config>
+            <bulletinLevel>WARN</bulletinLevel>
+            <comments></comments>
+            <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
+            <descriptors>
+              <entry>
+                <key>Delete Attributes Expression</key>
+                <value>
+                  <name>Delete Attributes Expression</name>
+                </value>
+              </entry>
+              <entry>
+                <key>Store State</key>
+                <value>
+                  <name>Store State</name>
+                </value>
+              </entry>
+              <entry>
+                <key>Stateful Variables Initial Value</key>
+                <value>
+                  <name>Stateful Variables Initial Value</name>
+                </value>
+              </entry>
+            </descriptors>
+            <executionNode>ALL</executionNode>
+            <lossTolerant>false</lossTolerant>
+            <penaltyDuration>30 sec</penaltyDuration>
+            <properties>
+              <entry>
+                <key>Delete Attributes Expression</key>
+              </entry>
+              <entry>
+                <key>Store State</key>
+                <value>Do not store state</value>
+              </entry>
+              <entry>
+                <key>Stateful Variables Initial Value</key>
+              </entry>
+            </properties>
+            <runDurationMillis>0</runDurationMillis>
+            <schedulingPeriod>0 sec</schedulingPeriod>
+            <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
+            <yieldDuration>1 sec</yieldDuration>
+          </config>
+          <name>UpdateAttribute</name>
+          <relationships>
+            <autoTerminate>false</autoTerminate>
+            <name>success</name>
+          </relationships>
+          <state>STOPPED</state>
+          <style></style>
+          <type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
+        </processors>
+        <processors>
+          <id>4f3bfa4c-6427-3aac-0000-000000000000</id>
+          <parentGroupId>9fc65d0a-ff54-3c07-0000-000000000000</parentGroupId>
+          <position>
+            <x>810.9999891005356</x>
+            <y>524.0000048267144</y>
+          </position>
+          <bundle>
+            <artifact>nifi-standard-nar</artifact>
+            <group>org.apache.nifi</group>
+            <version>1.5.0-SNAPSHOT</version>
+          </bundle>
+          <config>
+            <bulletinLevel>WARN</bulletinLevel>
+            <comments></comments>
+            <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
+            <descriptors>
+              <entry>
+                <key>Directory</key>
+                <value>
+                  <name>Directory</name>
+                </value>
+              </entry>
+              <entry>
+                <key>Conflict Resolution Strategy</key>
+                <value>
+                  <name>Conflict Resolution Strategy</name>
+                </value>
+              </entry>
+              <entry>
+                <key>Create Missing Directories</key>
+                <value>
+                  <name>Create Missing Directories</name>
+                </value>
+              </entry>
+              <entry>
+                <key>Maximum File Count</key>
+                <value>
+                  <name>Maximum File Count</name>
+                </value>
+              </entry>
+              <entry>
+                <key>Last Modified Time</key>
+                <value>
+                  <name>Last Modified Time</name>
+                </value>
+              </entry>
+              <entry>
+                <key>Permissions</key>
+                <value>
+                  <name>Permissions</name>
+                </value>
+              </entry>
+              <entry>
+                <key>Owner</key>
+                <value>
+                  <name>Owner</name>
+                </value>
+              </entry>
+              <entry>
+                <key>Group</key>
+                <value>
+                  <name>Group</name>
+                </value>
+              </entry>
+            </descriptors>
+            <executionNode>ALL</executionNode>
+            <lossTolerant>false</lossTolerant>
+            <penaltyDuration>30 sec</penaltyDuration>
+            <properties>
+              <entry>
+                <key>Directory</key>
+              </entry>
+              <entry>
+                <key>Conflict Resolution Strategy</key>
+                <value>fail</value>
+              </entry>
+              <entry>
+                <key>Create Missing Directories</key>
+                <value>true</value>
+              </entry>
+              <entry>
+                <key>Maximum File Count</key>
+              </entry>
+              <entry>
+                <key>Last Modified Time</key>
+              </entry>
+              <entry>
+                <key>Permissions</key>
+              </entry>
+              <entry>
+                <key>Owner</key>
+              </entry>
+              <entry>
+                <key>Group</key>
+              </entry>
+            </properties>
+            <runDurationMillis>0</runDurationMillis>
+            <schedulingPeriod>0 sec</schedulingPeriod>
+            <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
+            <yieldDuration>1 sec</yieldDuration>
+          </config>
+          <name>PutFile</name>
+          <relationships>
+            <autoTerminate>false</autoTerminate>
+            <name>failure</name>
+          </relationships>
+          <relationships>
+            <autoTerminate>false</autoTerminate>
+            <name>success</name>
+          </relationships>
+          <state>STOPPED</state>
+          <style></style>
+          <type>org.apache.nifi.processors.standard.PutFile</type>
+        </processors>
+        <remoteProcessGroups>
+          <id>c6acb687-616a-3d36-0000-000000000000</id>
+          <parentGroupId>9fc65d0a-ff54-3c07-0000-000000000000</parentGroupId>
+          <position>
+            <x>451.4000360293711</x>
+            <y>231.5999721410119</y>
+          </position>
+          <communicationsTimeout>30 sec</communicationsTimeout>
+          <contents>
+            <inputPorts>
+              <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
+              <connected>false</connected>
+              <exists>true</exists>
+              <id>015f101e-dcd7-17bd-8899-1a723733521a</id>
+              <name>input</name>
+              <targetRunning>true</targetRunning>
+              <transmitting>false</transmitting>
+              <useCompression>false</useCompression>
+            </inputPorts>
+            <outputPorts>
+              <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
+              <connected>true</connected>
+              <exists>true</exists>
+              <id>7375f8f6-4604-468d-0000-000000000000</id>
+              <targetId>392e7343-3950-329b-0000-000000000000</targetId>
+              <name>output</name>
+              <targetRunning>true</targetRunning>
+              <transmitting>true</transmitting>
+              <useCompression>false</useCompression>
+            </outputPorts>
+          </contents>
+          <proxyHost></proxyHost>
+          <proxyUser></proxyUser>
+          <targetUri>http://localhost:8080/nifi</targetUri>
+          <targetUris>http://localhost:8080/nifi</targetUris>
+          <transportProtocol>RAW</transportProtocol>
+          <yieldDuration>10 sec</yieldDuration>
+        </remoteProcessGroups>
+      </contents>
+      <name>S2SGetRAW</name>
+    </processGroups>
+  </snippet>
+  <timestamp>10/20/2017 13:03:49 JST</timestamp>
+</template>
\ No newline at end of file