You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2018/12/18 08:47:14 UTC

[flink] branch master updated (45f9ccc -> cf88590)

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 45f9ccc  [FLINK-11083] [Table&SQL] CRowSerializerConfigSnapshot is not instantiable
     new b078c80  [FLINK-10457][fs-connector] Add SequenceFile support to StreamingFileSink.
     new cf88590  [FLINK-10457][fs-connector] Add SequenceFile support to StreamingFileSink.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../avro/ParquetStreamingFileSinkITCase.java       |  14 ++-
 .../flink-sequence-file}/pom.xml                   |  49 +++++---
 .../formats/sequencefile/SequenceFileWriter.java}  |  35 +++---
 .../sequencefile/SequenceFileWriterFactory.java    | 128 +++++++++++++++++++++
 .../SerializableHadoopConfiguration.java           |  43 ++++---
 .../SequenceStreamingFileSinkITCase.java           | 123 ++++++++++++++++++++
 .../SerializableHadoopConfigurationTest.java       |  98 ++++++++++++++++
 .../src/test/resources/log4j-test.properties       |   3 +-
 flink-formats/pom.xml                              |   1 +
 .../flink/streaming/util}/FiniteTestSource.java    |  25 +++-
 10 files changed, 452 insertions(+), 67 deletions(-)
 copy {flink-libraries/flink-gelly => flink-formats/flink-sequence-file}/pom.xml (67%)
 copy flink-formats/{flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetBulkWriter.java => flink-sequence-file/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriter.java} (58%)
 create mode 100644 flink-formats/flink-sequence-file/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java
 copy flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/common/HadoopOutputFormatCommonBase.java => flink-formats/flink-sequence-file/src/main/java/org/apache/flink/formats/sequencefile/SerializableHadoopConfiguration.java (52%)
 create mode 100644 flink-formats/flink-sequence-file/src/test/java/org/apache/flink/formats/sequencefile/SequenceStreamingFileSinkITCase.java
 create mode 100644 flink-formats/flink-sequence-file/src/test/java/org/apache/flink/formats/sequencefile/SerializableHadoopConfigurationTest.java
 copy {flink-connectors/flink-connector-elasticsearch6 => flink-formats/flink-sequence-file}/src/test/resources/log4j-test.properties (97%)
 rename {flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/testutils => flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util}/FiniteTestSource.java (72%)


[flink] 02/02: [FLINK-10457][fs-connector] Add SequenceFile support to StreamingFileSink.

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cf88590e992b974284e01a3a35e8ad01fd47174e
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Tue Dec 11 10:50:33 2018 +0100

    [FLINK-10457][fs-connector] Add SequenceFile support to StreamingFileSink.
    
    This closes #6774.
---
 .../avro/ParquetStreamingFileSinkITCase.java       | 14 ++--
 .../pom.xml                                        | 11 +--
 .../formats/sequencefile/SequenceFileWriter.java   |  4 +-
 .../sequencefile/SequenceFileWriterFactory.java    | 68 ++++-----------
 .../SerializableHadoopConfiguration.java           | 58 +++++++++++++
 .../SequenceStreamingFileSinkITCase.java}          | 46 +++++-----
 .../SerializableHadoopConfigurationTest.java       | 98 ++++++++++++++++++++++
 .../src/test/resources/log4j-test.properties       |  0
 flink-formats/pom.xml                              |  2 +-
 .../{test => streaming}/util/FiniteTestSource.java | 25 ++++--
 10 files changed, 230 insertions(+), 96 deletions(-)

diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
index 540f762..484ef86 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
@@ -19,14 +19,15 @@
 package org.apache.flink.formats.parquet.avro;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
 import org.apache.flink.formats.parquet.generated.Address;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.streaming.util.FiniteTestSource;
 import org.apache.flink.test.util.AbstractTestBase;
-import org.apache.flink.test.util.FiniteTestSource;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
@@ -156,11 +157,14 @@ public class ParquetStreamingFileSinkITCase extends AbstractTestBase {
 
 		File[] partFiles = buckets[0].listFiles();
 		assertNotNull(partFiles);
-		assertEquals(1, partFiles.length);
-		assertTrue(partFiles[0].length() > 0);
+		assertEquals(2, partFiles.length);
 
-		List<Address> results = readParquetFile(partFiles[0], dataModel);
-		assertEquals(expected, results);
+		for (File partFile : partFiles) {
+			assertTrue(partFile.length() > 0);
+
+			final List<Tuple2<Long, String>> fileContent = readParquetFile(partFile, dataModel);
+			assertEquals(expected, fileContent);
+		}
 	}
 
 	private static <T> List<T> readParquetFile(File file, GenericData dataModel) throws IOException {
diff --git a/flink-formats/flink-sequencefile/pom.xml b/flink-formats/flink-sequence-file/pom.xml
similarity index 91%
rename from flink-formats/flink-sequencefile/pom.xml
rename to flink-formats/flink-sequence-file/pom.xml
index 632cf2f..1a2f84e 100644
--- a/flink-formats/flink-sequencefile/pom.xml
+++ b/flink-formats/flink-sequence-file/pom.xml
@@ -29,8 +29,8 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-sequencefile</artifactId>
-	<name>flink-sequencefile</name>
+	<artifactId>flink-sequence-file</artifactId>
+	<name>flink-sequence-file</name>
 
 	<packaging>jar</packaging>
 
@@ -50,13 +50,6 @@ under the License.
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-hadoop-fs</artifactId>
-			<version>${project.version}</version>
-			<scope>provided</scope>
-		</dependency>
-
 
 		<!-- test dependencies -->
 
diff --git a/flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriter.java b/flink-formats/flink-sequence-file/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriter.java
similarity index 97%
rename from flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriter.java
rename to flink-formats/flink-sequence-file/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriter.java
index fa8fab2..169aa4b 100644
--- a/flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriter.java
+++ b/flink-formats/flink-sequence-file/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriter.java
@@ -37,9 +37,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 @PublicEvolving
 public class SequenceFileWriter<K extends Writable, V extends Writable> implements BulkWriter<Tuple2<K, V>> {
+
 	private final SequenceFile.Writer writer;
 
-	public SequenceFileWriter(SequenceFile.Writer writer) {
+	SequenceFileWriter(SequenceFile.Writer writer) {
 		this.writer = checkNotNull(writer);
 	}
 
@@ -58,4 +59,3 @@ public class SequenceFileWriter<K extends Writable, V extends Writable> implemen
 		writer.close();
 	}
 }
-
diff --git a/flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java b/flink-formats/flink-sequence-file/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java
similarity index 71%
rename from flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java
rename to flink-formats/flink-sequence-file/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java
index 90a87e7..d7b96f6 100644
--- a/flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java
+++ b/flink-formats/flink-sequence-file/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java
@@ -22,9 +22,6 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.SequenceFile;
@@ -33,9 +30,6 @@ import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -47,9 +41,13 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 @PublicEvolving
 public class SequenceFileWriterFactory<K extends Writable, V extends Writable> implements BulkWriter.Factory<Tuple2<K, V>> {
+
 	private static final long serialVersionUID = 1L;
 
-	private final SerializableHadoopConfiguration serdeHadoopConfig;
+	/** A constant specifying that no compression is requested. */
+	public static final String NO_COMPRESSION = "NO_COMPRESSION";
+
+	private final SerializableHadoopConfiguration serializableHadoopConfig;
 	private final Class<K> keyClass;
 	private final Class<V> valueClass;
 	private final String compressionCodecName;
@@ -64,7 +62,7 @@ public class SequenceFileWriterFactory<K extends Writable, V extends Writable> i
 	 * @param valueClass The class of value to write.
 	 */
 	public SequenceFileWriterFactory(Configuration hadoopConf, Class<K> keyClass, Class<V> valueClass) {
-		this(hadoopConf, keyClass, valueClass, "None", SequenceFile.CompressionType.BLOCK);
+		this(hadoopConf, keyClass, valueClass, NO_COMPRESSION, SequenceFile.CompressionType.BLOCK);
 	}
 
 	/**
@@ -91,7 +89,7 @@ public class SequenceFileWriterFactory<K extends Writable, V extends Writable> i
 	 * @param compressionType      The type of compression level.
 	 */
 	public SequenceFileWriterFactory(Configuration hadoopConf, Class<K> keyClass, Class<V> valueClass, String compressionCodecName, SequenceFile.CompressionType compressionType) {
-		this.serdeHadoopConfig = new SerializableHadoopConfiguration(checkNotNull(hadoopConf));
+		this.serializableHadoopConfig = new SerializableHadoopConfiguration(checkNotNull(hadoopConf));
 		this.keyClass = checkNotNull(keyClass);
 		this.valueClass = checkNotNull(valueClass);
 		this.compressionCodecName = checkNotNull(compressionCodecName);
@@ -101,9 +99,9 @@ public class SequenceFileWriterFactory<K extends Writable, V extends Writable> i
 	@Override
 	public SequenceFileWriter<K, V> create(FSDataOutputStream out) throws IOException {
 		org.apache.hadoop.fs.FSDataOutputStream stream = new org.apache.hadoop.fs.FSDataOutputStream(out, null);
-		CompressionCodec compressionCodec = getCompressionCodec(serdeHadoopConfig.get(), compressionCodecName);
+		CompressionCodec compressionCodec = getCompressionCodec(serializableHadoopConfig.get(), compressionCodecName);
 		SequenceFile.Writer writer = SequenceFile.createWriter(
-			serdeHadoopConfig.get(),
+			serializableHadoopConfig.get(),
 			SequenceFile.Writer.stream(stream),
 			SequenceFile.Writer.keyClass(keyClass),
 			SequenceFile.Writer.valueClass(valueClass),
@@ -112,57 +110,19 @@ public class SequenceFileWriterFactory<K extends Writable, V extends Writable> i
 	}
 
 	private CompressionCodec getCompressionCodec(Configuration conf, String compressionCodecName) {
-		if (compressionCodecName.equals("None")) {
+		checkNotNull(conf);
+		checkNotNull(compressionCodecName);
+
+		if (compressionCodecName.equals(NO_COMPRESSION)) {
 			return null;
 		}
 
-		CompressionCodecFactory codecFactory = new CompressionCodecFactory(checkNotNull(conf));
+		CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf);
 		CompressionCodec codec = codecFactory.getCodecByName(compressionCodecName);
 		if (codec == null) {
 			throw new RuntimeException("Codec " + compressionCodecName + " not found.");
 		}
 		return codec;
 	}
-
-	/**
-	 * Get Hadoop configuration based by the path.
-	 * If the path is not Hadoop URI, it will be return default configuration.
-	 *
-	 * @param path The path to get configuration.
-	 * @return Hadoop configuration.
-	 * @throws IOException
-	 */
-	public static Configuration getHadoopConfigFromPath(Path path) throws IOException {
-		FileSystem fs = FileSystem.getUnguardedFileSystem(path.toUri());
-		if (fs != null && fs instanceof HadoopFileSystem) {
-			return ((HadoopFileSystem) fs).getHadoopFileSystem().getConf();
-		} else {
-			return new Configuration();
-		}
-	}
-
-	/**
-	 * The wrapper class for serialization of {@link Configuration}.
-	 */
-	private class SerializableHadoopConfiguration implements Serializable {
-		private transient Configuration hadoopConfig;
-
-		private SerializableHadoopConfiguration(Configuration hadoopConfig) {
-			this.hadoopConfig = hadoopConfig;
-		}
-
-		private Configuration get() {
-			return this.hadoopConfig;
-		}
-
-		private void writeObject(ObjectOutputStream out) throws IOException {
-			this.hadoopConfig.write(out);
-		}
-
-		private void readObject(ObjectInputStream in) throws IOException {
-			this.hadoopConfig = new Configuration();
-			this.hadoopConfig.readFields(in);
-		}
-	}
 }
 
diff --git a/flink-formats/flink-sequence-file/src/main/java/org/apache/flink/formats/sequencefile/SerializableHadoopConfiguration.java b/flink-formats/flink-sequence-file/src/main/java/org/apache/flink/formats/sequencefile/SerializableHadoopConfiguration.java
new file mode 100644
index 0000000..8e00e07
--- /dev/null
+++ b/flink-formats/flink-sequence-file/src/main/java/org/apache/flink/formats/sequencefile/SerializableHadoopConfiguration.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.sequencefile;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+/**
+ * Wrapper class for serialization of {@link Configuration}.
+ */
+class SerializableHadoopConfiguration implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	private transient Configuration hadoopConfig;
+
+	SerializableHadoopConfiguration(Configuration hadoopConfig) {
+		this.hadoopConfig = hadoopConfig;
+	}
+
+	Configuration get() {
+		return this.hadoopConfig;
+	}
+
+	// --------------------
+	private void writeObject(ObjectOutputStream out) throws IOException {
+		this.hadoopConfig.write(out);
+	}
+
+	private void readObject(ObjectInputStream in) throws IOException {
+		final Configuration config = new Configuration();
+		config.readFields(in);
+
+		if (this.hadoopConfig == null) {
+			this.hadoopConfig = config;
+		}
+	}
+}
diff --git a/flink-formats/flink-sequencefile/src/test/java/org/apache/flink/formats/sequencefile/SequenceFileSinkITCase.java b/flink-formats/flink-sequence-file/src/test/java/org/apache/flink/formats/sequencefile/SequenceStreamingFileSinkITCase.java
similarity index 78%
rename from flink-formats/flink-sequencefile/src/test/java/org/apache/flink/formats/sequencefile/SequenceFileSinkITCase.java
rename to flink-formats/flink-sequence-file/src/test/java/org/apache/flink/formats/sequencefile/SequenceStreamingFileSinkITCase.java
index b3d4b22..2b3f325 100644
--- a/flink-formats/flink-sequencefile/src/test/java/org/apache/flink/formats/sequencefile/SequenceFileSinkITCase.java
+++ b/flink-formats/flink-sequence-file/src/test/java/org/apache/flink/formats/sequencefile/SequenceStreamingFileSinkITCase.java
@@ -26,8 +26,8 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.streaming.util.FiniteTestSource;
 import org.apache.flink.test.util.AbstractTestBase;
-import org.apache.flink.test.util.FiniteTestSource;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.LongWritable;
@@ -49,26 +49,31 @@ import static org.junit.Assert.assertTrue;
  * Integration test case for writing bulk encoded files with the
  * {@link StreamingFileSink} with SequenceFile.
  */
-public class SequenceFileSinkITCase extends AbstractTestBase {
-	@Test
-	public void testWriteSequenceFile() throws Exception {
-		final File folder = TEMPORARY_FOLDER.newFolder();
+public class SequenceStreamingFileSinkITCase extends AbstractTestBase {
+
+	private final Configuration configuration = new Configuration();
 
-		final List<Tuple2<Long, String>> data = Arrays.asList(
+	private final List<Tuple2<Long, String>> testData = Arrays.asList(
 			new Tuple2<>(1L, "a"),
 			new Tuple2<>(2L, "b"),
 			new Tuple2<>(3L, "c")
-		);
+	);
+
+	@Test
+	public void testWriteSequenceFile() throws Exception {
+		final File folder = TEMPORARY_FOLDER.newFolder();
+		final Path testPath = Path.fromLocalFile(folder);
 
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(1);
 		env.enableCheckpointing(100);
 
 		DataStream<Tuple2<Long, String>> stream = env.addSource(
-			new FiniteTestSource<>(data), TypeInformation.of(new TypeHint<Tuple2<Long, String>>() {
-			}));
+				new FiniteTestSource<>(testData),
+				TypeInformation.of(new TypeHint<Tuple2<Long, String>>() {
 
-		Path testPath = Path.fromLocalFile(folder);
+				})
+		);
 
 		stream.map(new MapFunction<Tuple2<Long, String>, Tuple2<LongWritable, Text>>() {
 			@Override
@@ -78,17 +83,17 @@ public class SequenceFileSinkITCase extends AbstractTestBase {
 		}).addSink(
 			StreamingFileSink.forBulkFormat(
 				testPath,
-				new SequenceFileWriterFactory<>(SequenceFileWriterFactory.getHadoopConfigFromPath(testPath), LongWritable.class, Text.class, "BZip2")
+				new SequenceFileWriterFactory<>(configuration, LongWritable.class, Text.class, "BZip2")
 			).build());
 
 		env.execute();
 
-		validateResults(folder, data);
+		validateResults(folder, testData);
 	}
 
 	private List<Tuple2<Long, String>> readSequenceFile(File file) throws IOException {
 		SequenceFile.Reader reader = new SequenceFile.Reader(
-			new Configuration(), SequenceFile.Reader.file(new org.apache.hadoop.fs.Path(file.toURI())));
+			configuration, SequenceFile.Reader.file(new org.apache.hadoop.fs.Path(file.toURI())));
 		LongWritable key = new LongWritable();
 		Text val = new Text();
 		ArrayList<Tuple2<Long, String>> results = new ArrayList<>();
@@ -104,14 +109,15 @@ public class SequenceFileSinkITCase extends AbstractTestBase {
 		assertNotNull(buckets);
 		assertEquals(1, buckets.length);
 
-		File[] partFiles = buckets[0].listFiles();
+		final File[] partFiles = buckets[0].listFiles();
 		assertNotNull(partFiles);
-		assertEquals(1, partFiles.length);
-		assertTrue(partFiles[0].length() > 0);
+		assertEquals(2, partFiles.length);
+
+		for (File partFile : partFiles) {
+			assertTrue(partFile.length() > 0);
 
-		List<Tuple2<Long, String>> results = readSequenceFile(partFiles[0]);
-		assertEquals(expected, results);
+			final List<Tuple2<Long, String>> fileContent = readSequenceFile(partFile);
+			assertEquals(expected, fileContent);
+		}
 	}
 }
-
-
diff --git a/flink-formats/flink-sequence-file/src/test/java/org/apache/flink/formats/sequencefile/SerializableHadoopConfigurationTest.java b/flink-formats/flink-sequence-file/src/test/java/org/apache/flink/formats/sequencefile/SerializableHadoopConfigurationTest.java
new file mode 100644
index 0000000..ea0fb95
--- /dev/null
+++ b/flink-formats/flink-sequence-file/src/test/java/org/apache/flink/formats/sequencefile/SerializableHadoopConfigurationTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.sequencefile;
+
+import org.apache.hadoop.conf.Configuration;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+/**
+ * Tests for the {@link SerializableHadoopConfiguration}.
+ */
+public class SerializableHadoopConfigurationTest {
+
+	private static final String TEST_KEY = "test-key";
+
+	private static final String TEST_VALUE = "test-value";
+
+	private Configuration configuration;
+
+	@Before
+	public void createConfigWithCustomProperty() {
+		this.configuration = new Configuration();
+		configuration.set(TEST_KEY, TEST_VALUE);
+	}
+
+	@Test
+	public void customPropertiesSurviveSerializationDeserialization() throws IOException, ClassNotFoundException {
+		final SerializableHadoopConfiguration serializableConfigUnderTest = new SerializableHadoopConfiguration(configuration);
+		final byte[] serializedConfigUnderTest = serializeAndGetBytes(serializableConfigUnderTest);
+		final SerializableHadoopConfiguration deserializableConfigUnderTest = deserializeAndGetConfiguration(serializedConfigUnderTest);
+
+		Assert.assertThat(deserializableConfigUnderTest.get(), hasTheSamePropertiesAs(configuration));
+	}
+
+	// ----------------------------------------	Matchers ---------------------------------------- //
+
+	private static TypeSafeMatcher<Configuration> hasTheSamePropertiesAs(final Configuration expectedConfig) {
+		return new TypeSafeMatcher<Configuration>() {
+			@Override
+			protected boolean matchesSafely(Configuration actualConfig) {
+				final String value = actualConfig.get(TEST_KEY);
+				return actualConfig != expectedConfig && value != null && expectedConfig.get(TEST_KEY).equals(value);
+			}
+
+			@Override
+			public void describeTo(Description description) {
+				description.appendText("a Hadoop Configuration with property: key=")
+						.appendValue(TEST_KEY)
+						.appendText(" and value=")
+						.appendValue(TEST_VALUE);
+			}
+		};
+	}
+
+	// ----------------------------------------	Helper Methods ---------------------------------------- //
+
+	private byte[] serializeAndGetBytes(SerializableHadoopConfiguration serializableConfigUnderTest) throws IOException {
+		try (
+				ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+				ObjectOutputStream out = new ObjectOutputStream(byteStream)
+		) {
+			out.writeObject(serializableConfigUnderTest);
+			out.flush();
+			return byteStream.toByteArray();
+		}
+	}
+
+	private SerializableHadoopConfiguration deserializeAndGetConfiguration(byte[] serializedConfig) throws IOException, ClassNotFoundException {
+		try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(serializedConfig))) {
+			return (SerializableHadoopConfiguration) in.readObject();
+		}
+	}
+}
diff --git a/flink-formats/flink-sequencefile/src/test/resources/log4j-test.properties b/flink-formats/flink-sequence-file/src/test/resources/log4j-test.properties
similarity index 100%
rename from flink-formats/flink-sequencefile/src/test/resources/log4j-test.properties
rename to flink-formats/flink-sequence-file/src/test/resources/log4j-test.properties
diff --git a/flink-formats/pom.xml b/flink-formats/pom.xml
index 380eadc..51c3466 100644
--- a/flink-formats/pom.xml
+++ b/flink-formats/pom.xml
@@ -40,7 +40,7 @@ under the License.
 		<module>flink-json</module>
 		<module>flink-avro-confluent-registry</module>
 		<module>flink-parquet</module>
-		<module>flink-sequencefile</module>
+		<module>flink-sequence-file</module>
 	</modules>
 
 	<!-- override these root dependencies as 'provided', so they don't end up
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/FiniteTestSource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/FiniteTestSource.java
similarity index 72%
rename from flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/FiniteTestSource.java
rename to flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/FiniteTestSource.java
index 1d75118..b3a9546 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/FiniteTestSource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/FiniteTestSource.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.test.util;
+package org.apache.flink.streaming.util;
 
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -24,8 +24,15 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import java.util.Arrays;
 
 /**
- * A stream source that emits elements without allowing checkpoints and waits
- * for two more checkpoints to complete before exiting.
+ * A stream source that:
+ * 1) emits a list of elements without allowing checkpoints,
+ * 2) then waits for two more checkpoints to complete,
+ * 3) then re-emits the same elements before
+ * 4) waiting for another two checkpoints and
+ * 5) exiting.
+ *
+ * <p>This class was written to test the Bulk Writers used by
+ * the StreamingFileSink.
  */
 @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
 public class FiniteTestSource<T> implements SourceFunction<T>, CheckpointListener {
@@ -50,11 +57,19 @@ public class FiniteTestSource<T> implements SourceFunction<T>, CheckpointListene
 
 	@Override
 	public void run(SourceContext<T> ctx) throws Exception {
+		// first round of sending the elements and waiting for the checkpoints
+		emitElementsAndWaitForCheckpoints(ctx, 2);
+
+		// second round of the same
+		emitElementsAndWaitForCheckpoints(ctx, 2);
+	}
+
+	private void emitElementsAndWaitForCheckpoints(SourceContext<T> ctx, int checkpointsToWaitFor) throws InterruptedException {
 		final Object lock = ctx.getCheckpointLock();
-		final int checkpointToAwait;
 
+		final int checkpointToAwait;
 		synchronized (lock) {
-			checkpointToAwait = numCheckpointsComplete + 2;
+			checkpointToAwait = numCheckpointsComplete + checkpointsToWaitFor;
 			for (T t : elements) {
 				ctx.collect(t);
 			}


[flink] 01/02: [FLINK-10457][fs-connector] Add SequenceFile support to StreamingFileSink.

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b078c8090203fe55215a7626968a40a985af25ce
Author: Jihyun Cho <ji...@linecorp.com>
AuthorDate: Fri Sep 28 10:00:10 2018 +0200

    [FLINK-10457][fs-connector] Add SequenceFile support to StreamingFileSink.
---
 .../avro/ParquetStreamingFileSinkITCase.java       |   2 +-
 flink-formats/flink-sequencefile/pom.xml           | 107 +++++++++++++
 .../formats/sequencefile/SequenceFileWriter.java   |  61 ++++++++
 .../sequencefile/SequenceFileWriterFactory.java    | 168 +++++++++++++++++++++
 .../sequencefile/SequenceFileSinkITCase.java       | 117 ++++++++++++++
 .../src/test/resources/log4j-test.properties       |  23 +++
 flink-formats/pom.xml                              |   1 +
 .../apache/flink/test/util}/FiniteTestSource.java  |   2 +-
 8 files changed, 479 insertions(+), 2 deletions(-)

diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
index d1f0a5f..540f762 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
@@ -22,11 +22,11 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
 import org.apache.flink.formats.parquet.generated.Address;
-import org.apache.flink.formats.parquet.testutils.FiniteTestSource;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
 import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.util.FiniteTestSource;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
diff --git a/flink-formats/flink-sequencefile/pom.xml b/flink-formats/flink-sequencefile/pom.xml
new file mode 100644
index 0000000..632cf2f
--- /dev/null
+++ b/flink-formats/flink-sequencefile/pom.xml
@@ -0,0 +1,107 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-formats</artifactId>
+		<version>1.8-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-sequencefile</artifactId>
+	<name>flink-sequencefile</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<!-- Hadoop is needed for SequenceFile -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-shaded-hadoop2</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-hadoop-fs</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+
+		<!-- test dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+	</dependencies>
+
+
+	<build>
+		<plugins>
+			<!-- skip dependency convergence due to Hadoop dependency -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-enforcer-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>dependency-convergence</id>
+						<goals>
+							<goal>enforce</goal>
+						</goals>
+						<configuration>
+							<skip>true</skip>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>
diff --git a/flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriter.java b/flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriter.java
new file mode 100644
index 0000000..fa8fab2
--- /dev/null
+++ b/flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriter.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.sequencefile;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link BulkWriter} implementation that wraps a {@link SequenceFile.Writer}.
+ *
+ * @param <K> The type of key written.
+ * @param <V> The type of value written.
+ */
+@PublicEvolving
+public class SequenceFileWriter<K extends Writable, V extends Writable> implements BulkWriter<Tuple2<K, V>> {
+	private final SequenceFile.Writer writer;
+
+	public SequenceFileWriter(SequenceFile.Writer writer) {
+		this.writer = checkNotNull(writer);
+	}
+
+	@Override
+	public void addElement(Tuple2<K, V> element) throws IOException {
+		writer.append(element.f0, element.f1);
+	}
+
+	@Override
+	public void flush() throws IOException {
+		writer.hsync();
+	}
+
+	@Override
+	public void finish() throws IOException {
+		writer.close();
+	}
+}
+
diff --git a/flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java b/flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java
new file mode 100644
index 0000000..90a87e7
--- /dev/null
+++ b/flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.sequencefile;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A factory that creates a SequenceFile {@link BulkWriter}.
+ *
+ * @param <K> The type of key to write. It should be writable.
+ * @param <V> The type of value to write. It should be writable.
+ */
+@PublicEvolving
+public class SequenceFileWriterFactory<K extends Writable, V extends Writable> implements BulkWriter.Factory<Tuple2<K, V>> {
+	private static final long serialVersionUID = 1L;
+
+	private final SerializableHadoopConfiguration serdeHadoopConfig;
+	private final Class<K> keyClass;
+	private final Class<V> valueClass;
+	private final String compressionCodecName;
+	private final SequenceFile.CompressionType compressionType;
+
+	/**
+	 * Creates a new SequenceFileWriterFactory using the given builder to assemble the
+	 * SequenceFileWriter.
+	 *
+	 * @param hadoopConf The Hadoop configuration for Sequence File Writer.
+	 * @param keyClass   The class of key to write.
+	 * @param valueClass The class of value to write.
+	 */
+	public SequenceFileWriterFactory(Configuration hadoopConf, Class<K> keyClass, Class<V> valueClass) {
+		this(hadoopConf, keyClass, valueClass, "None", SequenceFile.CompressionType.BLOCK);
+	}
+
+	/**
+	 * Creates a new SequenceFileWriterFactory using the given builder to assemble the
+	 * SequenceFileWriter.
+	 *
+	 * @param hadoopConf           The Hadoop configuration for Sequence File Writer.
+	 * @param keyClass             The class of key to write.
+	 * @param valueClass           The class of value to write.
+	 * @param compressionCodecName The name of compression codec.
+	 */
+	public SequenceFileWriterFactory(Configuration hadoopConf, Class<K> keyClass, Class<V> valueClass, String compressionCodecName) {
+		this(hadoopConf, keyClass, valueClass, compressionCodecName, SequenceFile.CompressionType.BLOCK);
+	}
+
+	/**
+	 * Creates a new SequenceFileWriterFactory using the given builder to assemble the
+	 * SequenceFileWriter.
+	 *
+	 * @param hadoopConf           The Hadoop configuration for Sequence File Writer.
+	 * @param keyClass             The class of key to write.
+	 * @param valueClass           The class of value to write.
+	 * @param compressionCodecName The name of compression codec.
+	 * @param compressionType      The type of compression level.
+	 */
+	public SequenceFileWriterFactory(Configuration hadoopConf, Class<K> keyClass, Class<V> valueClass, String compressionCodecName, SequenceFile.CompressionType compressionType) {
+		this.serdeHadoopConfig = new SerializableHadoopConfiguration(checkNotNull(hadoopConf));
+		this.keyClass = checkNotNull(keyClass);
+		this.valueClass = checkNotNull(valueClass);
+		this.compressionCodecName = checkNotNull(compressionCodecName);
+		this.compressionType = checkNotNull(compressionType);
+	}
+
+	@Override
+	public SequenceFileWriter<K, V> create(FSDataOutputStream out) throws IOException {
+		org.apache.hadoop.fs.FSDataOutputStream stream = new org.apache.hadoop.fs.FSDataOutputStream(out, null);
+		CompressionCodec compressionCodec = getCompressionCodec(serdeHadoopConfig.get(), compressionCodecName);
+		SequenceFile.Writer writer = SequenceFile.createWriter(
+			serdeHadoopConfig.get(),
+			SequenceFile.Writer.stream(stream),
+			SequenceFile.Writer.keyClass(keyClass),
+			SequenceFile.Writer.valueClass(valueClass),
+			SequenceFile.Writer.compression(compressionType, compressionCodec));
+		return new SequenceFileWriter<>(writer);
+	}
+
+	private CompressionCodec getCompressionCodec(Configuration conf, String compressionCodecName) {
+		if (compressionCodecName.equals("None")) {
+			return null;
+		}
+
+		CompressionCodecFactory codecFactory = new CompressionCodecFactory(checkNotNull(conf));
+		CompressionCodec codec = codecFactory.getCodecByName(compressionCodecName);
+		if (codec == null) {
+			throw new RuntimeException("Codec " + compressionCodecName + " not found.");
+		}
+		return codec;
+	}
+
+	/**
+	 * Get Hadoop configuration based by the path.
+	 * If the path is not Hadoop URI, it will be return default configuration.
+	 *
+	 * @param path The path to get configuration.
+	 * @return Hadoop configuration.
+	 * @throws IOException
+	 */
+	public static Configuration getHadoopConfigFromPath(Path path) throws IOException {
+		FileSystem fs = FileSystem.getUnguardedFileSystem(path.toUri());
+		if (fs != null && fs instanceof HadoopFileSystem) {
+			return ((HadoopFileSystem) fs).getHadoopFileSystem().getConf();
+		} else {
+			return new Configuration();
+		}
+	}
+
+	/**
+	 * The wrapper class for serialization of {@link Configuration}.
+	 */
+	private class SerializableHadoopConfiguration implements Serializable {
+		private transient Configuration hadoopConfig;
+
+		private SerializableHadoopConfiguration(Configuration hadoopConfig) {
+			this.hadoopConfig = hadoopConfig;
+		}
+
+		private Configuration get() {
+			return this.hadoopConfig;
+		}
+
+		private void writeObject(ObjectOutputStream out) throws IOException {
+			this.hadoopConfig.write(out);
+		}
+
+		private void readObject(ObjectInputStream in) throws IOException {
+			this.hadoopConfig = new Configuration();
+			this.hadoopConfig.readFields(in);
+		}
+	}
+}
+
diff --git a/flink-formats/flink-sequencefile/src/test/java/org/apache/flink/formats/sequencefile/SequenceFileSinkITCase.java b/flink-formats/flink-sequencefile/src/test/java/org/apache/flink/formats/sequencefile/SequenceFileSinkITCase.java
new file mode 100644
index 0000000..b3d4b22
--- /dev/null
+++ b/flink-formats/flink-sequencefile/src/test/java/org/apache/flink/formats/sequencefile/SequenceFileSinkITCase.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.sequencefile;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.util.FiniteTestSource;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration test case for writing bulk encoded files with the
+ * {@link StreamingFileSink} with SequenceFile.
+ */
+public class SequenceFileSinkITCase extends AbstractTestBase {
+	@Test
+	public void testWriteSequenceFile() throws Exception {
+		final File folder = TEMPORARY_FOLDER.newFolder();
+
+		final List<Tuple2<Long, String>> data = Arrays.asList(
+			new Tuple2<>(1L, "a"),
+			new Tuple2<>(2L, "b"),
+			new Tuple2<>(3L, "c")
+		);
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
+		env.enableCheckpointing(100);
+
+		DataStream<Tuple2<Long, String>> stream = env.addSource(
+			new FiniteTestSource<>(data), TypeInformation.of(new TypeHint<Tuple2<Long, String>>() {
+			}));
+
+		Path testPath = Path.fromLocalFile(folder);
+
+		stream.map(new MapFunction<Tuple2<Long, String>, Tuple2<LongWritable, Text>>() {
+			@Override
+			public Tuple2<LongWritable, Text> map(Tuple2<Long, String> value) throws Exception {
+				return new Tuple2<>(new LongWritable(value.f0), new Text(value.f1));
+			}
+		}).addSink(
+			StreamingFileSink.forBulkFormat(
+				testPath,
+				new SequenceFileWriterFactory<>(SequenceFileWriterFactory.getHadoopConfigFromPath(testPath), LongWritable.class, Text.class, "BZip2")
+			).build());
+
+		env.execute();
+
+		validateResults(folder, data);
+	}
+
+	private List<Tuple2<Long, String>> readSequenceFile(File file) throws IOException {
+		SequenceFile.Reader reader = new SequenceFile.Reader(
+			new Configuration(), SequenceFile.Reader.file(new org.apache.hadoop.fs.Path(file.toURI())));
+		LongWritable key = new LongWritable();
+		Text val = new Text();
+		ArrayList<Tuple2<Long, String>> results = new ArrayList<>();
+		while (reader.next(key, val)) {
+			results.add(new Tuple2<>(key.get(), val.toString()));
+		}
+		reader.close();
+		return results;
+	}
+
+	private void validateResults(File folder, List<Tuple2<Long, String>> expected) throws Exception {
+		File[] buckets = folder.listFiles();
+		assertNotNull(buckets);
+		assertEquals(1, buckets.length);
+
+		File[] partFiles = buckets[0].listFiles();
+		assertNotNull(partFiles);
+		assertEquals(1, partFiles.length);
+		assertTrue(partFiles[0].length() > 0);
+
+		List<Tuple2<Long, String>> results = readSequenceFile(partFiles[0]);
+		assertEquals(expected, results);
+	}
+}
+
+
diff --git a/flink-formats/flink-sequencefile/src/test/resources/log4j-test.properties b/flink-formats/flink-sequencefile/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..644b884
--- /dev/null
+++ b/flink-formats/flink-sequencefile/src/test/resources/log4j-test.properties
@@ -0,0 +1,23 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=OFF, testlogger
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target=System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-formats/pom.xml b/flink-formats/pom.xml
index c354402..380eadc 100644
--- a/flink-formats/pom.xml
+++ b/flink-formats/pom.xml
@@ -40,6 +40,7 @@ under the License.
 		<module>flink-json</module>
 		<module>flink-avro-confluent-registry</module>
 		<module>flink-parquet</module>
+		<module>flink-sequencefile</module>
 	</modules>
 
 	<!-- override these root dependencies as 'provided', so they don't end up
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/testutils/FiniteTestSource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/FiniteTestSource.java
similarity index 97%
rename from flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/testutils/FiniteTestSource.java
rename to flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/FiniteTestSource.java
index 03db7ff..1d75118 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/testutils/FiniteTestSource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/FiniteTestSource.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.formats.parquet.testutils;
+package org.apache.flink.test.util;
 
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;