You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2021/02/25 14:37:34 UTC
[nifi] branch main updated: NIFI-8030: Added property to
ReportLineageToAtlas to configure file/directory level logging of
[hd]fs_path entities to Atlas
This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 93b1a05 NIFI-8030: Added property to ReportLineageToAtlas to configure file/directory level logging of [hd]fs_path entities to Atlas
93b1a05 is described below
commit 93b1a05dc3e98e2271ddc65af4b7ea969abd1462
Author: Peter Turcsanyi <tu...@apache.org>
AuthorDate: Mon Feb 8 20:29:10 2021 +0100
NIFI-8030: Added property to ReportLineageToAtlas to configure file/directory level logging of [hd]fs_path entities to Atlas
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #4813.
---
.../nifi/atlas/provenance/AnalysisContext.java | 1 +
...lysisContext.java => FilesystemPathsLevel.java} | 25 +++++------
.../atlas/provenance/StandardAnalysisContext.java | 9 +++-
.../analyzer/AbstractFileSystemPathAnalyzer.java | 42 +++++++++++++++++
.../nifi/atlas/provenance/analyzer/FilePath.java | 18 ++++----
.../nifi/atlas/provenance/analyzer/HDFSPath.java | 13 +++---
.../nifi/atlas/reporting/ReportLineageToAtlas.java | 22 ++++++++-
.../{TestHDFSPath.java => TestFilePath.java} | 52 +++++++++++++++++-----
.../atlas/provenance/analyzer/TestHDFSPath.java | 41 +++++++++++++++--
9 files changed, 178 insertions(+), 45 deletions(-)
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AnalysisContext.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AnalysisContext.java
index f592710..2b1e8cc 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AnalysisContext.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AnalysisContext.java
@@ -32,4 +32,5 @@ public interface AnalysisContext {
ComputeLineageResult findParents(long eventId);
ProvenanceEventRecord getProvenanceEvent(long eventId);
String getAwsS3ModelVersion();
+ FilesystemPathsLevel getFilesystemPathsLevel();
}
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AnalysisContext.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/FilesystemPathsLevel.java
similarity index 54%
copy from nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AnalysisContext.java
copy to nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/FilesystemPathsLevel.java
index f592710..82842aa 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AnalysisContext.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/FilesystemPathsLevel.java
@@ -16,20 +16,17 @@
*/
package org.apache.nifi.atlas.provenance;
-import org.apache.nifi.atlas.resolver.NamespaceResolver;
-import org.apache.nifi.controller.status.ConnectionStatus;
-import org.apache.nifi.provenance.ProvenanceEventRecord;
-import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+public enum FilesystemPathsLevel {
+ FILE("File"),
+ DIRECTORY("Directory");
-import java.util.List;
+ private final String displayName;
-public interface AnalysisContext {
- String getNiFiNamespace();
- NamespaceResolver getNamespaceResolver();
- List<ConnectionStatus> findConnectionTo(String componentId);
- List<ConnectionStatus> findConnectionFrom(String componentId);
- ComputeLineageResult queryLineage(long eventId);
- ComputeLineageResult findParents(long eventId);
- ProvenanceEventRecord getProvenanceEvent(long eventId);
- String getAwsS3ModelVersion();
+ FilesystemPathsLevel(String displayName) {
+ this.displayName = displayName;
+ }
+
+ public String getDisplayName() {
+ return displayName;
+ }
}
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java
index f889650..57a04c8 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java
@@ -37,13 +37,15 @@ public class StandardAnalysisContext implements AnalysisContext {
private final NamespaceResolver namespaceResolver;
private final ProvenanceRepository provenanceRepository;
private final String awsS3ModelVersion;
+ private final FilesystemPathsLevel filesystemPathsLevel;
public StandardAnalysisContext(NiFiFlow nifiFlow, NamespaceResolver namespaceResolver,
- ProvenanceRepository provenanceRepository, String awsS3ModelVersion) {
+ ProvenanceRepository provenanceRepository, String awsS3ModelVersion, FilesystemPathsLevel filesystemPathsLevel) {
this.nifiFlow = nifiFlow;
this.namespaceResolver = namespaceResolver;
this.provenanceRepository = provenanceRepository;
this.awsS3ModelVersion = awsS3ModelVersion;
+ this.filesystemPathsLevel = filesystemPathsLevel;
}
@Override
@@ -107,4 +109,9 @@ public class StandardAnalysisContext implements AnalysisContext {
public String getAwsS3ModelVersion() {
return awsS3ModelVersion;
}
+
+ @Override
+ public FilesystemPathsLevel getFilesystemPathsLevel() {
+ return filesystemPathsLevel;
+ }
}
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AbstractFileSystemPathAnalyzer.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AbstractFileSystemPathAnalyzer.java
new file mode 100644
index 0000000..3cee08f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AbstractFileSystemPathAnalyzer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.analyzer;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.FilesystemPathsLevel;
+
+import java.net.URI;
+
+public abstract class AbstractFileSystemPathAnalyzer extends AbstractNiFiProvenanceEventAnalyzer {
+
+ private static final String PATH_SEPARATOR = "/";
+
+ public String getPath(AnalysisContext context, URI uri) {
+ String path;
+
+ if (context.getFilesystemPathsLevel() == FilesystemPathsLevel.DIRECTORY) {
+ final String dirPath = StringUtils.substringBeforeLast(uri.getPath(), PATH_SEPARATOR);
+ path = dirPath.isEmpty() ? PATH_SEPARATOR : dirPath;
+ } else {
+ path = uri.getPath();
+ }
+
+ return path;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/FilePath.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/FilePath.java
index 59cb2d9..dae33df 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/FilePath.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/FilePath.java
@@ -17,11 +17,10 @@
package org.apache.nifi.atlas.provenance.analyzer;
import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
+import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.provenance.ProvenanceEventRecord;
-import org.apache.nifi.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,13 +34,13 @@ import static org.apache.nifi.atlas.NiFiTypes.ATTR_PATH;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
/**
- * Analyze a transit URI as a file system path.
- * <li>qualifiedName=/path/fileName@hostname (example: /tmp/dir/filename.txt@host.example.com)
- * <li>name=/path/fileName (example: /tmp/dir/filename.txt)
+ * Analyze a transit URI as a file system path. Return file or directory path depending on FilesystemPathsLevel setting.
+ * <li>qualifiedName=/path[/fileName]@namespace (example: /tmp/dir[/filename.txt]@ns1)
+ * <li>name=/path[/fileName] (example: /tmp/dir[/filename.txt])
*/
-public class FilePath extends AbstractNiFiProvenanceEventAnalyzer {
+public class FilePath extends AbstractFileSystemPathAnalyzer {
- private static final Logger logger = LoggerFactory.getLogger(FilePath.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(FilePath.class);
private static final String TYPE = "fs_path";
@@ -56,11 +55,12 @@ public class FilePath extends AbstractNiFiProvenanceEventAnalyzer {
final String hostname = StringUtils.isEmpty(uriHost) ? InetAddress.getLocalHost().getHostName() : uriHost;
namespace = context.getNamespaceResolver().fromHostNames(hostname);
} catch (UnknownHostException e) {
- logger.warn("Failed to get localhost name due to " + e, e);
+ LOGGER.warn("Failed to get localhost name due to " + e, e);
return null;
}
- final String path = uri.getPath();
+ final String path = getPath(context, uri);
+
ref.set(ATTR_NAME, path);
ref.set(ATTR_PATH, path);
ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(namespace, path));
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HDFSPath.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HDFSPath.java
index 6b5cf0b..1ced52b 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HDFSPath.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HDFSPath.java
@@ -17,7 +17,6 @@
package org.apache.nifi.atlas.provenance.analyzer;
import org.apache.atlas.v1.model.instance.Referenceable;
-import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.provenance.ProvenanceEventRecord;
@@ -31,11 +30,11 @@ import static org.apache.nifi.atlas.NiFiTypes.ATTR_PATH;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
/**
- * Analyze a transit URI as a HDFS path.
- * <li>qualifiedName=/path/fileName@namespace (example: /app/warehouse/hive/db/default@ns1)
- * <li>name=/path/fileName (example: /app/warehouse/hive/db/default)
+ * Analyze a transit URI as a HDFS path. Return file or directory path depending on FilesystemPathsLevel setting.
+ * <li>qualifiedName=/path[/fileName]@namespace (example: /app/warehouse/hive/db/default[/datafile]@ns1)
+ * <li>name=/path[/fileName] (example: /app/warehouse/hive/db/default[/datafile])
*/
-public class HDFSPath extends AbstractNiFiProvenanceEventAnalyzer {
+public class HDFSPath extends AbstractFileSystemPathAnalyzer {
private static final String TYPE = "hdfs_path";
@@ -44,7 +43,9 @@ public class HDFSPath extends AbstractNiFiProvenanceEventAnalyzer {
final Referenceable ref = new Referenceable(TYPE);
final URI uri = parseUri(event.getTransitUri());
final String namespace = context.getNamespaceResolver().fromHostNames(uri.getHost());
- final String path = uri.getPath();
+
+ final String path = getPath(context, uri);
+
ref.set(ATTR_NAME, path);
ref.set(ATTR_PATH, path);
// The attribute 'clusterName' is in the 'hdfs_path' Atlas entity so it cannot be changed.
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
index c565386..72c7599 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
@@ -41,6 +41,7 @@ import org.apache.nifi.atlas.NiFiFlow;
import org.apache.nifi.atlas.NiFiFlowAnalyzer;
import org.apache.nifi.atlas.hook.NiFiAtlasHook;
import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.FilesystemPathsLevel;
import org.apache.nifi.atlas.provenance.StandardAnalysisContext;
import org.apache.nifi.atlas.provenance.lineage.CompleteFlowPathLineage;
import org.apache.nifi.atlas.provenance.lineage.LineageStrategy;
@@ -343,6 +344,23 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
.defaultValue(AWS_S3_MODEL_VERSION_V2.getValue())
.build();
+ static final AllowableValue FILESYSTEM_PATHS_LEVEL_FILE = new AllowableValue(FilesystemPathsLevel.FILE.name(), FilesystemPathsLevel.FILE.getDisplayName(),
+ "Creates File level paths.");
+ static final AllowableValue FILESYSTEM_PATHS_LEVEL_DIRECTORY = new AllowableValue(FilesystemPathsLevel.DIRECTORY.name(), FilesystemPathsLevel.DIRECTORY.getDisplayName(),
+ "Creates Directory level paths.");
+
+ static final PropertyDescriptor FILESYSTEM_PATHS_LEVEL = new PropertyDescriptor.Builder()
+ .name("filesystem-paths-level")
+ .displayName("Filesystem Path Entities Level")
+ .description("Specifies how the filesystem path entities (fs_path and hdfs_path) will be logged in Atlas: File or Directory level. In case of File level, each individual file entity " +
+ "will be sent to Atlas as a separate entity with the full path including the filename. Directory level only logs the path of the parent directory without the filename. " +
+ "This setting affects processors working with files, like GetFile or PutHDFS. NOTE: Although the default value is File level for backward compatibility reasons, " +
+ "it is highly recommended to set it to Directory level because File level logging can generate a huge number of entities in Atlas.")
+ .required(true)
+ .allowableValues(FILESYSTEM_PATHS_LEVEL_FILE, FILESYSTEM_PATHS_LEVEL_DIRECTORY)
+ .defaultValue(FILESYSTEM_PATHS_LEVEL_FILE.getValue())
+ .build();
+
private static final String ATLAS_PROPERTIES_FILENAME = "atlas-application.properties";
private static final String ATLAS_PROPERTY_CLIENT_CONNECT_TIMEOUT_MS = "atlas.client.connectTimeoutMSecs";
private static final String ATLAS_PROPERTY_CLIENT_READ_TIMEOUT_MS = "atlas.client.readTimeoutMSecs";
@@ -405,6 +423,7 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
// Provenance event analyzer specific properties
properties.add(AWS_S3_MODEL_VERSION);
+ properties.add(FILESYSTEM_PATHS_LEVEL);
return properties;
}
@@ -868,9 +887,10 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
private void consumeNiFiProvenanceEvents(ReportingContext context, NiFiFlow nifiFlow) {
final EventAccess eventAccess = context.getEventAccess();
final String awsS3ModelVersion = context.getProperty(AWS_S3_MODEL_VERSION).getValue();
+ final FilesystemPathsLevel filesystemPathsLevel = FilesystemPathsLevel.valueOf(context.getProperty(FILESYSTEM_PATHS_LEVEL).getValue());
final AnalysisContext analysisContext = new StandardAnalysisContext(nifiFlow, namespaceResolvers,
// FIXME: This class cast shouldn't be necessary to query lineage. Possible refactor target in next major update.
- (ProvenanceRepository)eventAccess.getProvenanceRepository(), awsS3ModelVersion);
+ (ProvenanceRepository)eventAccess.getProvenanceRepository(), awsS3ModelVersion, filesystemPathsLevel);
consumer.consumeEvents(context, (componentMapHolder, events) -> {
for (ProvenanceEventRecord event : events) {
try {
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHDFSPath.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestFilePath.java
similarity index 56%
copy from nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHDFSPath.java
copy to nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestFilePath.java
index 6edc09f..485f761 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHDFSPath.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestFilePath.java
@@ -19,6 +19,7 @@ package org.apache.nifi.atlas.provenance.analyzer;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.atlas.provenance.FilesystemPathsLevel;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory;
import org.apache.nifi.atlas.resolver.NamespaceResolvers;
@@ -31,36 +32,67 @@ import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
-import static org.mockito.ArgumentMatchers.matches;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.when;
-public class TestHDFSPath {
+public class TestFilePath {
@Test
- public void testHDFSPath() {
- final String processorName = "PutHDFS";
- // TODO: what if with HA namenode?
- final String transitUri = "hdfs://0.example.com:8020/user/nifi/fileA";
+ public void testFilePathWithFileLevel() {
+ final String transitUri = "file:/user/nifi/fileA";
+ final FilesystemPathsLevel filesystemPathsLevel = FilesystemPathsLevel.FILE;
+ final String expectedPath = "/user/nifi/fileA";
+ testFilePath(transitUri, filesystemPathsLevel, expectedPath);
+ }
+
+ @Test
+ public void testFilePathWithDirectoryLevel() {
+ final String transitUri = "file:/user/nifi/fileA";
+ final FilesystemPathsLevel filesystemPathsLevel = FilesystemPathsLevel.DIRECTORY;
+ final String expectedPath = "/user/nifi";
+ testFilePath(transitUri, filesystemPathsLevel, expectedPath);
+ }
+
+ @Test
+ public void testFilePathRootDirWithFileLevel() {
+ final String transitUri = "file:/fileA";
+ final FilesystemPathsLevel filesystemPathsLevel = FilesystemPathsLevel.FILE;
+ final String expectedPath = "/fileA";
+ testFilePath(transitUri, filesystemPathsLevel, expectedPath);
+ }
+
+ @Test
+ public void testFilePathRootDirWithDirectoryLevel() {
+ final String transitUri = "file:/fileA";
+ final FilesystemPathsLevel filesystemPathsLevel = FilesystemPathsLevel.DIRECTORY;
+ final String expectedPath = "/";
+ testFilePath(transitUri, filesystemPathsLevel, expectedPath);
+ }
+
+ private void testFilePath(String transitUri, FilesystemPathsLevel filesystemPathsLevel, String expectedPath) {
+ final String processorName = "PutFile";
final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class);
when(record.getComponentType()).thenReturn(processorName);
when(record.getTransitUri()).thenReturn(transitUri);
when(record.getEventType()).thenReturn(ProvenanceEventType.SEND);
final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class);
- when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1");
+ when(namespaceResolvers.fromHostNames(anyString())).thenReturn("namespace1");
final AnalysisContext context = Mockito.mock(AnalysisContext.class);
when(context.getNamespaceResolver()).thenReturn(namespaceResolvers);
+ when(context.getFilesystemPathsLevel()).thenReturn(filesystemPathsLevel);
final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType());
assertNotNull(analyzer);
+ assertEquals(FilePath.class, analyzer.getClass());
final DataSetRefs refs = analyzer.analyze(context, record);
assertEquals(0, refs.getInputs().size());
assertEquals(1, refs.getOutputs().size());
Referenceable ref = refs.getOutputs().iterator().next();
- assertEquals("hdfs_path", ref.getTypeName());
- assertEquals("/user/nifi/fileA", ref.get(ATTR_NAME));
- assertEquals("/user/nifi/fileA@namespace1", ref.get(ATTR_QUALIFIED_NAME));
+ assertEquals("fs_path", ref.getTypeName());
+ assertEquals(expectedPath, ref.get(ATTR_NAME));
+ assertEquals(expectedPath + "@namespace1", ref.get(ATTR_QUALIFIED_NAME));
}
}
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHDFSPath.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHDFSPath.java
index 6edc09f..d8ae13d 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHDFSPath.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHDFSPath.java
@@ -19,6 +19,7 @@ package org.apache.nifi.atlas.provenance.analyzer;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.atlas.provenance.FilesystemPathsLevel;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzerFactory;
import org.apache.nifi.atlas.resolver.NamespaceResolvers;
@@ -37,10 +38,40 @@ import static org.mockito.Mockito.when;
public class TestHDFSPath {
@Test
- public void testHDFSPath() {
- final String processorName = "PutHDFS";
+ public void testHDFSPathWithFileLevel() {
// TODO: what if with HA namenode?
final String transitUri = "hdfs://0.example.com:8020/user/nifi/fileA";
+ final FilesystemPathsLevel filesystemPathsLevel = FilesystemPathsLevel.FILE;
+ final String expectedPath = "/user/nifi/fileA";
+ testHDFSPath(transitUri, filesystemPathsLevel, expectedPath);
+ }
+
+ @Test
+ public void testHDFSPathWithDirectoryLevel() {
+ final String transitUri = "hdfs://0.example.com:8020/user/nifi/fileA";
+ final FilesystemPathsLevel filesystemPathsLevel = FilesystemPathsLevel.DIRECTORY;
+ final String expectedPath = "/user/nifi";
+ testHDFSPath(transitUri, filesystemPathsLevel, expectedPath);
+ }
+
+ @Test
+ public void testHDFSPathRootDirWithFileLevel() {
+ final String transitUri = "hdfs://0.example.com:8020/fileA";
+ final FilesystemPathsLevel filesystemPathsLevel = FilesystemPathsLevel.FILE;
+ final String expectedPath = "/fileA";
+ testHDFSPath(transitUri, filesystemPathsLevel, expectedPath);
+ }
+
+ @Test
+ public void testHDFSPathRootDirWithDirectoryLevel() {
+ final String transitUri = "hdfs://0.example.com:8020/fileA";
+ final FilesystemPathsLevel filesystemPathsLevel = FilesystemPathsLevel.DIRECTORY;
+ final String expectedPath = "/";
+ testHDFSPath(transitUri, filesystemPathsLevel, expectedPath);
+ }
+
+ private void testHDFSPath(String transitUri, FilesystemPathsLevel filesystemPathsLevel, String expectedPath) {
+ final String processorName = "PutHDFS";
final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class);
when(record.getComponentType()).thenReturn(processorName);
when(record.getTransitUri()).thenReturn(transitUri);
@@ -51,16 +82,18 @@ public class TestHDFSPath {
final AnalysisContext context = Mockito.mock(AnalysisContext.class);
when(context.getNamespaceResolver()).thenReturn(namespaceResolvers);
+ when(context.getFilesystemPathsLevel()).thenReturn(filesystemPathsLevel);
final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType());
assertNotNull(analyzer);
+ assertEquals(HDFSPath.class, analyzer.getClass());
final DataSetRefs refs = analyzer.analyze(context, record);
assertEquals(0, refs.getInputs().size());
assertEquals(1, refs.getOutputs().size());
Referenceable ref = refs.getOutputs().iterator().next();
assertEquals("hdfs_path", ref.getTypeName());
- assertEquals("/user/nifi/fileA", ref.get(ATTR_NAME));
- assertEquals("/user/nifi/fileA@namespace1", ref.get(ATTR_QUALIFIED_NAME));
+ assertEquals(expectedPath, ref.get(ATTR_NAME));
+ assertEquals(expectedPath + "@namespace1", ref.get(ATTR_QUALIFIED_NAME));
}
}