You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2020/01/30 18:08:03 UTC

[nifi] branch support/nifi-1.11.x updated: NIFI-7073: This closes #4025. Route to failure when error on PutHDFS file system close

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.11.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.11.x by this push:
     new add2146  NIFI-7073: This closes #4025. Route to failure when error on PutHDFS file system close
add2146 is described below

commit add2146992e91d50a71eb67f41d336272825430e
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Thu Jan 30 10:50:29 2020 -0500

    NIFI-7073: This closes #4025. Route to failure when error on PutHDFS file system close
    
    Signed-off-by: Joe Witt <jo...@apache.org>
---
 .../org/apache/nifi/processors/hadoop/PutHDFS.java |  6 +--
 .../apache/nifi/processors/hadoop/PutHDFSTest.java | 48 +++++++++++++++++++++-
 2 files changed, 49 insertions(+), 5 deletions(-)

diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index 37c306c..b6c4328 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsCreateModes;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -353,7 +352,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
                                     if (fos != null) {
                                         fos.close();
                                     }
-                                } catch (RemoteException re) {
+                                } catch (Throwable t) {
                                     // when talking to remote HDFS clusters, we don't notice problems until fos.close()
                                     if (createdFile != null) {
                                         try {
@@ -361,8 +360,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
                                         } catch (Throwable ignore) {
                                         }
                                     }
-                                    throw re;
-                                } catch (Throwable ignore) {
+                                    throw t;
                                 }
                                 fos = null;
                             }
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
index 9c51f34..9413018 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
@@ -432,6 +432,33 @@ public class PutHDFSTest {
             fileSystem.getFileStatus(new Path("target/test-classes/randombytes-1")).getPermission());
     }
 
+    @Test
+    public void testPutFileWithCloseException() throws IOException {
+        mockFileSystem = new MockFileSystem(true);
+        String dirName = "target/testPutFileCloseException";
+        File file = new File(dirName);
+        file.mkdirs();
+        Path p = new Path(dirName).makeQualified(mockFileSystem.getUri(), mockFileSystem.getWorkingDirectory());
+
+        TestRunner runner = TestRunners.newTestRunner(new TestablePutHDFS(kerberosProperties, mockFileSystem));
+        runner.setProperty(PutHDFS.DIRECTORY, dirName);
+        runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
+
+        try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1")) {
+            Map<String, String> attributes = new HashMap<>();
+            attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
+            runner.enqueue(fis, attributes);
+            runner.run();
+        }
+
+        List<MockFlowFile> failedFlowFiles = runner
+                .getFlowFilesForRelationship(PutHDFS.REL_FAILURE);
+        assertFalse(failedFlowFiles.isEmpty());
+        assertTrue(failedFlowFiles.get(0).isPenalized());
+
+        mockFileSystem.delete(p, true);
+    }
+
     private class TestablePutHDFS extends PutHDFS {
 
         private KerberosProperties testKerberosProperties;
@@ -461,6 +488,15 @@ public class PutHDFSTest {
 
     private class MockFileSystem extends FileSystem {
         private final Map<Path, FileStatus> pathToStatus = new HashMap<>();
+        private final boolean failOnClose;
+
+        public MockFileSystem() {
+            failOnClose = false;
+        }
+
+        public MockFileSystem(boolean failOnClose) {
+            this.failOnClose = failOnClose;
+        }
 
         @Override
         public URI getUri() {
@@ -476,7 +512,17 @@ public class PutHDFSTest {
         public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize, final short replication,
                                          final long blockSize, final Progressable progress) {
             pathToStatus.put(f, newFile(f, permission));
-            return new FSDataOutputStream(new ByteArrayOutputStream(), new Statistics(""));
+            if(failOnClose) {
+                return new FSDataOutputStream(new ByteArrayOutputStream(), new Statistics("")) {
+                    @Override
+                    public void close() throws IOException {
+                        super.close();
+                        throw new IOException("Fail on close");
+                    }
+                };
+            } else {
+                return new FSDataOutputStream(new ByteArrayOutputStream(), new Statistics(""));
+            }
         }
 
         @Override