You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2020/05/29 12:23:08 UTC
[nifi] branch master updated: NIFI-7422: Support aws_s3_pseudo_dir
in Atlas reporting task
This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 06864c8 NIFI-7422: Support aws_s3_pseudo_dir in Atlas reporting task
06864c8 is described below
commit 06864c830cfd67b28b447085fdec59db2934d70b
Author: Peter Turcsanyi <tu...@apache.org>
AuthorDate: Fri May 22 19:09:04 2020 +0200
NIFI-7422: Support aws_s3_pseudo_dir in Atlas reporting task
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #4292.
---
.../atlas/provenance/analyzer/AwsS3Directory.java | 93 ++++++++++++++
...fi.atlas.provenance.NiFiProvenanceEventAnalyzer | 1 +
.../additionalDetails.html | 31 ++++-
.../provenance/analyzer/TestAwsS3Directory.java | 142 +++++++++++++++++++++
4 files changed, 266 insertions(+), 1 deletion(-)
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AwsS3Directory.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AwsS3Directory.java
new file mode 100644
index 0000000..af683fd
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AwsS3Directory.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.analyzer;
+
+import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+import java.net.URI;
+
+import static org.apache.nifi.atlas.AtlasUtils.toQualifiedName;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+
+/**
+ * Analyze a transit URI as an AWS S3 directory (skipping the object name).
+ * <li>qualifiedName=s3a://bucket/path@namespace (example: s3a://mybucket/mydir@ns1)
+ * <li>name=/path (example: /mydir)
+ */
+public class AwsS3Directory extends AbstractNiFiProvenanceEventAnalyzer {
+
+ private static final String TYPE_DIRECTORY = "aws_s3_pseudo_dir";
+ private static final String TYPE_BUCKET = "aws_s3_bucket";
+
+ public static final String ATTR_OBJECT_PREFIX = "objectPrefix";
+ public static final String ATTR_BUCKET = "bucket";
+
+ @Override
+ public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) {
+ final String transitUri = event.getTransitUri();
+ if (transitUri == null) {
+ return null;
+ }
+
+ final String directoryUri;
+ if (StringUtils.countMatches(transitUri, '/') > 3) {
+ // directory exists => drop last '/' and the file name
+ directoryUri = transitUri.substring(0, transitUri.lastIndexOf('/'));
+ } else {
+ // no directory => keep last '/', drop only the file name
+ directoryUri = transitUri.substring(0, transitUri.lastIndexOf('/') + 1);
+ }
+ final URI uri = parseUri(directoryUri);
+
+ final String namespace = context.getNamespaceResolver().fromHostNames(uri.getHost());
+
+ final Referenceable ref = createDirectoryRef(uri, namespace);
+
+ return singleDataSetRef(event.getComponentId(), event.getEventType(), ref);
+ }
+
+ @Override
+ public String targetTransitUriPattern() {
+ return "^s3a://.+/.+$";
+ }
+
+ private Referenceable createDirectoryRef(URI uri, String namespace) {
+ final Referenceable ref = new Referenceable(TYPE_DIRECTORY);
+
+ ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(namespace, uri.toString().toLowerCase()));
+ ref.set(ATTR_NAME, uri.getPath().toLowerCase());
+ ref.set(ATTR_OBJECT_PREFIX, uri.getPath().toLowerCase());
+ ref.set(ATTR_BUCKET, createBucketRef(uri, namespace));
+
+ return ref;
+ }
+
+ private Referenceable createBucketRef(URI uri, String namespace) {
+ final Referenceable ref = new Referenceable(TYPE_BUCKET);
+
+ ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(namespace, String.format("%s://%s", uri.getScheme(), uri.getAuthority())));
+ ref.set(ATTR_NAME, uri.getAuthority());
+
+ return ref;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer
index 014ec9e..a2e6071 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer
@@ -24,6 +24,7 @@ org.apache.nifi.atlas.provenance.analyzer.Hive2JDBC
org.apache.nifi.atlas.provenance.analyzer.HDFSPath
org.apache.nifi.atlas.provenance.analyzer.HBaseTable
org.apache.nifi.atlas.provenance.analyzer.FilePath
+org.apache.nifi.atlas.provenance.analyzer.AwsS3Directory
# By event type, if none of above analyzers matches
org.apache.nifi.atlas.provenance.analyzer.unknown.Create
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html
index 2df6f54..98adc3c 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html
@@ -471,8 +471,9 @@ remote target port
FetchHDFS<br/>
FetchParquet<br/>
GetHDFS<br/>
- GetHDFSSequenceFIle<br/>
+ GetHDFSSequenceFile<br/>
PutHDFS<br/>
+ PutORC<br/>
PutParquet<br/>
</td>
<td>
@@ -483,6 +484,7 @@ remote target port
RECEIVE<br/>
SEND<br/>
SEND<br/>
+ SEND<br/>
</td>
<td>hdfs://nn.example.com:8020/user/nifi/5262553828219</td>
<td>hdfs_path</td>
@@ -490,6 +492,33 @@ remote target port
<td></td>
</tr>
<tr>
+ <td>AwsS3Directory</td>
+ <td>
+ DeleteHDFS<br/>
+ FetchHDFS<br/>
+ FetchParquet<br/>
+ GetHDFS<br/>
+ GetHDFSSequenceFile<br/>
+ PutHDFS<br/>
+ PutORC<br/>
+ PutParquet<br/>
+ </td>
+ <td>
+ REMOTE_INVOCATION<br/>
+ FETCH<br/>
+ FETCH<br/>
+ RECEIVE<br/>
+ RECEIVE<br/>
+ SEND<br/>
+ SEND<br/>
+ SEND<br/>
+ </td>
+ <td>s3a://mybucket/mydir</td>
+ <td>aws_s3_pseudo_dir</td>
+ <td>s3UrlWithoutObjectName@namespace<br/>(e.g. s3a://mybucket/mydir@ns1)</td>
+ <td></td>
+ </tr>
+ <tr>
<td>HBaseTable</td>
<td>
FetchHBaseRow<br/>
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAwsS3Directory.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAwsS3Directory.java
new file mode 100644
index 0000000..d934328
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAwsS3Directory.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.analyzer;
+
+import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
+import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory;
+import org.apache.nifi.atlas.resolver.NamespaceResolver;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+public class TestAwsS3Directory {
+
+ private static final ProvenanceEventType PROVENANCE_EVENT_TYPE = ProvenanceEventType.SEND;
+ private static final String ATLAS_NAMESPACE = "namespace1";
+ private static final String AWS_BUCKET = "bucket1";
+ private static final String AWS_FILENAME = "file1";
+
+ @Test
+ public void testSimpleDirectory() {
+ String processorName = "PutHDFS";
+ String directory = "/dir1";
+
+ executeTest(processorName, directory);
+ }
+
+ @Test
+ public void testCompoundDirectory() {
+ String processorName = "PutHDFS";
+ String directory = "/dir1/dir2/dir3/dir4/dir5";
+
+ executeTest(processorName, directory);
+ }
+
+ @Test
+ public void testRootDirectory() {
+ String processorName = "PutHDFS";
+ String directory = "/";
+
+ executeTest(processorName, directory);
+ }
+
+ @Test
+ public void testWithPutORC() {
+ String processorName = "PutORC";
+ String directory = "/dir1";
+
+ executeTest(processorName, directory);
+ }
+
+ public void executeTest(String processorName, String directory) {
+ String transitUri = createTransitUri(directory);
+
+ ProvenanceEventRecord provenanceEvent = mockProvenanceEvent(processorName, transitUri);
+ AnalysisContext analysisContext = mockAnalysisContext();
+
+ NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, PROVENANCE_EVENT_TYPE);
+ assertAnalyzer(analyzer);
+
+ DataSetRefs refs = analyzer.analyze(analysisContext, provenanceEvent);
+ assertAnalysisResult(refs, directory);
+ }
+
+ private String createTransitUri(String directory) {
+ if (directory.equals("/")) {
+ return String.format("s3a://%s/%s", AWS_BUCKET, AWS_FILENAME);
+ } else {
+ return String.format("s3a://%s%s/%s", AWS_BUCKET, directory, AWS_FILENAME);
+ }
+ }
+
+ private ProvenanceEventRecord mockProvenanceEvent(String processorName, String transitUri) {
+ ProvenanceEventRecord provenanceEvent = Mockito.mock(ProvenanceEventRecord.class);
+
+ when(provenanceEvent.getComponentType()).thenReturn(processorName);
+ when(provenanceEvent.getTransitUri()).thenReturn(transitUri);
+ when(provenanceEvent.getEventType()).thenReturn(PROVENANCE_EVENT_TYPE);
+
+ return provenanceEvent;
+ }
+
+ private AnalysisContext mockAnalysisContext() {
+ NamespaceResolver namespaceResolver = Mockito.mock(NamespaceResolver.class);
+ when(namespaceResolver.fromHostNames(any())).thenReturn(ATLAS_NAMESPACE);
+
+ AnalysisContext analysisContext = Mockito.mock(AnalysisContext.class);
+ when(analysisContext.getNamespaceResolver()).thenReturn(namespaceResolver);
+
+ return analysisContext;
+ }
+
+ private void assertAnalyzer(NiFiProvenanceEventAnalyzer analyzer) {
+ assertNotNull(analyzer);
+ assertEquals(AwsS3Directory.class, analyzer.getClass());
+ }
+
+ private void assertAnalysisResult(DataSetRefs refs, String directory) {
+ String expectedDirectoryQualifiedName = String.format("s3a://%s%s@%s", AWS_BUCKET, directory, ATLAS_NAMESPACE);
+ String expectedBucketQualifiedName = String.format("s3a://%s@%s", AWS_BUCKET, ATLAS_NAMESPACE);
+
+ assertEquals(0, refs.getInputs().size());
+ assertEquals(1, refs.getOutputs().size());
+
+ Referenceable directoryRef = refs.getOutputs().iterator().next();
+
+ assertEquals("aws_s3_pseudo_dir", directoryRef.getTypeName());
+ assertEquals(expectedDirectoryQualifiedName, directoryRef.get(ATTR_QUALIFIED_NAME));
+ assertEquals(directory, directoryRef.get(ATTR_NAME));
+ assertEquals(directory, directoryRef.get("objectPrefix"));
+
+ Referenceable bucketRef = (Referenceable) directoryRef.get("bucket");
+ assertNotNull(bucketRef);
+ assertEquals("aws_s3_bucket", bucketRef.getTypeName());
+ assertEquals(expectedBucketQualifiedName, bucketRef.get(ATTR_QUALIFIED_NAME));
+ assertEquals(AWS_BUCKET, bucketRef.get(ATTR_NAME));
+ }
+}