You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2017/11/02 09:11:08 UTC

nifi git commit: NIFI-4544: Improve HDFS processors provenance transit URL

Repository: nifi
Updated Branches:
  refs/heads/master 641333791 -> 77a51e1a9


NIFI-4544: Improve HDFS processors provenance transit URL

Signed-off-by: Pierre Villard <pi...@gmail.com>

This closes #2238.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/77a51e1a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/77a51e1a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/77a51e1a

Branch: refs/heads/master
Commit: 77a51e1a9ebee2162af48c5bdd5b79703e65b4e2
Parents: 6413337
Author: Koji Kawamura <ij...@apache.org>
Authored: Mon Oct 30 11:35:13 2017 +0900
Committer: Pierre Villard <pi...@gmail.com>
Committed: Thu Nov 2 10:10:03 2017 +0100

----------------------------------------------------------------------
 .../nifi/processors/hadoop/AbstractFetchHDFSRecord.java  |  8 ++++----
 .../nifi/processors/hadoop/AbstractPutHDFSRecord.java    |  5 ++---
 .../org/apache/nifi/processors/hadoop/FetchHDFS.java     | 11 +++++------
 .../java/org/apache/nifi/processors/hadoop/GetHDFS.java  |  3 +--
 .../java/org/apache/nifi/processors/hadoop/PutHDFS.java  |  5 ++---
 .../org/apache/nifi/processors/hadoop/GetHDFSTest.java   |  8 ++++++++
 .../org/apache/nifi/processors/hadoop/PutHDFSTest.java   |  9 +++++++++
 .../org/apache/nifi/processors/hadoop/TestFetchHDFS.java |  8 ++++++++
 .../apache/nifi/processors/parquet/PutParquetTest.java   |  3 ++-
 9 files changed, 41 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/77a51e1a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
index d6a374f..96631ef 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
@@ -20,7 +20,6 @@ import java.io.BufferedOutputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.net.URI;
 import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -232,9 +231,10 @@ public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor {
                 attributes.put(CoreAttributes.MIME_TYPE.key(), mimeTypeRef.get());
                 successFlowFile = session.putAllAttributes(successFlowFile, attributes);
 
-                final URI uri = path.toUri();
-                getLogger().info("Successfully received content from {} for {} in {} milliseconds", new Object[] {uri, successFlowFile, stopWatch.getDuration()});
-                session.getProvenanceReporter().fetch(successFlowFile, uri.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
+
+                final Path qualifiedPath = path.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
+                getLogger().info("Successfully received content from {} for {} in {} milliseconds", new Object[] {qualifiedPath, successFlowFile, stopWatch.getDuration()});
+                session.getProvenanceReporter().fetch(successFlowFile, qualifiedPath.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
                 session.transfer(successFlowFile, REL_SUCCESS);
                 session.remove(originalFlowFile);
                 return null;

http://git-wip-us.apache.org/repos/asf/nifi/blob/77a51e1a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
index e08b4fb..22f61b2 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
@@ -349,7 +349,6 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
 
                 putFlowFile = postProcess(context, session, putFlowFile, destFile);
 
-                final String outputPath = destFile.toString();
                 final String newFilename = destFile.getName();
                 final String hdfsPath = destFile.getParent().toString();
 
@@ -361,8 +360,8 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
                 putFlowFile = session.putAllAttributes(putFlowFile, attributes);
 
                 // Send a provenance event and transfer to success
-                final String transitUri = (outputPath.startsWith("/")) ? "hdfs:/" + outputPath : "hdfs://" + outputPath;
-                session.getProvenanceReporter().send(putFlowFile, transitUri);
+                final Path qualifiedPath = destFile.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
+                session.getProvenanceReporter().send(putFlowFile, qualifiedPath.toString());
                 session.transfer(putFlowFile, REL_SUCCESS);
 
             } catch (IOException | FlowFileAccessException e) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/77a51e1a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
index 08a8ce2..4237503 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
@@ -46,7 +46,6 @@ import org.apache.nifi.util.StopWatch;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
-import java.net.URI;
 import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -128,7 +127,6 @@ public class FetchHDFS extends AbstractHadoopProcessor {
             return;
         }
 
-        final URI uri = path.toUri();
         final StopWatch stopWatch = new StopWatch(true);
         final FlowFile finalFlowFile = flowFile;
 
@@ -149,6 +147,7 @@ public class FetchHDFS extends AbstractHadoopProcessor {
                 }
 
                 FlowFile flowFile = finalFlowFile;
+                final Path qualifiedPath = path.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory());
                 try {
                     final String outputFilename;
                     final String originalFilename = path.getName();
@@ -166,16 +165,16 @@ public class FetchHDFS extends AbstractHadoopProcessor {
                     flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), outputFilename);
 
                     stopWatch.stop();
-                    getLogger().info("Successfully received content from {} for {} in {}", new Object[] {uri, flowFile, stopWatch.getDuration()});
-                    session.getProvenanceReporter().fetch(flowFile, uri.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
+                    getLogger().info("Successfully received content from {} for {} in {}", new Object[] {qualifiedPath, flowFile, stopWatch.getDuration()});
+                    session.getProvenanceReporter().fetch(flowFile, qualifiedPath.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
                     session.transfer(flowFile, REL_SUCCESS);
                 } catch (final FileNotFoundException | AccessControlException e) {
-                    getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {uri, flowFile, e});
+                    getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {qualifiedPath, flowFile, e});
                     flowFile = session.putAttribute(flowFile, "hdfs.failure.reason", e.getMessage());
                     flowFile = session.penalize(flowFile);
                     session.transfer(flowFile, REL_FAILURE);
                 } catch (final IOException e) {
-                    getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to comms.failure", new Object[] {uri, flowFile, e});
+                    getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to comms.failure", new Object[] {qualifiedPath, flowFile, e});
                     flowFile = session.penalize(flowFile);
                     session.transfer(flowFile, REL_COMMS_FAILURE);
                 } finally {

http://git-wip-us.apache.org/repos/asf/nifi/blob/77a51e1a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
index 7357d35..64730c8 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
@@ -381,8 +381,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
                     continue;
                 }
 
-                final String transitUri = (originalFilename.startsWith("/")) ? "hdfs:/" + originalFilename : "hdfs://" + originalFilename;
-                session.getProvenanceReporter().receive(flowFile, transitUri);
+                session.getProvenanceReporter().receive(flowFile, file.toString());
                 session.transfer(flowFile, REL_SUCCESS);
                 getLogger().info("retrieved {} from HDFS {} in {} milliseconds at a rate of {}",
                         new Object[]{flowFile, file, millis, dataRate});

http://git-wip-us.apache.org/repos/asf/nifi/blob/77a51e1a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index 4a9b2c1..fe62702 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -357,13 +357,12 @@ public class PutHDFS extends AbstractHadoopProcessor {
                     getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}",
                             new Object[]{putFlowFile, copyFile, millis, dataRate});
 
-                    final String outputPath = copyFile.toString();
                     final String newFilename = copyFile.getName();
                     final String hdfsPath = copyFile.getParent().toString();
                     putFlowFile = session.putAttribute(putFlowFile, CoreAttributes.FILENAME.key(), newFilename);
                     putFlowFile = session.putAttribute(putFlowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
-                    final String transitUri = (outputPath.startsWith("/")) ? "hdfs:/" + outputPath : "hdfs://" + outputPath;
-                    session.getProvenanceReporter().send(putFlowFile, transitUri);
+                    final Path qualifiedPath = copyFile.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory());
+                    session.getProvenanceReporter().send(putFlowFile, qualifiedPath.toString());
 
                     session.transfer(putFlowFile, REL_SUCCESS);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/77a51e1a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
index 325ba3a..40666d9 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
@@ -21,6 +21,8 @@ import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.hadoop.KerberosProperties;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.MockProcessContext;
 import org.apache.nifi.util.NiFiProperties;
@@ -214,6 +216,12 @@ public class GetHDFSTest {
         assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("13545423550275052.zip"));
         InputStream expected = getClass().getResourceAsStream("/testdata/13545423550275052.zip");
         flowFile.assertContentEquals(expected);
+        final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
+        assertEquals(1, provenanceEvents.size());
+        final ProvenanceEventRecord receiveEvent = provenanceEvents.get(0);
+        assertEquals(ProvenanceEventType.RECEIVE, receiveEvent.getEventType());
+        // If it runs with a real HDFS, the protocol will be "hdfs://", but with a local filesystem, just assert the filename.
+        assertTrue(receiveEvent.getTransitUri().endsWith("13545423550275052.zip"));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/nifi/blob/77a51e1a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
index 2d3ad79..32569ac 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
@@ -27,6 +27,8 @@ import org.apache.nifi.hadoop.KerberosProperties;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.MockProcessContext;
 import org.apache.nifi.util.NiFiProperties;
@@ -220,6 +222,13 @@ public class PutHDFSTest {
         assertTrue(fs.exists(new Path("target/test-classes/randombytes-1")));
         assertEquals("randombytes-1", flowFile.getAttribute(CoreAttributes.FILENAME.key()));
         assertEquals("target/test-classes", flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
+
+        final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
+        assertEquals(1, provenanceEvents.size());
+        final ProvenanceEventRecord sendEvent = provenanceEvents.get(0);
+        assertEquals(ProvenanceEventType.SEND, sendEvent.getEventType());
+        // If it runs with a real HDFS, the protocol will be "hdfs://", but with a local filesystem, just assert the filename.
+        assertTrue(sendEvent.getTransitUri().endsWith("target/test-classes/randombytes-1"));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/nifi/blob/77a51e1a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java
index 8b8b568..74c48a6 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java
@@ -18,6 +18,8 @@ package org.apache.nifi.processors.hadoop;
 
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.hadoop.KerberosProperties;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.TestRunner;
@@ -61,6 +63,12 @@ public class TestFetchHDFS {
         runner.enqueue(new String("trigger flow file"));
         runner.run();
         runner.assertAllFlowFilesTransferred(FetchHDFS.REL_SUCCESS, 1);
+        final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
+        assertEquals(1, provenanceEvents.size());
+        final ProvenanceEventRecord fetchEvent = provenanceEvents.get(0);
+        assertEquals(ProvenanceEventType.FETCH, fetchEvent.getEventType());
+        // If it runs with a real HDFS, the protocol will be "hdfs://", but with a local filesystem, just assert the filename.
+        assertTrue(fetchEvent.getTransitUri().endsWith(file));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/nifi/blob/77a51e1a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java
index e634e2e..9e7943e 100644
--- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java
@@ -143,7 +143,8 @@ public class PutParquetTest {
         // verify it was a SEND event with the correct URI
         final ProvenanceEventRecord provEvent = provEvents.get(0);
         Assert.assertEquals(ProvenanceEventType.SEND, provEvent.getEventType());
-        Assert.assertEquals("hdfs://" + avroParquetFile.toString(), provEvent.getTransitUri());
+        // If it runs with a real HDFS, the protocol will be "hdfs://", but with a local filesystem, just assert the filename.
+        Assert.assertTrue(provEvent.getTransitUri().endsWith(DIRECTORY + "/" + filename));
 
         // verify the content of the parquet file by reading it back in
         verifyAvroParquetUsers(avroParquetFile, 100);