You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2020/03/10 11:48:29 UTC

[flink] branch release-1.10 updated (66ea749 -> 5e16cdf)

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a change to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 66ea749  [FLINK-16025][k8s] Parse BlobServer port from Configuration instead of using constant
     new ea5197e  [FLINK-16371][fs-connector] Make CompressWriterFactory serializable
     new 5e16cdf  [FLINK-16371][fs-connector] Add ITCase

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../formats/compress/CompressWriterFactory.java    | 98 +++++++++++++++++-----
 .../writers/HadoopCompressionBulkWriter.java       | 30 +++----
 .../compress/writers/NoCompressionBulkWriter.java  | 10 ++-
 .../compress/CompressWriterFactoryTest.java        | 80 +++++++++++-------
 .../compress/CompressionFactoryITCase.java}        | 78 ++++++++---------
 .../formats/compress/CustomCompressionCodec.java}  | 16 ++--
 6 files changed, 188 insertions(+), 124 deletions(-)
 copy flink-formats/{flink-sequence-file/src/test/java/org/apache/flink/formats/sequencefile/SequenceStreamingFileSinkITCase.java => flink-compress/src/test/java/org/apache/flink/formats/compress/CompressionFactoryITCase.java} (55%)
 copy flink-formats/flink-compress/src/{main/java/org/apache/flink/formats/compress/CompressWriters.java => test/java/org/apache/flink/formats/compress/CustomCompressionCodec.java} (68%)


[flink] 01/02: [FLINK-16371][fs-connector] Make CompressWriterFactory serializable

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ea5197eeefcf72eb9aa33ad83e591faf856bbda9
Author: Sivaprasanna S <si...@et-c02y86hzjg5h.fkipl.flipkart.com>
AuthorDate: Wed Mar 4 17:04:21 2020 +0530

    [FLINK-16371][fs-connector] Make CompressWriterFactory serializable
---
 .../formats/compress/CompressWriterFactory.java    | 98 +++++++++++++++++-----
 .../writers/HadoopCompressionBulkWriter.java       | 30 +++----
 .../compress/writers/NoCompressionBulkWriter.java  | 10 ++-
 .../compress/CompressWriterFactoryTest.java        | 80 +++++++++++-------
 .../formats/compress/CustomCompressionCodec.java   | 32 +++++++
 5 files changed, 177 insertions(+), 73 deletions(-)

diff --git a/flink-formats/flink-compress/src/main/java/org/apache/flink/formats/compress/CompressWriterFactory.java b/flink-formats/flink-compress/src/main/java/org/apache/flink/formats/compress/CompressWriterFactory.java
index 8d66524..76dd4a9 100644
--- a/flink-formats/flink-compress/src/main/java/org/apache/flink/formats/compress/CompressWriterFactory.java
+++ b/flink-formats/flink-compress/src/main/java/org/apache/flink/formats/compress/CompressWriterFactory.java
@@ -24,57 +24,111 @@ import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.formats.compress.extractor.Extractor;
 import org.apache.flink.formats.compress.writers.HadoopCompressionBulkWriter;
 import org.apache.flink.formats.compress.writers.NoCompressionBulkWriter;
-import org.apache.flink.util.Preconditions;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A factory that creates a {@link BulkWriter} implementation that compresses the written data.
+ * A factory that creates for {@link BulkWriter bulk writers} that, when provided
+ * with a {@link CompressionCodec}, they compress the data they write. If no codec is
+ * provided, the data is written in bulk but uncompressed.
  *
  * @param <IN> The type of element to write.
  */
 @PublicEvolving
 public class CompressWriterFactory<IN> implements BulkWriter.Factory<IN> {
 
-	private Extractor<IN> extractor;
-	private CompressionCodec hadoopCodec;
+	private final Extractor<IN> extractor;
+	private final Map<String, String> hadoopConfigMap;
+
+	private transient CompressionCodec hadoopCodec;
+
+	private String hadoopCodecName;
+	private String codecExtension;
 
+	/**
+	 * Creates a new CompressWriterFactory using the given {@link Extractor} to assemble
+	 * either {@link HadoopCompressionBulkWriter} or {@link NoCompressionBulkWriter}
+	 * based on whether a Hadoop CompressionCodec name is specified.
+	 *
+	 * @param extractor Extractor to extract the element
+	 */
 	public CompressWriterFactory(Extractor<IN> extractor) {
-		this.extractor = Preconditions.checkNotNull(extractor, "extractor cannot be null");
+		this.extractor = checkNotNull(extractor, "Extractor cannot be null");
+		this.hadoopConfigMap = new HashMap<>();
 	}
 
-	public CompressWriterFactory<IN> withHadoopCompression(String hadoopCodecName) {
-		return withHadoopCompression(hadoopCodecName, new Configuration());
+	/**
+	 * Compresses the data using the provided Hadoop {@link CompressionCodec}.
+	 *
+	 * @param codecName Simple/complete name or alias of the CompressionCodec
+	 * @return the instance of CompressionWriterFactory
+	 * @throws IOException
+	 */
+	public CompressWriterFactory<IN> withHadoopCompression(String codecName) throws IOException {
+		return withHadoopCompression(codecName, new Configuration());
 	}
 
-	public CompressWriterFactory<IN> withHadoopCompression(String hadoopCodecName, Configuration hadoopConfiguration) {
-		return withHadoopCompression(new CompressionCodecFactory(hadoopConfiguration).getCodecByName(hadoopCodecName));
-	}
+	/**
+	 * Compresses the data using the provided Hadoop {@link CompressionCodec} and {@link Configuration}.
+	 *
+	 * @param codecName Simple/complete name or alias of the CompressionCodec
+	 * @param hadoopConfig Hadoop Configuration
+	 * @return the instance of CompressionWriterFactory
+	 * @throws IOException
+	 */
+	public CompressWriterFactory<IN> withHadoopCompression(String codecName, Configuration hadoopConfig) throws IOException {
+		this.codecExtension = getHadoopCodecExtension(codecName, hadoopConfig);
+		this.hadoopCodecName = codecName;
+
+		for (Map.Entry<String, String> entry : hadoopConfig) {
+			hadoopConfigMap.put(entry.getKey(), entry.getValue());
+		}
 
-	public CompressWriterFactory<IN> withHadoopCompression(CompressionCodec hadoopCodec) {
-		this.hadoopCodec = Preconditions.checkNotNull(hadoopCodec, "hadoopCodec cannot be null");
 		return this;
 	}
 
 	@Override
 	public BulkWriter<IN> create(FSDataOutputStream out) throws IOException {
-		try {
-			return (hadoopCodec != null)
-				? new HadoopCompressionBulkWriter<>(out, extractor, hadoopCodec)
-				: new NoCompressionBulkWriter<>(out, extractor);
-		} catch (Exception e) {
-			throw new IOException(e.getLocalizedMessage(), e);
+		if (hadoopCodecName == null || hadoopCodecName.trim().isEmpty()) {
+			return new NoCompressionBulkWriter<>(out, extractor);
 		}
+
+		initializeCompressionCodec();
+
+		return new HadoopCompressionBulkWriter<>(hadoopCodec.createOutputStream(out), extractor);
 	}
 
-	public String codecExtension() {
-		return (hadoopCodec != null)
-			? hadoopCodec.getDefaultExtension()
-			: "";
+	public String getExtension() {
+		return (hadoopCodecName != null) ? this.codecExtension : "";
 	}
 
+	private void initializeCompressionCodec() {
+		if (hadoopCodec == null) {
+			Configuration conf = new Configuration();
+
+			for (Map.Entry<String, String> entry : hadoopConfigMap.entrySet()) {
+				conf.set(entry.getKey(), entry.getValue());
+			}
+
+			hadoopCodec = new CompressionCodecFactory(conf).getCodecByName(this.hadoopCodecName);
+		}
+	}
+
+	private String getHadoopCodecExtension(String hadoopCodecName, Configuration conf) throws IOException {
+		CompressionCodec codec = new CompressionCodecFactory(conf).getCodecByName(hadoopCodecName);
+
+		if (codec == null) {
+			throw new IOException("Unable to load the provided Hadoop codec [" + hadoopCodecName + "]");
+		}
+
+		return codec.getDefaultExtension();
+	}
 }
diff --git a/flink-formats/flink-compress/src/main/java/org/apache/flink/formats/compress/writers/HadoopCompressionBulkWriter.java b/flink-formats/flink-compress/src/main/java/org/apache/flink/formats/compress/writers/HadoopCompressionBulkWriter.java
index 798c291..e62a0f0 100644
--- a/flink-formats/flink-compress/src/main/java/org/apache/flink/formats/compress/writers/HadoopCompressionBulkWriter.java
+++ b/flink-formats/flink-compress/src/main/java/org/apache/flink/formats/compress/writers/HadoopCompressionBulkWriter.java
@@ -19,48 +19,42 @@
 package org.apache.flink.formats.compress.writers;
 
 import org.apache.flink.api.common.serialization.BulkWriter;
-import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.formats.compress.extractor.Extractor;
 
-import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionOutputStream;
 
 import java.io.IOException;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * A {@link BulkWriter} implementation that compresses data using Hadoop codecs.
+ * A {@link BulkWriter} implementation that writes data that have been
+ * compressed using Hadoop {@link org.apache.hadoop.io.compress.CompressionCodec}.
  *
  * @param <T> The type of element to write.
  */
 public class HadoopCompressionBulkWriter<T> implements BulkWriter<T> {
 
-	private Extractor<T> extractor;
-	private FSDataOutputStream outputStream;
-	private CompressionOutputStream compressor;
+	private final Extractor<T> extractor;
+	private final CompressionOutputStream out;
 
-	public HadoopCompressionBulkWriter(
-			FSDataOutputStream outputStream,
-			Extractor<T> extractor,
-			CompressionCodec compressionCodec) throws Exception {
-		this.outputStream = outputStream;
-		this.extractor = extractor;
-		this.compressor = compressionCodec.createOutputStream(outputStream);
+	public HadoopCompressionBulkWriter(CompressionOutputStream out, Extractor<T> extractor) {
+		this.out = checkNotNull(out);
+		this.extractor = checkNotNull(extractor);
 	}
 
 	@Override
 	public void addElement(T element) throws IOException {
-		compressor.write(extractor.extract(element));
+		out.write(extractor.extract(element));
 	}
 
 	@Override
 	public void flush() throws IOException {
-		compressor.flush();
-		outputStream.flush();
+		out.flush();
 	}
 
 	@Override
 	public void finish() throws IOException {
-		compressor.finish();
-		outputStream.sync();
+		out.finish();
 	}
 }
diff --git a/flink-formats/flink-compress/src/main/java/org/apache/flink/formats/compress/writers/NoCompressionBulkWriter.java b/flink-formats/flink-compress/src/main/java/org/apache/flink/formats/compress/writers/NoCompressionBulkWriter.java
index 11cbbbf..0935f86 100644
--- a/flink-formats/flink-compress/src/main/java/org/apache/flink/formats/compress/writers/NoCompressionBulkWriter.java
+++ b/flink-formats/flink-compress/src/main/java/org/apache/flink/formats/compress/writers/NoCompressionBulkWriter.java
@@ -24,6 +24,8 @@ import org.apache.flink.formats.compress.extractor.Extractor;
 
 import java.io.IOException;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A {@link BulkWriter} implementation that does not compress data. This is essentially a no-op
  * writer for use with {@link org.apache.flink.formats.compress.CompressWriterFactory} for the case
@@ -33,12 +35,12 @@ import java.io.IOException;
  */
 public class NoCompressionBulkWriter<T> implements BulkWriter<T> {
 
-	private Extractor<T> extractor;
-	private FSDataOutputStream outputStream;
+	private final Extractor<T> extractor;
+	private final FSDataOutputStream outputStream;
 
 	public NoCompressionBulkWriter(FSDataOutputStream outputStream, Extractor<T> extractor) {
-		this.outputStream = outputStream;
-		this.extractor = extractor;
+		this.outputStream = checkNotNull(outputStream);
+		this.extractor = checkNotNull(extractor);
 	}
 
 	@Override
diff --git a/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CompressWriterFactoryTest.java b/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CompressWriterFactoryTest.java
index 2feab41..dc46e86 100644
--- a/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CompressWriterFactoryTest.java
+++ b/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CompressWriterFactoryTest.java
@@ -30,12 +30,9 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.compress.BZip2Codec;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.io.compress.DeflateCodec;
-import org.apache.hadoop.io.compress.GzipCodec;
+import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -43,6 +40,7 @@ import org.junit.rules.TemporaryFolder;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.Arrays;
 import java.util.List;
@@ -59,62 +57,86 @@ public class CompressWriterFactoryTest extends TestLogger {
 
 	@ClassRule
 	public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+	private static Configuration confWithCustomCodec;
+
+	@BeforeClass
+	public static void before() {
+		confWithCustomCodec = new Configuration();
+		confWithCustomCodec.set("io.compression.codecs", "org.apache.flink.formats.compress.CustomCompressionCodec");
+	}
 
 	@Test
-	public void testBzip2CompressByName() throws Exception {
+	public void testBzip2CompressByAlias() throws Exception {
 		testCompressByName("Bzip2");
 	}
 
 	@Test
-	public void testBzip2CompressCodec() throws Exception {
-		BZip2Codec codec = new BZip2Codec();
-		codec.setConf(new Configuration());
-		testCompressCodec(codec);
+	public void testBzip2CompressByName() throws Exception {
+		testCompressByName("Bzip2Codec");
 	}
 
 	@Test
-	public void testGzipCompressByName() throws Exception {
+	public void testGzipCompressByAlias() throws Exception {
 		testCompressByName("Gzip");
 	}
 
 	@Test
-	public void testGzipCompressCodec() throws Exception {
-		GzipCodec codec = new GzipCodec();
-		codec.setConf(new Configuration());
-		testCompressCodec(codec);
+	public void testGzipCompressByName() throws Exception {
+		testCompressByName("GzipCodec");
+	}
+
+	@Test
+	public void testDeflateCompressByAlias() throws Exception {
+		testCompressByName("deflate");
 	}
 
 	@Test
-	public void testDeflateCompressByName() throws Exception {
-		DeflateCodec codec = new DeflateCodec();
-		codec.setConf(new Configuration());
-		testCompressCodec(codec);
+	public void testDeflateCompressByClassName() throws Exception {
+		testCompressByName("org.apache.hadoop.io.compress.DeflateCodec");
 	}
 
 	@Test
 	public void testDefaultCompressByName() throws Exception {
-		DefaultCodec codec = new DefaultCodec();
-		codec.setConf(new Configuration());
-		testCompressCodec(codec);
+		testCompressByName("DefaultCodec");
 	}
 
-	private void testCompressByName(String codec) throws Exception {
-		CompressWriterFactory<String> writer = CompressWriters.forExtractor(new DefaultExtractor<String>()).withHadoopCompression(codec);
-		List<String> lines = Arrays.asList("line1", "line2", "line3");
+	@Test
+	public void testDefaultCompressByClassName() throws Exception {
+		testCompressByName("org.apache.hadoop.io.compress.DefaultCodec");
+	}
 
-		File directory = prepareCompressedFile(writer, lines);
+	@Test(expected = IOException.class)
+	public void testCompressFailureWithUnknownCodec() throws Exception {
+		testCompressByName("com.bla.bla.UnknownCodec");
+	}
+
+	@Test
+	public void testCustomCompressionCodecByClassName() throws Exception {
+		testCompressByName("org.apache.flink.formats.compress.CustomCompressionCodec", confWithCustomCodec);
+	}
 
-		validateResults(directory, lines, new CompressionCodecFactory(new Configuration()).getCodecByName(codec));
+	@Test
+	public void testCustomCompressionCodecByAlias() throws Exception {
+		testCompressByName("CustomCompressionCodec", confWithCustomCodec);
+	}
+
+	@Test
+	public void testCustomCompressionCodecByName() throws Exception {
+		testCompressByName("CustomCompression", confWithCustomCodec);
 	}
 
-	private void testCompressCodec(CompressionCodec codec) throws Exception {
+	private void testCompressByName(String codec) throws Exception {
+		testCompressByName(codec, new Configuration());
+	}
 
-		CompressWriterFactory<String> writer = CompressWriters.forExtractor(new DefaultExtractor<String>()).withHadoopCompression(codec);
+	private void testCompressByName(String codec, Configuration conf) throws Exception {
+		CompressWriterFactory<String> writer = CompressWriters.forExtractor(new DefaultExtractor<String>())
+			.withHadoopCompression(codec, conf);
 		List<String> lines = Arrays.asList("line1", "line2", "line3");
 
 		File directory = prepareCompressedFile(writer, lines);
 
-		validateResults(directory, lines, codec);
+		validateResults(directory, lines, new CompressionCodecFactory(conf).getCodecByName(codec));
 	}
 
 	private File prepareCompressedFile(CompressWriterFactory<String> writer, List<String> lines) throws Exception {
diff --git a/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CustomCompressionCodec.java b/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CustomCompressionCodec.java
new file mode 100644
index 0000000..e82a018
--- /dev/null
+++ b/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CustomCompressionCodec.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.compress;
+
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionCodec;
+
+/**
+ * Just a dummy class which extends BZip2Codec to verify
+ * that custom Hadoop {@link CompressionCodec} can also
+ * be successfully loaded using the class name or alias,
+ * just like the default ones.
+ */
+public class CustomCompressionCodec extends BZip2Codec {
+	// Seriously nothing happens here
+}


[flink] 02/02: [FLINK-16371][fs-connector] Add ITCase

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5e16cdf16725dceb07a30c5529d878fca3568291
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Tue Mar 10 12:02:54 2020 +0100

    [FLINK-16371][fs-connector] Add ITCase
    
    This closes #11307.
---
 .../formats/compress/CompressionFactoryITCase.java | 120 +++++++++++++++++++++
 1 file changed, 120 insertions(+)

diff --git a/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CompressionFactoryITCase.java b/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CompressionFactoryITCase.java
new file mode 100644
index 0000000..b9ce9f6
--- /dev/null
+++ b/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CompressionFactoryITCase.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.compress;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.compress.extractor.DefaultExtractor;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.streaming.util.FiniteTestSource;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration test case for writing bulk encoded files with the
+ * {@link StreamingFileSink} and Hadoop Compression Codecs.
+ */
+public class CompressionFactoryITCase extends AbstractTestBase {
+
+	private final Configuration configuration = new Configuration();
+
+	private static final String TEST_CODEC_NAME = "Bzip2";
+
+	private final List<String> testData = Arrays.asList(
+			"line1",
+			"line2",
+			"line3"
+	);
+
+	@Rule
+	public final Timeout timeoutPerTest = Timeout.seconds(20);
+
+	@Test
+	public void testWriteCompressedFile() throws Exception {
+		final File folder = TEMPORARY_FOLDER.newFolder();
+		final Path testPath = Path.fromLocalFile(folder);
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
+		env.enableCheckpointing(100);
+
+		DataStream<String> stream = env.addSource(
+				new FiniteTestSource<>(testData),
+				TypeInformation.of(String.class)
+		);
+
+		stream.map(str -> str).addSink(
+				StreamingFileSink.forBulkFormat(
+						testPath,
+						CompressWriters.forExtractor(new DefaultExtractor<String>()).withHadoopCompression(TEST_CODEC_NAME)
+				).build());
+
+		env.execute();
+
+		validateResults(folder, testData, new CompressionCodecFactory(configuration).getCodecByName(TEST_CODEC_NAME));
+	}
+
+	private List<String> readFile(File file, CompressionCodec codec) throws Exception {
+		try (
+				FileInputStream inputStream = new FileInputStream(file);
+				InputStreamReader readerStream = new InputStreamReader(codec.createInputStream(inputStream));
+				BufferedReader reader = new BufferedReader(readerStream)
+		) {
+			return reader.lines().collect(Collectors.toList());
+		}
+	}
+
+	private void validateResults(File folder, List<String> expected, CompressionCodec codec) throws Exception {
+		File[] buckets = folder.listFiles();
+		assertNotNull(buckets);
+		assertEquals(1, buckets.length);
+
+		final File[] partFiles = buckets[0].listFiles();
+		assertNotNull(partFiles);
+		assertEquals(2, partFiles.length);
+
+		for (File partFile : partFiles) {
+			assertTrue(partFile.length() > 0);
+
+			final List<String> fileContent = readFile(partFile, codec);
+			assertEquals(expected, fileContent);
+		}
+	}
+}
+