You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2019/09/13 16:08:44 UTC

[nifi] branch master updated: NIFI-6480: PutORC/PutParquet can't overwrite file even if set 'Overwrite Files' to true

This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a8852e  NIFI-6480: PutORC/PutParquet can't overwrite file even if set 'Overwrite Files' to true
8a8852e is described below

commit 8a8852e73dbcc9653ac49662df44576ec338ebb1
Author: archon <qq...@live.com>
AuthorDate: Sat Aug 10 13:56:25 2019 +0800

    NIFI-6480: PutORC/PutParquet can't overwrite file even if set 'Overwrite Files' to true
    
    This closes #3599.
    
    Signed-off-by: Bryan Bende <bb...@apache.org>
---
 .../processors/hadoop/AbstractPutHDFSRecord.java   | 14 ++++++++--
 .../org/apache/nifi/processors/orc/PutORCTest.java | 30 +++++++++++++++++++++-
 2 files changed, 41 insertions(+), 3 deletions(-)

diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
index 62e7231..5ee54e3 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
@@ -283,11 +283,11 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
                 final Path tempFile = new Path(directoryPath, "." + filenameValue);
                 final Path destFile = new Path(directoryPath, filenameValue);
 
-                final boolean destinationExists = fileSystem.exists(destFile) || fileSystem.exists(tempFile);
+                final boolean destinationOrTempExists = fileSystem.exists(destFile) || fileSystem.exists(tempFile);
                 final boolean shouldOverwrite = context.getProperty(OVERWRITE).asBoolean();
 
                 // if the tempFile or destFile already exist, and overwrite is set to false, then transfer to failure
-                if (destinationExists && !shouldOverwrite) {
+                if (destinationOrTempExists && !shouldOverwrite) {
                     session.transfer(session.penalize(putFlowFile), REL_FAILURE);
                     getLogger().warn("penalizing {} and routing to failure because file with same name already exists", new Object[]{putFlowFile});
                     return null;
@@ -339,6 +339,16 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
                     throw exceptionHolder.get();
                 }
 
+                final boolean destinationExists = fileSystem.exists(destFile);
+
+                // If destination file already exists, resolve that based on processor configuration
+                if (destinationExists && shouldOverwrite) {
+                    if (fileSystem.delete(destFile, false)) {
+                        getLogger().info("deleted {} in order to replace with the contents of {}",
+                                new Object[]{destFile, putFlowFile});
+                    }
+                }
+
                 // Attempt to rename from the tempFile to destFile, and change owner if successfully renamed
                 rename(fileSystem, tempFile, destFile);
                 changeOwner(fileSystem, destFile, remoteOwner, remoteGroup);
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java
index 20e30cc..c063ea1 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java
@@ -141,6 +141,34 @@ public class PutORCTest {
     }
 
     @Test
+    public void testOverwriteFile() throws InitializationException {
+        configure(proc, 1);
+
+        final String filename = "testORCWithDefaults-" + System.currentTimeMillis();
+
+        final Map<String, String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+
+        testRunner.setProperty(PutORC.OVERWRITE, "true");
+
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutORC.REL_SUCCESS, 1);
+
+        MockRecordParser readerFactory = (MockRecordParser) testRunner.getControllerService("mock-reader-factory");
+        readerFactory.addRecord("name", 1, "blue", 10.0);
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutORC.REL_SUCCESS, 2);
+
+        testRunner.setProperty(PutORC.OVERWRITE, "false");
+        readerFactory.addRecord("name", 1, "blue", 10.0);
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertTransferCount(PutORC.REL_FAILURE, 1);
+    }
+
+    @Test
     public void testWriteORCWithDefaults() throws IOException, InitializationException {
         configure(proc, 100);
 
@@ -454,4 +482,4 @@ public class PutORCTest {
         assertEquals(numExpectedUsers, currUser);
     }
 
-}
\ No newline at end of file
+}