You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2020/01/30 18:08:03 UTC
[nifi] branch support/nifi-1.11.x updated: NIFI-7073: This closes
#4025. Route to failure when error on PutHDFS file system close
This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch support/nifi-1.11.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.11.x by this push:
new add2146 NIFI-7073: This closes #4025. Route to failure when error on PutHDFS file system close
add2146 is described below
commit add2146992e91d50a71eb67f41d336272825430e
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Thu Jan 30 10:50:29 2020 -0500
NIFI-7073: This closes #4025. Route to failure when error on PutHDFS file system close
Signed-off-by: Joe Witt <jo...@apache.org>
---
.../org/apache/nifi/processors/hadoop/PutHDFS.java | 6 +--
.../apache/nifi/processors/hadoop/PutHDFSTest.java | 48 +++++++++++++++++++++-
2 files changed, 49 insertions(+), 5 deletions(-)
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index 37c306c..b6c4328 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsCreateModes;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -353,7 +352,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
if (fos != null) {
fos.close();
}
- } catch (RemoteException re) {
+ } catch (Throwable t) {
// when talking to remote HDFS clusters, we don't notice problems until fos.close()
if (createdFile != null) {
try {
@@ -361,8 +360,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
} catch (Throwable ignore) {
}
}
- throw re;
- } catch (Throwable ignore) {
+ throw t;
}
fos = null;
}
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
index 9c51f34..9413018 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
@@ -432,6 +432,33 @@ public class PutHDFSTest {
fileSystem.getFileStatus(new Path("target/test-classes/randombytes-1")).getPermission());
}
+ @Test
+ public void testPutFileWithCloseException() throws IOException {
+ mockFileSystem = new MockFileSystem(true);
+ String dirName = "target/testPutFileCloseException";
+ File file = new File(dirName);
+ file.mkdirs();
+ Path p = new Path(dirName).makeQualified(mockFileSystem.getUri(), mockFileSystem.getWorkingDirectory());
+
+ TestRunner runner = TestRunners.newTestRunner(new TestablePutHDFS(kerberosProperties, mockFileSystem));
+ runner.setProperty(PutHDFS.DIRECTORY, dirName);
+ runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
+
+ try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1")) {
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
+ runner.enqueue(fis, attributes);
+ runner.run();
+ }
+
+ List<MockFlowFile> failedFlowFiles = runner
+ .getFlowFilesForRelationship(PutHDFS.REL_FAILURE);
+ assertFalse(failedFlowFiles.isEmpty());
+ assertTrue(failedFlowFiles.get(0).isPenalized());
+
+ mockFileSystem.delete(p, true);
+ }
+
private class TestablePutHDFS extends PutHDFS {
private KerberosProperties testKerberosProperties;
@@ -461,6 +488,15 @@ public class PutHDFSTest {
private class MockFileSystem extends FileSystem {
private final Map<Path, FileStatus> pathToStatus = new HashMap<>();
+ private final boolean failOnClose;
+
+ public MockFileSystem() {
+ failOnClose = false;
+ }
+
+ public MockFileSystem(boolean failOnClose) {
+ this.failOnClose = failOnClose;
+ }
@Override
public URI getUri() {
@@ -476,7 +512,17 @@ public class PutHDFSTest {
public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize, final short replication,
final long blockSize, final Progressable progress) {
pathToStatus.put(f, newFile(f, permission));
- return new FSDataOutputStream(new ByteArrayOutputStream(), new Statistics(""));
+ if(failOnClose) {
+ return new FSDataOutputStream(new ByteArrayOutputStream(), new Statistics("")) {
+ @Override
+ public void close() throws IOException {
+ super.close();
+ throw new IOException("Fail on close");
+ }
+ };
+ } else {
+ return new FSDataOutputStream(new ByteArrayOutputStream(), new Statistics(""));
+ }
}
@Override