You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2019/06/10 06:48:00 UTC
[nifi] branch master updated: NIFI-6361: Add fix to PutFile
processor
This is an automated email from the ASF dual-hosted git repository.
ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 1975101 NIFI-6361: Add fix to PutFile processor
1975101 is described below
commit 1975101292fcd0fceb0d30594da54e5257bc7700
Author: Andres Garagiola <an...@gmail.com>
AuthorDate: Fri Jun 7 09:54:44 2019 -0300
NIFI-6361: Add fix to PutFile processor
When PutFile uses 'replace' conflict resolution and max files, there is an issue when the folder has X files, and the limit is also X. The processor fails instead of replacing it, leaving X files. This commit fixes that issue.
This closes #3524.
Signed-off-by: Andres Garagiola <an...@gmail.com>
Signed-off-by: Koji Kawamura <ij...@apache.org>
---
.../apache/nifi/processors/standard/PutFile.java | 15 +-
.../nifi/processors/standard/TestPutFile.java | 210 +++++++++++++++++++++
2 files changed, 222 insertions(+), 3 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java
index 0c95c75..8784c75 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java
@@ -49,6 +49,7 @@ import java.nio.file.attribute.PosixFilePermissions;
import java.nio.file.attribute.UserPrincipalLookupService;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
+import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
@@ -203,8 +204,9 @@ public class PutFile extends AbstractProcessor {
Path tempDotCopyFile = null;
try {
final Path rootDirPath = configuredRootDirPath;
- final Path tempCopyFile = rootDirPath.resolve("." + flowFile.getAttribute(CoreAttributes.FILENAME.key()));
- final Path copyFile = rootDirPath.resolve(flowFile.getAttribute(CoreAttributes.FILENAME.key()));
+ String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
+ final Path tempCopyFile = rootDirPath.resolve("." + filename);
+ final Path copyFile = rootDirPath.resolve(filename);
if (!Files.exists(rootDirPath)) {
if (context.getProperty(CREATE_DIRS).asBoolean()) {
@@ -224,7 +226,7 @@ public class PutFile extends AbstractProcessor {
final Path finalCopyFileDir = finalCopyFile.getParent();
if (Files.exists(finalCopyFileDir) && maxDestinationFiles != null) { // check if too many files already
- final int numFiles = finalCopyFileDir.toFile().list().length;
+ final long numFiles = getFilesNumberInFolder(finalCopyFileDir, filename);
if (numFiles >= maxDestinationFiles) {
flowFile = session.penalize(flowFile);
@@ -336,6 +338,13 @@ public class PutFile extends AbstractProcessor {
}
}
+ private long getFilesNumberInFolder(Path folder, String filename) {
+ String[] filesInFolder = folder.toFile().list();
+ return Arrays.stream(filesInFolder)
+ .filter(eachFilename -> !eachFilename.equals(filename))
+ .count();
+ }
+
protected String stringPermissions(String perms) {
String permissions = "";
final Pattern rwxPattern = Pattern.compile("^[rwx-]{9}$");
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutFile.java
new file mode 100644
index 0000000..51ad7c0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutFile.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestPutFile {
+
+ public static final String TARGET_DIRECTORY = "target/put-file";
+ private File targetDir;
+
+ @Before
+ public void prepDestDirectory() throws IOException {
+ targetDir = new File(TARGET_DIRECTORY);
+ if (!targetDir.exists()) {
+ Files.createDirectories(targetDir.toPath());
+ return;
+ }
+
+ targetDir.setReadable(true);
+
+ deleteDirectoryContent(targetDir);
+ }
+
+ private void deleteDirectoryContent(File directory) throws IOException {
+ for (final File file : directory.listFiles()) {
+ if (file.isDirectory()) {
+ deleteDirectoryContent(file);
+ }
+ Files.delete(file.toPath());
+ }
+ }
+
+ @Test
+ public void testCreateDirectory() throws IOException {
+ final TestRunner runner = TestRunners.newTestRunner(new PutFile());
+ String newDir = targetDir.getAbsolutePath()+"/new-folder";
+ runner.setProperty(PutFile.DIRECTORY, newDir);
+ runner.setProperty(PutFile.CONFLICT_RESOLUTION, PutFile.REPLACE_RESOLUTION);
+
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put(CoreAttributes.FILENAME.key(), "targetFile.txt");
+ runner.enqueue("Hello world!!".getBytes(), attributes);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(FetchFile.REL_SUCCESS, 1);
+ Path targetPath = Paths.get(TARGET_DIRECTORY + "/new-folder/targetFile.txt");
+ byte[] content = Files.readAllBytes(targetPath);
+ assertEquals("Hello world!!", new String(content));
+ }
+
+ @Test
+ public void testReplaceConflictResolution() throws IOException {
+ final TestRunner runner = TestRunners.newTestRunner(new PutFile());
+ runner.setProperty(PutFile.DIRECTORY, targetDir.getAbsolutePath());
+ runner.setProperty(PutFile.CONFLICT_RESOLUTION, PutFile.REPLACE_RESOLUTION);
+
+ Map<String,String> attributes = new HashMap<>();
+ attributes.put(CoreAttributes.FILENAME.key(), "targetFile.txt");
+ runner.enqueue("Hello world!!".getBytes(), attributes);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(FetchFile.REL_SUCCESS, 1);
+ Path targetPath = Paths.get(TARGET_DIRECTORY+"/targetFile.txt");
+ byte[] content = Files.readAllBytes(targetPath);
+ assertEquals("Hello world!!", new String(content));
+
+ //Second file
+ attributes.put(CoreAttributes.FILENAME.key(), "targetFile.txt");
+ runner.enqueue("Another file".getBytes(), attributes);
+ runner.run();
+ runner.assertTransferCount(FetchFile.REL_SUCCESS, 2);
+ File dir = new File(TARGET_DIRECTORY);
+ assertEquals(1, dir.list().length);
+ targetPath = Paths.get(TARGET_DIRECTORY+"/targetFile.txt");
+ content = Files.readAllBytes(targetPath);
+ assertEquals("Another file", new String(content));
+ }
+
+ @Test
+ public void testIgnoreConflictResolution() throws IOException {
+ final TestRunner runner = TestRunners.newTestRunner(new PutFile());
+ runner.setProperty(PutFile.DIRECTORY, targetDir.getAbsolutePath());
+ runner.setProperty(PutFile.CONFLICT_RESOLUTION, PutFile.IGNORE_RESOLUTION);
+
+ Map<String,String> attributes = new HashMap<>();
+ attributes.put(CoreAttributes.FILENAME.key(), "targetFile.txt");
+ runner.enqueue("Hello world!!".getBytes(), attributes);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(FetchFile.REL_SUCCESS, 1);
+ Path targetPath = Paths.get(TARGET_DIRECTORY+"/targetFile.txt");
+ byte[] content = Files.readAllBytes(targetPath);
+ assertEquals("Hello world!!", new String(content));
+
+ //Second file
+ attributes.put(CoreAttributes.FILENAME.key(), "targetFile.txt");
+ runner.enqueue("Another file".getBytes(), attributes);
+ runner.run();
+ runner.assertTransferCount(FetchFile.REL_SUCCESS, 2);
+ File dir = new File(TARGET_DIRECTORY);
+ assertEquals(1, dir.list().length);
+ targetPath = Paths.get(TARGET_DIRECTORY+"/targetFile.txt");
+ content = Files.readAllBytes(targetPath);
+ assertEquals("Hello world!!", new String(content));
+ }
+
+ @Test
+ public void testFailConflictResolution() throws IOException {
+ final TestRunner runner = TestRunners.newTestRunner(new PutFile());
+ runner.setProperty(PutFile.DIRECTORY, targetDir.getAbsolutePath());
+ runner.setProperty(PutFile.CONFLICT_RESOLUTION, PutFile.FAIL_RESOLUTION);
+
+ Map<String,String> attributes = new HashMap<>();
+ attributes.put(CoreAttributes.FILENAME.key(), "targetFile.txt");
+ runner.enqueue("Hello world!!".getBytes(), attributes);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(FetchFile.REL_SUCCESS, 1);
+ Path targetPath = Paths.get(TARGET_DIRECTORY+"/targetFile.txt");
+ byte[] content = Files.readAllBytes(targetPath);
+ assertEquals("Hello world!!", new String(content));
+
+ //Second file
+ attributes.put(CoreAttributes.FILENAME.key(), "targetFile.txt");
+ runner.enqueue("Another file".getBytes(), attributes);
+ runner.run();
+ runner.assertTransferCount(PutFile.REL_SUCCESS, 1);
+ runner.assertTransferCount(PutFile.REL_FAILURE, 1);
+ runner.assertPenalizeCount(1);
+ }
+
+ @Test
+ public void testMaxFileLimitReach() throws IOException {
+ final TestRunner runner = TestRunners.newTestRunner(new PutFile());
+ runner.setProperty(PutFile.DIRECTORY, targetDir.getAbsolutePath());
+ runner.setProperty(PutFile.CONFLICT_RESOLUTION, PutFile.REPLACE_RESOLUTION);
+ runner.setProperty(PutFile.MAX_DESTINATION_FILES, "1");
+
+ Map<String,String> attributes = new HashMap<>();
+ attributes.put(CoreAttributes.FILENAME.key(), "targetFile.txt");
+ runner.enqueue("Hello world!!".getBytes(), attributes);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(FetchFile.REL_SUCCESS, 1);
+ Path targetPath = Paths.get(TARGET_DIRECTORY+"/targetFile.txt");
+ byte[] content = Files.readAllBytes(targetPath);
+ assertEquals("Hello world!!", new String(content));
+
+ //Second file
+ attributes.put(CoreAttributes.FILENAME.key(), "secondFile.txt");
+ runner.enqueue("Hello world!!".getBytes(), attributes);
+ runner.run();
+ runner.assertTransferCount(PutFile.REL_FAILURE, 1);
+ runner.assertPenalizeCount(1);
+ }
+
+ @Test
+ public void testReplaceAndMaxFileLimitReach() throws IOException {
+ final TestRunner runner = TestRunners.newTestRunner(new PutFile());
+ runner.setProperty(PutFile.DIRECTORY, targetDir.getAbsolutePath());
+ runner.setProperty(PutFile.CONFLICT_RESOLUTION, PutFile.REPLACE_RESOLUTION);
+ runner.setProperty(PutFile.MAX_DESTINATION_FILES, "1");
+
+ Map<String,String> attributes = new HashMap<>();
+ attributes.put(CoreAttributes.FILENAME.key(), "targetFile.txt");
+ runner.enqueue("Hello world!!".getBytes(), attributes);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(FetchFile.REL_SUCCESS, 1);
+ Path targetPath = Paths.get(TARGET_DIRECTORY+"/targetFile.txt");
+ byte[] content = Files.readAllBytes(targetPath);
+ assertEquals("Hello world!!", new String(content));
+
+ //Second file
+ attributes.put(CoreAttributes.FILENAME.key(), "targetFile.txt");
+ runner.enqueue("Another file".getBytes(), attributes);
+ runner.run();
+ runner.assertTransferCount(FetchFile.REL_SUCCESS, 2);
+ File dir = new File(TARGET_DIRECTORY);
+ assertEquals(1, dir.list().length);
+ targetPath = Paths.get(TARGET_DIRECTORY+"/targetFile.txt");
+ content = Files.readAllBytes(targetPath);
+ assertEquals("Another file", new String(content));
+ }
+
+}