You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/06/09 05:50:52 UTC

[flink] 01/02: [FLINK-18056][fs-connector] Hadoop path-based file writer adds UUID to in-progress file to avoid conflicts

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0d9ed9012286ef2495b1b75277601743e8c9ff6f
Author: Yun Gao <ga...@gmail.com>
AuthorDate: Tue Jun 9 13:47:53 2020 +0800

    [FLINK-18056][fs-connector] Hadoop path-based file writer adds UUID to in-progress file to avoid conflicts
    
    
    This closes #12452
---
 flink-formats/flink-hadoop-bulk/pom.xml            |  37 ++++++
 .../bulk/DefaultHadoopFileCommitterFactory.java    |  16 ++-
 .../formats/hadoop/bulk/HadoopFileCommitter.java   |   2 +-
 .../hadoop/bulk/HadoopFileCommitterFactory.java    |  18 ++-
 .../hadoop/bulk/HadoopPathBasedPartFileWriter.java |  59 ++++++---
 .../bulk/committer/HadoopRenameFileCommitter.java  |  41 +++++--
 ...terTest.java => AbstractFileCommitterTest.java} | 132 +++++++++++++--------
 .../bulk/HadoopPathBasedPartFileWriterTest.java    |   6 +-
 .../committer/HadoopRenameCommitterHDFSTest.java   |  97 +++++++++++++++
 .../HadoopRenameCommitterLocalFSTest.java          |  69 +++++++++++
 .../bulk/committer/cluster/HDFSCluster.java}       |  40 ++++---
 11 files changed, 414 insertions(+), 103 deletions(-)

diff --git a/flink-formats/flink-hadoop-bulk/pom.xml b/flink-formats/flink-hadoop-bulk/pom.xml
index 371c15a..051cc53 100644
--- a/flink-formats/flink-hadoop-bulk/pom.xml
+++ b/flink-formats/flink-hadoop-bulk/pom.xml
@@ -80,6 +80,43 @@ under the License.
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
+
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-hdfs</artifactId>
+			<scope>test</scope>
+			<type>test-jar</type>
+			<version>${hadoop.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>log4j</groupId>
+					<artifactId>log4j</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-common</artifactId>
+			<scope>test</scope>
+			<type>test-jar</type>
+			<version>${hadoop.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>log4j</groupId>
+					<artifactId>log4j</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.slf4j</groupId>
+					<artifactId>slf4j-log4j12</artifactId>
+				</exclusion>
+				<exclusion>
+					<!-- This dependency is no longer shipped with the JDK since Java 9.-->
+					<groupId>jdk.tools</groupId>
+					<artifactId>jdk.tools</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
 	</dependencies>
 
 	<build>
diff --git a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/DefaultHadoopFileCommitterFactory.java b/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/DefaultHadoopFileCommitterFactory.java
index 01ac88c..00f8f58 100644
--- a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/DefaultHadoopFileCommitterFactory.java
+++ b/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/DefaultHadoopFileCommitterFactory.java
@@ -23,6 +23,8 @@ import org.apache.flink.formats.hadoop.bulk.committer.HadoopRenameFileCommitter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 
+import java.io.IOException;
+
 /**
  * The default hadoop file committer factory which always use {@link HadoopRenameFileCommitter}.
  */
@@ -31,7 +33,19 @@ public class DefaultHadoopFileCommitterFactory implements HadoopFileCommitterFac
 	private static final long serialVersionUID = 1L;
 
 	@Override
-	public HadoopFileCommitter create(Configuration configuration, Path targetFilePath) {
+	public HadoopFileCommitter create(
+		Configuration configuration,
+		Path targetFilePath) throws IOException {
+
 		return new HadoopRenameFileCommitter(configuration, targetFilePath);
 	}
+
+	@Override
+	public HadoopFileCommitter recoverForCommit(
+		Configuration configuration,
+		Path targetFilePath,
+		Path tempFilePath) throws IOException {
+
+		return new HadoopRenameFileCommitter(configuration, targetFilePath, tempFilePath);
+	}
 }
diff --git a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopFileCommitter.java b/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopFileCommitter.java
index 7ae0d56..0fdb4f8 100644
--- a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopFileCommitter.java
+++ b/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopFileCommitter.java
@@ -43,7 +43,7 @@ public interface HadoopFileCommitter {
 	 *
 	 * @return The path of the intermediate file to commit.
 	 */
-	Path getInProgressFilePath();
+	Path getTempFilePath();
 
 	/**
 	 * Prepares the intermediates file for committing.
diff --git a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopFileCommitterFactory.java b/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopFileCommitterFactory.java
index ae0495a..8f30c7a 100644
--- a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopFileCommitterFactory.java
+++ b/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopFileCommitterFactory.java
@@ -33,13 +33,25 @@ import java.io.Serializable;
 public interface HadoopFileCommitterFactory extends Serializable {
 
 	/**
-	 * Creates the corresponding Hadoop file committer according to the Hadoop
-	 * configuration and the target path.
+	 * Creates a new Hadoop file committer for writing.
 	 *
 	 * @param configuration The hadoop configuration.
-	 * @param targetFilePath The target path to commit.
+	 * @param targetFilePath The target path to commit to.
 	 * @return The corresponding Hadoop file committer.
 	 */
 	HadoopFileCommitter create(Configuration configuration, Path targetFilePath) throws IOException;
 
+	/**
+	 * Creates a Hadoop file committer for commit the pending file.
+	 *
+	 * @param configuration The hadoop configuration.
+	 * @param targetFilePath The target path to commit to.
+	 * @param inProgressPath The path of the remaining pending file.
+	 * @return The corresponding Hadoop file committer.
+	 */
+	HadoopFileCommitter recoverForCommit(
+		Configuration configuration,
+		Path targetFilePath,
+		Path inProgressPath) throws IOException;
+
 }
diff --git a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriter.java b/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriter.java
index 2703cfe..088e879 100644
--- a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriter.java
+++ b/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriter.java
@@ -103,20 +103,28 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID> extends AbstractPartFil
 
 		public PendingFileRecoverable getRecoverable() {
 			return new HadoopPathBasedPendingFileRecoverable(
-				fileCommitter.getTargetFilePath());
+				fileCommitter.getTargetFilePath(),
+				fileCommitter.getTempFilePath());
 		}
 	}
 
 	@VisibleForTesting
 	static class HadoopPathBasedPendingFileRecoverable implements PendingFileRecoverable {
-		private final Path path;
+		private final Path targetFilePath;
 
-		public HadoopPathBasedPendingFileRecoverable(Path path) {
-			this.path = path;
+		private final Path tempFilePath;
+
+		public HadoopPathBasedPendingFileRecoverable(Path targetFilePath, Path tempFilePath) {
+			this.targetFilePath = targetFilePath;
+			this.tempFilePath = tempFilePath;
+		}
+
+		public Path getTargetFilePath() {
+			return targetFilePath;
 		}
 
-		public Path getPath() {
-			return path;
+		public Path getTempFilePath() {
+			return tempFilePath;
 		}
 	}
 
@@ -142,14 +150,21 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID> extends AbstractPartFil
 				throw new UnsupportedOperationException("Only HadoopPathBasedPendingFileRecoverable is supported.");
 			}
 
-			Path path = ((HadoopPathBasedPendingFileRecoverable) pendingFileRecoverable).getPath();
+			HadoopPathBasedPendingFileRecoverable hadoopRecoverable =
+				(HadoopPathBasedPendingFileRecoverable) pendingFileRecoverable;
+			Path path = hadoopRecoverable.getTargetFilePath();
+			Path inProgressPath = hadoopRecoverable.getTempFilePath();
+
 			byte[] pathBytes = path.toUri().toString().getBytes(CHARSET);
+			byte[] inProgressBytes = inProgressPath.toUri().toString().getBytes(CHARSET);
 
-			byte[] targetBytes = new byte[8 + pathBytes.length];
+			byte[] targetBytes = new byte[12 + pathBytes.length + inProgressBytes.length];
 			ByteBuffer bb = ByteBuffer.wrap(targetBytes).order(ByteOrder.LITTLE_ENDIAN);
 			bb.putInt(MAGIC_NUMBER);
 			bb.putInt(pathBytes.length);
 			bb.put(pathBytes);
+			bb.putInt(inProgressBytes.length);
+			bb.put(inProgressBytes);
 
 			return targetBytes;
 		}
@@ -171,11 +186,17 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID> extends AbstractPartFil
 				throw new IOException("Corrupt data: Unexpected magic number.");
 			}
 
-			byte[] pathBytes = new byte[bb.getInt()];
-			bb.get(pathBytes);
-			String targetPath = new String(pathBytes, CHARSET);
+			byte[] targetFilePathBytes = new byte[bb.getInt()];
+			bb.get(targetFilePathBytes);
+			String targetFilePath = new String(targetFilePathBytes, CHARSET);
+
+			byte[] tempFilePathBytes = new byte[bb.getInt()];
+			bb.get(tempFilePathBytes);
+			String tempFilePath = new String(tempFilePathBytes, CHARSET);
 
-			return new HadoopPathBasedPendingFileRecoverable(new Path(targetPath));
+			return new HadoopPathBasedPendingFileRecoverable(
+				new Path(targetFilePath),
+				new Path(tempFilePath));
 		}
 	}
 
@@ -202,7 +223,9 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID> extends AbstractPartFil
 	}
 
 	/**
-	 * Factory to create {@link HadoopPathBasedPartFileWriter}.
+	 * Factory to create {@link HadoopPathBasedPartFileWriter}. This writer does not support snapshotting
+	 * the in-progress files. For pending files, it stores the target path and the staging file path into
+	 * the state.
 	 */
 	public static class HadoopPathBasedBucketWriter<IN, BucketID> implements BucketWriter<IN, BucketID> {
 		private final Configuration configuration;
@@ -230,7 +253,7 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID> extends AbstractPartFil
 			Path path = new Path(flinkPath.toUri());
 			HadoopFileCommitter fileCommitter = fileCommitterFactory.create(configuration, path);
 
-			Path inProgressFilePath = fileCommitter.getInProgressFilePath();
+			Path inProgressFilePath = fileCommitter.getTempFilePath();
 			HadoopPathBasedBulkWriter<IN> writer = bulkWriterFactory.create(path, inProgressFilePath);
 			return new HadoopPathBasedPartFileWriter<>(bucketID, writer, fileCommitter, creationTime);
 		}
@@ -241,8 +264,12 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID> extends AbstractPartFil
 				throw new UnsupportedOperationException("Only HadoopPathBasedPendingFileRecoverable is supported.");
 			}
 
-			Path path = ((HadoopPathBasedPendingFileRecoverable) pendingFileRecoverable).getPath();
-			return new HadoopPathBasedPendingFile(fileCommitterFactory.create(configuration, path));
+			HadoopPathBasedPendingFileRecoverable hadoopRecoverable =
+				(HadoopPathBasedPendingFileRecoverable) pendingFileRecoverable;
+			return new HadoopPathBasedPendingFile(fileCommitterFactory.recoverForCommit(
+				configuration,
+				hadoopRecoverable.getTargetFilePath(),
+				hadoopRecoverable.getTempFilePath()));
 		}
 
 		@Override
diff --git a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/committer/HadoopRenameFileCommitter.java b/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/committer/HadoopRenameFileCommitter.java
index df4266b..2c690cd 100644
--- a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/committer/HadoopRenameFileCommitter.java
+++ b/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/committer/HadoopRenameFileCommitter.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import java.io.IOException;
+import java.util.UUID;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 
@@ -39,12 +40,22 @@ public class HadoopRenameFileCommitter implements HadoopFileCommitter {
 
 	private final Path targetFilePath;
 
-	private final Path inProgressFilePath;
+	private final Path tempFilePath;
 
-	public HadoopRenameFileCommitter(Configuration configuration, Path targetFilePath) {
+	public HadoopRenameFileCommitter(Configuration configuration, Path targetFilePath) throws IOException {
 		this.configuration = configuration;
 		this.targetFilePath = targetFilePath;
-		this.inProgressFilePath = generateInProgressFilePath();
+		this.tempFilePath = generateTempFilePath();
+	}
+
+	public HadoopRenameFileCommitter(
+		Configuration configuration,
+		Path targetFilePath,
+		Path inProgressPath) throws IOException {
+
+		this.configuration = configuration;
+		this.targetFilePath = targetFilePath;
+		this.tempFilePath = inProgressPath;
 	}
 
 	@Override
@@ -53,8 +64,8 @@ public class HadoopRenameFileCommitter implements HadoopFileCommitter {
 	}
 
 	@Override
-	public Path getInProgressFilePath() {
-		return inProgressFilePath;
+	public Path getTempFilePath() {
+		return tempFilePath;
 	}
 
 	@Override
@@ -75,11 +86,10 @@ public class HadoopRenameFileCommitter implements HadoopFileCommitter {
 	private void rename(boolean assertFileExists) throws IOException {
 		FileSystem fileSystem = FileSystem.get(targetFilePath.toUri(), configuration);
 
-		if (!fileSystem.exists(inProgressFilePath)) {
+		if (!fileSystem.exists(tempFilePath)) {
 			if (assertFileExists) {
-				throw new IOException(String.format("In progress file(%s) not exists.", inProgressFilePath));
+				throw new IOException(String.format("In progress file(%s) not exists.", tempFilePath));
 			} else {
-
 				// By pass the re-commit if source file not exists.
 				// TODO: in the future we may also need to check if the target file exists.
 				return;
@@ -88,20 +98,27 @@ public class HadoopRenameFileCommitter implements HadoopFileCommitter {
 
 		try {
 			// If file exists, it will be overwritten.
-			fileSystem.rename(inProgressFilePath, targetFilePath);
+			fileSystem.rename(tempFilePath, targetFilePath);
 		} catch (IOException e) {
 			throw new IOException(
-				String.format("Could not commit file from %s to %s", inProgressFilePath, targetFilePath),
+				String.format("Could not commit file from %s to %s", tempFilePath, targetFilePath),
 				e);
 		}
 	}
 
-	private Path generateInProgressFilePath() {
+	private Path generateTempFilePath() throws IOException {
 		checkArgument(targetFilePath.isAbsolute(), "Target file must be absolute");
 
+		FileSystem fileSystem = FileSystem.get(targetFilePath.toUri(), configuration);
+
 		Path parent = targetFilePath.getParent();
 		String name = targetFilePath.getName();
 
-		return new Path(parent, "." + name + ".inprogress");
+		while (true) {
+			Path candidate = new Path(parent, "." + name + ".inprogress." + UUID.randomUUID().toString());
+			if (!fileSystem.exists(candidate)) {
+				return candidate;
+			}
+		}
 	}
 }
diff --git a/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/committer/HadoopRenameFileCommitterTest.java b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/AbstractFileCommitterTest.java
similarity index 66%
rename from flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/committer/HadoopRenameFileCommitterTest.java
rename to flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/AbstractFileCommitterTest.java
index 95c4af3..b591acec 100644
--- a/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/committer/HadoopRenameFileCommitterTest.java
+++ b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/AbstractFileCommitterTest.java
@@ -16,9 +16,8 @@
  * limitations under the License.
  */
 
-package org.apache.flink.formats.hadoop.bulk.committer;
+package org.apache.flink.formats.hadoop.bulk;
 
-import org.apache.flink.formats.hadoop.bulk.HadoopFileCommitter;
 import org.apache.flink.test.util.AbstractTestBase;
 
 import org.apache.hadoop.conf.Configuration;
@@ -27,7 +26,10 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.junit.After;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.BufferedReader;
 import java.io.IOException;
@@ -35,6 +37,7 @@ import java.io.InputStreamReader;
 import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
@@ -43,24 +46,59 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 /**
- * Tests the behaviors of {@link HadoopRenameFileCommitter}.
+ * Tests the behaviors of {@link HadoopFileCommitter}.
  */
-public class HadoopRenameFileCommitterTest extends AbstractTestBase {
+@RunWith(Parameterized.class)
+public abstract class AbstractFileCommitterTest extends AbstractTestBase {
 
 	private static final List<String> CONTENTS = new ArrayList<>(Arrays.asList(
 		"first line",
 		"second line",
 		"third line"));
 
+	private boolean override;
+
+	private Configuration configuration;
+
+	private Path basePath;
+
+	@Parameterized.Parameters(name = "Override: {0}")
+	public static Collection<Boolean> parameters() {
+		return Arrays.asList(true, false);
+	}
+
+	public AbstractFileCommitterTest(boolean override) throws IOException {
+		this.override = override;
+		this.configuration = getConfiguration();
+		this.basePath = getBasePath();
+	}
+
+	@After
+	public void cleanup() throws IOException {
+		cleanup(configuration, basePath);
+	}
+
+	protected abstract Configuration getConfiguration();
+
+	protected abstract HadoopFileCommitter createNewCommitter(
+		Configuration configuration,
+		Path targetFilePath) throws IOException;
+
+	protected abstract HadoopFileCommitter createPendingCommitter(
+		Configuration configuration,
+		Path targetFilePath,
+		Path tempFilePath) throws IOException;
+
+	protected abstract Path getBasePath() throws IOException;
+
+	protected abstract void cleanup(Configuration configuration, Path basePath) throws IOException;
+
 	@Test
 	public void testCommitOneFile() throws IOException {
-		Configuration configuration = new Configuration();
-
-		Path basePath = new Path(TEMPORARY_FOLDER.newFolder().toURI());
 		Path targetFilePath = new Path(basePath, "part-0-0.txt");
 
-		HadoopFileCommitter committer = new HadoopRenameFileCommitter(configuration, targetFilePath);
-		writeFile(committer.getInProgressFilePath(), configuration);
+		HadoopFileCommitter committer = createNewCommitter(configuration, targetFilePath);
+		writeFile(committer.getTempFilePath(), configuration);
 
 		committer.preCommit();
 		verifyFileNotExists(configuration, basePath, "part-0-0.txt");
@@ -71,53 +109,49 @@ public class HadoopRenameFileCommitterTest extends AbstractTestBase {
 
 	@Test
 	public void testCommitReWrittenFileAfterFailOver() throws IOException {
-		Configuration configuration = new Configuration();
-
-		Path basePath = new Path(TEMPORARY_FOLDER.newFolder().toURI());
 		Path targetFilePath = new Path(basePath, "part-0-0.txt");
 
-		HadoopFileCommitter committer = new HadoopRenameFileCommitter(configuration, targetFilePath);
-		writeFile(committer.getInProgressFilePath(), configuration);
+		HadoopFileCommitter committer = createNewCommitter(configuration, targetFilePath);
+		writeFile(committer.getTempFilePath(), configuration);
+		Path firstTempFilePath = committer.getTempFilePath();
 
 		// Simulates restart the process and re-write the file.
-		committer = new HadoopRenameFileCommitter(configuration, targetFilePath);
-		writeFile(committer.getInProgressFilePath(), configuration);
+		committer = createNewCommitter(configuration, targetFilePath);
+		writeFile(committer.getTempFilePath(), configuration);
 
 		committer.preCommit();
 		verifyFileNotExists(configuration, basePath, "part-0-0.txt");
 
 		committer.commit();
-		verifyFolderAfterAllCommitted(configuration, basePath, "part-0-0.txt");
+		verifyFolderAfterAllCommitted(
+			configuration,
+			basePath,
+			"part-0-0.txt",
+			firstTempFilePath.getName());
 	}
 
 	@Test
 	public void testCommitPreCommittedFileAfterFailOver() throws IOException {
-		Configuration configuration = new Configuration();
-
-		Path basePath = new Path(TEMPORARY_FOLDER.newFolder().toURI());
 		Path targetFilePath = new Path(basePath, "part-0-0.txt");
 
-		HadoopFileCommitter committer = new HadoopRenameFileCommitter(configuration, targetFilePath);
-		writeFile(committer.getInProgressFilePath(), configuration);
+		HadoopFileCommitter committer = createNewCommitter(configuration, targetFilePath);
+		writeFile(committer.getTempFilePath(), configuration);
 
 		committer.preCommit();
 		verifyFileNotExists(configuration, basePath, "part-0-0.txt");
 
 		// Simulates restart the process and continue committing the file.
-		committer = new HadoopRenameFileCommitter(configuration, targetFilePath);
+		committer = createPendingCommitter(configuration, targetFilePath, committer.getTempFilePath());
 		committer.commit();
 		verifyFolderAfterAllCommitted(configuration, basePath, "part-0-0.txt");
 	}
 
 	@Test
 	public void testRepeatCommitAfterFailOver() throws IOException {
-		Configuration configuration = new Configuration();
-
-		Path basePath = new Path(TEMPORARY_FOLDER.newFolder().toURI());
 		Path targetFilePath = new Path(basePath, "part-0-0.txt");
 
-		HadoopFileCommitter committer = new HadoopRenameFileCommitter(configuration, targetFilePath);
-		writeFile(committer.getInProgressFilePath(), configuration);
+		HadoopFileCommitter committer = createNewCommitter(configuration, targetFilePath);
+		writeFile(committer.getTempFilePath(), configuration);
 
 		committer.preCommit();
 		verifyFileNotExists(configuration, basePath, "part-0-0.txt");
@@ -126,7 +160,7 @@ public class HadoopRenameFileCommitterTest extends AbstractTestBase {
 		verifyFolderAfterAllCommitted(configuration, basePath, "part-0-0.txt");
 
 		// Simulates restart the process and continue committing the file.
-		committer = new HadoopRenameFileCommitter(configuration, targetFilePath);
+		committer = createPendingCommitter(configuration, targetFilePath, committer.getTempFilePath());
 		committer.commitAfterRecovery();
 
 		verifyFolderAfterAllCommitted(configuration, basePath, "part-0-0.txt");
@@ -134,17 +168,14 @@ public class HadoopRenameFileCommitterTest extends AbstractTestBase {
 
 	@Test
 	public void testCommitMultipleFilesOneByOne() throws IOException {
-		Configuration configuration = new Configuration();
-
-		Path basePath = new Path(TEMPORARY_FOLDER.newFolder().toURI());
 		Path targetFilePath1 = new Path(basePath, "part-0-0.txt");
 		Path targetFilePath2 = new Path(basePath, "part-1-1.txt");
 
-		HadoopFileCommitter committer1 = new HadoopRenameFileCommitter(configuration, targetFilePath1);
-		HadoopFileCommitter committer2 = new HadoopRenameFileCommitter(configuration, targetFilePath2);
+		HadoopFileCommitter committer1 = createNewCommitter(configuration, targetFilePath1);
+		HadoopFileCommitter committer2 = createNewCommitter(configuration, targetFilePath2);
 
-		writeFile(committer1.getInProgressFilePath(), configuration);
-		writeFile(committer2.getInProgressFilePath(), configuration);
+		writeFile(committer1.getTempFilePath(), configuration);
+		writeFile(committer2.getTempFilePath(), configuration);
 
 		committer1.preCommit();
 		committer1.commit();
@@ -160,17 +191,14 @@ public class HadoopRenameFileCommitterTest extends AbstractTestBase {
 
 	@Test
 	public void testCommitMultipleFilesMixed() throws IOException {
-		Configuration configuration = new Configuration();
-
-		Path basePath = new Path(TEMPORARY_FOLDER.newFolder().toURI());
 		Path targetFilePath1 = new Path(basePath, "part-0-0.txt");
 		Path targetFilePath2 = new Path(basePath, "part-1-1.txt");
 
-		HadoopFileCommitter committer1 = new HadoopRenameFileCommitter(configuration, targetFilePath1);
-		HadoopFileCommitter committer2 = new HadoopRenameFileCommitter(configuration, targetFilePath2);
+		HadoopFileCommitter committer1 = createNewCommitter(configuration, targetFilePath1);
+		HadoopFileCommitter committer2 = createNewCommitter(configuration, targetFilePath2);
 
-		writeFile(committer1.getInProgressFilePath(), configuration);
-		writeFile(committer2.getInProgressFilePath(), configuration);
+		writeFile(committer1.getTempFilePath(), configuration);
+		writeFile(committer2.getTempFilePath(), configuration);
 
 		committer1.preCommit();
 		committer2.preCommit();
@@ -190,7 +218,7 @@ public class HadoopRenameFileCommitterTest extends AbstractTestBase {
 
 	private void writeFile(Path path, Configuration configuration) throws IOException {
 		FileSystem fileSystem = FileSystem.get(path.toUri(), configuration);
-		try (FSDataOutputStream fsDataOutputStream = fileSystem.create(path, true);
+		try (FSDataOutputStream fsDataOutputStream = fileSystem.create(path, override);
 			PrintWriter printWriter = new PrintWriter(fsDataOutputStream)) {
 
 			for (String line : CONTENTS) {
@@ -234,9 +262,9 @@ public class HadoopRenameFileCommitterTest extends AbstractTestBase {
 	private void verifyFolderAfterAllCommitted(
 		Configuration configuration,
 		Path basePath,
-		String... targetFileNames) throws IOException {
+		String... expectedFileNames) throws IOException {
 
-		List<String> expectedNames = Arrays.asList(targetFileNames);
+		List<String> expectedNames = Arrays.asList(expectedFileNames);
 		Collections.sort(expectedNames);
 
 		FileSystem fileSystem = FileSystem.get(basePath.toUri(), configuration);
@@ -252,11 +280,13 @@ public class HadoopRenameFileCommitterTest extends AbstractTestBase {
 			fileNames);
 
 		for (FileStatus file : files) {
-			List<String> written = readFile(fileSystem, files[0].getPath());
-			assertEquals(
-				"Unexpected file content for file " + file.getPath(),
-				CONTENTS,
-				written);
+			if (!file.getPath().getName().startsWith(".")) {
+				List<String> written = readFile(fileSystem, files[0].getPath());
+				assertEquals(
+					"Unexpected file content for file " + file.getPath(),
+					CONTENTS,
+					written);
+			}
 		}
 	}
 
diff --git a/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriterTest.java b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriterTest.java
index 8a4a2a1..3ee14ee 100644
--- a/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriterTest.java
+++ b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriterTest.java
@@ -63,7 +63,8 @@ public class HadoopPathBasedPartFileWriterTest extends AbstractTestBase {
 	@Test
 	public void testPendingFileRecoverableSerializer() throws IOException {
 		HadoopPathBasedPendingFileRecoverable recoverable = new HadoopPathBasedPendingFileRecoverable(
-			new Path("hdfs://fake/path"));
+			new Path("hdfs://fake/path"),
+			new Path("hdfs://fake/path.inprogress.uuid"));
 		HadoopPathBasedPendingFileRecoverableSerializer serializer =
 			new HadoopPathBasedPendingFileRecoverableSerializer();
 
@@ -72,7 +73,8 @@ public class HadoopPathBasedPartFileWriterTest extends AbstractTestBase {
 			serializer.getVersion(),
 			serializedBytes);
 
-		assertEquals(recoverable.getPath(), deSerialized.getPath());
+		assertEquals(recoverable.getTargetFilePath(), deSerialized.getTargetFilePath());
+		assertEquals(recoverable.getTempFilePath(), deSerialized.getTempFilePath());
 	}
 
 	@Test
diff --git a/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/committer/HadoopRenameCommitterHDFSTest.java b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/committer/HadoopRenameCommitterHDFSTest.java
new file mode 100644
index 0000000..2661297
--- /dev/null
+++ b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/committer/HadoopRenameCommitterHDFSTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.hadoop.bulk.committer;
+
+import org.apache.flink.formats.hadoop.bulk.AbstractFileCommitterTest;
+import org.apache.flink.formats.hadoop.bulk.HadoopFileCommitter;
+import org.apache.flink.formats.hadoop.bulk.committer.cluster.HDFSCluster;
+import org.apache.flink.util.OperatingSystem;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+
+/**
+ * Tests the behaviors of {@link HadoopRenameFileCommitter} with HDFS file system.
+ */
+public class HadoopRenameCommitterHDFSTest extends AbstractFileCommitterTest {
+
+	@ClassRule
+	public static final TemporaryFolder CLASS_TEMPORARY_FOLDER = new TemporaryFolder();
+
+	private static HDFSCluster hdfsCluster;
+
+	@BeforeClass
+	public static void createHDFS() throws Exception {
+		Assume.assumeTrue(!OperatingSystem.isWindows());
+
+		hdfsCluster = new HDFSCluster(CLASS_TEMPORARY_FOLDER.newFolder());
+	}
+
+	@AfterClass
+	public static void destroyHDFS() {
+		if (hdfsCluster != null) {
+			hdfsCluster.shutdown();
+		}
+
+		hdfsCluster = null;
+	}
+
+	public HadoopRenameCommitterHDFSTest(boolean override) throws IOException {
+		super(override);
+	}
+
+	@Override
+	protected Path getBasePath() throws IOException {
+		return hdfsCluster.newFolder();
+	}
+
+	@Override
+	protected Configuration getConfiguration() {
+		return new Configuration();
+	}
+
+	@Override
+	protected HadoopFileCommitter createNewCommitter(
+		Configuration configuration,
+		Path targetFilePath) throws IOException {
+
+		return new HadoopRenameFileCommitter(configuration, targetFilePath);
+	}
+
+	@Override
+	protected HadoopFileCommitter createPendingCommitter(
+		Configuration configuration,
+		Path targetFilePath,
+		Path tempFilePath) throws IOException {
+
+		return new HadoopRenameFileCommitter(configuration, targetFilePath, tempFilePath);
+	}
+
+	@Override
+	protected void cleanup(Configuration configuration, Path basePath) throws IOException {
+		basePath.getFileSystem(configuration).delete(basePath, true);
+	}
+}
diff --git a/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/committer/HadoopRenameCommitterLocalFSTest.java b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/committer/HadoopRenameCommitterLocalFSTest.java
new file mode 100644
index 0000000..93e3afe
--- /dev/null
+++ b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/committer/HadoopRenameCommitterLocalFSTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.hadoop.bulk.committer;
+
+import org.apache.flink.formats.hadoop.bulk.AbstractFileCommitterTest;
+import org.apache.flink.formats.hadoop.bulk.HadoopFileCommitter;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+
+/**
+ * Tests the behaviors of {@link HadoopRenameFileCommitter} with local file system.
+ */
+public class HadoopRenameCommitterLocalFSTest extends AbstractFileCommitterTest {
+
+	public HadoopRenameCommitterLocalFSTest(boolean override) throws IOException {
+		super(override);
+	}
+
+	@Override
+	protected Path getBasePath() throws IOException {
+		return new Path(TEMPORARY_FOLDER.newFolder().toURI());
+	}
+
+	@Override
+	protected Configuration getConfiguration() {
+		return new Configuration();
+	}
+
+	@Override
+	protected HadoopFileCommitter createNewCommitter(
+		Configuration configuration,
+		Path targetFilePath) throws IOException {
+
+		return new HadoopRenameFileCommitter(configuration, targetFilePath);
+	}
+
+	@Override
+	protected HadoopFileCommitter createPendingCommitter(
+		Configuration configuration,
+		Path targetFilePath,
+		Path tempFilePath) throws IOException {
+
+		return new HadoopRenameFileCommitter(configuration, targetFilePath, tempFilePath);
+	}
+
+	@Override
+	public void cleanup(Configuration configuration, Path basePath) {
+		// Empty method.
+	}
+}
diff --git a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopFileCommitterFactory.java b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/committer/cluster/HDFSCluster.java
similarity index 56%
copy from flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopFileCommitterFactory.java
copy to flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/committer/cluster/HDFSCluster.java
index ae0495a..3ce5e33 100644
--- a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopFileCommitterFactory.java
+++ b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/committer/cluster/HDFSCluster.java
@@ -16,30 +16,36 @@
  * limitations under the License.
  */
 
-package org.apache.flink.formats.hadoop.bulk;
-
-import org.apache.flink.annotation.Internal;
+package org.apache.flink.formats.hadoop.bulk.committer.cluster;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 
+import java.io.File;
 import java.io.IOException;
-import java.io.Serializable;
+import java.util.UUID;
 
 /**
- * The factory to create the {@link HadoopFileCommitter}.
+ * Utility class for testing with HDFS FileSystem.
  */
-@Internal
-public interface HadoopFileCommitterFactory extends Serializable {
-
-	/**
-	 * Creates the corresponding Hadoop file committer according to the Hadoop
-	 * configuration and the target path.
-	 *
-	 * @param configuration The hadoop configuration.
-	 * @param targetFilePath The target path to commit.
-	 * @return The corresponding Hadoop file committer.
-	 */
-	HadoopFileCommitter create(Configuration configuration, Path targetFilePath) throws IOException;
+public class HDFSCluster {
+
+	public final MiniDFSCluster miniCluster;
+
+	public HDFSCluster(File tmpDir) throws IOException {
+		Configuration hdfsConfig = new Configuration();
+		hdfsConfig.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpDir.getAbsolutePath());
+		miniCluster = new MiniDFSCluster.Builder(hdfsConfig).build();
+	}
+
+	public void shutdown() {
+		miniCluster.shutdown();
+	}
 
+	public Path newFolder() {
+		return new Path(
+			new Path(miniCluster.getURI() + "/"),
+			UUID.randomUUID().toString());
+	}
 }