You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/12 15:14:05 UTC

incubator-nifi git commit: NIFI-246: fixed provenance transit uris to include hdfs:// prefix

Repository: incubator-nifi
Updated Branches:
  refs/heads/develop 78c069fb5 -> f36eea370


NIFI-246: fixed provenance transit uris to include hdfs:// prefix


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/f36eea37
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/f36eea37
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/f36eea37

Branch: refs/heads/develop
Commit: f36eea3701598be05f09e7757d06748bda776ed6
Parents: 78c069f
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Jan 12 09:09:13 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Jan 12 09:09:13 2015 -0500

----------------------------------------------------------------------
 .../main/java/org/apache/nifi/processors/hadoop/GetHDFS.java    | 3 ++-
 .../main/java/org/apache/nifi/processors/hadoop/PutHDFS.java    | 5 +++--
 2 files changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f36eea37/nar-bundles/hadoop-bundle/hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
----------------------------------------------------------------------
diff --git a/nar-bundles/hadoop-bundle/hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java b/nar-bundles/hadoop-bundle/hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
index 2cab573..20ac738 100644
--- a/nar-bundles/hadoop-bundle/hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
+++ b/nar-bundles/hadoop-bundle/hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
@@ -363,7 +363,8 @@ public class GetHDFS extends AbstractHadoopProcessor {
                     continue;
                 }
 
-                session.getProvenanceReporter().receive(flowFile, file.toString());
+                final String transitUri = (filename.startsWith("/")) ? "hdfs:/" + filename : "hdfs://" + filename;
+                session.getProvenanceReporter().receive(flowFile, transitUri);
                 session.transfer(flowFile, REL_SUCCESS);
                 getLogger().info("retrieved {} from HDFS {} in {} milliseconds at a rate of {}",
                         new Object[]{flowFile, file, millis, dataRate});

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f36eea37/nar-bundles/hadoop-bundle/hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
----------------------------------------------------------------------
diff --git a/nar-bundles/hadoop-bundle/hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nar-bundles/hadoop-bundle/hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index 5768da0..e84b575 100644
--- a/nar-bundles/hadoop-bundle/hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ b/nar-bundles/hadoop-bundle/hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -47,7 +47,6 @@ import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.util.StopWatch;
 import org.apache.nifi.util.Tuple;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -322,7 +321,9 @@ public class PutHDFS extends AbstractHadoopProcessor {
             getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}",
                     new Object[]{flowFile, copyFile, millis, dataRate});
 
-            session.getProvenanceReporter().send(flowFile, copyFile.toString());
+            final String filename = copyFile.toString();
+            final String transitUri = (filename.startsWith("/")) ? "hdfs:/" + filename : "hdfs://" + filename;
+            session.getProvenanceReporter().send(flowFile, transitUri);
             session.transfer(flowFile, REL_SUCCESS);
 
         } catch (final Throwable t) {