You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2017/11/02 09:11:08 UTC
nifi git commit: NIFI-4544: Improve HDFS processors provenance
transit URL
Repository: nifi
Updated Branches:
refs/heads/master 641333791 -> 77a51e1a9
NIFI-4544: Improve HDFS processors provenance transit URL
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #2238.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/77a51e1a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/77a51e1a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/77a51e1a
Branch: refs/heads/master
Commit: 77a51e1a9ebee2162af48c5bdd5b79703e65b4e2
Parents: 6413337
Author: Koji Kawamura <ij...@apache.org>
Authored: Mon Oct 30 11:35:13 2017 +0900
Committer: Pierre Villard <pi...@gmail.com>
Committed: Thu Nov 2 10:10:03 2017 +0100
----------------------------------------------------------------------
.../nifi/processors/hadoop/AbstractFetchHDFSRecord.java | 8 ++++----
.../nifi/processors/hadoop/AbstractPutHDFSRecord.java | 5 ++---
.../org/apache/nifi/processors/hadoop/FetchHDFS.java | 11 +++++------
.../java/org/apache/nifi/processors/hadoop/GetHDFS.java | 3 +--
.../java/org/apache/nifi/processors/hadoop/PutHDFS.java | 5 ++---
.../org/apache/nifi/processors/hadoop/GetHDFSTest.java | 8 ++++++++
.../org/apache/nifi/processors/hadoop/PutHDFSTest.java | 9 +++++++++
.../org/apache/nifi/processors/hadoop/TestFetchHDFS.java | 8 ++++++++
.../apache/nifi/processors/parquet/PutParquetTest.java | 3 ++-
9 files changed, 41 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/77a51e1a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
index d6a374f..96631ef 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
@@ -20,7 +20,6 @@ import java.io.BufferedOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
-import java.net.URI;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collections;
@@ -232,9 +231,10 @@ public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor {
attributes.put(CoreAttributes.MIME_TYPE.key(), mimeTypeRef.get());
successFlowFile = session.putAllAttributes(successFlowFile, attributes);
- final URI uri = path.toUri();
- getLogger().info("Successfully received content from {} for {} in {} milliseconds", new Object[] {uri, successFlowFile, stopWatch.getDuration()});
- session.getProvenanceReporter().fetch(successFlowFile, uri.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
+
+ final Path qualifiedPath = path.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
+ getLogger().info("Successfully received content from {} for {} in {} milliseconds", new Object[] {qualifiedPath, successFlowFile, stopWatch.getDuration()});
+ session.getProvenanceReporter().fetch(successFlowFile, qualifiedPath.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
session.transfer(successFlowFile, REL_SUCCESS);
session.remove(originalFlowFile);
return null;
http://git-wip-us.apache.org/repos/asf/nifi/blob/77a51e1a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
index e08b4fb..22f61b2 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
@@ -349,7 +349,6 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
putFlowFile = postProcess(context, session, putFlowFile, destFile);
- final String outputPath = destFile.toString();
final String newFilename = destFile.getName();
final String hdfsPath = destFile.getParent().toString();
@@ -361,8 +360,8 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
putFlowFile = session.putAllAttributes(putFlowFile, attributes);
// Send a provenance event and transfer to success
- final String transitUri = (outputPath.startsWith("/")) ? "hdfs:/" + outputPath : "hdfs://" + outputPath;
- session.getProvenanceReporter().send(putFlowFile, transitUri);
+ final Path qualifiedPath = destFile.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
+ session.getProvenanceReporter().send(putFlowFile, qualifiedPath.toString());
session.transfer(putFlowFile, REL_SUCCESS);
} catch (IOException | FlowFileAccessException e) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/77a51e1a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
index 08a8ce2..4237503 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
@@ -46,7 +46,6 @@ import org.apache.nifi.util.StopWatch;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
-import java.net.URI;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.HashSet;
@@ -128,7 +127,6 @@ public class FetchHDFS extends AbstractHadoopProcessor {
return;
}
- final URI uri = path.toUri();
final StopWatch stopWatch = new StopWatch(true);
final FlowFile finalFlowFile = flowFile;
@@ -149,6 +147,7 @@ public class FetchHDFS extends AbstractHadoopProcessor {
}
FlowFile flowFile = finalFlowFile;
+ final Path qualifiedPath = path.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory());
try {
final String outputFilename;
final String originalFilename = path.getName();
@@ -166,16 +165,16 @@ public class FetchHDFS extends AbstractHadoopProcessor {
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), outputFilename);
stopWatch.stop();
- getLogger().info("Successfully received content from {} for {} in {}", new Object[] {uri, flowFile, stopWatch.getDuration()});
- session.getProvenanceReporter().fetch(flowFile, uri.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
+ getLogger().info("Successfully received content from {} for {} in {}", new Object[] {qualifiedPath, flowFile, stopWatch.getDuration()});
+ session.getProvenanceReporter().fetch(flowFile, qualifiedPath.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
} catch (final FileNotFoundException | AccessControlException e) {
- getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {uri, flowFile, e});
+ getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {qualifiedPath, flowFile, e});
flowFile = session.putAttribute(flowFile, "hdfs.failure.reason", e.getMessage());
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
} catch (final IOException e) {
- getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to comms.failure", new Object[] {uri, flowFile, e});
+ getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to comms.failure", new Object[] {qualifiedPath, flowFile, e});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_COMMS_FAILURE);
} finally {
http://git-wip-us.apache.org/repos/asf/nifi/blob/77a51e1a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
index 7357d35..64730c8 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
@@ -381,8 +381,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
continue;
}
- final String transitUri = (originalFilename.startsWith("/")) ? "hdfs:/" + originalFilename : "hdfs://" + originalFilename;
- session.getProvenanceReporter().receive(flowFile, transitUri);
+ session.getProvenanceReporter().receive(flowFile, file.toString());
session.transfer(flowFile, REL_SUCCESS);
getLogger().info("retrieved {} from HDFS {} in {} milliseconds at a rate of {}",
new Object[]{flowFile, file, millis, dataRate});
http://git-wip-us.apache.org/repos/asf/nifi/blob/77a51e1a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index 4a9b2c1..fe62702 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -357,13 +357,12 @@ public class PutHDFS extends AbstractHadoopProcessor {
getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}",
new Object[]{putFlowFile, copyFile, millis, dataRate});
- final String outputPath = copyFile.toString();
final String newFilename = copyFile.getName();
final String hdfsPath = copyFile.getParent().toString();
putFlowFile = session.putAttribute(putFlowFile, CoreAttributes.FILENAME.key(), newFilename);
putFlowFile = session.putAttribute(putFlowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
- final String transitUri = (outputPath.startsWith("/")) ? "hdfs:/" + outputPath : "hdfs://" + outputPath;
- session.getProvenanceReporter().send(putFlowFile, transitUri);
+ final Path qualifiedPath = copyFile.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory());
+ session.getProvenanceReporter().send(putFlowFile, qualifiedPath.toString());
session.transfer(putFlowFile, REL_SUCCESS);
http://git-wip-us.apache.org/repos/asf/nifi/blob/77a51e1a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
index 325ba3a..40666d9 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
@@ -21,6 +21,8 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.NiFiProperties;
@@ -214,6 +216,12 @@ public class GetHDFSTest {
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("13545423550275052.zip"));
InputStream expected = getClass().getResourceAsStream("/testdata/13545423550275052.zip");
flowFile.assertContentEquals(expected);
+ final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
+ assertEquals(1, provenanceEvents.size());
+ final ProvenanceEventRecord receiveEvent = provenanceEvents.get(0);
+ assertEquals(ProvenanceEventType.RECEIVE, receiveEvent.getEventType());
+ // If it runs with a real HDFS, the protocol will be "hdfs://", but with a local filesystem, just assert the filename.
+ assertTrue(receiveEvent.getTransitUri().endsWith("13545423550275052.zip"));
}
@Test
http://git-wip-us.apache.org/repos/asf/nifi/blob/77a51e1a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
index 2d3ad79..32569ac 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
@@ -27,6 +27,8 @@ import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.NiFiProperties;
@@ -220,6 +222,13 @@ public class PutHDFSTest {
assertTrue(fs.exists(new Path("target/test-classes/randombytes-1")));
assertEquals("randombytes-1", flowFile.getAttribute(CoreAttributes.FILENAME.key()));
assertEquals("target/test-classes", flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
+
+ final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
+ assertEquals(1, provenanceEvents.size());
+ final ProvenanceEventRecord sendEvent = provenanceEvents.get(0);
+ assertEquals(ProvenanceEventType.SEND, sendEvent.getEventType());
+ // If it runs with a real HDFS, the protocol will be "hdfs://", but with a local filesystem, just assert the filename.
+ assertTrue(sendEvent.getTransitUri().endsWith("target/test-classes/randombytes-1"));
}
@Test
http://git-wip-us.apache.org/repos/asf/nifi/blob/77a51e1a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java
index 8b8b568..74c48a6 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java
@@ -18,6 +18,8 @@ package org.apache.nifi.processors.hadoop;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.hadoop.KerberosProperties;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.TestRunner;
@@ -61,6 +63,12 @@ public class TestFetchHDFS {
runner.enqueue(new String("trigger flow file"));
runner.run();
runner.assertAllFlowFilesTransferred(FetchHDFS.REL_SUCCESS, 1);
+ final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
+ assertEquals(1, provenanceEvents.size());
+ final ProvenanceEventRecord fetchEvent = provenanceEvents.get(0);
+ assertEquals(ProvenanceEventType.FETCH, fetchEvent.getEventType());
+ // If it runs with a real HDFS, the protocol will be "hdfs://", but with a local filesystem, just assert the filename.
+ assertTrue(fetchEvent.getTransitUri().endsWith(file));
}
@Test
http://git-wip-us.apache.org/repos/asf/nifi/blob/77a51e1a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java
index e634e2e..9e7943e 100644
--- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java
@@ -143,7 +143,8 @@ public class PutParquetTest {
// verify it was a SEND event with the correct URI
final ProvenanceEventRecord provEvent = provEvents.get(0);
Assert.assertEquals(ProvenanceEventType.SEND, provEvent.getEventType());
- Assert.assertEquals("hdfs://" + avroParquetFile.toString(), provEvent.getTransitUri());
+ // If it runs with a real HDFS, the protocol will be "hdfs://", but with a local filesystem, just assert the filename.
+ Assert.assertTrue(provEvent.getTransitUri().endsWith(DIRECTORY + "/" + filename));
// verify the content of the parquet file by reading it back in
verifyAvroParquetUsers(avroParquetFile, 100);