You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2018/07/10 15:48:46 UTC
[2/2] nifi git commit: NIFI-4654: Support reporting RAS S2S lineage
to Atlas
NIFI-4654: Support reporting RAS S2S lineage to Atlas
- Added 's2s.port.id' FlowFile attribute to track target remote Port id
- Use 's2s.port.id' to analyze RAW S2S provenance events
- This closes #2863
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e94f0757
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e94f0757
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e94f0757
Branch: refs/heads/master
Commit: e94f0757db617c649ae054d7c914feade9e1185e
Parents: b279624
Author: Koji Kawamura <ij...@apache.org>
Authored: Mon Jul 9 11:28:17 2018 +0900
Committer: Matt Gilman <ma...@gmail.com>
Committed: Tue Jul 10 11:44:00 2018 -0400
----------------------------------------------------------------------
.../attributes/SiteToSiteAttributes.java | 4 +-
.../provenance/analyzer/NiFiRemotePort.java | 17 +-
.../provenance/analyzer/NiFiRootGroupPort.java | 9 +-
.../nifi/atlas/provenance/analyzer/NiFiS2S.java | 35 +-
.../additionalDetails.html | 5 +-
.../provenance/analyzer/TestNiFiRemotePort.java | 85 +-
.../analyzer/TestNiFiRootGroupPort.java | 203 +++++
.../atlas/reporting/ITReportLineageToAtlas.java | 280 +++++++
.../test/resources/flow-templates/S2SGetRAW.xml | 443 ++++++++++
.../resources/flow-templates/S2SSendRAW.xml | 822 +++++++++++++++++++
.../nifi/remote/StandardRemoteGroupPort.java | 2 +
.../remote/TestStandardRemoteGroupPort.java | 4 +
12 files changed, 1882 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/e94f0757/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/SiteToSiteAttributes.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/SiteToSiteAttributes.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/SiteToSiteAttributes.java
index ed6437e..7af0f0b 100644
--- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/SiteToSiteAttributes.java
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/flowfile/attributes/SiteToSiteAttributes.java
@@ -23,7 +23,9 @@ public enum SiteToSiteAttributes implements FlowFileAttributeKey {
S2S_HOST("s2s.host"),
- S2S_ADDRESS("s2s.address");
+ S2S_ADDRESS("s2s.address"),
+
+ S2S_PORT_ID("s2s.port.id");
private final String key;
http://git-wip-us.apache.org/repos/asf/nifi/blob/e94f0757/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRemotePort.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRemotePort.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRemotePort.java
index e2118f7..69b2d47 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRemotePort.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRemotePort.java
@@ -20,6 +20,7 @@ import org.apache.atlas.typesystem.Referenceable;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.slf4j.Logger;
@@ -53,14 +54,13 @@ public class NiFiRemotePort extends NiFiS2S {
final boolean isRemoteInputPort = event.getComponentType().equals("Remote Input Port");
final String type = isRemoteInputPort ? TYPE_NIFI_INPUT_PORT : TYPE_NIFI_OUTPUT_PORT;
- final String remotePortId = event.getComponentId();
-
- final S2STransitUrl s2sUrl = parseTransitURL(event.getTransitUri(), context.getClusterResolver());
+ final S2SPort s2SPort = analyzeS2SPort(event, context.getClusterResolver());
// Find connections that connects to/from the remote port.
+ final String componentId = event.getComponentId();
final List<ConnectionStatus> connections = isRemoteInputPort
- ? context.findConnectionTo(remotePortId)
- : context.findConnectionFrom(remotePortId);
+ ? context.findConnectionTo(componentId)
+ : context.findConnectionFrom(componentId);
if (connections == null || connections.isEmpty()) {
logger.warn("Connection was not found: {}", new Object[]{event});
return null;
@@ -70,7 +70,7 @@ public class NiFiRemotePort extends NiFiS2S {
final ConnectionStatus connection = connections.get(0);
final Referenceable ref = new Referenceable(type);
ref.set(ATTR_NAME, isRemoteInputPort ? connection.getDestinationName() : connection.getSourceName());
- ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(s2sUrl.clusterName, s2sUrl.targetPortId));
+ ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(s2SPort.clusterName, s2SPort.targetPortId));
return singleDataSetRef(event.getComponentId(), event.getEventType(), ref);
}
@@ -79,4 +79,9 @@ public class NiFiRemotePort extends NiFiS2S {
public String targetComponentTypePattern() {
return "^Remote (In|Out)put Port$";
}
+
+ @Override
+ protected String getRawProtocolPortId(ProvenanceEventRecord event) {
+ return event.getAttribute(SiteToSiteAttributes.S2S_PORT_ID.key());
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/e94f0757/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRootGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRootGroupPort.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRootGroupPort.java
index 4f66025..e791c62 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRootGroupPort.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRootGroupPort.java
@@ -54,7 +54,7 @@ public class NiFiRootGroupPort extends NiFiS2S {
final String type = isInputPort ? TYPE_NIFI_INPUT_PORT : TYPE_NIFI_OUTPUT_PORT;
final String rootPortId = event.getComponentId();
- final S2STransitUrl s2sUrl = parseTransitURL(event.getTransitUri(), context.getClusterResolver());
+ final S2SPort s2SPort = analyzeS2SPort(event, context.getClusterResolver());
// Find connections connecting to/from the remote port.
final List<ConnectionStatus> connections = isInputPort
@@ -69,7 +69,7 @@ public class NiFiRootGroupPort extends NiFiS2S {
final ConnectionStatus connection = connections.get(0);
final Referenceable ref = new Referenceable(type);
ref.set(ATTR_NAME, isInputPort ? connection.getSourceName() : connection.getDestinationName());
- ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(s2sUrl.clusterName, rootPortId));
+ ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(s2SPort.clusterName, rootPortId));
return singleDataSetRef(event.getComponentId(), event.getEventType(), ref);
}
@@ -78,4 +78,9 @@ public class NiFiRootGroupPort extends NiFiS2S {
public String targetComponentTypePattern() {
return "^(In|Out)put Port$";
}
+
+ @Override
+ protected String getRawProtocolPortId(ProvenanceEventRecord event) {
+ return event.getComponentId();
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/e94f0757/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiS2S.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiS2S.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiS2S.java
index 762a1aa..d205b3e 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiS2S.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiS2S.java
@@ -18,10 +18,10 @@ package org.apache.nifi.atlas.provenance.analyzer;
import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
import org.apache.nifi.atlas.resolver.ClusterResolver;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.URL;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -29,27 +29,30 @@ public abstract class NiFiS2S extends AbstractNiFiProvenanceEventAnalyzer {
private static final Logger logger = LoggerFactory.getLogger(NiFiS2S.class);
- private static final Pattern RAW_URL_REGEX = Pattern.compile("([0-9a-zA-Z\\-]+)");
+ private static final Pattern RAW_URL_REGEX = Pattern.compile("nifi://([^:/]+):\\d+/([0-9a-zA-Z\\-]+)");
private static final Pattern HTTP_URL_REGEX = Pattern.compile(".*/nifi-api/data-transfer/(in|out)put-ports/([[0-9a-zA-Z\\-]]+)/transactions/.*");
- protected S2STransitUrl parseTransitURL(String transitUri, ClusterResolver clusterResolver) {
- final URL url = parseUrl(transitUri);
+ protected S2SPort analyzeS2SPort(ProvenanceEventRecord event, ClusterResolver clusterResolver) {
+ final String transitUri = event.getTransitUri();
+ final int protocolIndex = transitUri.indexOf(':');
+ final String protocol = transitUri.substring(0, protocolIndex).toLowerCase();
- final String clusterName = clusterResolver.fromHostNames(url.getHost());
+ final String targetHostname;
final String targetPortId;
- final String protocol = url.getProtocol().toLowerCase();
switch (protocol) {
case "http":
case "https": {
- final Matcher uriMatcher = matchUrl(url, HTTP_URL_REGEX);
+ final Matcher uriMatcher = matchUrl(transitUri, HTTP_URL_REGEX);
+ targetHostname = parseUri(transitUri).getHost();
targetPortId = uriMatcher.group(2);
}
break;
case "nifi": {
- final Matcher uriMatcher = matchUrl(url, RAW_URL_REGEX);
- targetPortId = uriMatcher.group(1);
+ final Matcher uriMatcher = matchUrl(transitUri, RAW_URL_REGEX);
+ targetHostname = uriMatcher.group(1);
+ targetPortId = getRawProtocolPortId(event);
}
break;
@@ -58,23 +61,25 @@ public abstract class NiFiS2S extends AbstractNiFiProvenanceEventAnalyzer {
}
- return new S2STransitUrl(clusterName, targetPortId);
-
+ final String clusterName = clusterResolver.fromHostNames(targetHostname);
+ return new S2SPort(clusterName, targetPortId);
}
- private Matcher matchUrl(URL url, Pattern pattern) {
- final Matcher uriMatcher = pattern.matcher(url.getPath());
+ abstract protected String getRawProtocolPortId(ProvenanceEventRecord event);
+
+ private Matcher matchUrl(String url, Pattern pattern) {
+ final Matcher uriMatcher = pattern.matcher(url);
if (!uriMatcher.matches()) {
throw new IllegalArgumentException("Unexpected transit URI: " + url);
}
return uriMatcher;
}
- protected static class S2STransitUrl {
+ protected static class S2SPort {
final String clusterName;
final String targetPortId;
- public S2STransitUrl(String clusterName, String targetPortId) {
+ public S2SPort(String clusterName, String targetPortId) {
this.clusterName = clusterName;
this.targetPortId = targetPortId;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/e94f0757/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html
index cd82079..b94a915 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html
@@ -329,7 +329,7 @@ Processor 3</pre>
</td>
<td>rootGroupPortGUID@clusterName
(e.g. 35dbc0ab-015e-1000-144c-a8d71255027d@cl1)</td>
- <td rowspan="4"><strong>NOTE:</strong>Only HTTP S2S protocol is supported. RAW support may be added in the future as it needs NiFi code modification. See <a href="https://issues.apache.org/jira/browse/NIFI-4654">NIFI-4654</a> for detail.</td>
+ <td></td>
</tr>
<tr>
<td>
@@ -345,6 +345,7 @@ upstream (nifi_flow_path)
</td>
<td>remoteInputPortGUID@clusterName<br/>(e.g. f31a6b53-3077-4c59-144c-a8d71255027d@cl1)
<p>NOTE: The remoteInputPortGUID is the client side component ID and different from the remote target port GUID. Multiple Remote Input Ports can send to the same target remote input port.</p></td>
+ <td></td>
</tr>
<tr>
<td rowspan="2">
@@ -364,6 +365,7 @@ upstream (nifi_flow_path)
</td>
<td>rootGroupPortGUID@clusterName
(e.g. 45dbc0ab-015e-1000-144c-a8d71255027d@cl1)</td>
+ <td></td>
</tr>
<tr>
<td>
@@ -385,6 +387,7 @@ remote target port
<li>For 'nifi_queue': downstreamPathGUID@clusterName<br/>(e.g. bb530e58-ee14-3cac-144c-a8d71255027d@cl1)</li>
</ul>
</td>
+ <td></td>
</tr>
<tr>
<td>NiFiRootGroupPort</td>
http://git-wip-us.apache.org/repos/asf/nifi/blob/e94f0757/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRemotePort.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRemotePort.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRemotePort.java
index 3040d50..b0a6654 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRemotePort.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRemotePort.java
@@ -24,6 +24,7 @@ import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory;
import org.apache.nifi.atlas.reporting.ITReportLineageToAtlas;
import org.apache.nifi.atlas.resolver.ClusterResolvers;
import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.junit.Test;
@@ -49,7 +50,7 @@ import static org.mockito.Mockito.when;
public class TestNiFiRemotePort {
@Test
- public void testRemoteInputPort() {
+ public void testRemoteInputPortHTTP() {
final String componentType = "Remote Input Port";
final String transitUri = "http://0.example.com:8080/nifi-api/data-transfer/input-ports/port-guid/transactions/tx-guid/flow-files";
final ProvenanceEventRecord sendEvent = Mockito.mock(ProvenanceEventRecord.class);
@@ -89,7 +90,7 @@ public class TestNiFiRemotePort {
}
@Test
- public void testRemoteOutputPort() {
+ public void testRemoteOutputPortHTTP() {
final String componentType = "Remote Output Port";
final String transitUri = "http://0.example.com:8080/nifi-api/data-transfer/output-ports/port-guid/transactions/tx-guid/flow-files";
final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class);
@@ -123,5 +124,85 @@ public class TestNiFiRemotePort {
assertEquals("port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME));
}
+ @Test
+ public void testRemoteInputPortRAW() {
+ final String componentType = "Remote Input Port";
+ // The UUID in a Transit Uri is a FlowFile UUID
+ final String transitUri = "nifi://0.example.com:8081/580b7989-a80b-4089-b25b-3f5e0103af82";
+ final ProvenanceEventRecord sendEvent = Mockito.mock(ProvenanceEventRecord.class);
+ when(sendEvent.getEventId()).thenReturn(123L);
+ // Component Id is an UUID of the RemoteGroupPort instance acting as a S2S client.
+ when(sendEvent.getComponentId()).thenReturn("s2s-client-component-guid");
+ when(sendEvent.getComponentType()).thenReturn(componentType);
+ when(sendEvent.getTransitUri()).thenReturn(transitUri);
+ when(sendEvent.getEventType()).thenReturn(ProvenanceEventType.SEND);
+ when(sendEvent.getAttribute(SiteToSiteAttributes.S2S_PORT_ID.key())).thenReturn("remote-port-guid");
+
+ final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
+ when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
+
+ final List<ConnectionStatus> connections = new ArrayList<>();
+ final ConnectionStatus connection = new ConnectionStatus();
+ connection.setDestinationId("s2s-client-component-guid");
+ connection.setDestinationName("inputPortA");
+ connections.add(connection);
+
+ final AnalysisContext context = Mockito.mock(AnalysisContext.class);
+ when(context.getClusterResolver()).thenReturn(clusterResolvers);
+ when(context.findConnectionTo(matches("s2s-client-component-guid"))).thenReturn(connections);
+
+ final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, sendEvent.getEventType());
+ assertNotNull(analyzer);
+
+ final DataSetRefs refs = analyzer.analyze(context, sendEvent);
+ assertEquals(0, refs.getInputs().size());
+ assertEquals(1, refs.getOutputs().size());
+ assertEquals(1, refs.getComponentIds().size());
+ // Should report connected componentId.
+ assertTrue(refs.getComponentIds().contains("s2s-client-component-guid"));
+
+ Referenceable ref = refs.getOutputs().iterator().next();
+ assertEquals(TYPE_NIFI_INPUT_PORT, ref.getTypeName());
+ assertEquals("inputPortA", ref.get(ATTR_NAME));
+ assertEquals("remote-port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME));
+ }
+
+ @Test
+ public void testRemoteOutputPortRAW() {
+ final String componentType = "Remote Output Port";
+ // The UUID in a Transit Uri is a FlowFile UUID
+ final String transitUri = "nifi://0.example.com:8081/232018cc-a147-40c6-b148-21f9f814e93c";
+ final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class);
+ // Component Id is an UUID of the RemoteGroupPort instance acting as a S2S client.
+ when(record.getComponentId()).thenReturn("s2s-client-component-guid");
+ when(record.getComponentType()).thenReturn(componentType);
+ when(record.getTransitUri()).thenReturn(transitUri);
+ when(record.getEventType()).thenReturn(ProvenanceEventType.RECEIVE);
+ when(record.getAttribute(SiteToSiteAttributes.S2S_PORT_ID.key())).thenReturn("remote-port-guid");
+
+ final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
+ when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
+
+ final List<ConnectionStatus> connections = new ArrayList<>();
+ final ConnectionStatus connection = new ConnectionStatus();
+ connection.setSourceId("s2s-client-component-guid");
+ connection.setSourceName("outputPortA");
+ connections.add(connection);
+
+ final AnalysisContext context = Mockito.mock(AnalysisContext.class);
+ when(context.getClusterResolver()).thenReturn(clusterResolvers);
+ when(context.findConnectionFrom(matches("s2s-client-component-guid"))).thenReturn(connections);
+
+ final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, record.getEventType());
+ assertNotNull(analyzer);
+
+ final DataSetRefs refs = analyzer.analyze(context, record);
+ assertEquals(1, refs.getInputs().size());
+ assertEquals(0, refs.getOutputs().size());
+ Referenceable ref = refs.getInputs().iterator().next();
+ assertEquals(TYPE_NIFI_OUTPUT_PORT, ref.getTypeName());
+ assertEquals("outputPortA", ref.get(ATTR_NAME));
+ assertEquals("remote-port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME));
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/e94f0757/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRootGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRootGroupPort.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRootGroupPort.java
new file mode 100644
index 0000000..61d59da
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRootGroupPort.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.analyzer;
+
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
+import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory;
+import org.apache.nifi.atlas.reporting.ITReportLineageToAtlas;
+import org.apache.nifi.atlas.resolver.ClusterResolvers;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.matches;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for RootGroupPorts.
+ * More complex and detailed tests are available at {@link ITReportLineageToAtlas}.
+ */
+public class TestNiFiRootGroupPort {
+
+ @Test
+ public void testInputPortHTTP() {
+ final String componentType = "Input Port";
+ final String transitUri = "http://0.example.com:8080/nifi-api/data-transfer/input-ports/port-guid/transactions/tx-guid/flow-files";
+ final ProvenanceEventRecord receiveEvent = Mockito.mock(ProvenanceEventRecord.class);
+ when(receiveEvent.getEventId()).thenReturn(123L);
+ when(receiveEvent.getComponentId()).thenReturn("port-guid");
+ when(receiveEvent.getComponentType()).thenReturn(componentType);
+ when(receiveEvent.getTransitUri()).thenReturn(transitUri);
+ when(receiveEvent.getEventType()).thenReturn(ProvenanceEventType.RECEIVE);
+
+ final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
+ when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
+
+ final List<ConnectionStatus> connections = new ArrayList<>();
+ final ConnectionStatus connection = new ConnectionStatus();
+ connection.setSourceId("port-guid");
+ connection.setSourceName("inputPortA");
+ connections.add(connection);
+
+ final AnalysisContext context = Mockito.mock(AnalysisContext.class);
+ when(context.getClusterResolver()).thenReturn(clusterResolvers);
+ when(context.findConnectionFrom(matches("port-guid"))).thenReturn(connections);
+
+ final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, receiveEvent.getEventType());
+ assertNotNull(analyzer);
+
+ final DataSetRefs refs = analyzer.analyze(context, receiveEvent);
+ assertEquals(1, refs.getInputs().size());
+ assertEquals(0, refs.getOutputs().size());
+ assertEquals(1, refs.getComponentIds().size());
+ // Should report connected componentId.
+ assertTrue(refs.getComponentIds().contains("port-guid"));
+
+ Referenceable ref = refs.getInputs().iterator().next();
+ assertEquals(TYPE_NIFI_INPUT_PORT, ref.getTypeName());
+ assertEquals("inputPortA", ref.get(ATTR_NAME));
+ assertEquals("port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME));
+ }
+
+ @Test
+ public void testRemoteOutputPortHTTP() {
+ final String componentType = "Output Port";
+ final String transitUri = "http://0.example.com:8080/nifi-api/data-transfer/output-ports/port-guid/transactions/tx-guid/flow-files";
+ final ProvenanceEventRecord sendEvent = Mockito.mock(ProvenanceEventRecord.class);
+ when(sendEvent.getComponentId()).thenReturn("port-guid");
+ when(sendEvent.getComponentType()).thenReturn(componentType);
+ when(sendEvent.getTransitUri()).thenReturn(transitUri);
+ when(sendEvent.getEventType()).thenReturn(ProvenanceEventType.SEND);
+
+ final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
+ when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
+
+ final List<ConnectionStatus> connections = new ArrayList<>();
+ final ConnectionStatus connection = new ConnectionStatus();
+ connection.setDestinationId("port-guid");
+ connection.setDestinationName("outputPortA");
+ connections.add(connection);
+
+ final AnalysisContext context = Mockito.mock(AnalysisContext.class);
+ when(context.getClusterResolver()).thenReturn(clusterResolvers);
+ when(context.findConnectionTo(matches("port-guid"))).thenReturn(connections);
+
+ final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, sendEvent.getEventType());
+ assertNotNull(analyzer);
+
+ final DataSetRefs refs = analyzer.analyze(context, sendEvent);
+ assertEquals(0, refs.getInputs().size());
+ assertEquals(1, refs.getOutputs().size());
+ Referenceable ref = refs.getOutputs().iterator().next();
+ assertEquals(TYPE_NIFI_OUTPUT_PORT, ref.getTypeName());
+ assertEquals("outputPortA", ref.get(ATTR_NAME));
+ assertEquals("port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME));
+ }
+
+ @Test
+ public void testRemoteInputPortRAW() {
+ final String componentType = "Input Port";
+ // The UUID in a Transit Uri is a FlowFile UUID
+ final String transitUri = "nifi://0.example.com:8081/580b7989-a80b-4089-b25b-3f5e0103af82";
+ final ProvenanceEventRecord receiveEvent = Mockito.mock(ProvenanceEventRecord.class);
+ when(receiveEvent.getEventId()).thenReturn(123L);
+ when(receiveEvent.getComponentId()).thenReturn("port-guid");
+ when(receiveEvent.getComponentType()).thenReturn(componentType);
+ when(receiveEvent.getTransitUri()).thenReturn(transitUri);
+ when(receiveEvent.getEventType()).thenReturn(ProvenanceEventType.RECEIVE);
+
+ final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
+ when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
+
+ final List<ConnectionStatus> connections = new ArrayList<>();
+ final ConnectionStatus connection = new ConnectionStatus();
+ connection.setSourceId("port-guid");
+ connection.setSourceName("inputPortA");
+ connections.add(connection);
+
+ final AnalysisContext context = Mockito.mock(AnalysisContext.class);
+ when(context.getClusterResolver()).thenReturn(clusterResolvers);
+ when(context.findConnectionFrom(matches("port-guid"))).thenReturn(connections);
+
+ final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, receiveEvent.getEventType());
+ assertNotNull(analyzer);
+
+ final DataSetRefs refs = analyzer.analyze(context, receiveEvent);
+ assertEquals(1, refs.getInputs().size());
+ assertEquals(0, refs.getOutputs().size());
+ assertEquals(1, refs.getComponentIds().size());
+ // Should report connected componentId.
+ assertTrue(refs.getComponentIds().contains("port-guid"));
+
+ Referenceable ref = refs.getInputs().iterator().next();
+ assertEquals(TYPE_NIFI_INPUT_PORT, ref.getTypeName());
+ assertEquals("inputPortA", ref.get(ATTR_NAME));
+ assertEquals("port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME));
+ }
+
+ @Test
+ public void testRemoteOutputPortRAW() {
+ final String componentType = "Output Port";
+ // The UUID in a Transit Uri is a FlowFile UUID
+ final String transitUri = "nifi://0.example.com:8081/232018cc-a147-40c6-b148-21f9f814e93c";
+ final ProvenanceEventRecord sendEvent = Mockito.mock(ProvenanceEventRecord.class);
+ when(sendEvent.getComponentId()).thenReturn("port-guid");
+ when(sendEvent.getComponentType()).thenReturn(componentType);
+ when(sendEvent.getTransitUri()).thenReturn(transitUri);
+ when(sendEvent.getEventType()).thenReturn(ProvenanceEventType.SEND);
+
+ final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
+ when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
+
+ final List<ConnectionStatus> connections = new ArrayList<>();
+ final ConnectionStatus connection = new ConnectionStatus();
+ connection.setDestinationId("port-guid");
+ connection.setDestinationName("outputPortA");
+ connections.add(connection);
+
+ final AnalysisContext context = Mockito.mock(AnalysisContext.class);
+ when(context.getClusterResolver()).thenReturn(clusterResolvers);
+ when(context.findConnectionTo(matches("port-guid"))).thenReturn(connections);
+
+ final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(componentType, transitUri, sendEvent.getEventType());
+ assertNotNull(analyzer);
+
+ final DataSetRefs refs = analyzer.analyze(context, sendEvent);
+ assertEquals(0, refs.getInputs().size());
+ assertEquals(1, refs.getOutputs().size());
+ Referenceable ref = refs.getOutputs().iterator().next();
+ assertEquals(TYPE_NIFI_OUTPUT_PORT, ref.getTypeName());
+ assertEquals("outputPortA", ref.get(ATTR_NAME));
+ assertEquals("port-guid@cluster1", ref.get(ATTR_QUALIFIED_NAME));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/e94f0757/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java
index b16e088..370025b 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java
@@ -26,6 +26,7 @@ import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceRepository;
import org.apache.nifi.provenance.lineage.ComputeLineageResult;
@@ -652,6 +653,64 @@ public class ITReportLineageToAtlas {
}
+ /**
+ * A client NiFi sends FlowFiles to a remote NiFi using RAW protocol.
+ */
+ private void testS2SSendRAW(TestConfiguration tc) throws Exception {
+ final ProvenanceRecords prs = tc.provenanceRecords;
+ prs.add(pr("ca71e4d9-2a4f-3970", "Generate A", CREATE));
+ prs.add(pr("c439cdca-e989-3491", "Generate C", CREATE));
+ prs.add(pr("b775b657-5a5b-3708", "GetTwitter", CREATE));
+
+ // The remote port GUID is different than the Remote Input Ports.
+ final SimpleProvenanceRecord sendEvent1 = pr("f31a6b53-3077-4c59", "Remote Input Port", SEND,
+ "nifi://nifi.example.com:8081/d668805a-0ad0-44d1-aa65-ac362bf06e10");
+ sendEvent1.getAttributes().put(SiteToSiteAttributes.S2S_PORT_ID.key(), "77919f59-533e-35a3-0000-000000000000");
+ prs.add(sendEvent1);
+
+ final SimpleProvenanceRecord sendEvent2 = pr("f31a6b53-3077-4c59", "Remote Input Port", SEND,
+ "nifi://nifi.example.com:8081/d4ec2459-d903-4a73-a09e-853f9997d3fb");
+ sendEvent2.getAttributes().put(SiteToSiteAttributes.S2S_PORT_ID.key(), "77919f59-533e-35a3-0000-000000000000");
+ prs.add(sendEvent2);
+
+ prs.add(pr("f31a6b53-3077-4c59", "Remote Input Port", DROP)); // C
+ prs.add(pr("f31a6b53-3077-4c59", "Remote Input Port", DROP)); // Twitter
+
+ // Generate C created a FlowFile, then it's sent via S2S
+ tc.addLineage(createLineage(prs, 1, 3, 5));
+ // GetTwitter created a FlowFile, then it's sent via S2S
+ tc.addLineage(createLineage(prs, 2, 4, 6));
+
+ test(tc);
+
+ waitNotificationsGetDelivered();
+
+ final Lineage lineage = getLineage();
+
+ final Node flow = lineage.findNode("nifi_flow", "S2SSendRAW", "S2SSendRAW@example");
+ final Node pathA = lineage.findNode("nifi_flow_path", "Generate A", "ca71e4d9-2a4f-3970");
+ final Node pathB = lineage.findNode("nifi_flow_path", "Generate B", "333255b6-eb02-3056");
+ final Node pathC = lineage.findNode("nifi_flow_path", "Generate C", "c439cdca-e989-3491");
+ final Node pathT = lineage.findNode("nifi_flow_path", "GetTwitter", "b775b657-5a5b-3708");
+ final Node pathI = lineage.findNode("nifi_flow_path", "InactiveProcessor", "7033f311-ac68-3cab");
+ // UpdateAttribute has multiple incoming paths, so it generates a queue to receive those.
+ final Node queueU = lineage.findNode("nifi_queue", "queue", "c5392447-e9f1-33ad");
+ final Node pathU = lineage.findNode("nifi_flow_path", "UpdateAttribute", "c5392447-e9f1-33ad");
+
+ // These are starting paths.
+ lineage.assertLink(flow, pathA);
+ lineage.assertLink(flow, pathB);
+ lineage.assertLink(flow, pathC);
+ lineage.assertLink(flow, pathT);
+ lineage.assertLink(flow, pathI);
+
+ // Multiple paths connected to the same path.
+ lineage.assertLink(pathB, queueU);
+ lineage.assertLink(pathC, queueU);
+ lineage.assertLink(queueU, pathU);
+
+ }
+
@Test
public void testS2SSendSimple() throws Exception {
final TestConfiguration tc = new TestConfiguration("S2SSend");
@@ -711,6 +770,65 @@ public class ITReportLineageToAtlas {
lineage.assertLink(genT, pathT);
}
+ @Test
+ public void testS2SSendSimpleRAW() throws Exception {
+ final TestConfiguration tc = new TestConfiguration("S2SSendRAW");
+
+ testS2SSendRAW(tc);
+
+ final Lineage lineage = getLineage();
+
+ // The FlowFile created by Generate A has not been finished (by DROP event, but SIMPLE_PATH strategy can report it.
+ final Node pathA = lineage.findNode("nifi_flow_path", "Generate A", "ca71e4d9-2a4f-3970");
+ final Node genA = lineage.findNode("nifi_data", "Generate A", "ca71e4d9-2a4f-3970");
+ lineage.assertLink(genA, pathA);
+
+ final Node pathC = lineage.findNode("nifi_flow_path", "Generate C", "c439cdca-e989-3491");
+ final Node pathT = lineage.findNode("nifi_flow_path", "GetTwitter", "b775b657-5a5b-3708");
+
+ // Generate C and GetTwitter have reported proper SEND lineage to the input port.
+ final Node remoteInputPortD = lineage.findNode("nifi_input_port", "input", "77919f59-533e-35a3");
+ final Node remoteInputPortP = lineage.findNode("nifi_flow_path", "Remote Input Port", "f31a6b53-3077-4c59");
+ final Node remoteInputPortQ = lineage.findNode("nifi_queue", "queue", "f31a6b53-3077-4c59");
+ lineage.assertLink(pathC, remoteInputPortQ);
+ lineage.assertLink(pathT, remoteInputPortQ);
+ lineage.assertLink(remoteInputPortQ, remoteInputPortP);
+ lineage.assertLink(remoteInputPortP, remoteInputPortD);
+
+ // nifi_data is created for each obscure input processor.
+ final Node genC = lineage.findNode("nifi_data", "Generate C", "c439cdca-e989-3491");
+ final Node genT = lineage.findNode("nifi_data", "GetTwitter", "b775b657-5a5b-3708");
+ lineage.assertLink(genC, pathC);
+ lineage.assertLink(genT, pathT);
+ }
+
+ @Test
+ public void testS2SSendCompleteRAW() throws Exception {
+ final TestConfiguration tc = new TestConfiguration("S2SSendRAW");
+ tc.properties.put(NIFI_LINEAGE_STRATEGY, LINEAGE_STRATEGY_COMPLETE_PATH.getValue());
+
+ testS2SSendRAW(tc);
+
+ final Lineage lineage = getLineage();
+
+ // Complete path has hash.
+ final Node pathC = lineage.findNode("nifi_flow_path", "Generate C, Remote Input Port",
+ "c439cdca-e989-3491-0000-000000000000::1605753423@example");
+ final Node pathT = lineage.findNode("nifi_flow_path", "GetTwitter, Remote Input Port",
+ "b775b657-5a5b-3708-0000-000000000000::3843156947@example");
+
+ // Generate C and GetTwitter have reported proper SEND lineage to the input port.
+ final Node remoteInputPort = lineage.findNode("nifi_input_port", "input", "77919f59-533e-35a3");
+ lineage.assertLink(pathC, remoteInputPort);
+ lineage.assertLink(pathT, remoteInputPort);
+
+ // nifi_data is created for each obscure input processor.
+ final Node genC = lineage.findNode("nifi_data", "Generate C", "c439cdca-e989-3491");
+ final Node genT = lineage.findNode("nifi_data", "GetTwitter", "b775b657-5a5b-3708");
+ lineage.assertLink(genC, pathC);
+ lineage.assertLink(genT, pathT);
+ }
+
/**
* A client NiFi gets FlowFiles from a remote NiFi.
*/
@@ -755,6 +873,50 @@ public class ITReportLineageToAtlas {
}
/**
+ * A client NiFi gets FlowFiles from a remote NiFi using RAW protocol.
+ */
+ @Test
+ public void testS2SGetRAW() throws Exception {
+ final TestConfiguration tc = new TestConfiguration("S2SGetRAW");
+ final ProvenanceRecords prs = tc.provenanceRecords;
+ // The remote port GUID is different than the Remote Output Ports.
+ final SimpleProvenanceRecord receiveEvent = pr("7375f8f6-4604-468d", "Remote Output Port", RECEIVE,
+ "nifi://nifi.example.com:8081/7f1a5d65-65bb-4473-b1a4-4a742d9af4a7");
+ receiveEvent.getAttributes().put(SiteToSiteAttributes.S2S_PORT_ID.key(), "392e7343-3950-329b-0000-000000000000");
+ prs.add(receiveEvent);
+
+ test(tc);
+
+ waitNotificationsGetDelivered();
+
+ final Lineage lineage = getLineage();
+
+ final Node flow = lineage.findNode("nifi_flow", "S2SGetRAW", "S2SGetRAW@example");
+ final Node pathL = lineage.findNode("nifi_flow_path", "LogAttribute", "97cc5b27-22f3-3c3b");
+ final Node pathP = lineage.findNode("nifi_flow_path", "PutFile", "4f3bfa4c-6427-3aac");
+ final Node pathU = lineage.findNode("nifi_flow_path", "UpdateAttribute", "bb530e58-ee14-3cac");
+
+ // These entities should be created by notification.
+ final Node remoteOutputPortDataSet = lineage.findNode("nifi_output_port", "output", "392e7343-3950-329b");
+ final Node remoteOutputPortProcess = lineage.findNode("nifi_flow_path", "Remote Output Port", "7375f8f6-4604-468d");
+ final Node queueL = lineage.findNode("nifi_queue", "queue", "97cc5b27-22f3-3c3b");
+ final Node queueP = lineage.findNode("nifi_queue", "queue", "4f3bfa4c-6427-3aac");
+ final Node queueU = lineage.findNode("nifi_queue", "queue", "bb530e58-ee14-3cac");
+
+ lineage.assertLink(remoteOutputPortDataSet, remoteOutputPortProcess);
+
+ lineage.assertLink(flow, remoteOutputPortProcess);
+ lineage.assertLink(remoteOutputPortProcess, queueL);
+ lineage.assertLink(remoteOutputPortProcess, queueP);
+ lineage.assertLink(remoteOutputPortProcess, queueU);
+
+ lineage.assertLink(queueL, pathL);
+ lineage.assertLink(queueP, pathP);
+ lineage.assertLink(queueU, pathU);
+
+ }
+
+ /**
* A remote NiFi transfers FlowFiles to remote client NiFis.
* This NiFi instance owns RootProcessGroup output port.
*/
@@ -810,6 +972,60 @@ public class ITReportLineageToAtlas {
lineage.assertLink(inputPort, path);
}
+ /**
+ * A remote NiFi transfers FlowFiles to remote client NiFis using RAW protocol.
+ * This NiFi instance owns RootProcessGroup output port.
+ */
+ @Test
+ public void testS2STransferRAW() throws Exception {
+ final TestConfiguration tc = new TestConfiguration("S2STransfer");
+
+ final ProvenanceRecords prs = tc.provenanceRecords;
+ prs.add(pr("392e7343-3950-329b", "Output Port", SEND,
+ "nifi://nifi.example.com:8081/580b7989-a80b-4089-b25b-3f5e0103af82"));
+
+ test(tc);
+
+ waitNotificationsGetDelivered();
+
+ final Lineage lineage = getLineage();
+
+ final Node flow = lineage.findNode("nifi_flow", "S2STransfer", "S2STransfer@example");
+ final Node path = lineage.findNode("nifi_flow_path", "GenerateFlowFile, output", "1b9f81db-a0fd-389a");
+ final Node outputPort = lineage.findNode("nifi_output_port", "output", "392e7343-3950-329b");
+
+ lineage.assertLink(flow, path);
+ lineage.assertLink(path, outputPort);
+ }
+
+ /**
+ * A remote NiFi receives FlowFiles from remote client NiFis using RAW protocol.
+ * This NiFi instance owns RootProcessGroup input port.
+ */
+ @Test
+ public void testS2SReceiveRAW() throws Exception {
+ final TestConfiguration tc = new TestConfiguration("S2SReceive");
+
+ final ProvenanceRecords prs = tc.provenanceRecords;
+ prs.add(pr("77919f59-533e-35a3", "Input Port", RECEIVE,
+ "nifi://nifi.example.com:8081/232018cc-a147-40c6-b148-21f9f814e93c"));
+
+ test(tc);
+
+ waitNotificationsGetDelivered();
+
+ final Lineage lineage = getLineage();
+
+ final Node flow = lineage.findNode("nifi_flow", "S2SReceive", "S2SReceive@example");
+ final Node path = lineage.findNode("nifi_flow_path", "input, UpdateAttribute", "77919f59-533e-35a3");
+ final Node inputPort = lineage.findNode("nifi_input_port", "input", "77919f59-533e-35a3");
+
+ lineage.assertLink(flow, path);
+ lineage.assertLink(flow, inputPort);
+
+ lineage.assertLink(inputPort, path);
+ }
+
@Test
public void testS2SReceiveAndSendCombination() throws Exception {
testS2SReceive();
@@ -874,6 +1090,70 @@ public class ITReportLineageToAtlas {
}
+ @Test
+ public void testS2SReceiveAndSendCombinationRAW() throws Exception {
+ testS2SReceiveRAW();
+ testS2SSendSimpleRAW();
+
+ final Lineage lineage = getLineage();
+
+ final Node remoteFlow = lineage.findNode("nifi_flow", "S2SReceive", "S2SReceive@example");
+ final Node localFlow = lineage.findNode("nifi_flow", "S2SSendRAW", "S2SSendRAW@example");
+ final Node remoteInputPortQ = lineage.findNode("nifi_queue", "queue", "f31a6b53-3077-4c59");
+ final Node remoteInputPortP = lineage.findNode("nifi_flow_path", "Remote Input Port", "f31a6b53-3077-4c59");
+ final Node inputPort = lineage.findNode("nifi_input_port", "input", "77919f59-533e-35a3");
+ final Node pathC = lineage.findNode("nifi_flow_path", "Generate C", "c439cdca-e989-3491");
+ final Node pathT = lineage.findNode("nifi_flow_path", "GetTwitter", "b775b657-5a5b-3708");
+
+ // Remote flow owns the inputPort.
+ lineage.assertLink(remoteFlow, inputPort);
+
+ // These paths within local flow sends data to the remote flow through the remote input port.
+ lineage.assertLink(localFlow, pathC);
+ lineage.assertLink(localFlow, pathT);
+ lineage.assertLink(pathC, remoteInputPortQ);
+ lineage.assertLink(pathT, remoteInputPortQ);
+ lineage.assertLink(remoteInputPortQ, remoteInputPortP);
+ lineage.assertLink(remoteInputPortP, inputPort);
+
+ }
+
+ @Test
+ public void testS2STransferAndGetCombinationRAW() throws Exception {
+ testS2STransferRAW();
+ testS2SGetRAW();
+
+ final Lineage lineage = getLineage();
+
+ final Node remoteFlow = lineage.findNode("nifi_flow", "S2STransfer", "S2STransfer@example");
+ final Node localFlow = lineage.findNode("nifi_flow", "S2SGetRAW", "S2SGetRAW@example");
+ final Node remoteGen = lineage.findNode("nifi_flow_path", "GenerateFlowFile, output", "1b9f81db-a0fd-389a");
+ final Node outputPort = lineage.findNode("nifi_output_port", "output", "392e7343-3950-329b");
+
+ final Node remoteOutputPortP = lineage.findNode("nifi_flow_path", "Remote Output Port", "7375f8f6-4604-468d");
+ final Node queueL = lineage.findNode("nifi_queue", "queue", "97cc5b27-22f3-3c3b");
+ final Node queueP = lineage.findNode("nifi_queue", "queue", "4f3bfa4c-6427-3aac");
+ final Node queueU = lineage.findNode("nifi_queue", "queue", "bb530e58-ee14-3cac");
+ final Node pathL = lineage.findNode("nifi_flow_path", "LogAttribute", "97cc5b27-22f3-3c3b");
+ final Node pathP = lineage.findNode("nifi_flow_path", "PutFile", "4f3bfa4c-6427-3aac");
+ final Node pathU = lineage.findNode("nifi_flow_path", "UpdateAttribute", "bb530e58-ee14-3cac");
+
+ // Remote flow owns the outputPort and transfer data generated by GenerateFlowFile.
+ lineage.assertLink(remoteFlow, remoteGen);
+ lineage.assertLink(remoteGen, outputPort);
+
+ // The Remote Output Port path in local flow gets data from the remote.
+ lineage.assertLink(localFlow, remoteOutputPortP);
+ lineage.assertLink(outputPort, remoteOutputPortP);
+ lineage.assertLink(remoteOutputPortP, queueL);
+ lineage.assertLink(remoteOutputPortP, queueP);
+ lineage.assertLink(remoteOutputPortP, queueU);
+ lineage.assertLink(queueL, pathL);
+ lineage.assertLink(queueP, pathP);
+ lineage.assertLink(queueU, pathU);
+
+ }
+
/**
* A client NiFi gets FlowFiles from a remote output port and sends it to a remote input port without doing anything.
*/
http://git-wip-us.apache.org/repos/asf/nifi/blob/e94f0757/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/resources/flow-templates/S2SGetRAW.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/resources/flow-templates/S2SGetRAW.xml b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/resources/flow-templates/S2SGetRAW.xml
new file mode 100644
index 0000000..20a759a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/resources/flow-templates/S2SGetRAW.xml
@@ -0,0 +1,443 @@
+<?xml version="1.0" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<template encoding-version="1.1">
+ <description></description>
+ <groupId>27b7b6b8-015f-1000-0d31-197ae42bab34</groupId>
+ <name>S2SGetRAW</name>
+ <snippet>
+ <processGroups>
+ <id>9fc65d0a-ff54-3c07-0000-000000000000</id>
+ <parentGroupId>c81f8a46-4aa3-313e-0000-000000000000</parentGroupId>
+ <position>
+ <x>0.0</x>
+ <y>0.0</y>
+ </position>
+ <comments></comments>
+ <contents>
+ <connections>
+ <id>8812c40b-5c71-369f-0000-000000000000</id>
+ <parentGroupId>9fc65d0a-ff54-3c07-0000-000000000000</parentGroupId>
+ <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
+ <backPressureObjectThreshold>10000</backPressureObjectThreshold>
+ <destination>
+ <groupId>9fc65d0a-ff54-3c07-0000-000000000000</groupId>
+ <id>bb530e58-ee14-3cac-0000-000000000000</id>
+ <type>PROCESSOR</type>
+ </destination>
+ <flowFileExpiration>0 sec</flowFileExpiration>
+ <labelIndex>1</labelIndex>
+ <name></name>
+ <source>
+ <groupId>c6acb687-616a-3d36-0000-000000000000</groupId>
+ <id>7375f8f6-4604-468d-0000-000000000000</id>
+ <type>REMOTE_OUTPUT_PORT</type>
+ </source>
+ <zIndex>0</zIndex>
+ </connections>
+ <connections>
+ <id>9df33c4b-8c26-33e5-0000-000000000000</id>
+ <parentGroupId>9fc65d0a-ff54-3c07-0000-000000000000</parentGroupId>
+ <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
+ <backPressureObjectThreshold>10000</backPressureObjectThreshold>
+ <destination>
+ <groupId>9fc65d0a-ff54-3c07-0000-000000000000</groupId>
+ <id>97cc5b27-22f3-3c3b-0000-000000000000</id>
+ <type>PROCESSOR</type>
+ </destination>
+ <flowFileExpiration>0 sec</flowFileExpiration>
+ <labelIndex>1</labelIndex>
+ <name></name>
+ <source>
+ <groupId>c6acb687-616a-3d36-0000-000000000000</groupId>
+ <id>7375f8f6-4604-468d-0000-000000000000</id>
+ <type>REMOTE_OUTPUT_PORT</type>
+ </source>
+ <zIndex>0</zIndex>
+ </connections>
+ <connections>
+ <id>1ddd5163-7815-3117-0000-000000000000</id>
+ <parentGroupId>9fc65d0a-ff54-3c07-0000-000000000000</parentGroupId>
+ <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
+ <backPressureObjectThreshold>10000</backPressureObjectThreshold>
+ <destination>
+ <groupId>9fc65d0a-ff54-3c07-0000-000000000000</groupId>
+ <id>4f3bfa4c-6427-3aac-0000-000000000000</id>
+ <type>PROCESSOR</type>
+ </destination>
+ <flowFileExpiration>0 sec</flowFileExpiration>
+ <labelIndex>1</labelIndex>
+ <name></name>
+ <source>
+ <groupId>c6acb687-616a-3d36-0000-000000000000</groupId>
+ <id>7375f8f6-4604-468d-0000-000000000000</id>
+ <type>REMOTE_OUTPUT_PORT</type>
+ </source>
+ <zIndex>0</zIndex>
+ </connections>
+ <labels>
+ <id>12073df1-f38b-3cad-0000-000000000000</id>
+ <parentGroupId>9fc65d0a-ff54-3c07-0000-000000000000</parentGroupId>
+ <position>
+ <x>872.9999891005355</x>
+ <y>296.0000048267144</y>
+ </position>
+ <height>68.00000762939453</height>
+ <label>A FlowFile is passed to every downstream process paths.</label>
+ <style>
+ <entry>
+ <key>font-size</key>
+ <value>12px</value>
+ </entry>
+ </style>
+ <width>338.0</width>
+ </labels>
+ <processors>
+ <id>97cc5b27-22f3-3c3b-0000-000000000000</id>
+ <parentGroupId>9fc65d0a-ff54-3c07-0000-000000000000</parentGroupId>
+ <position>
+ <x>470.9999891005356</x>
+ <y>679.0000048267144</y>
+ </position>
+ <bundle>
+ <artifact>nifi-standard-nar</artifact>
+ <group>org.apache.nifi</group>
+ <version>1.5.0-SNAPSHOT</version>
+ </bundle>
+ <config>
+ <bulletinLevel>WARN</bulletinLevel>
+ <comments></comments>
+ <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
+ <descriptors>
+ <entry>
+ <key>Log Level</key>
+ <value>
+ <name>Log Level</name>
+ </value>
+ </entry>
+ <entry>
+ <key>Log Payload</key>
+ <value>
+ <name>Log Payload</name>
+ </value>
+ </entry>
+ <entry>
+ <key>Attributes to Log</key>
+ <value>
+ <name>Attributes to Log</name>
+ </value>
+ </entry>
+ <entry>
+ <key>attributes-to-log-regex</key>
+ <value>
+ <name>attributes-to-log-regex</name>
+ </value>
+ </entry>
+ <entry>
+ <key>Attributes to Ignore</key>
+ <value>
+ <name>Attributes to Ignore</name>
+ </value>
+ </entry>
+ <entry>
+ <key>attributes-to-ignore-regex</key>
+ <value>
+ <name>attributes-to-ignore-regex</name>
+ </value>
+ </entry>
+ <entry>
+ <key>Log prefix</key>
+ <value>
+ <name>Log prefix</name>
+ </value>
+ </entry>
+ <entry>
+ <key>character-set</key>
+ <value>
+ <name>character-set</name>
+ </value>
+ </entry>
+ </descriptors>
+ <executionNode>ALL</executionNode>
+ <lossTolerant>false</lossTolerant>
+ <penaltyDuration>30 sec</penaltyDuration>
+ <properties>
+ <entry>
+ <key>Log Level</key>
+ <value>info</value>
+ </entry>
+ <entry>
+ <key>Log Payload</key>
+ <value>false</value>
+ </entry>
+ <entry>
+ <key>Attributes to Log</key>
+ </entry>
+ <entry>
+ <key>attributes-to-log-regex</key>
+ <value>.*</value>
+ </entry>
+ <entry>
+ <key>Attributes to Ignore</key>
+ </entry>
+ <entry>
+ <key>attributes-to-ignore-regex</key>
+ </entry>
+ <entry>
+ <key>Log prefix</key>
+ </entry>
+ <entry>
+ <key>character-set</key>
+ <value>UTF-8</value>
+ </entry>
+ </properties>
+ <runDurationMillis>0</runDurationMillis>
+ <schedulingPeriod>0 sec</schedulingPeriod>
+ <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
+ <yieldDuration>1 sec</yieldDuration>
+ </config>
+ <name>LogAttribute</name>
+ <relationships>
+ <autoTerminate>false</autoTerminate>
+ <name>success</name>
+ </relationships>
+ <state>STOPPED</state>
+ <style></style>
+ <type>org.apache.nifi.processors.standard.LogAttribute</type>
+ </processors>
+ <processors>
+ <id>bb530e58-ee14-3cac-0000-000000000000</id>
+ <parentGroupId>9fc65d0a-ff54-3c07-0000-000000000000</parentGroupId>
+ <position>
+ <x>150.99998910053557</x>
+ <y>514.0000048267144</y>
+ </position>
+ <bundle>
+ <artifact>nifi-update-attribute-nar</artifact>
+ <group>org.apache.nifi</group>
+ <version>1.5.0-SNAPSHOT</version>
+ </bundle>
+ <config>
+ <bulletinLevel>WARN</bulletinLevel>
+ <comments></comments>
+ <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
+ <descriptors>
+ <entry>
+ <key>Delete Attributes Expression</key>
+ <value>
+ <name>Delete Attributes Expression</name>
+ </value>
+ </entry>
+ <entry>
+ <key>Store State</key>
+ <value>
+ <name>Store State</name>
+ </value>
+ </entry>
+ <entry>
+ <key>Stateful Variables Initial Value</key>
+ <value>
+ <name>Stateful Variables Initial Value</name>
+ </value>
+ </entry>
+ </descriptors>
+ <executionNode>ALL</executionNode>
+ <lossTolerant>false</lossTolerant>
+ <penaltyDuration>30 sec</penaltyDuration>
+ <properties>
+ <entry>
+ <key>Delete Attributes Expression</key>
+ </entry>
+ <entry>
+ <key>Store State</key>
+ <value>Do not store state</value>
+ </entry>
+ <entry>
+ <key>Stateful Variables Initial Value</key>
+ </entry>
+ </properties>
+ <runDurationMillis>0</runDurationMillis>
+ <schedulingPeriod>0 sec</schedulingPeriod>
+ <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
+ <yieldDuration>1 sec</yieldDuration>
+ </config>
+ <name>UpdateAttribute</name>
+ <relationships>
+ <autoTerminate>false</autoTerminate>
+ <name>success</name>
+ </relationships>
+ <state>STOPPED</state>
+ <style></style>
+ <type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
+ </processors>
+ <processors>
+ <id>4f3bfa4c-6427-3aac-0000-000000000000</id>
+ <parentGroupId>9fc65d0a-ff54-3c07-0000-000000000000</parentGroupId>
+ <position>
+ <x>810.9999891005356</x>
+ <y>524.0000048267144</y>
+ </position>
+ <bundle>
+ <artifact>nifi-standard-nar</artifact>
+ <group>org.apache.nifi</group>
+ <version>1.5.0-SNAPSHOT</version>
+ </bundle>
+ <config>
+ <bulletinLevel>WARN</bulletinLevel>
+ <comments></comments>
+ <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
+ <descriptors>
+ <entry>
+ <key>Directory</key>
+ <value>
+ <name>Directory</name>
+ </value>
+ </entry>
+ <entry>
+ <key>Conflict Resolution Strategy</key>
+ <value>
+ <name>Conflict Resolution Strategy</name>
+ </value>
+ </entry>
+ <entry>
+ <key>Create Missing Directories</key>
+ <value>
+ <name>Create Missing Directories</name>
+ </value>
+ </entry>
+ <entry>
+ <key>Maximum File Count</key>
+ <value>
+ <name>Maximum File Count</name>
+ </value>
+ </entry>
+ <entry>
+ <key>Last Modified Time</key>
+ <value>
+ <name>Last Modified Time</name>
+ </value>
+ </entry>
+ <entry>
+ <key>Permissions</key>
+ <value>
+ <name>Permissions</name>
+ </value>
+ </entry>
+ <entry>
+ <key>Owner</key>
+ <value>
+ <name>Owner</name>
+ </value>
+ </entry>
+ <entry>
+ <key>Group</key>
+ <value>
+ <name>Group</name>
+ </value>
+ </entry>
+ </descriptors>
+ <executionNode>ALL</executionNode>
+ <lossTolerant>false</lossTolerant>
+ <penaltyDuration>30 sec</penaltyDuration>
+ <properties>
+ <entry>
+ <key>Directory</key>
+ </entry>
+ <entry>
+ <key>Conflict Resolution Strategy</key>
+ <value>fail</value>
+ </entry>
+ <entry>
+ <key>Create Missing Directories</key>
+ <value>true</value>
+ </entry>
+ <entry>
+ <key>Maximum File Count</key>
+ </entry>
+ <entry>
+ <key>Last Modified Time</key>
+ </entry>
+ <entry>
+ <key>Permissions</key>
+ </entry>
+ <entry>
+ <key>Owner</key>
+ </entry>
+ <entry>
+ <key>Group</key>
+ </entry>
+ </properties>
+ <runDurationMillis>0</runDurationMillis>
+ <schedulingPeriod>0 sec</schedulingPeriod>
+ <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
+ <yieldDuration>1 sec</yieldDuration>
+ </config>
+ <name>PutFile</name>
+ <relationships>
+ <autoTerminate>false</autoTerminate>
+ <name>failure</name>
+ </relationships>
+ <relationships>
+ <autoTerminate>false</autoTerminate>
+ <name>success</name>
+ </relationships>
+ <state>STOPPED</state>
+ <style></style>
+ <type>org.apache.nifi.processors.standard.PutFile</type>
+ </processors>
+ <remoteProcessGroups>
+ <id>c6acb687-616a-3d36-0000-000000000000</id>
+ <parentGroupId>9fc65d0a-ff54-3c07-0000-000000000000</parentGroupId>
+ <position>
+ <x>451.4000360293711</x>
+ <y>231.5999721410119</y>
+ </position>
+ <communicationsTimeout>30 sec</communicationsTimeout>
+ <contents>
+ <inputPorts>
+ <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
+ <connected>false</connected>
+ <exists>true</exists>
+ <id>015f101e-dcd7-17bd-8899-1a723733521a</id>
+ <name>input</name>
+ <targetRunning>true</targetRunning>
+ <transmitting>false</transmitting>
+ <useCompression>false</useCompression>
+ </inputPorts>
+ <outputPorts>
+ <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
+ <connected>true</connected>
+ <exists>true</exists>
+ <id>7375f8f6-4604-468d-0000-000000000000</id>
+ <targetId>392e7343-3950-329b-0000-000000000000</targetId>
+ <name>output</name>
+ <targetRunning>true</targetRunning>
+ <transmitting>true</transmitting>
+ <useCompression>false</useCompression>
+ </outputPorts>
+ </contents>
+ <proxyHost></proxyHost>
+ <proxyUser></proxyUser>
+ <targetUri>http://localhost:8080/nifi</targetUri>
+ <targetUris>http://localhost:8080/nifi</targetUris>
+ <transportProtocol>RAW</transportProtocol>
+ <yieldDuration>10 sec</yieldDuration>
+ </remoteProcessGroups>
+ </contents>
+ <name>S2SGetRAW</name>
+ </processGroups>
+ </snippet>
+ <timestamp>10/20/2017 13:03:49 JST</timestamp>
+</template>
\ No newline at end of file