You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/06/09 05:50:52 UTC
[flink] 01/02: [FLINK-18056][fs-connector] Hadoop path-based file
writer adds UUID to in-progress file to avoid conflicts
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0d9ed9012286ef2495b1b75277601743e8c9ff6f
Author: Yun Gao <ga...@gmail.com>
AuthorDate: Tue Jun 9 13:47:53 2020 +0800
[FLINK-18056][fs-connector] Hadoop path-based file writer adds UUID to in-progress file to avoid conflicts
This closes #12452
---
flink-formats/flink-hadoop-bulk/pom.xml | 37 ++++++
.../bulk/DefaultHadoopFileCommitterFactory.java | 16 ++-
.../formats/hadoop/bulk/HadoopFileCommitter.java | 2 +-
.../hadoop/bulk/HadoopFileCommitterFactory.java | 18 ++-
.../hadoop/bulk/HadoopPathBasedPartFileWriter.java | 59 ++++++---
.../bulk/committer/HadoopRenameFileCommitter.java | 41 +++++--
...terTest.java => AbstractFileCommitterTest.java} | 132 +++++++++++++--------
.../bulk/HadoopPathBasedPartFileWriterTest.java | 6 +-
.../committer/HadoopRenameCommitterHDFSTest.java | 97 +++++++++++++++
.../HadoopRenameCommitterLocalFSTest.java | 69 +++++++++++
.../bulk/committer/cluster/HDFSCluster.java} | 40 ++++---
11 files changed, 414 insertions(+), 103 deletions(-)
diff --git a/flink-formats/flink-hadoop-bulk/pom.xml b/flink-formats/flink-hadoop-bulk/pom.xml
index 371c15a..051cc53 100644
--- a/flink-formats/flink-hadoop-bulk/pom.xml
+++ b/flink-formats/flink-hadoop-bulk/pom.xml
@@ -80,6 +80,43 @@ under the License.
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <!-- This dependency is no longer shipped with the JDK since Java 9.-->
+ <groupId>jdk.tools</groupId>
+ <artifactId>jdk.tools</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<build>
diff --git a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/DefaultHadoopFileCommitterFactory.java b/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/DefaultHadoopFileCommitterFactory.java
index 01ac88c..00f8f58 100644
--- a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/DefaultHadoopFileCommitterFactory.java
+++ b/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/DefaultHadoopFileCommitterFactory.java
@@ -23,6 +23,8 @@ import org.apache.flink.formats.hadoop.bulk.committer.HadoopRenameFileCommitter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import java.io.IOException;
+
/**
* The default hadoop file committer factory which always use {@link HadoopRenameFileCommitter}.
*/
@@ -31,7 +33,19 @@ public class DefaultHadoopFileCommitterFactory implements HadoopFileCommitterFac
private static final long serialVersionUID = 1L;
@Override
- public HadoopFileCommitter create(Configuration configuration, Path targetFilePath) {
+ public HadoopFileCommitter create(
+ Configuration configuration,
+ Path targetFilePath) throws IOException {
+
return new HadoopRenameFileCommitter(configuration, targetFilePath);
}
+
+ @Override
+ public HadoopFileCommitter recoverForCommit(
+ Configuration configuration,
+ Path targetFilePath,
+ Path tempFilePath) throws IOException {
+
+ return new HadoopRenameFileCommitter(configuration, targetFilePath, tempFilePath);
+ }
}
diff --git a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopFileCommitter.java b/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopFileCommitter.java
index 7ae0d56..0fdb4f8 100644
--- a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopFileCommitter.java
+++ b/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopFileCommitter.java
@@ -43,7 +43,7 @@ public interface HadoopFileCommitter {
*
* @return The path of the intermediate file to commit.
*/
- Path getInProgressFilePath();
+ Path getTempFilePath();
/**
* Prepares the intermediates file for committing.
diff --git a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopFileCommitterFactory.java b/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopFileCommitterFactory.java
index ae0495a..8f30c7a 100644
--- a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopFileCommitterFactory.java
+++ b/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopFileCommitterFactory.java
@@ -33,13 +33,25 @@ import java.io.Serializable;
public interface HadoopFileCommitterFactory extends Serializable {
/**
- * Creates the corresponding Hadoop file committer according to the Hadoop
- * configuration and the target path.
+ * Creates a new Hadoop file committer for writing.
*
* @param configuration The hadoop configuration.
- * @param targetFilePath The target path to commit.
+ * @param targetFilePath The target path to commit to.
* @return The corresponding Hadoop file committer.
*/
HadoopFileCommitter create(Configuration configuration, Path targetFilePath) throws IOException;
+ /**
+ * Creates a Hadoop file committer for commit the pending file.
+ *
+ * @param configuration The hadoop configuration.
+ * @param targetFilePath The target path to commit to.
+ * @param inProgressPath The path of the remaining pending file.
+ * @return The corresponding Hadoop file committer.
+ */
+ HadoopFileCommitter recoverForCommit(
+ Configuration configuration,
+ Path targetFilePath,
+ Path inProgressPath) throws IOException;
+
}
diff --git a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriter.java b/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriter.java
index 2703cfe..088e879 100644
--- a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriter.java
+++ b/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriter.java
@@ -103,20 +103,28 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID> extends AbstractPartFil
public PendingFileRecoverable getRecoverable() {
return new HadoopPathBasedPendingFileRecoverable(
- fileCommitter.getTargetFilePath());
+ fileCommitter.getTargetFilePath(),
+ fileCommitter.getTempFilePath());
}
}
@VisibleForTesting
static class HadoopPathBasedPendingFileRecoverable implements PendingFileRecoverable {
- private final Path path;
+ private final Path targetFilePath;
- public HadoopPathBasedPendingFileRecoverable(Path path) {
- this.path = path;
+ private final Path tempFilePath;
+
+ public HadoopPathBasedPendingFileRecoverable(Path targetFilePath, Path tempFilePath) {
+ this.targetFilePath = targetFilePath;
+ this.tempFilePath = tempFilePath;
+ }
+
+ public Path getTargetFilePath() {
+ return targetFilePath;
}
- public Path getPath() {
- return path;
+ public Path getTempFilePath() {
+ return tempFilePath;
}
}
@@ -142,14 +150,21 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID> extends AbstractPartFil
throw new UnsupportedOperationException("Only HadoopPathBasedPendingFileRecoverable is supported.");
}
- Path path = ((HadoopPathBasedPendingFileRecoverable) pendingFileRecoverable).getPath();
+ HadoopPathBasedPendingFileRecoverable hadoopRecoverable =
+ (HadoopPathBasedPendingFileRecoverable) pendingFileRecoverable;
+ Path path = hadoopRecoverable.getTargetFilePath();
+ Path inProgressPath = hadoopRecoverable.getTempFilePath();
+
byte[] pathBytes = path.toUri().toString().getBytes(CHARSET);
+ byte[] inProgressBytes = inProgressPath.toUri().toString().getBytes(CHARSET);
- byte[] targetBytes = new byte[8 + pathBytes.length];
+ byte[] targetBytes = new byte[12 + pathBytes.length + inProgressBytes.length];
ByteBuffer bb = ByteBuffer.wrap(targetBytes).order(ByteOrder.LITTLE_ENDIAN);
bb.putInt(MAGIC_NUMBER);
bb.putInt(pathBytes.length);
bb.put(pathBytes);
+ bb.putInt(inProgressBytes.length);
+ bb.put(inProgressBytes);
return targetBytes;
}
@@ -171,11 +186,17 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID> extends AbstractPartFil
throw new IOException("Corrupt data: Unexpected magic number.");
}
- byte[] pathBytes = new byte[bb.getInt()];
- bb.get(pathBytes);
- String targetPath = new String(pathBytes, CHARSET);
+ byte[] targetFilePathBytes = new byte[bb.getInt()];
+ bb.get(targetFilePathBytes);
+ String targetFilePath = new String(targetFilePathBytes, CHARSET);
+
+ byte[] tempFilePathBytes = new byte[bb.getInt()];
+ bb.get(tempFilePathBytes);
+ String tempFilePath = new String(tempFilePathBytes, CHARSET);
- return new HadoopPathBasedPendingFileRecoverable(new Path(targetPath));
+ return new HadoopPathBasedPendingFileRecoverable(
+ new Path(targetFilePath),
+ new Path(tempFilePath));
}
}
@@ -202,7 +223,9 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID> extends AbstractPartFil
}
/**
- * Factory to create {@link HadoopPathBasedPartFileWriter}.
+ * Factory to create {@link HadoopPathBasedPartFileWriter}. This writer does not support snapshotting
+ * the in-progress files. For pending files, it stores the target path and the staging file path into
+ * the state.
*/
public static class HadoopPathBasedBucketWriter<IN, BucketID> implements BucketWriter<IN, BucketID> {
private final Configuration configuration;
@@ -230,7 +253,7 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID> extends AbstractPartFil
Path path = new Path(flinkPath.toUri());
HadoopFileCommitter fileCommitter = fileCommitterFactory.create(configuration, path);
- Path inProgressFilePath = fileCommitter.getInProgressFilePath();
+ Path inProgressFilePath = fileCommitter.getTempFilePath();
HadoopPathBasedBulkWriter<IN> writer = bulkWriterFactory.create(path, inProgressFilePath);
return new HadoopPathBasedPartFileWriter<>(bucketID, writer, fileCommitter, creationTime);
}
@@ -241,8 +264,12 @@ public class HadoopPathBasedPartFileWriter<IN, BucketID> extends AbstractPartFil
throw new UnsupportedOperationException("Only HadoopPathBasedPendingFileRecoverable is supported.");
}
- Path path = ((HadoopPathBasedPendingFileRecoverable) pendingFileRecoverable).getPath();
- return new HadoopPathBasedPendingFile(fileCommitterFactory.create(configuration, path));
+ HadoopPathBasedPendingFileRecoverable hadoopRecoverable =
+ (HadoopPathBasedPendingFileRecoverable) pendingFileRecoverable;
+ return new HadoopPathBasedPendingFile(fileCommitterFactory.recoverForCommit(
+ configuration,
+ hadoopRecoverable.getTargetFilePath(),
+ hadoopRecoverable.getTempFilePath()));
}
@Override
diff --git a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/committer/HadoopRenameFileCommitter.java b/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/committer/HadoopRenameFileCommitter.java
index df4266b..2c690cd 100644
--- a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/committer/HadoopRenameFileCommitter.java
+++ b/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/committer/HadoopRenameFileCommitter.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
+import java.util.UUID;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -39,12 +40,22 @@ public class HadoopRenameFileCommitter implements HadoopFileCommitter {
private final Path targetFilePath;
- private final Path inProgressFilePath;
+ private final Path tempFilePath;
- public HadoopRenameFileCommitter(Configuration configuration, Path targetFilePath) {
+ public HadoopRenameFileCommitter(Configuration configuration, Path targetFilePath) throws IOException {
this.configuration = configuration;
this.targetFilePath = targetFilePath;
- this.inProgressFilePath = generateInProgressFilePath();
+ this.tempFilePath = generateTempFilePath();
+ }
+
+ public HadoopRenameFileCommitter(
+ Configuration configuration,
+ Path targetFilePath,
+ Path inProgressPath) throws IOException {
+
+ this.configuration = configuration;
+ this.targetFilePath = targetFilePath;
+ this.tempFilePath = inProgressPath;
}
@Override
@@ -53,8 +64,8 @@ public class HadoopRenameFileCommitter implements HadoopFileCommitter {
}
@Override
- public Path getInProgressFilePath() {
- return inProgressFilePath;
+ public Path getTempFilePath() {
+ return tempFilePath;
}
@Override
@@ -75,11 +86,10 @@ public class HadoopRenameFileCommitter implements HadoopFileCommitter {
private void rename(boolean assertFileExists) throws IOException {
FileSystem fileSystem = FileSystem.get(targetFilePath.toUri(), configuration);
- if (!fileSystem.exists(inProgressFilePath)) {
+ if (!fileSystem.exists(tempFilePath)) {
if (assertFileExists) {
- throw new IOException(String.format("In progress file(%s) not exists.", inProgressFilePath));
+ throw new IOException(String.format("In progress file(%s) not exists.", tempFilePath));
} else {
-
// By pass the re-commit if source file not exists.
// TODO: in the future we may also need to check if the target file exists.
return;
@@ -88,20 +98,27 @@ public class HadoopRenameFileCommitter implements HadoopFileCommitter {
try {
// If file exists, it will be overwritten.
- fileSystem.rename(inProgressFilePath, targetFilePath);
+ fileSystem.rename(tempFilePath, targetFilePath);
} catch (IOException e) {
throw new IOException(
- String.format("Could not commit file from %s to %s", inProgressFilePath, targetFilePath),
+ String.format("Could not commit file from %s to %s", tempFilePath, targetFilePath),
e);
}
}
- private Path generateInProgressFilePath() {
+ private Path generateTempFilePath() throws IOException {
checkArgument(targetFilePath.isAbsolute(), "Target file must be absolute");
+ FileSystem fileSystem = FileSystem.get(targetFilePath.toUri(), configuration);
+
Path parent = targetFilePath.getParent();
String name = targetFilePath.getName();
- return new Path(parent, "." + name + ".inprogress");
+ while (true) {
+ Path candidate = new Path(parent, "." + name + ".inprogress." + UUID.randomUUID().toString());
+ if (!fileSystem.exists(candidate)) {
+ return candidate;
+ }
+ }
}
}
diff --git a/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/committer/HadoopRenameFileCommitterTest.java b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/AbstractFileCommitterTest.java
similarity index 66%
rename from flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/committer/HadoopRenameFileCommitterTest.java
rename to flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/AbstractFileCommitterTest.java
index 95c4af3..b591acec 100644
--- a/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/committer/HadoopRenameFileCommitterTest.java
+++ b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/AbstractFileCommitterTest.java
@@ -16,9 +16,8 @@
* limitations under the License.
*/
-package org.apache.flink.formats.hadoop.bulk.committer;
+package org.apache.flink.formats.hadoop.bulk;
-import org.apache.flink.formats.hadoop.bulk.HadoopFileCommitter;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.hadoop.conf.Configuration;
@@ -27,7 +26,10 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.junit.After;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.io.BufferedReader;
import java.io.IOException;
@@ -35,6 +37,7 @@ import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -43,24 +46,59 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
- * Tests the behaviors of {@link HadoopRenameFileCommitter}.
+ * Tests the behaviors of {@link HadoopFileCommitter}.
*/
-public class HadoopRenameFileCommitterTest extends AbstractTestBase {
+@RunWith(Parameterized.class)
+public abstract class AbstractFileCommitterTest extends AbstractTestBase {
private static final List<String> CONTENTS = new ArrayList<>(Arrays.asList(
"first line",
"second line",
"third line"));
+ private boolean override;
+
+ private Configuration configuration;
+
+ private Path basePath;
+
+ @Parameterized.Parameters(name = "Override: {0}")
+ public static Collection<Boolean> parameters() {
+ return Arrays.asList(true, false);
+ }
+
+ public AbstractFileCommitterTest(boolean override) throws IOException {
+ this.override = override;
+ this.configuration = getConfiguration();
+ this.basePath = getBasePath();
+ }
+
+ @After
+ public void cleanup() throws IOException {
+ cleanup(configuration, basePath);
+ }
+
+ protected abstract Configuration getConfiguration();
+
+ protected abstract HadoopFileCommitter createNewCommitter(
+ Configuration configuration,
+ Path targetFilePath) throws IOException;
+
+ protected abstract HadoopFileCommitter createPendingCommitter(
+ Configuration configuration,
+ Path targetFilePath,
+ Path tempFilePath) throws IOException;
+
+ protected abstract Path getBasePath() throws IOException;
+
+ protected abstract void cleanup(Configuration configuration, Path basePath) throws IOException;
+
@Test
public void testCommitOneFile() throws IOException {
- Configuration configuration = new Configuration();
-
- Path basePath = new Path(TEMPORARY_FOLDER.newFolder().toURI());
Path targetFilePath = new Path(basePath, "part-0-0.txt");
- HadoopFileCommitter committer = new HadoopRenameFileCommitter(configuration, targetFilePath);
- writeFile(committer.getInProgressFilePath(), configuration);
+ HadoopFileCommitter committer = createNewCommitter(configuration, targetFilePath);
+ writeFile(committer.getTempFilePath(), configuration);
committer.preCommit();
verifyFileNotExists(configuration, basePath, "part-0-0.txt");
@@ -71,53 +109,49 @@ public class HadoopRenameFileCommitterTest extends AbstractTestBase {
@Test
public void testCommitReWrittenFileAfterFailOver() throws IOException {
- Configuration configuration = new Configuration();
-
- Path basePath = new Path(TEMPORARY_FOLDER.newFolder().toURI());
Path targetFilePath = new Path(basePath, "part-0-0.txt");
- HadoopFileCommitter committer = new HadoopRenameFileCommitter(configuration, targetFilePath);
- writeFile(committer.getInProgressFilePath(), configuration);
+ HadoopFileCommitter committer = createNewCommitter(configuration, targetFilePath);
+ writeFile(committer.getTempFilePath(), configuration);
+ Path firstTempFilePath = committer.getTempFilePath();
// Simulates restart the process and re-write the file.
- committer = new HadoopRenameFileCommitter(configuration, targetFilePath);
- writeFile(committer.getInProgressFilePath(), configuration);
+ committer = createNewCommitter(configuration, targetFilePath);
+ writeFile(committer.getTempFilePath(), configuration);
committer.preCommit();
verifyFileNotExists(configuration, basePath, "part-0-0.txt");
committer.commit();
- verifyFolderAfterAllCommitted(configuration, basePath, "part-0-0.txt");
+ verifyFolderAfterAllCommitted(
+ configuration,
+ basePath,
+ "part-0-0.txt",
+ firstTempFilePath.getName());
}
@Test
public void testCommitPreCommittedFileAfterFailOver() throws IOException {
- Configuration configuration = new Configuration();
-
- Path basePath = new Path(TEMPORARY_FOLDER.newFolder().toURI());
Path targetFilePath = new Path(basePath, "part-0-0.txt");
- HadoopFileCommitter committer = new HadoopRenameFileCommitter(configuration, targetFilePath);
- writeFile(committer.getInProgressFilePath(), configuration);
+ HadoopFileCommitter committer = createNewCommitter(configuration, targetFilePath);
+ writeFile(committer.getTempFilePath(), configuration);
committer.preCommit();
verifyFileNotExists(configuration, basePath, "part-0-0.txt");
// Simulates restart the process and continue committing the file.
- committer = new HadoopRenameFileCommitter(configuration, targetFilePath);
+ committer = createPendingCommitter(configuration, targetFilePath, committer.getTempFilePath());
committer.commit();
verifyFolderAfterAllCommitted(configuration, basePath, "part-0-0.txt");
}
@Test
public void testRepeatCommitAfterFailOver() throws IOException {
- Configuration configuration = new Configuration();
-
- Path basePath = new Path(TEMPORARY_FOLDER.newFolder().toURI());
Path targetFilePath = new Path(basePath, "part-0-0.txt");
- HadoopFileCommitter committer = new HadoopRenameFileCommitter(configuration, targetFilePath);
- writeFile(committer.getInProgressFilePath(), configuration);
+ HadoopFileCommitter committer = createNewCommitter(configuration, targetFilePath);
+ writeFile(committer.getTempFilePath(), configuration);
committer.preCommit();
verifyFileNotExists(configuration, basePath, "part-0-0.txt");
@@ -126,7 +160,7 @@ public class HadoopRenameFileCommitterTest extends AbstractTestBase {
verifyFolderAfterAllCommitted(configuration, basePath, "part-0-0.txt");
// Simulates restart the process and continue committing the file.
- committer = new HadoopRenameFileCommitter(configuration, targetFilePath);
+ committer = createPendingCommitter(configuration, targetFilePath, committer.getTempFilePath());
committer.commitAfterRecovery();
verifyFolderAfterAllCommitted(configuration, basePath, "part-0-0.txt");
@@ -134,17 +168,14 @@ public class HadoopRenameFileCommitterTest extends AbstractTestBase {
@Test
public void testCommitMultipleFilesOneByOne() throws IOException {
- Configuration configuration = new Configuration();
-
- Path basePath = new Path(TEMPORARY_FOLDER.newFolder().toURI());
Path targetFilePath1 = new Path(basePath, "part-0-0.txt");
Path targetFilePath2 = new Path(basePath, "part-1-1.txt");
- HadoopFileCommitter committer1 = new HadoopRenameFileCommitter(configuration, targetFilePath1);
- HadoopFileCommitter committer2 = new HadoopRenameFileCommitter(configuration, targetFilePath2);
+ HadoopFileCommitter committer1 = createNewCommitter(configuration, targetFilePath1);
+ HadoopFileCommitter committer2 = createNewCommitter(configuration, targetFilePath2);
- writeFile(committer1.getInProgressFilePath(), configuration);
- writeFile(committer2.getInProgressFilePath(), configuration);
+ writeFile(committer1.getTempFilePath(), configuration);
+ writeFile(committer2.getTempFilePath(), configuration);
committer1.preCommit();
committer1.commit();
@@ -160,17 +191,14 @@ public class HadoopRenameFileCommitterTest extends AbstractTestBase {
@Test
public void testCommitMultipleFilesMixed() throws IOException {
- Configuration configuration = new Configuration();
-
- Path basePath = new Path(TEMPORARY_FOLDER.newFolder().toURI());
Path targetFilePath1 = new Path(basePath, "part-0-0.txt");
Path targetFilePath2 = new Path(basePath, "part-1-1.txt");
- HadoopFileCommitter committer1 = new HadoopRenameFileCommitter(configuration, targetFilePath1);
- HadoopFileCommitter committer2 = new HadoopRenameFileCommitter(configuration, targetFilePath2);
+ HadoopFileCommitter committer1 = createNewCommitter(configuration, targetFilePath1);
+ HadoopFileCommitter committer2 = createNewCommitter(configuration, targetFilePath2);
- writeFile(committer1.getInProgressFilePath(), configuration);
- writeFile(committer2.getInProgressFilePath(), configuration);
+ writeFile(committer1.getTempFilePath(), configuration);
+ writeFile(committer2.getTempFilePath(), configuration);
committer1.preCommit();
committer2.preCommit();
@@ -190,7 +218,7 @@ public class HadoopRenameFileCommitterTest extends AbstractTestBase {
private void writeFile(Path path, Configuration configuration) throws IOException {
FileSystem fileSystem = FileSystem.get(path.toUri(), configuration);
- try (FSDataOutputStream fsDataOutputStream = fileSystem.create(path, true);
+ try (FSDataOutputStream fsDataOutputStream = fileSystem.create(path, override);
PrintWriter printWriter = new PrintWriter(fsDataOutputStream)) {
for (String line : CONTENTS) {
@@ -234,9 +262,9 @@ public class HadoopRenameFileCommitterTest extends AbstractTestBase {
private void verifyFolderAfterAllCommitted(
Configuration configuration,
Path basePath,
- String... targetFileNames) throws IOException {
+ String... expectedFileNames) throws IOException {
- List<String> expectedNames = Arrays.asList(targetFileNames);
+ List<String> expectedNames = Arrays.asList(expectedFileNames);
Collections.sort(expectedNames);
FileSystem fileSystem = FileSystem.get(basePath.toUri(), configuration);
@@ -252,11 +280,13 @@ public class HadoopRenameFileCommitterTest extends AbstractTestBase {
fileNames);
for (FileStatus file : files) {
- List<String> written = readFile(fileSystem, files[0].getPath());
- assertEquals(
- "Unexpected file content for file " + file.getPath(),
- CONTENTS,
- written);
+ if (!file.getPath().getName().startsWith(".")) {
+ List<String> written = readFile(fileSystem, files[0].getPath());
+ assertEquals(
+ "Unexpected file content for file " + file.getPath(),
+ CONTENTS,
+ written);
+ }
}
}
diff --git a/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriterTest.java b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriterTest.java
index 8a4a2a1..3ee14ee 100644
--- a/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriterTest.java
+++ b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriterTest.java
@@ -63,7 +63,8 @@ public class HadoopPathBasedPartFileWriterTest extends AbstractTestBase {
@Test
public void testPendingFileRecoverableSerializer() throws IOException {
HadoopPathBasedPendingFileRecoverable recoverable = new HadoopPathBasedPendingFileRecoverable(
- new Path("hdfs://fake/path"));
+ new Path("hdfs://fake/path"),
+ new Path("hdfs://fake/path.inprogress.uuid"));
HadoopPathBasedPendingFileRecoverableSerializer serializer =
new HadoopPathBasedPendingFileRecoverableSerializer();
@@ -72,7 +73,8 @@ public class HadoopPathBasedPartFileWriterTest extends AbstractTestBase {
serializer.getVersion(),
serializedBytes);
- assertEquals(recoverable.getPath(), deSerialized.getPath());
+ assertEquals(recoverable.getTargetFilePath(), deSerialized.getTargetFilePath());
+ assertEquals(recoverable.getTempFilePath(), deSerialized.getTempFilePath());
}
@Test
diff --git a/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/committer/HadoopRenameCommitterHDFSTest.java b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/committer/HadoopRenameCommitterHDFSTest.java
new file mode 100644
index 0000000..2661297
--- /dev/null
+++ b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/committer/HadoopRenameCommitterHDFSTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.hadoop.bulk.committer;
+
+import org.apache.flink.formats.hadoop.bulk.AbstractFileCommitterTest;
+import org.apache.flink.formats.hadoop.bulk.HadoopFileCommitter;
+import org.apache.flink.formats.hadoop.bulk.committer.cluster.HDFSCluster;
+import org.apache.flink.util.OperatingSystem;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+
+/**
+ * Tests the behaviors of {@link HadoopRenameFileCommitter} with HDFS file system.
+ */
+public class HadoopRenameCommitterHDFSTest extends AbstractFileCommitterTest {
+
+ @ClassRule
+ public static final TemporaryFolder CLASS_TEMPORARY_FOLDER = new TemporaryFolder();
+
+ private static HDFSCluster hdfsCluster;
+
+ @BeforeClass
+ public static void createHDFS() throws Exception {
+ Assume.assumeTrue(!OperatingSystem.isWindows());
+
+ hdfsCluster = new HDFSCluster(CLASS_TEMPORARY_FOLDER.newFolder());
+ }
+
+ @AfterClass
+ public static void destroyHDFS() {
+ if (hdfsCluster != null) {
+ hdfsCluster.shutdown();
+ }
+
+ hdfsCluster = null;
+ }
+
+ public HadoopRenameCommitterHDFSTest(boolean override) throws IOException {
+ super(override);
+ }
+
+ @Override
+ protected Path getBasePath() throws IOException {
+ return hdfsCluster.newFolder();
+ }
+
+ @Override
+ protected Configuration getConfiguration() {
+ return new Configuration();
+ }
+
+ @Override
+ protected HadoopFileCommitter createNewCommitter(
+ Configuration configuration,
+ Path targetFilePath) throws IOException {
+
+ return new HadoopRenameFileCommitter(configuration, targetFilePath);
+ }
+
+ @Override
+ protected HadoopFileCommitter createPendingCommitter(
+ Configuration configuration,
+ Path targetFilePath,
+ Path tempFilePath) throws IOException {
+
+ return new HadoopRenameFileCommitter(configuration, targetFilePath, tempFilePath);
+ }
+
+ @Override
+ protected void cleanup(Configuration configuration, Path basePath) throws IOException {
+ basePath.getFileSystem(configuration).delete(basePath, true);
+ }
+}
diff --git a/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/committer/HadoopRenameCommitterLocalFSTest.java b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/committer/HadoopRenameCommitterLocalFSTest.java
new file mode 100644
index 0000000..93e3afe
--- /dev/null
+++ b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/committer/HadoopRenameCommitterLocalFSTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.hadoop.bulk.committer;
+
+import org.apache.flink.formats.hadoop.bulk.AbstractFileCommitterTest;
+import org.apache.flink.formats.hadoop.bulk.HadoopFileCommitter;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+
+/**
+ * Tests the behaviors of {@link HadoopRenameFileCommitter} with local file system.
+ */
+public class HadoopRenameCommitterLocalFSTest extends AbstractFileCommitterTest {
+
+ public HadoopRenameCommitterLocalFSTest(boolean override) throws IOException {
+ super(override);
+ }
+
+ @Override
+ protected Path getBasePath() throws IOException {
+ return new Path(TEMPORARY_FOLDER.newFolder().toURI());
+ }
+
+ @Override
+ protected Configuration getConfiguration() {
+ return new Configuration();
+ }
+
+ @Override
+ protected HadoopFileCommitter createNewCommitter(
+ Configuration configuration,
+ Path targetFilePath) throws IOException {
+
+ return new HadoopRenameFileCommitter(configuration, targetFilePath);
+ }
+
+ @Override
+ protected HadoopFileCommitter createPendingCommitter(
+ Configuration configuration,
+ Path targetFilePath,
+ Path tempFilePath) throws IOException {
+
+ return new HadoopRenameFileCommitter(configuration, targetFilePath, tempFilePath);
+ }
+
+ @Override
+ public void cleanup(Configuration configuration, Path basePath) {
+ // Empty method.
+ }
+}
diff --git a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopFileCommitterFactory.java b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/committer/cluster/HDFSCluster.java
similarity index 56%
copy from flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopFileCommitterFactory.java
copy to flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/committer/cluster/HDFSCluster.java
index ae0495a..3ce5e33 100644
--- a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/formats/hadoop/bulk/HadoopFileCommitterFactory.java
+++ b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/committer/cluster/HDFSCluster.java
@@ -16,30 +16,36 @@
* limitations under the License.
*/
-package org.apache.flink.formats.hadoop.bulk;
-
-import org.apache.flink.annotation.Internal;
+package org.apache.flink.formats.hadoop.bulk.committer.cluster;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import java.io.File;
import java.io.IOException;
-import java.io.Serializable;
+import java.util.UUID;
/**
- * The factory to create the {@link HadoopFileCommitter}.
+ * Utility class for testing with HDFS FileSystem.
*/
-@Internal
-public interface HadoopFileCommitterFactory extends Serializable {
-
- /**
- * Creates the corresponding Hadoop file committer according to the Hadoop
- * configuration and the target path.
- *
- * @param configuration The hadoop configuration.
- * @param targetFilePath The target path to commit.
- * @return The corresponding Hadoop file committer.
- */
- HadoopFileCommitter create(Configuration configuration, Path targetFilePath) throws IOException;
+public class HDFSCluster {
+
+ public final MiniDFSCluster miniCluster;
+
+ public HDFSCluster(File tmpDir) throws IOException {
+ Configuration hdfsConfig = new Configuration();
+ hdfsConfig.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpDir.getAbsolutePath());
+ miniCluster = new MiniDFSCluster.Builder(hdfsConfig).build();
+ }
+
+ public void shutdown() {
+ miniCluster.shutdown();
+ }
+ public Path newFolder() {
+ return new Path(
+ new Path(miniCluster.getURI() + "/"),
+ UUID.randomUUID().toString());
+ }
}