You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2018/12/18 08:47:16 UTC

[flink] 02/02: [FLINK-10457][fs-connector] Add SequenceFile support to StreamingFileSink.

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cf88590e992b974284e01a3a35e8ad01fd47174e
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Tue Dec 11 10:50:33 2018 +0100

    [FLINK-10457][fs-connector] Add SequenceFile support to StreamingFileSink.
    
    This closes #6774.
---
 .../avro/ParquetStreamingFileSinkITCase.java       | 14 ++--
 .../pom.xml                                        | 11 +--
 .../formats/sequencefile/SequenceFileWriter.java   |  4 +-
 .../sequencefile/SequenceFileWriterFactory.java    | 68 ++++-----------
 .../SerializableHadoopConfiguration.java           | 58 +++++++++++++
 .../SequenceStreamingFileSinkITCase.java}          | 46 +++++-----
 .../SerializableHadoopConfigurationTest.java       | 98 ++++++++++++++++++++++
 .../src/test/resources/log4j-test.properties       |  0
 flink-formats/pom.xml                              |  2 +-
 .../{test => streaming}/util/FiniteTestSource.java | 25 ++++--
 10 files changed, 230 insertions(+), 96 deletions(-)

diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
index 540f762..484ef86 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
@@ -19,14 +19,15 @@
 package org.apache.flink.formats.parquet.avro;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
 import org.apache.flink.formats.parquet.generated.Address;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.streaming.util.FiniteTestSource;
 import org.apache.flink.test.util.AbstractTestBase;
-import org.apache.flink.test.util.FiniteTestSource;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
@@ -156,11 +157,14 @@ public class ParquetStreamingFileSinkITCase extends AbstractTestBase {
 
 		File[] partFiles = buckets[0].listFiles();
 		assertNotNull(partFiles);
-		assertEquals(1, partFiles.length);
-		assertTrue(partFiles[0].length() > 0);
+		assertEquals(2, partFiles.length);
 
-		List<Address> results = readParquetFile(partFiles[0], dataModel);
-		assertEquals(expected, results);
+		for (File partFile : partFiles) {
+			assertTrue(partFile.length() > 0);
+
+			final List<Tuple2<Long, String>> fileContent = readParquetFile(partFile, dataModel);
+			assertEquals(expected, fileContent);
+		}
 	}
 
 	private static <T> List<T> readParquetFile(File file, GenericData dataModel) throws IOException {
diff --git a/flink-formats/flink-sequencefile/pom.xml b/flink-formats/flink-sequence-file/pom.xml
similarity index 91%
rename from flink-formats/flink-sequencefile/pom.xml
rename to flink-formats/flink-sequence-file/pom.xml
index 632cf2f..1a2f84e 100644
--- a/flink-formats/flink-sequencefile/pom.xml
+++ b/flink-formats/flink-sequence-file/pom.xml
@@ -29,8 +29,8 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-sequencefile</artifactId>
-	<name>flink-sequencefile</name>
+	<artifactId>flink-sequence-file</artifactId>
+	<name>flink-sequence-file</name>
 
 	<packaging>jar</packaging>
 
@@ -50,13 +50,6 @@ under the License.
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-hadoop-fs</artifactId>
-			<version>${project.version}</version>
-			<scope>provided</scope>
-		</dependency>
-
 
 		<!-- test dependencies -->
 
diff --git a/flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriter.java b/flink-formats/flink-sequence-file/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriter.java
similarity index 97%
rename from flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriter.java
rename to flink-formats/flink-sequence-file/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriter.java
index fa8fab2..169aa4b 100644
--- a/flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriter.java
+++ b/flink-formats/flink-sequence-file/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriter.java
@@ -37,9 +37,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 @PublicEvolving
 public class SequenceFileWriter<K extends Writable, V extends Writable> implements BulkWriter<Tuple2<K, V>> {
+
 	private final SequenceFile.Writer writer;
 
-	public SequenceFileWriter(SequenceFile.Writer writer) {
+	SequenceFileWriter(SequenceFile.Writer writer) {
 		this.writer = checkNotNull(writer);
 	}
 
@@ -58,4 +59,3 @@ public class SequenceFileWriter<K extends Writable, V extends Writable> implemen
 		writer.close();
 	}
 }
-
diff --git a/flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java b/flink-formats/flink-sequence-file/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java
similarity index 71%
rename from flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java
rename to flink-formats/flink-sequence-file/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java
index 90a87e7..d7b96f6 100644
--- a/flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java
+++ b/flink-formats/flink-sequence-file/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java
@@ -22,9 +22,6 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.SequenceFile;
@@ -33,9 +30,6 @@ import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -47,9 +41,13 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 @PublicEvolving
 public class SequenceFileWriterFactory<K extends Writable, V extends Writable> implements BulkWriter.Factory<Tuple2<K, V>> {
+
 	private static final long serialVersionUID = 1L;
 
-	private final SerializableHadoopConfiguration serdeHadoopConfig;
+	/** A constant specifying that no compression is requested. */
+	public static final String NO_COMPRESSION = "NO_COMPRESSION";
+
+	private final SerializableHadoopConfiguration serializableHadoopConfig;
 	private final Class<K> keyClass;
 	private final Class<V> valueClass;
 	private final String compressionCodecName;
@@ -64,7 +62,7 @@ public class SequenceFileWriterFactory<K extends Writable, V extends Writable> i
 	 * @param valueClass The class of value to write.
 	 */
 	public SequenceFileWriterFactory(Configuration hadoopConf, Class<K> keyClass, Class<V> valueClass) {
-		this(hadoopConf, keyClass, valueClass, "None", SequenceFile.CompressionType.BLOCK);
+		this(hadoopConf, keyClass, valueClass, NO_COMPRESSION, SequenceFile.CompressionType.BLOCK);
 	}
 
 	/**
@@ -91,7 +89,7 @@ public class SequenceFileWriterFactory<K extends Writable, V extends Writable> i
 	 * @param compressionType      The type of compression level.
 	 */
 	public SequenceFileWriterFactory(Configuration hadoopConf, Class<K> keyClass, Class<V> valueClass, String compressionCodecName, SequenceFile.CompressionType compressionType) {
-		this.serdeHadoopConfig = new SerializableHadoopConfiguration(checkNotNull(hadoopConf));
+		this.serializableHadoopConfig = new SerializableHadoopConfiguration(checkNotNull(hadoopConf));
 		this.keyClass = checkNotNull(keyClass);
 		this.valueClass = checkNotNull(valueClass);
 		this.compressionCodecName = checkNotNull(compressionCodecName);
@@ -101,9 +99,9 @@ public class SequenceFileWriterFactory<K extends Writable, V extends Writable> i
 	@Override
 	public SequenceFileWriter<K, V> create(FSDataOutputStream out) throws IOException {
 		org.apache.hadoop.fs.FSDataOutputStream stream = new org.apache.hadoop.fs.FSDataOutputStream(out, null);
-		CompressionCodec compressionCodec = getCompressionCodec(serdeHadoopConfig.get(), compressionCodecName);
+		CompressionCodec compressionCodec = getCompressionCodec(serializableHadoopConfig.get(), compressionCodecName);
 		SequenceFile.Writer writer = SequenceFile.createWriter(
-			serdeHadoopConfig.get(),
+			serializableHadoopConfig.get(),
 			SequenceFile.Writer.stream(stream),
 			SequenceFile.Writer.keyClass(keyClass),
 			SequenceFile.Writer.valueClass(valueClass),
@@ -112,57 +110,19 @@ public class SequenceFileWriterFactory<K extends Writable, V extends Writable> i
 	}
 
 	private CompressionCodec getCompressionCodec(Configuration conf, String compressionCodecName) {
-		if (compressionCodecName.equals("None")) {
+		checkNotNull(conf);
+		checkNotNull(compressionCodecName);
+
+		if (compressionCodecName.equals(NO_COMPRESSION)) {
 			return null;
 		}
 
-		CompressionCodecFactory codecFactory = new CompressionCodecFactory(checkNotNull(conf));
+		CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf);
 		CompressionCodec codec = codecFactory.getCodecByName(compressionCodecName);
 		if (codec == null) {
 			throw new RuntimeException("Codec " + compressionCodecName + " not found.");
 		}
 		return codec;
 	}
-
-	/**
-	 * Get Hadoop configuration based by the path.
-	 * If the path is not Hadoop URI, it will be return default configuration.
-	 *
-	 * @param path The path to get configuration.
-	 * @return Hadoop configuration.
-	 * @throws IOException
-	 */
-	public static Configuration getHadoopConfigFromPath(Path path) throws IOException {
-		FileSystem fs = FileSystem.getUnguardedFileSystem(path.toUri());
-		if (fs != null && fs instanceof HadoopFileSystem) {
-			return ((HadoopFileSystem) fs).getHadoopFileSystem().getConf();
-		} else {
-			return new Configuration();
-		}
-	}
-
-	/**
-	 * The wrapper class for serialization of {@link Configuration}.
-	 */
-	private class SerializableHadoopConfiguration implements Serializable {
-		private transient Configuration hadoopConfig;
-
-		private SerializableHadoopConfiguration(Configuration hadoopConfig) {
-			this.hadoopConfig = hadoopConfig;
-		}
-
-		private Configuration get() {
-			return this.hadoopConfig;
-		}
-
-		private void writeObject(ObjectOutputStream out) throws IOException {
-			this.hadoopConfig.write(out);
-		}
-
-		private void readObject(ObjectInputStream in) throws IOException {
-			this.hadoopConfig = new Configuration();
-			this.hadoopConfig.readFields(in);
-		}
-	}
 }
 
diff --git a/flink-formats/flink-sequence-file/src/main/java/org/apache/flink/formats/sequencefile/SerializableHadoopConfiguration.java b/flink-formats/flink-sequence-file/src/main/java/org/apache/flink/formats/sequencefile/SerializableHadoopConfiguration.java
new file mode 100644
index 0000000..8e00e07
--- /dev/null
+++ b/flink-formats/flink-sequence-file/src/main/java/org/apache/flink/formats/sequencefile/SerializableHadoopConfiguration.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.sequencefile;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+/**
+ * Wrapper class for serialization of {@link Configuration}.
+ */
+class SerializableHadoopConfiguration implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	private transient Configuration hadoopConfig;
+
+	SerializableHadoopConfiguration(Configuration hadoopConfig) {
+		this.hadoopConfig = hadoopConfig;
+	}
+
+	Configuration get() {
+		return this.hadoopConfig;
+	}
+
+	// --------------------
+	private void writeObject(ObjectOutputStream out) throws IOException {
+		this.hadoopConfig.write(out);
+	}
+
+	private void readObject(ObjectInputStream in) throws IOException {
+		final Configuration config = new Configuration();
+		config.readFields(in);
+
+		if (this.hadoopConfig == null) {
+			this.hadoopConfig = config;
+		}
+	}
+}
diff --git a/flink-formats/flink-sequencefile/src/test/java/org/apache/flink/formats/sequencefile/SequenceFileSinkITCase.java b/flink-formats/flink-sequence-file/src/test/java/org/apache/flink/formats/sequencefile/SequenceStreamingFileSinkITCase.java
similarity index 78%
rename from flink-formats/flink-sequencefile/src/test/java/org/apache/flink/formats/sequencefile/SequenceFileSinkITCase.java
rename to flink-formats/flink-sequence-file/src/test/java/org/apache/flink/formats/sequencefile/SequenceStreamingFileSinkITCase.java
index b3d4b22..2b3f325 100644
--- a/flink-formats/flink-sequencefile/src/test/java/org/apache/flink/formats/sequencefile/SequenceFileSinkITCase.java
+++ b/flink-formats/flink-sequence-file/src/test/java/org/apache/flink/formats/sequencefile/SequenceStreamingFileSinkITCase.java
@@ -26,8 +26,8 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.streaming.util.FiniteTestSource;
 import org.apache.flink.test.util.AbstractTestBase;
-import org.apache.flink.test.util.FiniteTestSource;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.LongWritable;
@@ -49,26 +49,31 @@ import static org.junit.Assert.assertTrue;
  * Integration test case for writing bulk encoded files with the
  * {@link StreamingFileSink} with SequenceFile.
  */
-public class SequenceFileSinkITCase extends AbstractTestBase {
-	@Test
-	public void testWriteSequenceFile() throws Exception {
-		final File folder = TEMPORARY_FOLDER.newFolder();
+public class SequenceStreamingFileSinkITCase extends AbstractTestBase {
+
+	private final Configuration configuration = new Configuration();
 
-		final List<Tuple2<Long, String>> data = Arrays.asList(
+	private final List<Tuple2<Long, String>> testData = Arrays.asList(
 			new Tuple2<>(1L, "a"),
 			new Tuple2<>(2L, "b"),
 			new Tuple2<>(3L, "c")
-		);
+	);
+
+	@Test
+	public void testWriteSequenceFile() throws Exception {
+		final File folder = TEMPORARY_FOLDER.newFolder();
+		final Path testPath = Path.fromLocalFile(folder);
 
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(1);
 		env.enableCheckpointing(100);
 
 		DataStream<Tuple2<Long, String>> stream = env.addSource(
-			new FiniteTestSource<>(data), TypeInformation.of(new TypeHint<Tuple2<Long, String>>() {
-			}));
+				new FiniteTestSource<>(testData),
+				TypeInformation.of(new TypeHint<Tuple2<Long, String>>() {
 
-		Path testPath = Path.fromLocalFile(folder);
+				})
+		);
 
 		stream.map(new MapFunction<Tuple2<Long, String>, Tuple2<LongWritable, Text>>() {
 			@Override
@@ -78,17 +83,17 @@ public class SequenceFileSinkITCase extends AbstractTestBase {
 		}).addSink(
 			StreamingFileSink.forBulkFormat(
 				testPath,
-				new SequenceFileWriterFactory<>(SequenceFileWriterFactory.getHadoopConfigFromPath(testPath), LongWritable.class, Text.class, "BZip2")
+				new SequenceFileWriterFactory<>(configuration, LongWritable.class, Text.class, "BZip2")
 			).build());
 
 		env.execute();
 
-		validateResults(folder, data);
+		validateResults(folder, testData);
 	}
 
 	private List<Tuple2<Long, String>> readSequenceFile(File file) throws IOException {
 		SequenceFile.Reader reader = new SequenceFile.Reader(
-			new Configuration(), SequenceFile.Reader.file(new org.apache.hadoop.fs.Path(file.toURI())));
+			configuration, SequenceFile.Reader.file(new org.apache.hadoop.fs.Path(file.toURI())));
 		LongWritable key = new LongWritable();
 		Text val = new Text();
 		ArrayList<Tuple2<Long, String>> results = new ArrayList<>();
@@ -104,14 +109,15 @@ public class SequenceFileSinkITCase extends AbstractTestBase {
 		assertNotNull(buckets);
 		assertEquals(1, buckets.length);
 
-		File[] partFiles = buckets[0].listFiles();
+		final File[] partFiles = buckets[0].listFiles();
 		assertNotNull(partFiles);
-		assertEquals(1, partFiles.length);
-		assertTrue(partFiles[0].length() > 0);
+		assertEquals(2, partFiles.length);
+
+		for (File partFile : partFiles) {
+			assertTrue(partFile.length() > 0);
 
-		List<Tuple2<Long, String>> results = readSequenceFile(partFiles[0]);
-		assertEquals(expected, results);
+			final List<Tuple2<Long, String>> fileContent = readSequenceFile(partFile);
+			assertEquals(expected, fileContent);
+		}
 	}
 }
-
-
diff --git a/flink-formats/flink-sequence-file/src/test/java/org/apache/flink/formats/sequencefile/SerializableHadoopConfigurationTest.java b/flink-formats/flink-sequence-file/src/test/java/org/apache/flink/formats/sequencefile/SerializableHadoopConfigurationTest.java
new file mode 100644
index 0000000..ea0fb95
--- /dev/null
+++ b/flink-formats/flink-sequence-file/src/test/java/org/apache/flink/formats/sequencefile/SerializableHadoopConfigurationTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.sequencefile;
+
+import org.apache.hadoop.conf.Configuration;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+/**
+ * Tests for the {@link SerializableHadoopConfiguration}.
+ */
+public class SerializableHadoopConfigurationTest {
+
+	private static final String TEST_KEY = "test-key";
+
+	private static final String TEST_VALUE = "test-value";
+
+	private Configuration configuration;
+
+	@Before
+	public void createConfigWithCustomProperty() {
+		this.configuration = new Configuration();
+		configuration.set(TEST_KEY, TEST_VALUE);
+	}
+
+	@Test
+	public void customPropertiesSurviveSerializationDeserialization() throws IOException, ClassNotFoundException {
+		final SerializableHadoopConfiguration serializableConfigUnderTest = new SerializableHadoopConfiguration(configuration);
+		final byte[] serializedConfigUnderTest = serializeAndGetBytes(serializableConfigUnderTest);
+		final SerializableHadoopConfiguration deserializableConfigUnderTest = deserializeAndGetConfiguration(serializedConfigUnderTest);
+
+		Assert.assertThat(deserializableConfigUnderTest.get(), hasTheSamePropertiesAs(configuration));
+	}
+
+	// ----------------------------------------	Matchers ---------------------------------------- //
+
+	private static TypeSafeMatcher<Configuration> hasTheSamePropertiesAs(final Configuration expectedConfig) {
+		return new TypeSafeMatcher<Configuration>() {
+			@Override
+			protected boolean matchesSafely(Configuration actualConfig) {
+				final String value = actualConfig.get(TEST_KEY);
+				return actualConfig != expectedConfig && value != null && expectedConfig.get(TEST_KEY).equals(value);
+			}
+
+			@Override
+			public void describeTo(Description description) {
+				description.appendText("a Hadoop Configuration with property: key=")
+						.appendValue(TEST_KEY)
+						.appendText(" and value=")
+						.appendValue(TEST_VALUE);
+			}
+		};
+	}
+
+	// ----------------------------------------	Helper Methods ---------------------------------------- //
+
+	private byte[] serializeAndGetBytes(SerializableHadoopConfiguration serializableConfigUnderTest) throws IOException {
+		try (
+				ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+				ObjectOutputStream out = new ObjectOutputStream(byteStream)
+		) {
+			out.writeObject(serializableConfigUnderTest);
+			out.flush();
+			return byteStream.toByteArray();
+		}
+	}
+
+	private SerializableHadoopConfiguration deserializeAndGetConfiguration(byte[] serializedConfig) throws IOException, ClassNotFoundException {
+		try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(serializedConfig))) {
+			return (SerializableHadoopConfiguration) in.readObject();
+		}
+	}
+}
diff --git a/flink-formats/flink-sequencefile/src/test/resources/log4j-test.properties b/flink-formats/flink-sequence-file/src/test/resources/log4j-test.properties
similarity index 100%
rename from flink-formats/flink-sequencefile/src/test/resources/log4j-test.properties
rename to flink-formats/flink-sequence-file/src/test/resources/log4j-test.properties
diff --git a/flink-formats/pom.xml b/flink-formats/pom.xml
index 380eadc..51c3466 100644
--- a/flink-formats/pom.xml
+++ b/flink-formats/pom.xml
@@ -40,7 +40,7 @@ under the License.
 		<module>flink-json</module>
 		<module>flink-avro-confluent-registry</module>
 		<module>flink-parquet</module>
-		<module>flink-sequencefile</module>
+		<module>flink-sequence-file</module>
 	</modules>
 
 	<!-- override these root dependencies as 'provided', so they don't end up
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/FiniteTestSource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/FiniteTestSource.java
similarity index 72%
rename from flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/FiniteTestSource.java
rename to flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/FiniteTestSource.java
index 1d75118..b3a9546 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/FiniteTestSource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/FiniteTestSource.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.test.util;
+package org.apache.flink.streaming.util;
 
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -24,8 +24,15 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import java.util.Arrays;
 
 /**
- * A stream source that emits elements without allowing checkpoints and waits
- * for two more checkpoints to complete before exiting.
+ * A stream source that:
+ * 1) emits a list of elements without allowing checkpoints,
+ * 2) then waits for two more checkpoints to complete,
+ * 3) then re-emits the same elements before
+ * 4) waiting for another two checkpoints and
+ * 5) exiting.
+ *
+ * <p>This class was written to test the Bulk Writers used by
+ * the StreamingFileSink.
  */
 @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
 public class FiniteTestSource<T> implements SourceFunction<T>, CheckpointListener {
@@ -50,11 +57,19 @@ public class FiniteTestSource<T> implements SourceFunction<T>, CheckpointListene
 
 	@Override
 	public void run(SourceContext<T> ctx) throws Exception {
+		// first round of sending the elements and waiting for the checkpoints
+		emitElementsAndWaitForCheckpoints(ctx, 2);
+
+		// second round of the same
+		emitElementsAndWaitForCheckpoints(ctx, 2);
+	}
+
+	private void emitElementsAndWaitForCheckpoints(SourceContext<T> ctx, int checkpointsToWaitFor) throws InterruptedException {
 		final Object lock = ctx.getCheckpointLock();
-		final int checkpointToAwait;
 
+		final int checkpointToAwait;
 		synchronized (lock) {
-			checkpointToAwait = numCheckpointsComplete + 2;
+			checkpointToAwait = numCheckpointsComplete + checkpointsToWaitFor;
 			for (T t : elements) {
 				ctx.collect(t);
 			}