You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/12 15:14:05 UTC
incubator-nifi git commit: NIFI-246: fixed provenance transit uris to
include hdfs:// prefix
Repository: incubator-nifi
Updated Branches:
refs/heads/develop 78c069fb5 -> f36eea370
NIFI-246: fixed provenance transit uris to include hdfs:// prefix
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/f36eea37
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/f36eea37
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/f36eea37
Branch: refs/heads/develop
Commit: f36eea3701598be05f09e7757d06748bda776ed6
Parents: 78c069f
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Jan 12 09:09:13 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Jan 12 09:09:13 2015 -0500
----------------------------------------------------------------------
.../main/java/org/apache/nifi/processors/hadoop/GetHDFS.java | 3 ++-
.../main/java/org/apache/nifi/processors/hadoop/PutHDFS.java | 5 +++--
2 files changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f36eea37/nar-bundles/hadoop-bundle/hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
----------------------------------------------------------------------
diff --git a/nar-bundles/hadoop-bundle/hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java b/nar-bundles/hadoop-bundle/hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
index 2cab573..20ac738 100644
--- a/nar-bundles/hadoop-bundle/hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
+++ b/nar-bundles/hadoop-bundle/hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
@@ -363,7 +363,8 @@ public class GetHDFS extends AbstractHadoopProcessor {
continue;
}
- session.getProvenanceReporter().receive(flowFile, file.toString());
+ final String transitUri = (filename.startsWith("/")) ? "hdfs:/" + filename : "hdfs://" + filename;
+ session.getProvenanceReporter().receive(flowFile, transitUri);
session.transfer(flowFile, REL_SUCCESS);
getLogger().info("retrieved {} from HDFS {} in {} milliseconds at a rate of {}",
new Object[]{flowFile, file, millis, dataRate});
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f36eea37/nar-bundles/hadoop-bundle/hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
----------------------------------------------------------------------
diff --git a/nar-bundles/hadoop-bundle/hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nar-bundles/hadoop-bundle/hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index 5768da0..e84b575 100644
--- a/nar-bundles/hadoop-bundle/hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ b/nar-bundles/hadoop-bundle/hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -47,7 +47,6 @@ import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.Tuple;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -322,7 +321,9 @@ public class PutHDFS extends AbstractHadoopProcessor {
getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}",
new Object[]{flowFile, copyFile, millis, dataRate});
- session.getProvenanceReporter().send(flowFile, copyFile.toString());
+ final String filename = copyFile.toString();
+ final String transitUri = (filename.startsWith("/")) ? "hdfs:/" + filename : "hdfs://" + filename;
+ session.getProvenanceReporter().send(flowFile, transitUri);
session.transfer(flowFile, REL_SUCCESS);
} catch (final Throwable t) {