You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2020/03/10 11:48:30 UTC
[flink] 01/02: [FLINK-16371][fs-connector] Make
CompressWriterFactory serializable
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
commit ea5197eeefcf72eb9aa33ad83e591faf856bbda9
Author: Sivaprasanna S <si...@et-c02y86hzjg5h.fkipl.flipkart.com>
AuthorDate: Wed Mar 4 17:04:21 2020 +0530
[FLINK-16371][fs-connector] Make CompressWriterFactory serializable
---
.../formats/compress/CompressWriterFactory.java | 98 +++++++++++++++++-----
.../writers/HadoopCompressionBulkWriter.java | 30 +++----
.../compress/writers/NoCompressionBulkWriter.java | 10 ++-
.../compress/CompressWriterFactoryTest.java | 80 +++++++++++-------
.../formats/compress/CustomCompressionCodec.java | 32 +++++++
5 files changed, 177 insertions(+), 73 deletions(-)
diff --git a/flink-formats/flink-compress/src/main/java/org/apache/flink/formats/compress/CompressWriterFactory.java b/flink-formats/flink-compress/src/main/java/org/apache/flink/formats/compress/CompressWriterFactory.java
index 8d66524..76dd4a9 100644
--- a/flink-formats/flink-compress/src/main/java/org/apache/flink/formats/compress/CompressWriterFactory.java
+++ b/flink-formats/flink-compress/src/main/java/org/apache/flink/formats/compress/CompressWriterFactory.java
@@ -24,57 +24,111 @@ import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.formats.compress.extractor.Extractor;
import org.apache.flink.formats.compress.writers.HadoopCompressionBulkWriter;
import org.apache.flink.formats.compress.writers.NoCompressionBulkWriter;
-import org.apache.flink.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * A factory that creates a {@link BulkWriter} implementation that compresses the written data.
+ * A factory that creates for {@link BulkWriter bulk writers} that, when provided
+ * with a {@link CompressionCodec}, they compress the data they write. If no codec is
+ * provided, the data is written in bulk but uncompressed.
*
* @param <IN> The type of element to write.
*/
@PublicEvolving
public class CompressWriterFactory<IN> implements BulkWriter.Factory<IN> {
- private Extractor<IN> extractor;
- private CompressionCodec hadoopCodec;
+ private final Extractor<IN> extractor;
+ private final Map<String, String> hadoopConfigMap;
+
+ private transient CompressionCodec hadoopCodec;
+
+ private String hadoopCodecName;
+ private String codecExtension;
+ /**
+ * Creates a new CompressWriterFactory using the given {@link Extractor} to assemble
+ * either {@link HadoopCompressionBulkWriter} or {@link NoCompressionBulkWriter}
+ * based on whether a Hadoop CompressionCodec name is specified.
+ *
+ * @param extractor Extractor to extract the element
+ */
public CompressWriterFactory(Extractor<IN> extractor) {
- this.extractor = Preconditions.checkNotNull(extractor, "extractor cannot be null");
+ this.extractor = checkNotNull(extractor, "Extractor cannot be null");
+ this.hadoopConfigMap = new HashMap<>();
}
- public CompressWriterFactory<IN> withHadoopCompression(String hadoopCodecName) {
- return withHadoopCompression(hadoopCodecName, new Configuration());
+ /**
+ * Compresses the data using the provided Hadoop {@link CompressionCodec}.
+ *
+ * @param codecName Simple/complete name or alias of the CompressionCodec
+ * @return the instance of CompressionWriterFactory
+ * @throws IOException
+ */
+ public CompressWriterFactory<IN> withHadoopCompression(String codecName) throws IOException {
+ return withHadoopCompression(codecName, new Configuration());
}
- public CompressWriterFactory<IN> withHadoopCompression(String hadoopCodecName, Configuration hadoopConfiguration) {
- return withHadoopCompression(new CompressionCodecFactory(hadoopConfiguration).getCodecByName(hadoopCodecName));
- }
+ /**
+ * Compresses the data using the provided Hadoop {@link CompressionCodec} and {@link Configuration}.
+ *
+ * @param codecName Simple/complete name or alias of the CompressionCodec
+ * @param hadoopConfig Hadoop Configuration
+ * @return the instance of CompressionWriterFactory
+ * @throws IOException
+ */
+ public CompressWriterFactory<IN> withHadoopCompression(String codecName, Configuration hadoopConfig) throws IOException {
+ this.codecExtension = getHadoopCodecExtension(codecName, hadoopConfig);
+ this.hadoopCodecName = codecName;
+
+ for (Map.Entry<String, String> entry : hadoopConfig) {
+ hadoopConfigMap.put(entry.getKey(), entry.getValue());
+ }
- public CompressWriterFactory<IN> withHadoopCompression(CompressionCodec hadoopCodec) {
- this.hadoopCodec = Preconditions.checkNotNull(hadoopCodec, "hadoopCodec cannot be null");
return this;
}
@Override
public BulkWriter<IN> create(FSDataOutputStream out) throws IOException {
- try {
- return (hadoopCodec != null)
- ? new HadoopCompressionBulkWriter<>(out, extractor, hadoopCodec)
- : new NoCompressionBulkWriter<>(out, extractor);
- } catch (Exception e) {
- throw new IOException(e.getLocalizedMessage(), e);
+ if (hadoopCodecName == null || hadoopCodecName.trim().isEmpty()) {
+ return new NoCompressionBulkWriter<>(out, extractor);
}
+
+ initializeCompressionCodec();
+
+ return new HadoopCompressionBulkWriter<>(hadoopCodec.createOutputStream(out), extractor);
}
- public String codecExtension() {
- return (hadoopCodec != null)
- ? hadoopCodec.getDefaultExtension()
- : "";
+ public String getExtension() {
+ return (hadoopCodecName != null) ? this.codecExtension : "";
}
+ private void initializeCompressionCodec() {
+ if (hadoopCodec == null) {
+ Configuration conf = new Configuration();
+
+ for (Map.Entry<String, String> entry : hadoopConfigMap.entrySet()) {
+ conf.set(entry.getKey(), entry.getValue());
+ }
+
+ hadoopCodec = new CompressionCodecFactory(conf).getCodecByName(this.hadoopCodecName);
+ }
+ }
+
+ private String getHadoopCodecExtension(String hadoopCodecName, Configuration conf) throws IOException {
+ CompressionCodec codec = new CompressionCodecFactory(conf).getCodecByName(hadoopCodecName);
+
+ if (codec == null) {
+ throw new IOException("Unable to load the provided Hadoop codec [" + hadoopCodecName + "]");
+ }
+
+ return codec.getDefaultExtension();
+ }
}
diff --git a/flink-formats/flink-compress/src/main/java/org/apache/flink/formats/compress/writers/HadoopCompressionBulkWriter.java b/flink-formats/flink-compress/src/main/java/org/apache/flink/formats/compress/writers/HadoopCompressionBulkWriter.java
index 798c291..e62a0f0 100644
--- a/flink-formats/flink-compress/src/main/java/org/apache/flink/formats/compress/writers/HadoopCompressionBulkWriter.java
+++ b/flink-formats/flink-compress/src/main/java/org/apache/flink/formats/compress/writers/HadoopCompressionBulkWriter.java
@@ -19,48 +19,42 @@
package org.apache.flink.formats.compress.writers;
import org.apache.flink.api.common.serialization.BulkWriter;
-import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.formats.compress.extractor.Extractor;
-import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import java.io.IOException;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
- * A {@link BulkWriter} implementation that compresses data using Hadoop codecs.
+ * A {@link BulkWriter} implementation that writes data that have been
+ * compressed using Hadoop {@link org.apache.hadoop.io.compress.CompressionCodec}.
*
* @param <T> The type of element to write.
*/
public class HadoopCompressionBulkWriter<T> implements BulkWriter<T> {
- private Extractor<T> extractor;
- private FSDataOutputStream outputStream;
- private CompressionOutputStream compressor;
+ private final Extractor<T> extractor;
+ private final CompressionOutputStream out;
- public HadoopCompressionBulkWriter(
- FSDataOutputStream outputStream,
- Extractor<T> extractor,
- CompressionCodec compressionCodec) throws Exception {
- this.outputStream = outputStream;
- this.extractor = extractor;
- this.compressor = compressionCodec.createOutputStream(outputStream);
+ public HadoopCompressionBulkWriter(CompressionOutputStream out, Extractor<T> extractor) {
+ this.out = checkNotNull(out);
+ this.extractor = checkNotNull(extractor);
}
@Override
public void addElement(T element) throws IOException {
- compressor.write(extractor.extract(element));
+ out.write(extractor.extract(element));
}
@Override
public void flush() throws IOException {
- compressor.flush();
- outputStream.flush();
+ out.flush();
}
@Override
public void finish() throws IOException {
- compressor.finish();
- outputStream.sync();
+ out.finish();
}
}
diff --git a/flink-formats/flink-compress/src/main/java/org/apache/flink/formats/compress/writers/NoCompressionBulkWriter.java b/flink-formats/flink-compress/src/main/java/org/apache/flink/formats/compress/writers/NoCompressionBulkWriter.java
index 11cbbbf..0935f86 100644
--- a/flink-formats/flink-compress/src/main/java/org/apache/flink/formats/compress/writers/NoCompressionBulkWriter.java
+++ b/flink-formats/flink-compress/src/main/java/org/apache/flink/formats/compress/writers/NoCompressionBulkWriter.java
@@ -24,6 +24,8 @@ import org.apache.flink.formats.compress.extractor.Extractor;
import java.io.IOException;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* A {@link BulkWriter} implementation that does not compress data. This is essentially a no-op
* writer for use with {@link org.apache.flink.formats.compress.CompressWriterFactory} for the case
@@ -33,12 +35,12 @@ import java.io.IOException;
*/
public class NoCompressionBulkWriter<T> implements BulkWriter<T> {
- private Extractor<T> extractor;
- private FSDataOutputStream outputStream;
+ private final Extractor<T> extractor;
+ private final FSDataOutputStream outputStream;
public NoCompressionBulkWriter(FSDataOutputStream outputStream, Extractor<T> extractor) {
- this.outputStream = outputStream;
- this.extractor = extractor;
+ this.outputStream = checkNotNull(outputStream);
+ this.extractor = checkNotNull(extractor);
}
@Override
diff --git a/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CompressWriterFactoryTest.java b/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CompressWriterFactoryTest.java
index 2feab41..dc46e86 100644
--- a/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CompressWriterFactoryTest.java
+++ b/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CompressWriterFactoryTest.java
@@ -30,12 +30,9 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.TestLogger;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.io.compress.DeflateCodec;
-import org.apache.hadoop.io.compress.GzipCodec;
+import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -43,6 +40,7 @@ import org.junit.rules.TemporaryFolder;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
+import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.List;
@@ -59,62 +57,86 @@ public class CompressWriterFactoryTest extends TestLogger {
@ClassRule
public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+ private static Configuration confWithCustomCodec;
+
+ @BeforeClass
+ public static void before() {
+ confWithCustomCodec = new Configuration();
+ confWithCustomCodec.set("io.compression.codecs", "org.apache.flink.formats.compress.CustomCompressionCodec");
+ }
@Test
- public void testBzip2CompressByName() throws Exception {
+ public void testBzip2CompressByAlias() throws Exception {
testCompressByName("Bzip2");
}
@Test
- public void testBzip2CompressCodec() throws Exception {
- BZip2Codec codec = new BZip2Codec();
- codec.setConf(new Configuration());
- testCompressCodec(codec);
+ public void testBzip2CompressByName() throws Exception {
+ testCompressByName("Bzip2Codec");
}
@Test
- public void testGzipCompressByName() throws Exception {
+ public void testGzipCompressByAlias() throws Exception {
testCompressByName("Gzip");
}
@Test
- public void testGzipCompressCodec() throws Exception {
- GzipCodec codec = new GzipCodec();
- codec.setConf(new Configuration());
- testCompressCodec(codec);
+ public void testGzipCompressByName() throws Exception {
+ testCompressByName("GzipCodec");
+ }
+
+ @Test
+ public void testDeflateCompressByAlias() throws Exception {
+ testCompressByName("deflate");
}
@Test
- public void testDeflateCompressByName() throws Exception {
- DeflateCodec codec = new DeflateCodec();
- codec.setConf(new Configuration());
- testCompressCodec(codec);
+ public void testDeflateCompressByClassName() throws Exception {
+ testCompressByName("org.apache.hadoop.io.compress.DeflateCodec");
}
@Test
public void testDefaultCompressByName() throws Exception {
- DefaultCodec codec = new DefaultCodec();
- codec.setConf(new Configuration());
- testCompressCodec(codec);
+ testCompressByName("DefaultCodec");
}
- private void testCompressByName(String codec) throws Exception {
- CompressWriterFactory<String> writer = CompressWriters.forExtractor(new DefaultExtractor<String>()).withHadoopCompression(codec);
- List<String> lines = Arrays.asList("line1", "line2", "line3");
+ @Test
+ public void testDefaultCompressByClassName() throws Exception {
+ testCompressByName("org.apache.hadoop.io.compress.DefaultCodec");
+ }
- File directory = prepareCompressedFile(writer, lines);
+ @Test(expected = IOException.class)
+ public void testCompressFailureWithUnknownCodec() throws Exception {
+ testCompressByName("com.bla.bla.UnknownCodec");
+ }
+
+ @Test
+ public void testCustomCompressionCodecByClassName() throws Exception {
+ testCompressByName("org.apache.flink.formats.compress.CustomCompressionCodec", confWithCustomCodec);
+ }
- validateResults(directory, lines, new CompressionCodecFactory(new Configuration()).getCodecByName(codec));
+ @Test
+ public void testCustomCompressionCodecByAlias() throws Exception {
+ testCompressByName("CustomCompressionCodec", confWithCustomCodec);
+ }
+
+ @Test
+ public void testCustomCompressionCodecByName() throws Exception {
+ testCompressByName("CustomCompression", confWithCustomCodec);
}
- private void testCompressCodec(CompressionCodec codec) throws Exception {
+ private void testCompressByName(String codec) throws Exception {
+ testCompressByName(codec, new Configuration());
+ }
- CompressWriterFactory<String> writer = CompressWriters.forExtractor(new DefaultExtractor<String>()).withHadoopCompression(codec);
+ private void testCompressByName(String codec, Configuration conf) throws Exception {
+ CompressWriterFactory<String> writer = CompressWriters.forExtractor(new DefaultExtractor<String>())
+ .withHadoopCompression(codec, conf);
List<String> lines = Arrays.asList("line1", "line2", "line3");
File directory = prepareCompressedFile(writer, lines);
- validateResults(directory, lines, codec);
+ validateResults(directory, lines, new CompressionCodecFactory(conf).getCodecByName(codec));
}
private File prepareCompressedFile(CompressWriterFactory<String> writer, List<String> lines) throws Exception {
diff --git a/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CustomCompressionCodec.java b/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CustomCompressionCodec.java
new file mode 100644
index 0000000..e82a018
--- /dev/null
+++ b/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CustomCompressionCodec.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.compress;
+
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionCodec;
+
+/**
+ * Just a dummy class which extends BZip2Codec to verify
+ * that custom Hadoop {@link CompressionCodec} can also
+ * be successfully loaded using the class name or alias,
+ * just like the default ones.
+ */
+public class CustomCompressionCodec extends BZip2Codec {
+ // Seriously nothing happens here
+}