You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2019/09/13 16:08:44 UTC
[nifi] branch master updated: NIFI-6480: PutORC/PutParquet can't
overwrite file even if set 'Overwrite Files' to true
This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 8a8852e NIFI-6480: PutORC/PutParquet can't overwrite file even if set 'Overwrite Files' to true
8a8852e is described below
commit 8a8852e73dbcc9653ac49662df44576ec338ebb1
Author: archon <qq...@live.com>
AuthorDate: Sat Aug 10 13:56:25 2019 +0800
NIFI-6480: PutORC/PutParquet can't overwrite file even if set 'Overwrite Files' to true
This closes #3599.
Signed-off-by: Bryan Bende <bb...@apache.org>
---
.../processors/hadoop/AbstractPutHDFSRecord.java | 14 ++++++++--
.../org/apache/nifi/processors/orc/PutORCTest.java | 30 +++++++++++++++++++++-
2 files changed, 41 insertions(+), 3 deletions(-)
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
index 62e7231..5ee54e3 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
@@ -283,11 +283,11 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
final Path tempFile = new Path(directoryPath, "." + filenameValue);
final Path destFile = new Path(directoryPath, filenameValue);
- final boolean destinationExists = fileSystem.exists(destFile) || fileSystem.exists(tempFile);
+ final boolean destinationOrTempExists = fileSystem.exists(destFile) || fileSystem.exists(tempFile);
final boolean shouldOverwrite = context.getProperty(OVERWRITE).asBoolean();
// if the tempFile or destFile already exist, and overwrite is set to false, then transfer to failure
- if (destinationExists && !shouldOverwrite) {
+ if (destinationOrTempExists && !shouldOverwrite) {
session.transfer(session.penalize(putFlowFile), REL_FAILURE);
getLogger().warn("penalizing {} and routing to failure because file with same name already exists", new Object[]{putFlowFile});
return null;
@@ -339,6 +339,16 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
throw exceptionHolder.get();
}
+ final boolean destinationExists = fileSystem.exists(destFile);
+
+ // If destination file already exists, resolve that based on processor configuration
+ if (destinationExists && shouldOverwrite) {
+ if (fileSystem.delete(destFile, false)) {
+ getLogger().info("deleted {} in order to replace with the contents of {}",
+ new Object[]{destFile, putFlowFile});
+ }
+ }
+
// Attempt to rename from the tempFile to destFile, and change owner if successfully renamed
rename(fileSystem, tempFile, destFile);
changeOwner(fileSystem, destFile, remoteOwner, remoteGroup);
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java
index 20e30cc..c063ea1 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java
@@ -141,6 +141,34 @@ public class PutORCTest {
}
@Test
+ public void testOverwriteFile() throws InitializationException {
+ configure(proc, 1);
+
+ final String filename = "testORCWithDefaults-" + System.currentTimeMillis();
+
+ final Map<String, String> flowFileAttributes = new HashMap<>();
+ flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+
+ testRunner.setProperty(PutORC.OVERWRITE, "true");
+
+ testRunner.enqueue("trigger", flowFileAttributes);
+ testRunner.run();
+ testRunner.assertAllFlowFilesTransferred(PutORC.REL_SUCCESS, 1);
+
+ MockRecordParser readerFactory = (MockRecordParser) testRunner.getControllerService("mock-reader-factory");
+ readerFactory.addRecord("name", 1, "blue", 10.0);
+ testRunner.enqueue("trigger", flowFileAttributes);
+ testRunner.run();
+ testRunner.assertAllFlowFilesTransferred(PutORC.REL_SUCCESS, 2);
+
+ testRunner.setProperty(PutORC.OVERWRITE, "false");
+ readerFactory.addRecord("name", 1, "blue", 10.0);
+ testRunner.enqueue("trigger", flowFileAttributes);
+ testRunner.run();
+ testRunner.assertTransferCount(PutORC.REL_FAILURE, 1);
+ }
+
+ @Test
public void testWriteORCWithDefaults() throws IOException, InitializationException {
configure(proc, 100);
@@ -454,4 +482,4 @@ public class PutORCTest {
assertEquals(numExpectedUsers, currUser);
}
-}
\ No newline at end of file
+}