You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2021/12/14 17:15:30 UTC

[nifi] 12/15: NIFI-9260 Making the 'write and rename' behaviour optional for PutHDFS

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 534e6eafe7b8622e6d141802a40d7c6e01a09361
Author: Bence Simon <si...@gmail.com>
AuthorDate: Thu Sep 30 15:57:41 2021 +0200

    NIFI-9260 Making the 'write and rename' behaviour optional for PutHDFS
    
    This closes #5423.
    
    Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
 .../org/apache/nifi/processors/hadoop/PutHDFS.java | 35 ++++++++--
 .../apache/nifi/processors/hadoop/PutHDFSTest.java | 74 +++++++++++++++++-----
 2 files changed, 90 insertions(+), 19 deletions(-)

diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index 62b7996..d5d85fc 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -125,6 +125,9 @@ public class PutHDFS extends AbstractHadoopProcessor {
     protected static final String FAIL_RESOLUTION = "fail";
     protected static final String APPEND_RESOLUTION = "append";
 
+    protected static final String WRITE_AND_RENAME = "writeAndRename";
+    protected static final String SIMPLE_WRITE = "simpleWrite";
+
     protected static final AllowableValue REPLACE_RESOLUTION_AV = new AllowableValue(REPLACE_RESOLUTION,
             REPLACE_RESOLUTION, "Replaces the existing file if any.");
     protected static final AllowableValue IGNORE_RESOLUTION_AV = new AllowableValue(IGNORE_RESOLUTION, IGNORE_RESOLUTION,
@@ -134,6 +137,11 @@ public class PutHDFS extends AbstractHadoopProcessor {
     protected static final AllowableValue APPEND_RESOLUTION_AV = new AllowableValue(APPEND_RESOLUTION, APPEND_RESOLUTION,
             "Appends to the existing file if any, creates a new file otherwise.");
 
+    protected static final AllowableValue WRITE_AND_RENAME_AV = new AllowableValue(WRITE_AND_RENAME, "Write and rename",
+            "The processor writes FlowFile data into a temporary file and renames it after completion. This prevents other processes from reading partially written files.");
+    protected static final AllowableValue SIMPLE_WRITE_AV = new AllowableValue(SIMPLE_WRITE, "Simple write",
+            "The processor writes FlowFile data directly to the destination file. In some cases this might cause reading partially written files.");
+
     protected static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
             .name("Conflict Resolution Strategy")
             .description("Indicates what should happen when a file with the same name already exists in the output directory")
@@ -142,6 +150,15 @@ public class PutHDFS extends AbstractHadoopProcessor {
             .allowableValues(REPLACE_RESOLUTION_AV, IGNORE_RESOLUTION_AV, FAIL_RESOLUTION_AV, APPEND_RESOLUTION_AV)
             .build();
 
+    protected static final PropertyDescriptor WRITING_STRATEGY = new PropertyDescriptor.Builder()
+            .name("writing-strategy")
+            .displayName("Writing Strategy")
+            .description("Defines the approach for writing the FlowFile data.")
+            .required(true)
+            .defaultValue(WRITE_AND_RENAME_AV.getValue())
+            .allowableValues(WRITE_AND_RENAME_AV, SIMPLE_WRITE_AV)
+            .build();
+
     public static final PropertyDescriptor BLOCK_SIZE = new PropertyDescriptor.Builder()
             .name("Block Size")
             .description("Size of each block as written to HDFS. This overrides the Hadoop Configuration")
@@ -219,6 +236,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
                 .description("The parent HDFS directory to which files should be written. The directory will be created if it doesn't exist.")
                 .build());
         props.add(CONFLICT_RESOLUTION);
+        props.add(WRITING_STRATEGY);
         props.add(BLOCK_SIZE);
         props.add(BUFFER_SIZE);
         props.add(REPLICATION_FACTOR);
@@ -280,6 +298,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
                 Path tempDotCopyFile = null;
                 FlowFile putFlowFile = flowFile;
                 try {
+                    final String writingStrategy = context.getProperty(WRITING_STRATEGY).getValue();
                     final Path dirPath = getNormalizedPath(context, DIRECTORY, putFlowFile);
                     final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue();
                     final long blockSize = getBlockSize(context, session, putFlowFile, dirPath);
@@ -295,6 +314,11 @@ public class PutHDFS extends AbstractHadoopProcessor {
                     final Path tempCopyFile = new Path(dirPath, "." + filename);
                     final Path copyFile = new Path(dirPath, filename);
 
+                    // Depending on the writing strategy, we might need a temporary file
+                    final Path actualCopyFile = (writingStrategy.equals(WRITE_AND_RENAME))
+                            ? tempCopyFile
+                            : copyFile;
+
                     // Create destination directory if it does not exist
                     boolean targetDirCreated = false;
                     try {
@@ -361,7 +385,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
                                         cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
                                     }
 
-                                    fos = hdfs.create(tempCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(),
+                                    fos = hdfs.create(actualCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(),
                                             FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, blockSize,
                                             null, null);
                                 }
@@ -369,7 +393,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
                                 if (codec != null) {
                                     fos = codec.createOutputStream(fos);
                                 }
-                                createdFile = tempCopyFile;
+                                createdFile = actualCopyFile;
                                 BufferedInputStream bis = new BufferedInputStream(in);
                                 StreamUtils.copy(bis, fos);
                                 bis = null;
@@ -399,9 +423,12 @@ public class PutHDFS extends AbstractHadoopProcessor {
                     final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
                     tempDotCopyFile = tempCopyFile;
 
-                    if (!conflictResponse.equals(APPEND_RESOLUTION)
-                            || (conflictResponse.equals(APPEND_RESOLUTION) && !destinationExists)) {
+                    if  (
+                        writingStrategy.equals(WRITE_AND_RENAME)
+                        && (!conflictResponse.equals(APPEND_RESOLUTION) || (conflictResponse.equals(APPEND_RESOLUTION) && !destinationExists))
+                    ) {
                         boolean renamed = false;
+
                         for (int i = 0; i < 10; i++) { // try to rename multiple times.
                             if (hdfs.rename(tempCopyFile, copyFile)) {
                                 renamed = true;
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
index c41114b..22b3ec2 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
@@ -71,6 +71,9 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 public class PutHDFSTest {
+    private final static String TARGET_DIRECTORY = "target/test-classes";
+    private final static String SOURCE_DIRECTORY = "src/test/resources/testdata";
+    private final static String FILE_NAME = "randombytes-1";
 
     private KerberosProperties kerberosProperties;
     private FileSystem mockFileSystem;
@@ -197,27 +200,32 @@ public class PutHDFSTest {
 
     @Test
     public void testPutFile() throws IOException {
-        PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem);
-        TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes");
-        runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
-        try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1")) {
-            Map<String, String> attributes = new HashMap<>();
-            attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
+        // given
+        final FileSystem spyFileSystem = Mockito.spy(mockFileSystem);
+        final PutHDFS proc = new TestablePutHDFS(kerberosProperties, spyFileSystem);
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(PutHDFS.DIRECTORY, TARGET_DIRECTORY);
+        runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, PutHDFS.REPLACE_RESOLUTION);
+
+        // when
+        try (final FileInputStream fis = new FileInputStream(SOURCE_DIRECTORY + "/" + FILE_NAME)) {
+            final Map<String, String> attributes = new HashMap<>();
+            attributes.put(CoreAttributes.FILENAME.key(), FILE_NAME);
             runner.enqueue(fis, attributes);
             runner.run();
         }
 
-        List<MockFlowFile> failedFlowFiles = runner
-                .getFlowFilesForRelationship(new Relationship.Builder().name("failure").build());
+        // then
+        final List<MockFlowFile> failedFlowFiles = runner.getFlowFilesForRelationship(PutHDFS.REL_FAILURE);
         assertTrue(failedFlowFiles.isEmpty());
 
-        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS);
+        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS);
         assertEquals(1, flowFiles.size());
-        MockFlowFile flowFile = flowFiles.get(0);
-        assertTrue(mockFileSystem.exists(new Path("target/test-classes/randombytes-1")));
-        assertEquals("randombytes-1", flowFile.getAttribute(CoreAttributes.FILENAME.key()));
-        assertEquals("target/test-classes", flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
+
+        final MockFlowFile flowFile = flowFiles.get(0);
+        assertTrue(spyFileSystem.exists(new Path(TARGET_DIRECTORY + "/" + FILE_NAME)));
+        assertEquals(FILE_NAME, flowFile.getAttribute(CoreAttributes.FILENAME.key()));
+        assertEquals(TARGET_DIRECTORY, flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
         assertEquals("true", flowFile.getAttribute(PutHDFS.TARGET_HDFS_DIR_CREATED_ATTRIBUTE));
 
         final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
@@ -225,7 +233,43 @@ public class PutHDFSTest {
         final ProvenanceEventRecord sendEvent = provenanceEvents.get(0);
         assertEquals(ProvenanceEventType.SEND, sendEvent.getEventType());
         // If it runs with a real HDFS, the protocol will be "hdfs://", but with a local filesystem, just assert the filename.
-        assertTrue(sendEvent.getTransitUri().endsWith("target/test-classes/randombytes-1"));
+        assertTrue(sendEvent.getTransitUri().endsWith(TARGET_DIRECTORY + "/" + FILE_NAME));
+
+        Mockito.verify(spyFileSystem, Mockito.times(1)).rename(Mockito.any(Path.class), Mockito.any(Path.class));
+    }
+
+    @Test
+    public void testPutFileWithSimpleWrite() throws IOException {
+        // given
+        final FileSystem spyFileSystem = Mockito.spy(mockFileSystem);
+        final PutHDFS proc = new TestablePutHDFS(kerberosProperties, spyFileSystem);
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(PutHDFS.DIRECTORY, TARGET_DIRECTORY);
+        runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, PutHDFS.REPLACE_RESOLUTION);
+        runner.setProperty(PutHDFS.WRITING_STRATEGY, PutHDFS.SIMPLE_WRITE);
+
+        // when
+        try (final FileInputStream fis = new FileInputStream(SOURCE_DIRECTORY + "/" + FILE_NAME)) {
+            final Map<String, String> attributes = new HashMap<>();
+            attributes.put(CoreAttributes.FILENAME.key(), FILE_NAME);
+            runner.enqueue(fis, attributes);
+            runner.run();
+        }
+
+        // then
+        final List<MockFlowFile> failedFlowFiles = runner.getFlowFilesForRelationship(PutHDFS.REL_FAILURE);
+        assertTrue(failedFlowFiles.isEmpty());
+
+        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS);
+        assertEquals(1, flowFiles.size());
+
+        final MockFlowFile flowFile = flowFiles.get(0);
+        assertTrue(spyFileSystem.exists(new Path(TARGET_DIRECTORY + "/" + FILE_NAME)));
+        assertEquals(FILE_NAME, flowFile.getAttribute(CoreAttributes.FILENAME.key()));
+        assertEquals(TARGET_DIRECTORY, flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
+        assertEquals("true", flowFile.getAttribute(PutHDFS.TARGET_HDFS_DIR_CREATED_ATTRIBUTE));
+
+        Mockito.verify(spyFileSystem, Mockito.never()).rename(Mockito.any(Path.class), Mockito.any(Path.class));
     }
 
     @Test