You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2020/05/29 12:23:08 UTC

[nifi] branch master updated: NIFI-7422: Support aws_s3_pseudo_dir in Atlas reporting task

This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 06864c8  NIFI-7422: Support aws_s3_pseudo_dir in Atlas reporting task
06864c8 is described below

commit 06864c830cfd67b28b447085fdec59db2934d70b
Author: Peter Turcsanyi <tu...@apache.org>
AuthorDate: Fri May 22 19:09:04 2020 +0200

    NIFI-7422: Support aws_s3_pseudo_dir in Atlas reporting task
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #4292.
---
 .../atlas/provenance/analyzer/AwsS3Directory.java  |  93 ++++++++++++++
 ...fi.atlas.provenance.NiFiProvenanceEventAnalyzer |   1 +
 .../additionalDetails.html                         |  31 ++++-
 .../provenance/analyzer/TestAwsS3Directory.java    | 142 +++++++++++++++++++++
 4 files changed, 266 insertions(+), 1 deletion(-)

diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AwsS3Directory.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AwsS3Directory.java
new file mode 100644
index 0000000..af683fd
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AwsS3Directory.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.analyzer;
+
+import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+
+import java.net.URI;
+
+import static org.apache.nifi.atlas.AtlasUtils.toQualifiedName;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+
+/**
+ * Analyze a transit URI as an AWS S3 directory (skipping the object name).
+ * <li>qualifiedName=s3a://bucket/path@namespace (example: s3a://mybucket/mydir@ns1)
+ * <li>name=/path (example: /mydir)
+ */
+public class AwsS3Directory extends AbstractNiFiProvenanceEventAnalyzer {
+
+    private static final String TYPE_DIRECTORY = "aws_s3_pseudo_dir";
+    private static final String TYPE_BUCKET = "aws_s3_bucket";
+
+    public static final String ATTR_OBJECT_PREFIX = "objectPrefix";
+    public static final String ATTR_BUCKET = "bucket";
+
+    @Override
+    public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) {
+        final String transitUri = event.getTransitUri();
+        if (transitUri == null) {
+            return null;
+        }
+
+        final String directoryUri;
+        if (StringUtils.countMatches(transitUri, '/') > 3) {
+            // directory exists => drop last '/' and the file name
+            directoryUri = transitUri.substring(0, transitUri.lastIndexOf('/'));
+        } else {
+            // no directory => keep last '/', drop only the file name
+            directoryUri = transitUri.substring(0, transitUri.lastIndexOf('/') + 1);
+        }
+        final URI uri = parseUri(directoryUri);
+
+        final String namespace = context.getNamespaceResolver().fromHostNames(uri.getHost());
+
+        final Referenceable ref = createDirectoryRef(uri, namespace);
+
+        return singleDataSetRef(event.getComponentId(), event.getEventType(), ref);
+    }
+
+    @Override
+    public String targetTransitUriPattern() {
+        return "^s3a://.+/.+$";
+    }
+
+    private Referenceable createDirectoryRef(URI uri, String namespace) {
+        final Referenceable ref = new Referenceable(TYPE_DIRECTORY);
+
+        ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(namespace, uri.toString().toLowerCase()));
+        ref.set(ATTR_NAME, uri.getPath().toLowerCase());
+        ref.set(ATTR_OBJECT_PREFIX, uri.getPath().toLowerCase());
+        ref.set(ATTR_BUCKET, createBucketRef(uri, namespace));
+
+        return ref;
+    }
+
+    private Referenceable createBucketRef(URI uri, String namespace) {
+        final Referenceable ref = new Referenceable(TYPE_BUCKET);
+
+        ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(namespace, String.format("%s://%s", uri.getScheme(), uri.getAuthority())));
+        ref.set(ATTR_NAME, uri.getAuthority());
+
+        return ref;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer
index 014ec9e..a2e6071 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer
@@ -24,6 +24,7 @@ org.apache.nifi.atlas.provenance.analyzer.Hive2JDBC
 org.apache.nifi.atlas.provenance.analyzer.HDFSPath
 org.apache.nifi.atlas.provenance.analyzer.HBaseTable
 org.apache.nifi.atlas.provenance.analyzer.FilePath
+org.apache.nifi.atlas.provenance.analyzer.AwsS3Directory
 
 # By event type, if none of above analyzers matches
 org.apache.nifi.atlas.provenance.analyzer.unknown.Create
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html
index 2df6f54..98adc3c 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.ReportLineageToAtlas/additionalDetails.html
@@ -471,8 +471,9 @@ remote target port
                     FetchHDFS<br/>
                     FetchParquet<br/>
                     GetHDFS<br/>
-                    GetHDFSSequenceFIle<br/>
+                    GetHDFSSequenceFile<br/>
                     PutHDFS<br/>
+                    PutORC<br/>
                     PutParquet<br/>
                 </td>
                 <td>
@@ -483,6 +484,7 @@ remote target port
                     RECEIVE<br/>
                     SEND<br/>
                     SEND<br/>
+                    SEND<br/>
                 </td>
                 <td>hdfs://nn.example.com:8020/user/nifi/5262553828219</td>
                 <td>hdfs_path</td>
@@ -490,6 +492,33 @@ remote target port
                 <td></td>
             </tr>
             <tr>
+                <td>AwsS3Directory</td>
+                <td>
+                    DeleteHDFS<br/>
+                    FetchHDFS<br/>
+                    FetchParquet<br/>
+                    GetHDFS<br/>
+                    GetHDFSSequenceFile<br/>
+                    PutHDFS<br/>
+                    PutORC<br/>
+                    PutParquet<br/>
+                </td>
+                <td>
+                    REMOTE_INVOCATION<br/>
+                    FETCH<br/>
+                    FETCH<br/>
+                    RECEIVE<br/>
+                    RECEIVE<br/>
+                    SEND<br/>
+                    SEND<br/>
+                    SEND<br/>
+                </td>
+                <td>s3a://mybucket/mydir</td>
+                <td>aws_s3_pseudo_dir</td>
+                <td>s3UrlWithoutObjectName@namespace<br/>(e.g. s3a://mybucket/mydir@ns1)</td>
+                <td></td>
+            </tr>
+            <tr>
                 <td>HBaseTable</td>
                 <td>
                     FetchHBaseRow<br/>
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAwsS3Directory.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAwsS3Directory.java
new file mode 100644
index 0000000..d934328
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAwsS3Directory.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.analyzer;
+
+import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
+import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory;
+import org.apache.nifi.atlas.resolver.NamespaceResolver;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+public class TestAwsS3Directory {
+
+    private static final ProvenanceEventType PROVENANCE_EVENT_TYPE = ProvenanceEventType.SEND;
+    private static final String ATLAS_NAMESPACE = "namespace1";
+    private static final String AWS_BUCKET = "bucket1";
+    private static final String AWS_FILENAME = "file1";
+
+    @Test
+    public void testSimpleDirectory() {
+        String processorName = "PutHDFS";
+        String directory = "/dir1";
+
+        executeTest(processorName, directory);
+    }
+
+    @Test
+    public void testCompoundDirectory() {
+        String processorName = "PutHDFS";
+        String directory = "/dir1/dir2/dir3/dir4/dir5";
+
+        executeTest(processorName, directory);
+    }
+
+    @Test
+    public void testRootDirectory() {
+        String processorName = "PutHDFS";
+        String directory = "/";
+
+        executeTest(processorName, directory);
+    }
+
+    @Test
+    public void testWithPutORC() {
+        String processorName = "PutORC";
+        String directory = "/dir1";
+
+        executeTest(processorName, directory);
+    }
+
+    public void executeTest(String processorName, String directory) {
+        String transitUri = createTransitUri(directory);
+
+        ProvenanceEventRecord provenanceEvent = mockProvenanceEvent(processorName, transitUri);
+        AnalysisContext analysisContext = mockAnalysisContext();
+
+        NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, PROVENANCE_EVENT_TYPE);
+        assertAnalyzer(analyzer);
+
+        DataSetRefs refs = analyzer.analyze(analysisContext, provenanceEvent);
+        assertAnalysisResult(refs, directory);
+    }
+
+    private String createTransitUri(String directory) {
+        if (directory.equals("/")) {
+            return String.format("s3a://%s/%s", AWS_BUCKET, AWS_FILENAME);
+        } else {
+            return String.format("s3a://%s%s/%s", AWS_BUCKET, directory, AWS_FILENAME);
+        }
+    }
+
+    private ProvenanceEventRecord mockProvenanceEvent(String processorName, String transitUri) {
+        ProvenanceEventRecord provenanceEvent = Mockito.mock(ProvenanceEventRecord.class);
+
+        when(provenanceEvent.getComponentType()).thenReturn(processorName);
+        when(provenanceEvent.getTransitUri()).thenReturn(transitUri);
+        when(provenanceEvent.getEventType()).thenReturn(PROVENANCE_EVENT_TYPE);
+
+        return provenanceEvent;
+    }
+
+    private AnalysisContext mockAnalysisContext() {
+        NamespaceResolver namespaceResolver = Mockito.mock(NamespaceResolver.class);
+        when(namespaceResolver.fromHostNames(any())).thenReturn(ATLAS_NAMESPACE);
+
+        AnalysisContext analysisContext = Mockito.mock(AnalysisContext.class);
+        when(analysisContext.getNamespaceResolver()).thenReturn(namespaceResolver);
+
+        return analysisContext;
+    }
+
+    private void assertAnalyzer(NiFiProvenanceEventAnalyzer analyzer) {
+        assertNotNull(analyzer);
+        assertEquals(AwsS3Directory.class, analyzer.getClass());
+    }
+
+    private void assertAnalysisResult(DataSetRefs refs, String directory) {
+        String expectedDirectoryQualifiedName = String.format("s3a://%s%s@%s", AWS_BUCKET, directory, ATLAS_NAMESPACE);
+        String expectedBucketQualifiedName = String.format("s3a://%s@%s", AWS_BUCKET, ATLAS_NAMESPACE);
+
+        assertEquals(0, refs.getInputs().size());
+        assertEquals(1, refs.getOutputs().size());
+
+        Referenceable directoryRef = refs.getOutputs().iterator().next();
+
+        assertEquals("aws_s3_pseudo_dir", directoryRef.getTypeName());
+        assertEquals(expectedDirectoryQualifiedName, directoryRef.get(ATTR_QUALIFIED_NAME));
+        assertEquals(directory, directoryRef.get(ATTR_NAME));
+        assertEquals(directory, directoryRef.get("objectPrefix"));
+
+        Referenceable bucketRef = (Referenceable) directoryRef.get("bucket");
+        assertNotNull(bucketRef);
+        assertEquals("aws_s3_bucket", bucketRef.getTypeName());
+        assertEquals(expectedBucketQualifiedName, bucketRef.get(ATTR_QUALIFIED_NAME));
+        assertEquals(AWS_BUCKET, bucketRef.get(ATTR_NAME));
+    }
+}