You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2021/02/25 14:37:34 UTC

[nifi] branch main updated: NIFI-8030: Added property to ReportLineageToAtlas to configure file/directory level logging of [hd]fs_path entities to Atlas

This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 93b1a05  NIFI-8030: Added property to ReportLineageToAtlas to configure file/directory level logging of [hd]fs_path entities to Atlas
93b1a05 is described below

commit 93b1a05dc3e98e2271ddc65af4b7ea969abd1462
Author: Peter Turcsanyi <tu...@apache.org>
AuthorDate: Mon Feb 8 20:29:10 2021 +0100

    NIFI-8030: Added property to ReportLineageToAtlas to configure file/directory level logging of [hd]fs_path entities to Atlas
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #4813.
---
 .../nifi/atlas/provenance/AnalysisContext.java     |  1 +
 ...lysisContext.java => FilesystemPathsLevel.java} | 25 +++++------
 .../atlas/provenance/StandardAnalysisContext.java  |  9 +++-
 .../analyzer/AbstractFileSystemPathAnalyzer.java   | 42 +++++++++++++++++
 .../nifi/atlas/provenance/analyzer/FilePath.java   | 18 ++++----
 .../nifi/atlas/provenance/analyzer/HDFSPath.java   | 13 +++---
 .../nifi/atlas/reporting/ReportLineageToAtlas.java | 22 ++++++++-
 .../{TestHDFSPath.java => TestFilePath.java}       | 52 +++++++++++++++++-----
 .../atlas/provenance/analyzer/TestHDFSPath.java    | 41 +++++++++++++++--
 9 files changed, 178 insertions(+), 45 deletions(-)

diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AnalysisContext.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AnalysisContext.java
index f592710..2b1e8cc 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AnalysisContext.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AnalysisContext.java
@@ -32,4 +32,5 @@ public interface AnalysisContext {
     ComputeLineageResult findParents(long eventId);
     ProvenanceEventRecord getProvenanceEvent(long eventId);
     String getAwsS3ModelVersion();
+    FilesystemPathsLevel getFilesystemPathsLevel();
 }
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AnalysisContext.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/FilesystemPathsLevel.java
similarity index 54%
copy from nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AnalysisContext.java
copy to nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/FilesystemPathsLevel.java
index f592710..82842aa 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AnalysisContext.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/FilesystemPathsLevel.java
@@ -16,20 +16,17 @@
  */
 package org.apache.nifi.atlas.provenance;
 
-import org.apache.nifi.atlas.resolver.NamespaceResolver;
-import org.apache.nifi.controller.status.ConnectionStatus;
-import org.apache.nifi.provenance.ProvenanceEventRecord;
-import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+public enum FilesystemPathsLevel {
+    FILE("File"),
+    DIRECTORY("Directory");
 
-import java.util.List;
+    private final String displayName;
 
-public interface AnalysisContext {
-    String getNiFiNamespace();
-    NamespaceResolver getNamespaceResolver();
-    List<ConnectionStatus> findConnectionTo(String componentId);
-    List<ConnectionStatus> findConnectionFrom(String componentId);
-    ComputeLineageResult queryLineage(long eventId);
-    ComputeLineageResult findParents(long eventId);
-    ProvenanceEventRecord getProvenanceEvent(long eventId);
-    String getAwsS3ModelVersion();
+    FilesystemPathsLevel(String displayName) {
+        this.displayName = displayName;
+    }
+
+    public String getDisplayName() {
+        return displayName;
+    }
 }
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java
index f889650..57a04c8 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java
@@ -37,13 +37,15 @@ public class StandardAnalysisContext implements AnalysisContext {
     private final NamespaceResolver namespaceResolver;
     private final ProvenanceRepository provenanceRepository;
     private final String awsS3ModelVersion;
+    private final FilesystemPathsLevel filesystemPathsLevel;
 
     public StandardAnalysisContext(NiFiFlow nifiFlow, NamespaceResolver namespaceResolver,
-                                   ProvenanceRepository provenanceRepository, String awsS3ModelVersion) {
+                                   ProvenanceRepository provenanceRepository, String awsS3ModelVersion, FilesystemPathsLevel filesystemPathsLevel) {
         this.nifiFlow = nifiFlow;
         this.namespaceResolver = namespaceResolver;
         this.provenanceRepository = provenanceRepository;
         this.awsS3ModelVersion = awsS3ModelVersion;
+        this.filesystemPathsLevel = filesystemPathsLevel;
     }
 
     @Override
@@ -107,4 +109,9 @@ public class StandardAnalysisContext implements AnalysisContext {
     public String getAwsS3ModelVersion() {
         return awsS3ModelVersion;
     }
+
+    @Override
+    public FilesystemPathsLevel getFilesystemPathsLevel() {
+        return filesystemPathsLevel;
+    }
 }
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AbstractFileSystemPathAnalyzer.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AbstractFileSystemPathAnalyzer.java
new file mode 100644
index 0000000..3cee08f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AbstractFileSystemPathAnalyzer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.analyzer;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.FilesystemPathsLevel;
+
+import java.net.URI;
+
+public abstract class AbstractFileSystemPathAnalyzer extends AbstractNiFiProvenanceEventAnalyzer {
+
+    private static final String PATH_SEPARATOR = "/";
+
+    public String getPath(AnalysisContext context, URI uri) {
+        String path;
+
+        if (context.getFilesystemPathsLevel() == FilesystemPathsLevel.DIRECTORY) {
+            final String dirPath = StringUtils.substringBeforeLast(uri.getPath(), PATH_SEPARATOR);
+            path = dirPath.isEmpty() ? PATH_SEPARATOR : dirPath;
+        } else {
+            path = uri.getPath();
+        }
+
+        return path;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/FilePath.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/FilePath.java
index 59cb2d9..dae33df 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/FilePath.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/FilePath.java
@@ -17,11 +17,10 @@
 package org.apache.nifi.atlas.provenance.analyzer;
 
 import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.atlas.provenance.AnalysisContext;
 import org.apache.nifi.atlas.provenance.DataSetRefs;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
-import org.apache.nifi.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,13 +34,13 @@ import static org.apache.nifi.atlas.NiFiTypes.ATTR_PATH;
 import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
 
 /**
- * Analyze a transit URI as a file system path.
- * <li>qualifiedName=/path/fileName@hostname (example: /tmp/dir/filename.txt@host.example.com)
- * <li>name=/path/fileName (example: /tmp/dir/filename.txt)
+ * Analyze a transit URI as a file system path. Return file or directory path depending on FilesystemPathsLevel setting.
+ * <li>qualifiedName=/path[/fileName]@namespace (example: /tmp/dir[/filename.txt]@ns1)
+ * <li>name=/path[/fileName] (example: /tmp/dir[/filename.txt])
  */
-public class FilePath extends AbstractNiFiProvenanceEventAnalyzer {
+public class FilePath extends AbstractFileSystemPathAnalyzer {
 
-    private static final Logger logger = LoggerFactory.getLogger(FilePath.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(FilePath.class);
 
     private static final String TYPE = "fs_path";
 
@@ -56,11 +55,12 @@ public class FilePath extends AbstractNiFiProvenanceEventAnalyzer {
             final String hostname = StringUtils.isEmpty(uriHost) ? InetAddress.getLocalHost().getHostName() : uriHost;
             namespace = context.getNamespaceResolver().fromHostNames(hostname);
         } catch (UnknownHostException e) {
-            logger.warn("Failed to get localhost name due to " + e, e);
+            LOGGER.warn("Failed to get localhost name due to " + e, e);
             return null;
         }
 
-        final String path = uri.getPath();
+        final String path = getPath(context, uri);
+
         ref.set(ATTR_NAME, path);
         ref.set(ATTR_PATH, path);
         ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(namespace, path));
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HDFSPath.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HDFSPath.java
index 6b5cf0b..1ced52b 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HDFSPath.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HDFSPath.java
@@ -17,7 +17,6 @@
 package org.apache.nifi.atlas.provenance.analyzer;
 
 import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
 import org.apache.nifi.atlas.provenance.AnalysisContext;
 import org.apache.nifi.atlas.provenance.DataSetRefs;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
@@ -31,11 +30,11 @@ import static org.apache.nifi.atlas.NiFiTypes.ATTR_PATH;
 import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
 
 /**
- * Analyze a transit URI as a HDFS path.
- * <li>qualifiedName=/path/fileName@namespace (example: /app/warehouse/hive/db/default@ns1)
- * <li>name=/path/fileName (example: /app/warehouse/hive/db/default)
+ * Analyze a transit URI as a HDFS path. Return file or directory path depending on FilesystemPathsLevel setting.
+ * <li>qualifiedName=/path[/fileName]@namespace (example: /app/warehouse/hive/db/default[/datafile]@ns1)
+ * <li>name=/path[/fileName] (example: /app/warehouse/hive/db/default[/datafile])
  */
-public class HDFSPath extends AbstractNiFiProvenanceEventAnalyzer {
+public class HDFSPath extends AbstractFileSystemPathAnalyzer {
 
     private static final String TYPE = "hdfs_path";
 
@@ -44,7 +43,9 @@ public class HDFSPath extends AbstractNiFiProvenanceEventAnalyzer {
         final Referenceable ref = new Referenceable(TYPE);
         final URI uri = parseUri(event.getTransitUri());
         final String namespace = context.getNamespaceResolver().fromHostNames(uri.getHost());
-        final String path = uri.getPath();
+
+        final String path = getPath(context, uri);
+
         ref.set(ATTR_NAME, path);
         ref.set(ATTR_PATH, path);
         // The attribute 'clusterName' is in the 'hdfs_path' Atlas entity so it cannot be changed.
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
index c565386..72c7599 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
@@ -41,6 +41,7 @@ import org.apache.nifi.atlas.NiFiFlow;
 import org.apache.nifi.atlas.NiFiFlowAnalyzer;
 import org.apache.nifi.atlas.hook.NiFiAtlasHook;
 import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.FilesystemPathsLevel;
 import org.apache.nifi.atlas.provenance.StandardAnalysisContext;
 import org.apache.nifi.atlas.provenance.lineage.CompleteFlowPathLineage;
 import org.apache.nifi.atlas.provenance.lineage.LineageStrategy;
@@ -343,6 +344,23 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
             .defaultValue(AWS_S3_MODEL_VERSION_V2.getValue())
             .build();
 
+    static final AllowableValue FILESYSTEM_PATHS_LEVEL_FILE = new AllowableValue(FilesystemPathsLevel.FILE.name(), FilesystemPathsLevel.FILE.getDisplayName(),
+            "Creates File level paths.");
+    static final AllowableValue FILESYSTEM_PATHS_LEVEL_DIRECTORY = new AllowableValue(FilesystemPathsLevel.DIRECTORY.name(), FilesystemPathsLevel.DIRECTORY.getDisplayName(),
+            "Creates Directory level paths.");
+
+    static final PropertyDescriptor FILESYSTEM_PATHS_LEVEL = new PropertyDescriptor.Builder()
+            .name("filesystem-paths-level")
+            .displayName("Filesystem Path Entities Level")
+            .description("Specifies how the filesystem path entities (fs_path and hdfs_path) will be logged in Atlas: File or Directory level. In case of File level, each individual file entity " +
+                    "will be sent to Atlas as a separate entity with the full path including the filename. Directory level only logs the path of the parent directory without the filename. " +
+                    "This setting affects processors working with files, like GetFile or PutHDFS. NOTE: Although the default value is File level for backward compatibility reasons, " +
+                    "it is highly recommended to set it to Directory level because File level logging can generate a huge number of entities in Atlas.")
+            .required(true)
+            .allowableValues(FILESYSTEM_PATHS_LEVEL_FILE, FILESYSTEM_PATHS_LEVEL_DIRECTORY)
+            .defaultValue(FILESYSTEM_PATHS_LEVEL_FILE.getValue())
+            .build();
+
     private static final String ATLAS_PROPERTIES_FILENAME = "atlas-application.properties";
     private static final String ATLAS_PROPERTY_CLIENT_CONNECT_TIMEOUT_MS = "atlas.client.connectTimeoutMSecs";
     private static final String ATLAS_PROPERTY_CLIENT_READ_TIMEOUT_MS = "atlas.client.readTimeoutMSecs";
@@ -405,6 +423,7 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
 
         // Provenance event analyzer specific properties
         properties.add(AWS_S3_MODEL_VERSION);
+        properties.add(FILESYSTEM_PATHS_LEVEL);
 
         return properties;
     }
@@ -868,9 +887,10 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
     private void consumeNiFiProvenanceEvents(ReportingContext context, NiFiFlow nifiFlow) {
         final EventAccess eventAccess = context.getEventAccess();
         final String awsS3ModelVersion = context.getProperty(AWS_S3_MODEL_VERSION).getValue();
+        final FilesystemPathsLevel filesystemPathsLevel = FilesystemPathsLevel.valueOf(context.getProperty(FILESYSTEM_PATHS_LEVEL).getValue());
         final AnalysisContext analysisContext = new StandardAnalysisContext(nifiFlow, namespaceResolvers,
                 // FIXME: This class cast shouldn't be necessary to query lineage. Possible refactor target in next major update.
-                (ProvenanceRepository)eventAccess.getProvenanceRepository(), awsS3ModelVersion);
+                (ProvenanceRepository)eventAccess.getProvenanceRepository(), awsS3ModelVersion, filesystemPathsLevel);
         consumer.consumeEvents(context, (componentMapHolder, events) -> {
             for (ProvenanceEventRecord event : events) {
                 try {
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHDFSPath.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestFilePath.java
similarity index 56%
copy from nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHDFSPath.java
copy to nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestFilePath.java
index 6edc09f..485f761 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHDFSPath.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestFilePath.java
@@ -19,6 +19,7 @@ package org.apache.nifi.atlas.provenance.analyzer;
 import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.nifi.atlas.provenance.AnalysisContext;
 import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.atlas.provenance.FilesystemPathsLevel;
 import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
 import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory;
 import org.apache.nifi.atlas.resolver.NamespaceResolvers;
@@ -31,36 +32,67 @@ import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
 import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.mockito.ArgumentMatchers.matches;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.when;
 
-public class TestHDFSPath {
+public class TestFilePath {
 
     @Test
-    public void testHDFSPath() {
-        final String processorName = "PutHDFS";
-        // TODO: what if with HA namenode?
-        final String transitUri = "hdfs://0.example.com:8020/user/nifi/fileA";
+    public void testFilePathWithFileLevel() {
+        final String transitUri = "file:/user/nifi/fileA";
+        final FilesystemPathsLevel filesystemPathsLevel = FilesystemPathsLevel.FILE;
+        final String expectedPath = "/user/nifi/fileA";
+        testFilePath(transitUri, filesystemPathsLevel, expectedPath);
+    }
+
+    @Test
+    public void testFilePathWithDirectoryLevel() {
+        final String transitUri = "file:/user/nifi/fileA";
+        final FilesystemPathsLevel filesystemPathsLevel = FilesystemPathsLevel.DIRECTORY;
+        final String expectedPath = "/user/nifi";
+        testFilePath(transitUri, filesystemPathsLevel, expectedPath);
+    }
+
+    @Test
+    public void testFilePathRootDirWithFileLevel() {
+        final String transitUri = "file:/fileA";
+        final FilesystemPathsLevel filesystemPathsLevel = FilesystemPathsLevel.FILE;
+        final String expectedPath = "/fileA";
+        testFilePath(transitUri, filesystemPathsLevel, expectedPath);
+    }
+
+    @Test
+    public void testFilePathRootDirWithDirectoryLevel() {
+        final String transitUri = "file:/fileA";
+        final FilesystemPathsLevel filesystemPathsLevel = FilesystemPathsLevel.DIRECTORY;
+        final String expectedPath = "/";
+        testFilePath(transitUri, filesystemPathsLevel, expectedPath);
+    }
+
+    private void testFilePath(String transitUri, FilesystemPathsLevel filesystemPathsLevel, String expectedPath) {
+        final String processorName = "PutFile";
         final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class);
         when(record.getComponentType()).thenReturn(processorName);
         when(record.getTransitUri()).thenReturn(transitUri);
         when(record.getEventType()).thenReturn(ProvenanceEventType.SEND);
 
         final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class);
-        when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1");
+        when(namespaceResolvers.fromHostNames(anyString())).thenReturn("namespace1");
 
         final AnalysisContext context = Mockito.mock(AnalysisContext.class);
         when(context.getNamespaceResolver()).thenReturn(namespaceResolvers);
+        when(context.getFilesystemPathsLevel()).thenReturn(filesystemPathsLevel);
 
         final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType());
         assertNotNull(analyzer);
+        assertEquals(FilePath.class, analyzer.getClass());
 
         final DataSetRefs refs = analyzer.analyze(context, record);
         assertEquals(0, refs.getInputs().size());
         assertEquals(1, refs.getOutputs().size());
         Referenceable ref = refs.getOutputs().iterator().next();
-        assertEquals("hdfs_path", ref.getTypeName());
-        assertEquals("/user/nifi/fileA", ref.get(ATTR_NAME));
-        assertEquals("/user/nifi/fileA@namespace1", ref.get(ATTR_QUALIFIED_NAME));
+        assertEquals("fs_path", ref.getTypeName());
+        assertEquals(expectedPath, ref.get(ATTR_NAME));
+        assertEquals(expectedPath + "@namespace1", ref.get(ATTR_QUALIFIED_NAME));
     }
 }
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHDFSPath.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHDFSPath.java
index 6edc09f..d8ae13d 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHDFSPath.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHDFSPath.java
@@ -19,6 +19,7 @@ package org.apache.nifi.atlas.provenance.analyzer;
 import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.nifi.atlas.provenance.AnalysisContext;
 import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.atlas.provenance.FilesystemPathsLevel;
 import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
 import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory;
 import org.apache.nifi.atlas.resolver.NamespaceResolvers;
@@ -37,10 +38,40 @@ import static org.mockito.Mockito.when;
 public class TestHDFSPath {
 
     @Test
-    public void testHDFSPath() {
-        final String processorName = "PutHDFS";
+    public void testHDFSPathWithFileLevel() {
         // TODO: what if with HA namenode?
         final String transitUri = "hdfs://0.example.com:8020/user/nifi/fileA";
+        final FilesystemPathsLevel filesystemPathsLevel = FilesystemPathsLevel.FILE;
+        final String expectedPath = "/user/nifi/fileA";
+        testHDFSPath(transitUri, filesystemPathsLevel, expectedPath);
+    }
+
+    @Test
+    public void testHDFSPathWithDirectoryLevel() {
+        final String transitUri = "hdfs://0.example.com:8020/user/nifi/fileA";
+        final FilesystemPathsLevel filesystemPathsLevel = FilesystemPathsLevel.DIRECTORY;
+        final String expectedPath = "/user/nifi";
+        testHDFSPath(transitUri, filesystemPathsLevel, expectedPath);
+    }
+
+    @Test
+    public void testHDFSPathRootDirWithFileLevel() {
+        final String transitUri = "hdfs://0.example.com:8020/fileA";
+        final FilesystemPathsLevel filesystemPathsLevel = FilesystemPathsLevel.FILE;
+        final String expectedPath = "/fileA";
+        testHDFSPath(transitUri, filesystemPathsLevel, expectedPath);
+    }
+
+    @Test
+    public void testHDFSPathRootDirWithDirectoryLevel() {
+        final String transitUri = "hdfs://0.example.com:8020/fileA";
+        final FilesystemPathsLevel filesystemPathsLevel = FilesystemPathsLevel.DIRECTORY;
+        final String expectedPath = "/";
+        testHDFSPath(transitUri, filesystemPathsLevel, expectedPath);
+    }
+
+    private void testHDFSPath(String transitUri, FilesystemPathsLevel filesystemPathsLevel, String expectedPath) {
+        final String processorName = "PutHDFS";
         final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class);
         when(record.getComponentType()).thenReturn(processorName);
         when(record.getTransitUri()).thenReturn(transitUri);
@@ -51,16 +82,18 @@ public class TestHDFSPath {
 
         final AnalysisContext context = Mockito.mock(AnalysisContext.class);
         when(context.getNamespaceResolver()).thenReturn(namespaceResolvers);
+        when(context.getFilesystemPathsLevel()).thenReturn(filesystemPathsLevel);
 
         final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType());
         assertNotNull(analyzer);
+        assertEquals(HDFSPath.class, analyzer.getClass());
 
         final DataSetRefs refs = analyzer.analyze(context, record);
         assertEquals(0, refs.getInputs().size());
         assertEquals(1, refs.getOutputs().size());
         Referenceable ref = refs.getOutputs().iterator().next();
         assertEquals("hdfs_path", ref.getTypeName());
-        assertEquals("/user/nifi/fileA", ref.get(ATTR_NAME));
-        assertEquals("/user/nifi/fileA@namespace1", ref.get(ATTR_QUALIFIED_NAME));
+        assertEquals(expectedPath, ref.get(ATTR_NAME));
+        assertEquals(expectedPath + "@namespace1", ref.get(ATTR_QUALIFIED_NAME));
     }
 }