You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by oz...@apache.org on 2016/11/04 12:24:04 UTC
[1/2] nifi git commit: NIFI-1322 fixed breaking changes introduced in
previous commit
Repository: nifi
Updated Branches:
refs/heads/master 50010fb34 -> c13cfa6ea
NIFI-1322 fixed breaking changes introduced in previous commit
This closes #1181
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c13cfa6e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c13cfa6e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c13cfa6e
Branch: refs/heads/master
Commit: c13cfa6ea6009070db74d7dd9be9c66703d56942
Parents: a7d0641
Author: Oleg Zhurakousky <ol...@suitcase.io>
Authored: Fri Nov 4 08:22:24 2016 -0400
Committer: Oleg Zhurakousky <ol...@suitcase.io>
Committed: Fri Nov 4 08:23:10 2016 -0400
----------------------------------------------------------------------
.../apache/nifi/processors/hadoop/PutHDFS.java | 38 +++++++++++---------
1 file changed, 21 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/c13cfa6e/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index cb49d59..90b25e0 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -75,15 +75,19 @@ import java.util.concurrent.TimeUnit;
@SeeAlso(GetHDFS.class)
public class PutHDFS extends AbstractHadoopProcessor {
- public static final String REPLACE = "replace";
- public static final String IGNORE = "ignore";
- public static final String FAIL = "fail";
- public static final String APPEND = "append";
-
- public static final AllowableValue REPLACE_RESOLUTION = new AllowableValue(REPLACE, REPLACE, "Replaces the existing file if any.");
- public static final AllowableValue IGNORE_RESOLUTION = new AllowableValue(IGNORE, IGNORE, "Ignores the flow file and routes it to success.");
- public static final AllowableValue FAIL_RESOLUTION = new AllowableValue(FAIL, FAIL, "Penalizes the flow file and routes it to failure.");
- public static final AllowableValue APPEND_RESOLUTION = new AllowableValue(APPEND, APPEND, "Appends to the existing file if any, creates a new file otherwise.");
+ public static final String REPLACE_RESOLUTION = "replace";
+ public static final String IGNORE_RESOLUTION = "ignore";
+ public static final String FAIL_RESOLUTION = "fail";
+ public static final String APPEND_RESOLUTION = "append";
+
+ public static final AllowableValue REPLACE_RESOLUTION_AV = new AllowableValue(REPLACE_RESOLUTION,
+ REPLACE_RESOLUTION, "Replaces the existing file if any.");
+ public static final AllowableValue IGNORE_RESOLUTION_AV = new AllowableValue(IGNORE_RESOLUTION, IGNORE_RESOLUTION,
+ "Ignores the flow file and routes it to success.");
+ public static final AllowableValue FAIL_RESOLUTION_AV = new AllowableValue(FAIL_RESOLUTION, FAIL_RESOLUTION,
+ "Penalizes the flow file and routes it to failure.");
+ public static final AllowableValue APPEND_RESOLUTION_AV = new AllowableValue(APPEND_RESOLUTION, APPEND_RESOLUTION,
+ "Appends to the existing file if any, creates a new file otherwise.");
public static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
public static final int BUFFER_SIZE_DEFAULT = 4096;
@@ -108,8 +112,8 @@ public class PutHDFS extends AbstractHadoopProcessor {
.name("Conflict Resolution Strategy")
.description("Indicates what should happen when a file with the same name already exists in the output directory")
.required(true)
- .defaultValue(FAIL_RESOLUTION.getValue())
- .allowableValues(REPLACE_RESOLUTION, IGNORE_RESOLUTION, FAIL_RESOLUTION, APPEND_RESOLUTION)
+.defaultValue(FAIL_RESOLUTION_AV.getValue())
+ .allowableValues(REPLACE_RESOLUTION_AV, IGNORE_RESOLUTION_AV, FAIL_RESOLUTION_AV, APPEND_RESOLUTION_AV)
.build();
public static final PropertyDescriptor BLOCK_SIZE = new PropertyDescriptor.Builder()
@@ -258,18 +262,18 @@ public class PutHDFS extends AbstractHadoopProcessor {
// If destination file already exists, resolve that based on processor configuration
if (destinationExists) {
switch (conflictResponse) {
- case REPLACE:
+ case REPLACE_RESOLUTION:
if (hdfs.delete(copyFile, false)) {
getLogger().info("deleted {} in order to replace with the contents of {}",
new Object[]{copyFile, flowFile});
}
break;
- case IGNORE:
+ case IGNORE_RESOLUTION:
session.transfer(flowFile, REL_SUCCESS);
getLogger().info("transferring {} to success because file with same name already exists",
new Object[]{flowFile});
return;
- case FAIL:
+ case FAIL_RESOLUTION:
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
getLogger().warn("penalizing {} and routing to failure because file with same name already exists",
@@ -289,7 +293,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
OutputStream fos = null;
Path createdFile = null;
try {
- if(conflictResponse.equals(APPEND_RESOLUTION.getValue()) && destinationExists) {
+ if (conflictResponse.equals(APPEND_RESOLUTION_AV.getValue()) && destinationExists) {
fos = hdfs.append(copyFile, bufferSize);
} else {
fos = hdfs.create(tempCopyFile, true, bufferSize, replication, blockSize);
@@ -328,8 +332,8 @@ public class PutHDFS extends AbstractHadoopProcessor {
final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
tempDotCopyFile = tempCopyFile;
- if(!conflictResponse.equals(APPEND_RESOLUTION.getValue())
- || (conflictResponse.equals(APPEND_RESOLUTION.getValue()) && !destinationExists)) {
+ if (!conflictResponse.equals(APPEND_RESOLUTION_AV.getValue())
+ || (conflictResponse.equals(APPEND_RESOLUTION_AV.getValue()) && !destinationExists)) {
boolean renamed = false;
for (int i = 0; i < 10; i++) { // try to rename multiple times.
if (hdfs.rename(tempCopyFile, copyFile)) {
[2/2] nifi git commit: NIFI-1322 - PutHDFS - allow file append
resolution
Posted by oz...@apache.org.
NIFI-1322 - PutHDFS - allow file append resolution
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a7d06412
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a7d06412
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a7d06412
Branch: refs/heads/master
Commit: a7d06412f8e4809157bf1074d659b5420546190f
Parents: 50010fb
Author: Pierre Villard <pi...@gmail.com>
Authored: Wed Nov 2 20:38:52 2016 +0100
Committer: Oleg Zhurakousky <ol...@suitcase.io>
Committed: Fri Nov 4 08:23:10 2016 -0400
----------------------------------------------------------------------
.../apache/nifi/processors/hadoop/PutHDFS.java | 62 ++++++++++++--------
1 file changed, 39 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/a7d06412/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index 3a0cb48..cb49d59 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -31,6 +31,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
@@ -74,9 +75,15 @@ import java.util.concurrent.TimeUnit;
@SeeAlso(GetHDFS.class)
public class PutHDFS extends AbstractHadoopProcessor {
- public static final String REPLACE_RESOLUTION = "replace";
- public static final String IGNORE_RESOLUTION = "ignore";
- public static final String FAIL_RESOLUTION = "fail";
+ public static final String REPLACE = "replace";
+ public static final String IGNORE = "ignore";
+ public static final String FAIL = "fail";
+ public static final String APPEND = "append";
+
+ public static final AllowableValue REPLACE_RESOLUTION = new AllowableValue(REPLACE, REPLACE, "Replaces the existing file if any.");
+ public static final AllowableValue IGNORE_RESOLUTION = new AllowableValue(IGNORE, IGNORE, "Ignores the flow file and routes it to success.");
+ public static final AllowableValue FAIL_RESOLUTION = new AllowableValue(FAIL, FAIL, "Penalizes the flow file and routes it to failure.");
+ public static final AllowableValue APPEND_RESOLUTION = new AllowableValue(APPEND, APPEND, "Appends to the existing file if any, creates a new file otherwise.");
public static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
public static final int BUFFER_SIZE_DEFAULT = 4096;
@@ -101,8 +108,8 @@ public class PutHDFS extends AbstractHadoopProcessor {
.name("Conflict Resolution Strategy")
.description("Indicates what should happen when a file with the same name already exists in the output directory")
.required(true)
- .defaultValue(FAIL_RESOLUTION)
- .allowableValues(REPLACE_RESOLUTION, IGNORE_RESOLUTION, FAIL_RESOLUTION)
+ .defaultValue(FAIL_RESOLUTION.getValue())
+ .allowableValues(REPLACE_RESOLUTION, IGNORE_RESOLUTION, FAIL_RESOLUTION, APPEND_RESOLUTION)
.build();
public static final PropertyDescriptor BLOCK_SIZE = new PropertyDescriptor.Builder()
@@ -246,21 +253,23 @@ public class PutHDFS extends AbstractHadoopProcessor {
changeOwner(context, hdfs, configuredRootDirPath);
}
+ final boolean destinationExists = hdfs.exists(copyFile);
+
// If destination file already exists, resolve that based on processor configuration
- if (hdfs.exists(copyFile)) {
+ if (destinationExists) {
switch (conflictResponse) {
- case REPLACE_RESOLUTION:
+ case REPLACE:
if (hdfs.delete(copyFile, false)) {
getLogger().info("deleted {} in order to replace with the contents of {}",
new Object[]{copyFile, flowFile});
}
break;
- case IGNORE_RESOLUTION:
+ case IGNORE:
session.transfer(flowFile, REL_SUCCESS);
getLogger().info("transferring {} to success because file with same name already exists",
new Object[]{flowFile});
return;
- case FAIL_RESOLUTION:
+ case FAIL:
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
getLogger().warn("penalizing {} and routing to failure because file with same name already exists",
@@ -280,7 +289,11 @@ public class PutHDFS extends AbstractHadoopProcessor {
OutputStream fos = null;
Path createdFile = null;
try {
- fos = hdfs.create(tempCopyFile, true, bufferSize, replication, blockSize);
+ if(conflictResponse.equals(APPEND_RESOLUTION.getValue()) && destinationExists) {
+ fos = hdfs.append(copyFile, bufferSize);
+ } else {
+ fos = hdfs.create(tempCopyFile, true, bufferSize, replication, blockSize);
+ }
if (codec != null) {
fos = codec.createOutputStream(fos);
}
@@ -315,21 +328,24 @@ public class PutHDFS extends AbstractHadoopProcessor {
final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
tempDotCopyFile = tempCopyFile;
- boolean renamed = false;
- for (int i = 0; i < 10; i++) { // try to rename multiple times.
- if (hdfs.rename(tempCopyFile, copyFile)) {
- renamed = true;
- break;// rename was successful
+ if(!conflictResponse.equals(APPEND_RESOLUTION.getValue())
+ || (conflictResponse.equals(APPEND_RESOLUTION.getValue()) && !destinationExists)) {
+ boolean renamed = false;
+ for (int i = 0; i < 10; i++) { // try to rename multiple times.
+ if (hdfs.rename(tempCopyFile, copyFile)) {
+ renamed = true;
+ break;// rename was successful
+ }
+ Thread.sleep(200L);// try waiting to let whatever might cause rename failure to resolve
+ }
+ if (!renamed) {
+ hdfs.delete(tempCopyFile, false);
+ throw new ProcessException("Copied file to HDFS but could not rename dot file " + tempCopyFile
+ + " to its final filename");
}
- Thread.sleep(200L);// try waiting to let whatever might cause rename failure to resolve
- }
- if (!renamed) {
- hdfs.delete(tempCopyFile, false);
- throw new ProcessException("Copied file to HDFS but could not rename dot file " + tempCopyFile
- + " to its final filename");
- }
- changeOwner(context, hdfs, copyFile);
+ changeOwner(context, hdfs, copyFile);
+ }
getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}",
new Object[]{flowFile, copyFile, millis, dataRate});