You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2019/06/10 06:48:00 UTC

[nifi] branch master updated: NIFI-6361: Add fix to PutFile processor

This is an automated email from the ASF dual-hosted git repository.

ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1975101  NIFI-6361: Add fix to PutFile processor
1975101 is described below

commit 1975101292fcd0fceb0d30594da54e5257bc7700
Author: Andres Garagiola <an...@gmail.com>
AuthorDate: Fri Jun 7 09:54:44 2019 -0300

    NIFI-6361: Add fix to PutFile processor
    
    When PutFile uses 'replace' conflict resolution and max files, there is an issue when the folder has X files, and the limit is also X. The processor fails instead of replacing it, leaving X files. This commit fixes that issue.
    
    This closes #3524.
    
    Signed-off-by: Andres Garagiola  <an...@gmail.com>
    
    Signed-off-by: Koji Kawamura <ij...@apache.org>
---
 .../apache/nifi/processors/standard/PutFile.java   |  15 +-
 .../nifi/processors/standard/TestPutFile.java      | 210 +++++++++++++++++++++
 2 files changed, 222 insertions(+), 3 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java
index 0c95c75..8784c75 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java
@@ -49,6 +49,7 @@ import java.nio.file.attribute.PosixFilePermissions;
 import java.nio.file.attribute.UserPrincipalLookupService;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
+import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
@@ -203,8 +204,9 @@ public class PutFile extends AbstractProcessor {
         Path tempDotCopyFile = null;
         try {
             final Path rootDirPath = configuredRootDirPath;
-            final Path tempCopyFile = rootDirPath.resolve("." + flowFile.getAttribute(CoreAttributes.FILENAME.key()));
-            final Path copyFile = rootDirPath.resolve(flowFile.getAttribute(CoreAttributes.FILENAME.key()));
+            String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
+            final Path tempCopyFile = rootDirPath.resolve("." + filename);
+            final Path copyFile = rootDirPath.resolve(filename);
 
             if (!Files.exists(rootDirPath)) {
                 if (context.getProperty(CREATE_DIRS).asBoolean()) {
@@ -224,7 +226,7 @@ public class PutFile extends AbstractProcessor {
 
             final Path finalCopyFileDir = finalCopyFile.getParent();
             if (Files.exists(finalCopyFileDir) && maxDestinationFiles != null) { // check if too many files already
-                final int numFiles = finalCopyFileDir.toFile().list().length;
+                final long numFiles = getFilesNumberInFolder(finalCopyFileDir, filename);
 
                 if (numFiles >= maxDestinationFiles) {
                     flowFile = session.penalize(flowFile);
@@ -336,6 +338,13 @@ public class PutFile extends AbstractProcessor {
         }
     }
 
+    private long getFilesNumberInFolder(Path folder, String filename) {
+        String[] filesInFolder = folder.toFile().list();
+        return Arrays.stream(filesInFolder)
+                .filter(eachFilename -> !eachFilename.equals(filename))
+                .count();
+    }
+
     protected String stringPermissions(String perms) {
         String permissions = "";
         final Pattern rwxPattern = Pattern.compile("^[rwx-]{9}$");
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutFile.java
new file mode 100644
index 0000000..51ad7c0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutFile.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestPutFile {
+
+    public static final String TARGET_DIRECTORY = "target/put-file";
+    private File targetDir;
+
+    @Before
+    public void prepDestDirectory() throws IOException {
+        targetDir = new File(TARGET_DIRECTORY);
+        if (!targetDir.exists()) {
+            Files.createDirectories(targetDir.toPath());
+            return;
+        }
+
+        targetDir.setReadable(true);
+
+        deleteDirectoryContent(targetDir);
+    }
+
+    private void deleteDirectoryContent(File directory) throws IOException {
+        for (final File file : directory.listFiles()) {
+            if (file.isDirectory()) {
+                deleteDirectoryContent(file);
+            }
+            Files.delete(file.toPath());
+        }
+    }
+
+    @Test
+    public void testCreateDirectory() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new PutFile());
+        String newDir = targetDir.getAbsolutePath()+"/new-folder";
+        runner.setProperty(PutFile.DIRECTORY, newDir);
+        runner.setProperty(PutFile.CONFLICT_RESOLUTION, PutFile.REPLACE_RESOLUTION);
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put(CoreAttributes.FILENAME.key(), "targetFile.txt");
+        runner.enqueue("Hello world!!".getBytes(), attributes);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(FetchFile.REL_SUCCESS, 1);
+        Path targetPath = Paths.get(TARGET_DIRECTORY + "/new-folder/targetFile.txt");
+        byte[] content = Files.readAllBytes(targetPath);
+        assertEquals("Hello world!!", new String(content));
+    }
+
+    @Test
+    public void testReplaceConflictResolution() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new PutFile());
+        runner.setProperty(PutFile.DIRECTORY, targetDir.getAbsolutePath());
+        runner.setProperty(PutFile.CONFLICT_RESOLUTION, PutFile.REPLACE_RESOLUTION);
+
+        Map<String,String> attributes = new HashMap<>();
+        attributes.put(CoreAttributes.FILENAME.key(), "targetFile.txt");
+        runner.enqueue("Hello world!!".getBytes(), attributes);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(FetchFile.REL_SUCCESS, 1);
+        Path targetPath = Paths.get(TARGET_DIRECTORY+"/targetFile.txt");
+        byte[] content = Files.readAllBytes(targetPath);
+        assertEquals("Hello world!!", new String(content));
+
+        //Second file
+        attributes.put(CoreAttributes.FILENAME.key(), "targetFile.txt");
+        runner.enqueue("Another file".getBytes(), attributes);
+        runner.run();
+        runner.assertTransferCount(FetchFile.REL_SUCCESS, 2);
+        File dir = new File(TARGET_DIRECTORY);
+        assertEquals(1, dir.list().length);
+        targetPath = Paths.get(TARGET_DIRECTORY+"/targetFile.txt");
+        content = Files.readAllBytes(targetPath);
+        assertEquals("Another file", new String(content));
+    }
+
+    @Test
+    public void testIgnoreConflictResolution() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new PutFile());
+        runner.setProperty(PutFile.DIRECTORY, targetDir.getAbsolutePath());
+        runner.setProperty(PutFile.CONFLICT_RESOLUTION, PutFile.IGNORE_RESOLUTION);
+
+        Map<String,String> attributes = new HashMap<>();
+        attributes.put(CoreAttributes.FILENAME.key(), "targetFile.txt");
+        runner.enqueue("Hello world!!".getBytes(), attributes);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(FetchFile.REL_SUCCESS, 1);
+        Path targetPath = Paths.get(TARGET_DIRECTORY+"/targetFile.txt");
+        byte[] content = Files.readAllBytes(targetPath);
+        assertEquals("Hello world!!", new String(content));
+
+        //Second file
+        attributes.put(CoreAttributes.FILENAME.key(), "targetFile.txt");
+        runner.enqueue("Another file".getBytes(), attributes);
+        runner.run();
+        runner.assertTransferCount(FetchFile.REL_SUCCESS, 2);
+        File dir = new File(TARGET_DIRECTORY);
+        assertEquals(1, dir.list().length);
+        targetPath = Paths.get(TARGET_DIRECTORY+"/targetFile.txt");
+        content = Files.readAllBytes(targetPath);
+        assertEquals("Hello world!!", new String(content));
+    }
+
+    @Test
+    public void testFailConflictResolution() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new PutFile());
+        runner.setProperty(PutFile.DIRECTORY, targetDir.getAbsolutePath());
+        runner.setProperty(PutFile.CONFLICT_RESOLUTION, PutFile.FAIL_RESOLUTION);
+
+        Map<String,String> attributes = new HashMap<>();
+        attributes.put(CoreAttributes.FILENAME.key(), "targetFile.txt");
+        runner.enqueue("Hello world!!".getBytes(), attributes);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(FetchFile.REL_SUCCESS, 1);
+        Path targetPath = Paths.get(TARGET_DIRECTORY+"/targetFile.txt");
+        byte[] content = Files.readAllBytes(targetPath);
+        assertEquals("Hello world!!", new String(content));
+
+        //Second file
+        attributes.put(CoreAttributes.FILENAME.key(), "targetFile.txt");
+        runner.enqueue("Another file".getBytes(), attributes);
+        runner.run();
+        runner.assertTransferCount(PutFile.REL_SUCCESS, 1);
+        runner.assertTransferCount(PutFile.REL_FAILURE, 1);
+        runner.assertPenalizeCount(1);
+    }
+
+    @Test
+    public void testMaxFileLimitReach() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new PutFile());
+        runner.setProperty(PutFile.DIRECTORY, targetDir.getAbsolutePath());
+        runner.setProperty(PutFile.CONFLICT_RESOLUTION, PutFile.REPLACE_RESOLUTION);
+        runner.setProperty(PutFile.MAX_DESTINATION_FILES, "1");
+
+        Map<String,String> attributes = new HashMap<>();
+        attributes.put(CoreAttributes.FILENAME.key(), "targetFile.txt");
+        runner.enqueue("Hello world!!".getBytes(), attributes);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(FetchFile.REL_SUCCESS, 1);
+        Path targetPath = Paths.get(TARGET_DIRECTORY+"/targetFile.txt");
+        byte[] content = Files.readAllBytes(targetPath);
+        assertEquals("Hello world!!", new String(content));
+
+        //Second file
+        attributes.put(CoreAttributes.FILENAME.key(), "secondFile.txt");
+        runner.enqueue("Hello world!!".getBytes(), attributes);
+        runner.run();
+        runner.assertTransferCount(PutFile.REL_FAILURE, 1);
+        runner.assertPenalizeCount(1);
+    }
+
+    @Test
+    public void testReplaceAndMaxFileLimitReach() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new PutFile());
+        runner.setProperty(PutFile.DIRECTORY, targetDir.getAbsolutePath());
+        runner.setProperty(PutFile.CONFLICT_RESOLUTION, PutFile.REPLACE_RESOLUTION);
+        runner.setProperty(PutFile.MAX_DESTINATION_FILES, "1");
+
+        Map<String,String> attributes = new HashMap<>();
+        attributes.put(CoreAttributes.FILENAME.key(), "targetFile.txt");
+        runner.enqueue("Hello world!!".getBytes(), attributes);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(FetchFile.REL_SUCCESS, 1);
+        Path targetPath = Paths.get(TARGET_DIRECTORY+"/targetFile.txt");
+        byte[] content = Files.readAllBytes(targetPath);
+        assertEquals("Hello world!!", new String(content));
+
+        //Second file
+        attributes.put(CoreAttributes.FILENAME.key(), "targetFile.txt");
+        runner.enqueue("Another file".getBytes(), attributes);
+        runner.run();
+        runner.assertTransferCount(FetchFile.REL_SUCCESS, 2);
+        File dir = new File(TARGET_DIRECTORY);
+        assertEquals(1, dir.list().length);
+        targetPath = Paths.get(TARGET_DIRECTORY+"/targetFile.txt");
+        content = Files.readAllBytes(targetPath);
+        assertEquals("Another file", new String(content));
+    }
+
+}