You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2021/12/14 17:15:30 UTC
[nifi] 12/15: NIFI-9260 Making the 'write and rename' behaviour optional for PutHDFS
This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 534e6eafe7b8622e6d141802a40d7c6e01a09361
Author: Bence Simon <si...@gmail.com>
AuthorDate: Thu Sep 30 15:57:41 2021 +0200
NIFI-9260 Making the 'write and rename' behaviour optional for PutHDFS
This closes #5423.
Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
.../org/apache/nifi/processors/hadoop/PutHDFS.java | 35 ++++++++--
.../apache/nifi/processors/hadoop/PutHDFSTest.java | 74 +++++++++++++++++-----
2 files changed, 90 insertions(+), 19 deletions(-)
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index 62b7996..d5d85fc 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -125,6 +125,9 @@ public class PutHDFS extends AbstractHadoopProcessor {
protected static final String FAIL_RESOLUTION = "fail";
protected static final String APPEND_RESOLUTION = "append";
+ protected static final String WRITE_AND_RENAME = "writeAndRename";
+ protected static final String SIMPLE_WRITE = "simpleWrite";
+
protected static final AllowableValue REPLACE_RESOLUTION_AV = new AllowableValue(REPLACE_RESOLUTION,
REPLACE_RESOLUTION, "Replaces the existing file if any.");
protected static final AllowableValue IGNORE_RESOLUTION_AV = new AllowableValue(IGNORE_RESOLUTION, IGNORE_RESOLUTION,
@@ -134,6 +137,11 @@ public class PutHDFS extends AbstractHadoopProcessor {
protected static final AllowableValue APPEND_RESOLUTION_AV = new AllowableValue(APPEND_RESOLUTION, APPEND_RESOLUTION,
"Appends to the existing file if any, creates a new file otherwise.");
+ protected static final AllowableValue WRITE_AND_RENAME_AV = new AllowableValue(WRITE_AND_RENAME, "Write and rename",
+ "The processor writes FlowFile data into a temporary file and renames it after completion. This prevents other processes from reading partially written files.");
+ protected static final AllowableValue SIMPLE_WRITE_AV = new AllowableValue(SIMPLE_WRITE, "Simple write",
+ "The processor writes FlowFile data directly to the destination file. In some cases this might cause reading partially written files.");
+
protected static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
.name("Conflict Resolution Strategy")
.description("Indicates what should happen when a file with the same name already exists in the output directory")
@@ -142,6 +150,15 @@ public class PutHDFS extends AbstractHadoopProcessor {
.allowableValues(REPLACE_RESOLUTION_AV, IGNORE_RESOLUTION_AV, FAIL_RESOLUTION_AV, APPEND_RESOLUTION_AV)
.build();
+ protected static final PropertyDescriptor WRITING_STRATEGY = new PropertyDescriptor.Builder()
+ .name("writing-strategy")
+ .displayName("Writing Strategy")
+ .description("Defines the approach for writing the FlowFile data.")
+ .required(true)
+ .defaultValue(WRITE_AND_RENAME_AV.getValue())
+ .allowableValues(WRITE_AND_RENAME_AV, SIMPLE_WRITE_AV)
+ .build();
+
public static final PropertyDescriptor BLOCK_SIZE = new PropertyDescriptor.Builder()
.name("Block Size")
.description("Size of each block as written to HDFS. This overrides the Hadoop Configuration")
@@ -219,6 +236,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
.description("The parent HDFS directory to which files should be written. The directory will be created if it doesn't exist.")
.build());
props.add(CONFLICT_RESOLUTION);
+ props.add(WRITING_STRATEGY);
props.add(BLOCK_SIZE);
props.add(BUFFER_SIZE);
props.add(REPLICATION_FACTOR);
@@ -280,6 +298,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
Path tempDotCopyFile = null;
FlowFile putFlowFile = flowFile;
try {
+ final String writingStrategy = context.getProperty(WRITING_STRATEGY).getValue();
final Path dirPath = getNormalizedPath(context, DIRECTORY, putFlowFile);
final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue();
final long blockSize = getBlockSize(context, session, putFlowFile, dirPath);
@@ -295,6 +314,11 @@ public class PutHDFS extends AbstractHadoopProcessor {
final Path tempCopyFile = new Path(dirPath, "." + filename);
final Path copyFile = new Path(dirPath, filename);
+ // Depending on the writing strategy, we might need a temporary file
+ final Path actualCopyFile = (writingStrategy.equals(WRITE_AND_RENAME))
+ ? tempCopyFile
+ : copyFile;
+
// Create destination directory if it does not exist
boolean targetDirCreated = false;
try {
@@ -361,7 +385,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
}
- fos = hdfs.create(tempCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(),
+ fos = hdfs.create(actualCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(),
FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, blockSize,
null, null);
}
@@ -369,7 +393,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
if (codec != null) {
fos = codec.createOutputStream(fos);
}
- createdFile = tempCopyFile;
+ createdFile = actualCopyFile;
BufferedInputStream bis = new BufferedInputStream(in);
StreamUtils.copy(bis, fos);
bis = null;
@@ -399,9 +423,12 @@ public class PutHDFS extends AbstractHadoopProcessor {
final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
tempDotCopyFile = tempCopyFile;
- if (!conflictResponse.equals(APPEND_RESOLUTION)
- || (conflictResponse.equals(APPEND_RESOLUTION) && !destinationExists)) {
+ if (
+ writingStrategy.equals(WRITE_AND_RENAME)
+ && (!conflictResponse.equals(APPEND_RESOLUTION) || (conflictResponse.equals(APPEND_RESOLUTION) && !destinationExists))
+ ) {
boolean renamed = false;
+
for (int i = 0; i < 10; i++) { // try to rename multiple times.
if (hdfs.rename(tempCopyFile, copyFile)) {
renamed = true;
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
index c41114b..22b3ec2 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
@@ -71,6 +71,9 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
public class PutHDFSTest {
+ private final static String TARGET_DIRECTORY = "target/test-classes";
+ private final static String SOURCE_DIRECTORY = "src/test/resources/testdata";
+ private final static String FILE_NAME = "randombytes-1";
private KerberosProperties kerberosProperties;
private FileSystem mockFileSystem;
@@ -197,27 +200,32 @@ public class PutHDFSTest {
@Test
public void testPutFile() throws IOException {
- PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem);
- TestRunner runner = TestRunners.newTestRunner(proc);
- runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes");
- runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
- try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1")) {
- Map<String, String> attributes = new HashMap<>();
- attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
+ // given
+ final FileSystem spyFileSystem = Mockito.spy(mockFileSystem);
+ final PutHDFS proc = new TestablePutHDFS(kerberosProperties, spyFileSystem);
+ final TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(PutHDFS.DIRECTORY, TARGET_DIRECTORY);
+ runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, PutHDFS.REPLACE_RESOLUTION);
+
+ // when
+ try (final FileInputStream fis = new FileInputStream(SOURCE_DIRECTORY + "/" + FILE_NAME)) {
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(CoreAttributes.FILENAME.key(), FILE_NAME);
runner.enqueue(fis, attributes);
runner.run();
}
- List<MockFlowFile> failedFlowFiles = runner
- .getFlowFilesForRelationship(new Relationship.Builder().name("failure").build());
+ // then
+ final List<MockFlowFile> failedFlowFiles = runner.getFlowFilesForRelationship(PutHDFS.REL_FAILURE);
assertTrue(failedFlowFiles.isEmpty());
- List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS);
+ final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS);
assertEquals(1, flowFiles.size());
- MockFlowFile flowFile = flowFiles.get(0);
- assertTrue(mockFileSystem.exists(new Path("target/test-classes/randombytes-1")));
- assertEquals("randombytes-1", flowFile.getAttribute(CoreAttributes.FILENAME.key()));
- assertEquals("target/test-classes", flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
+
+ final MockFlowFile flowFile = flowFiles.get(0);
+ assertTrue(spyFileSystem.exists(new Path(TARGET_DIRECTORY + "/" + FILE_NAME)));
+ assertEquals(FILE_NAME, flowFile.getAttribute(CoreAttributes.FILENAME.key()));
+ assertEquals(TARGET_DIRECTORY, flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
assertEquals("true", flowFile.getAttribute(PutHDFS.TARGET_HDFS_DIR_CREATED_ATTRIBUTE));
final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
@@ -225,7 +233,43 @@ public class PutHDFSTest {
final ProvenanceEventRecord sendEvent = provenanceEvents.get(0);
assertEquals(ProvenanceEventType.SEND, sendEvent.getEventType());
// If it runs with a real HDFS, the protocol will be "hdfs://", but with a local filesystem, just assert the filename.
- assertTrue(sendEvent.getTransitUri().endsWith("target/test-classes/randombytes-1"));
+ assertTrue(sendEvent.getTransitUri().endsWith(TARGET_DIRECTORY + "/" + FILE_NAME));
+
+ Mockito.verify(spyFileSystem, Mockito.times(1)).rename(Mockito.any(Path.class), Mockito.any(Path.class));
+ }
+
+ @Test
+ public void testPutFileWithSimpleWrite() throws IOException {
+ // given
+ final FileSystem spyFileSystem = Mockito.spy(mockFileSystem);
+ final PutHDFS proc = new TestablePutHDFS(kerberosProperties, spyFileSystem);
+ final TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(PutHDFS.DIRECTORY, TARGET_DIRECTORY);
+ runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, PutHDFS.REPLACE_RESOLUTION);
+ runner.setProperty(PutHDFS.WRITING_STRATEGY, PutHDFS.SIMPLE_WRITE);
+
+ // when
+ try (final FileInputStream fis = new FileInputStream(SOURCE_DIRECTORY + "/" + FILE_NAME)) {
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(CoreAttributes.FILENAME.key(), FILE_NAME);
+ runner.enqueue(fis, attributes);
+ runner.run();
+ }
+
+ // then
+ final List<MockFlowFile> failedFlowFiles = runner.getFlowFilesForRelationship(PutHDFS.REL_FAILURE);
+ assertTrue(failedFlowFiles.isEmpty());
+
+ final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS);
+ assertEquals(1, flowFiles.size());
+
+ final MockFlowFile flowFile = flowFiles.get(0);
+ assertTrue(spyFileSystem.exists(new Path(TARGET_DIRECTORY + "/" + FILE_NAME)));
+ assertEquals(FILE_NAME, flowFile.getAttribute(CoreAttributes.FILENAME.key()));
+ assertEquals(TARGET_DIRECTORY, flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
+ assertEquals("true", flowFile.getAttribute(PutHDFS.TARGET_HDFS_DIR_CREATED_ATTRIBUTE));
+
+ Mockito.verify(spyFileSystem, Mockito.never()).rename(Mockito.any(Path.class), Mockito.any(Path.class));
}
@Test