You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2020/03/10 11:48:30 UTC

[flink] 01/02: [FLINK-16371][fs-connector] Make CompressWriterFactory serializable

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ea5197eeefcf72eb9aa33ad83e591faf856bbda9
Author: Sivaprasanna S <si...@et-c02y86hzjg5h.fkipl.flipkart.com>
AuthorDate: Wed Mar 4 17:04:21 2020 +0530

    [FLINK-16371][fs-connector] Make CompressWriterFactory serializable
---
 .../formats/compress/CompressWriterFactory.java    | 98 +++++++++++++++++-----
 .../writers/HadoopCompressionBulkWriter.java       | 30 +++----
 .../compress/writers/NoCompressionBulkWriter.java  | 10 ++-
 .../compress/CompressWriterFactoryTest.java        | 80 +++++++++++-------
 .../formats/compress/CustomCompressionCodec.java   | 32 +++++++
 5 files changed, 177 insertions(+), 73 deletions(-)

diff --git a/flink-formats/flink-compress/src/main/java/org/apache/flink/formats/compress/CompressWriterFactory.java b/flink-formats/flink-compress/src/main/java/org/apache/flink/formats/compress/CompressWriterFactory.java
index 8d66524..76dd4a9 100644
--- a/flink-formats/flink-compress/src/main/java/org/apache/flink/formats/compress/CompressWriterFactory.java
+++ b/flink-formats/flink-compress/src/main/java/org/apache/flink/formats/compress/CompressWriterFactory.java
@@ -24,57 +24,111 @@ import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.formats.compress.extractor.Extractor;
 import org.apache.flink.formats.compress.writers.HadoopCompressionBulkWriter;
 import org.apache.flink.formats.compress.writers.NoCompressionBulkWriter;
-import org.apache.flink.util.Preconditions;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A factory that creates a {@link BulkWriter} implementation that compresses the written data.
+ * A factory that creates for {@link BulkWriter bulk writers} that, when provided
+ * with a {@link CompressionCodec}, they compress the data they write. If no codec is
+ * provided, the data is written in bulk but uncompressed.
  *
  * @param <IN> The type of element to write.
  */
 @PublicEvolving
 public class CompressWriterFactory<IN> implements BulkWriter.Factory<IN> {
 
-	private Extractor<IN> extractor;
-	private CompressionCodec hadoopCodec;
+	private final Extractor<IN> extractor;
+	private final Map<String, String> hadoopConfigMap;
+
+	private transient CompressionCodec hadoopCodec;
+
+	private String hadoopCodecName;
+	private String codecExtension;
 
+	/**
+	 * Creates a new CompressWriterFactory using the given {@link Extractor} to assemble
+	 * either {@link HadoopCompressionBulkWriter} or {@link NoCompressionBulkWriter}
+	 * based on whether a Hadoop CompressionCodec name is specified.
+	 *
+	 * @param extractor Extractor to extract the element
+	 */
 	public CompressWriterFactory(Extractor<IN> extractor) {
-		this.extractor = Preconditions.checkNotNull(extractor, "extractor cannot be null");
+		this.extractor = checkNotNull(extractor, "Extractor cannot be null");
+		this.hadoopConfigMap = new HashMap<>();
 	}
 
-	public CompressWriterFactory<IN> withHadoopCompression(String hadoopCodecName) {
-		return withHadoopCompression(hadoopCodecName, new Configuration());
+	/**
+	 * Compresses the data using the provided Hadoop {@link CompressionCodec}.
+	 *
+	 * @param codecName Simple/complete name or alias of the CompressionCodec
+	 * @return the instance of CompressionWriterFactory
+	 * @throws IOException
+	 */
+	public CompressWriterFactory<IN> withHadoopCompression(String codecName) throws IOException {
+		return withHadoopCompression(codecName, new Configuration());
 	}
 
-	public CompressWriterFactory<IN> withHadoopCompression(String hadoopCodecName, Configuration hadoopConfiguration) {
-		return withHadoopCompression(new CompressionCodecFactory(hadoopConfiguration).getCodecByName(hadoopCodecName));
-	}
+	/**
+	 * Compresses the data using the provided Hadoop {@link CompressionCodec} and {@link Configuration}.
+	 *
+	 * @param codecName Simple/complete name or alias of the CompressionCodec
+	 * @param hadoopConfig Hadoop Configuration
+	 * @return the instance of CompressionWriterFactory
+	 * @throws IOException
+	 */
+	public CompressWriterFactory<IN> withHadoopCompression(String codecName, Configuration hadoopConfig) throws IOException {
+		this.codecExtension = getHadoopCodecExtension(codecName, hadoopConfig);
+		this.hadoopCodecName = codecName;
+
+		for (Map.Entry<String, String> entry : hadoopConfig) {
+			hadoopConfigMap.put(entry.getKey(), entry.getValue());
+		}
 
-	public CompressWriterFactory<IN> withHadoopCompression(CompressionCodec hadoopCodec) {
-		this.hadoopCodec = Preconditions.checkNotNull(hadoopCodec, "hadoopCodec cannot be null");
 		return this;
 	}
 
 	@Override
 	public BulkWriter<IN> create(FSDataOutputStream out) throws IOException {
-		try {
-			return (hadoopCodec != null)
-				? new HadoopCompressionBulkWriter<>(out, extractor, hadoopCodec)
-				: new NoCompressionBulkWriter<>(out, extractor);
-		} catch (Exception e) {
-			throw new IOException(e.getLocalizedMessage(), e);
+		if (hadoopCodecName == null || hadoopCodecName.trim().isEmpty()) {
+			return new NoCompressionBulkWriter<>(out, extractor);
 		}
+
+		initializeCompressionCodec();
+
+		return new HadoopCompressionBulkWriter<>(hadoopCodec.createOutputStream(out), extractor);
 	}
 
-	public String codecExtension() {
-		return (hadoopCodec != null)
-			? hadoopCodec.getDefaultExtension()
-			: "";
+	public String getExtension() {
+		return (hadoopCodecName != null) ? this.codecExtension : "";
 	}
 
+	private void initializeCompressionCodec() {
+		if (hadoopCodec == null) {
+			Configuration conf = new Configuration();
+
+			for (Map.Entry<String, String> entry : hadoopConfigMap.entrySet()) {
+				conf.set(entry.getKey(), entry.getValue());
+			}
+
+			hadoopCodec = new CompressionCodecFactory(conf).getCodecByName(this.hadoopCodecName);
+		}
+	}
+
+	private String getHadoopCodecExtension(String hadoopCodecName, Configuration conf) throws IOException {
+		CompressionCodec codec = new CompressionCodecFactory(conf).getCodecByName(hadoopCodecName);
+
+		if (codec == null) {
+			throw new IOException("Unable to load the provided Hadoop codec [" + hadoopCodecName + "]");
+		}
+
+		return codec.getDefaultExtension();
+	}
 }
diff --git a/flink-formats/flink-compress/src/main/java/org/apache/flink/formats/compress/writers/HadoopCompressionBulkWriter.java b/flink-formats/flink-compress/src/main/java/org/apache/flink/formats/compress/writers/HadoopCompressionBulkWriter.java
index 798c291..e62a0f0 100644
--- a/flink-formats/flink-compress/src/main/java/org/apache/flink/formats/compress/writers/HadoopCompressionBulkWriter.java
+++ b/flink-formats/flink-compress/src/main/java/org/apache/flink/formats/compress/writers/HadoopCompressionBulkWriter.java
@@ -19,48 +19,42 @@
 package org.apache.flink.formats.compress.writers;
 
 import org.apache.flink.api.common.serialization.BulkWriter;
-import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.formats.compress.extractor.Extractor;
 
-import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionOutputStream;
 
 import java.io.IOException;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * A {@link BulkWriter} implementation that compresses data using Hadoop codecs.
+ * A {@link BulkWriter} implementation that writes data that have been
+ * compressed using Hadoop {@link org.apache.hadoop.io.compress.CompressionCodec}.
  *
  * @param <T> The type of element to write.
  */
 public class HadoopCompressionBulkWriter<T> implements BulkWriter<T> {
 
-	private Extractor<T> extractor;
-	private FSDataOutputStream outputStream;
-	private CompressionOutputStream compressor;
+	private final Extractor<T> extractor;
+	private final CompressionOutputStream out;
 
-	public HadoopCompressionBulkWriter(
-			FSDataOutputStream outputStream,
-			Extractor<T> extractor,
-			CompressionCodec compressionCodec) throws Exception {
-		this.outputStream = outputStream;
-		this.extractor = extractor;
-		this.compressor = compressionCodec.createOutputStream(outputStream);
+	public HadoopCompressionBulkWriter(CompressionOutputStream out, Extractor<T> extractor) {
+		this.out = checkNotNull(out);
+		this.extractor = checkNotNull(extractor);
 	}
 
 	@Override
 	public void addElement(T element) throws IOException {
-		compressor.write(extractor.extract(element));
+		out.write(extractor.extract(element));
 	}
 
 	@Override
 	public void flush() throws IOException {
-		compressor.flush();
-		outputStream.flush();
+		out.flush();
 	}
 
 	@Override
 	public void finish() throws IOException {
-		compressor.finish();
-		outputStream.sync();
+		out.finish();
 	}
 }
diff --git a/flink-formats/flink-compress/src/main/java/org/apache/flink/formats/compress/writers/NoCompressionBulkWriter.java b/flink-formats/flink-compress/src/main/java/org/apache/flink/formats/compress/writers/NoCompressionBulkWriter.java
index 11cbbbf..0935f86 100644
--- a/flink-formats/flink-compress/src/main/java/org/apache/flink/formats/compress/writers/NoCompressionBulkWriter.java
+++ b/flink-formats/flink-compress/src/main/java/org/apache/flink/formats/compress/writers/NoCompressionBulkWriter.java
@@ -24,6 +24,8 @@ import org.apache.flink.formats.compress.extractor.Extractor;
 
 import java.io.IOException;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A {@link BulkWriter} implementation that does not compress data. This is essentially a no-op
  * writer for use with {@link org.apache.flink.formats.compress.CompressWriterFactory} for the case
@@ -33,12 +35,12 @@ import java.io.IOException;
  */
 public class NoCompressionBulkWriter<T> implements BulkWriter<T> {
 
-	private Extractor<T> extractor;
-	private FSDataOutputStream outputStream;
+	private final Extractor<T> extractor;
+	private final FSDataOutputStream outputStream;
 
 	public NoCompressionBulkWriter(FSDataOutputStream outputStream, Extractor<T> extractor) {
-		this.outputStream = outputStream;
-		this.extractor = extractor;
+		this.outputStream = checkNotNull(outputStream);
+		this.extractor = checkNotNull(extractor);
 	}
 
 	@Override
diff --git a/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CompressWriterFactoryTest.java b/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CompressWriterFactoryTest.java
index 2feab41..dc46e86 100644
--- a/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CompressWriterFactoryTest.java
+++ b/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CompressWriterFactoryTest.java
@@ -30,12 +30,9 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.compress.BZip2Codec;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.io.compress.DeflateCodec;
-import org.apache.hadoop.io.compress.GzipCodec;
+import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -43,6 +40,7 @@ import org.junit.rules.TemporaryFolder;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.Arrays;
 import java.util.List;
@@ -59,62 +57,86 @@ public class CompressWriterFactoryTest extends TestLogger {
 
 	@ClassRule
 	public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+	private static Configuration confWithCustomCodec;
+
+	@BeforeClass
+	public static void before() {
+		confWithCustomCodec = new Configuration();
+		confWithCustomCodec.set("io.compression.codecs", "org.apache.flink.formats.compress.CustomCompressionCodec");
+	}
 
 	@Test
-	public void testBzip2CompressByName() throws Exception {
+	public void testBzip2CompressByAlias() throws Exception {
 		testCompressByName("Bzip2");
 	}
 
 	@Test
-	public void testBzip2CompressCodec() throws Exception {
-		BZip2Codec codec = new BZip2Codec();
-		codec.setConf(new Configuration());
-		testCompressCodec(codec);
+	public void testBzip2CompressByName() throws Exception {
+		testCompressByName("Bzip2Codec");
 	}
 
 	@Test
-	public void testGzipCompressByName() throws Exception {
+	public void testGzipCompressByAlias() throws Exception {
 		testCompressByName("Gzip");
 	}
 
 	@Test
-	public void testGzipCompressCodec() throws Exception {
-		GzipCodec codec = new GzipCodec();
-		codec.setConf(new Configuration());
-		testCompressCodec(codec);
+	public void testGzipCompressByName() throws Exception {
+		testCompressByName("GzipCodec");
+	}
+
+	@Test
+	public void testDeflateCompressByAlias() throws Exception {
+		testCompressByName("deflate");
 	}
 
 	@Test
-	public void testDeflateCompressByName() throws Exception {
-		DeflateCodec codec = new DeflateCodec();
-		codec.setConf(new Configuration());
-		testCompressCodec(codec);
+	public void testDeflateCompressByClassName() throws Exception {
+		testCompressByName("org.apache.hadoop.io.compress.DeflateCodec");
 	}
 
 	@Test
 	public void testDefaultCompressByName() throws Exception {
-		DefaultCodec codec = new DefaultCodec();
-		codec.setConf(new Configuration());
-		testCompressCodec(codec);
+		testCompressByName("DefaultCodec");
 	}
 
-	private void testCompressByName(String codec) throws Exception {
-		CompressWriterFactory<String> writer = CompressWriters.forExtractor(new DefaultExtractor<String>()).withHadoopCompression(codec);
-		List<String> lines = Arrays.asList("line1", "line2", "line3");
+	@Test
+	public void testDefaultCompressByClassName() throws Exception {
+		testCompressByName("org.apache.hadoop.io.compress.DefaultCodec");
+	}
 
-		File directory = prepareCompressedFile(writer, lines);
+	@Test(expected = IOException.class)
+	public void testCompressFailureWithUnknownCodec() throws Exception {
+		testCompressByName("com.bla.bla.UnknownCodec");
+	}
+
+	@Test
+	public void testCustomCompressionCodecByClassName() throws Exception {
+		testCompressByName("org.apache.flink.formats.compress.CustomCompressionCodec", confWithCustomCodec);
+	}
 
-		validateResults(directory, lines, new CompressionCodecFactory(new Configuration()).getCodecByName(codec));
+	@Test
+	public void testCustomCompressionCodecByAlias() throws Exception {
+		testCompressByName("CustomCompressionCodec", confWithCustomCodec);
+	}
+
+	@Test
+	public void testCustomCompressionCodecByName() throws Exception {
+		testCompressByName("CustomCompression", confWithCustomCodec);
 	}
 
-	private void testCompressCodec(CompressionCodec codec) throws Exception {
+	private void testCompressByName(String codec) throws Exception {
+		testCompressByName(codec, new Configuration());
+	}
 
-		CompressWriterFactory<String> writer = CompressWriters.forExtractor(new DefaultExtractor<String>()).withHadoopCompression(codec);
+	private void testCompressByName(String codec, Configuration conf) throws Exception {
+		CompressWriterFactory<String> writer = CompressWriters.forExtractor(new DefaultExtractor<String>())
+			.withHadoopCompression(codec, conf);
 		List<String> lines = Arrays.asList("line1", "line2", "line3");
 
 		File directory = prepareCompressedFile(writer, lines);
 
-		validateResults(directory, lines, codec);
+		validateResults(directory, lines, new CompressionCodecFactory(conf).getCodecByName(codec));
 	}
 
 	private File prepareCompressedFile(CompressWriterFactory<String> writer, List<String> lines) throws Exception {
diff --git a/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CustomCompressionCodec.java b/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CustomCompressionCodec.java
new file mode 100644
index 0000000..e82a018
--- /dev/null
+++ b/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CustomCompressionCodec.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.compress;
+
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionCodec;
+
+/**
+ * Just a dummy class which extends BZip2Codec to verify
+ * that custom Hadoop {@link CompressionCodec} can also
+ * be successfully loaded using the class name or alias,
+ * just like the default ones.
+ */
+public class CustomCompressionCodec extends BZip2Codec {
+	// Seriously nothing happens here
+}