You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/10/15 07:47:06 UTC
[3/3] flink git commit: [FLINK-4771] [avro] Add support for
compression to AvroOutputFormat.
[FLINK-4771] [avro] Add support for compression to AvroOutputFormat.
This closes #2612
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dffde7ef
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dffde7ef
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dffde7ef
Branch: refs/heads/master
Commit: dffde7efb3c60da83e54f4202004d5d70e174e8f
Parents: dc99deb
Author: larsbachmann <la...@posteo.de>
Authored: Sat Oct 8 13:30:14 2016 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Sat Oct 15 07:59:51 2016 +0200
----------------------------------------------------------------------
.../flink/api/java/io/AvroOutputFormat.java | 66 ++++++++
.../flink/api/avro/AvroOutputFormatITCase.java | 1 +
.../flink/api/java/io/AvroOutputFormatTest.java | 154 +++++++++++++++++++
3 files changed, 221 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/dffde7ef/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
index 4d8313c..600d1e5 100644
--- a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
+++ b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
@@ -18,6 +18,7 @@
package org.apache.flink.api.java.io;
import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.reflect.ReflectData;
@@ -26,16 +27,58 @@ import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.core.fs.Path;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
import java.io.IOException;
import java.io.Serializable;
public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializable {
+ /**
+ * Wrapper which encapsulates the supported codec and a related serialization byte.
+ */
+ public enum Codec {
+
+ NULL((byte)0, CodecFactory.nullCodec()),
+ SNAPPY((byte)1, CodecFactory.snappyCodec()),
+ BZIP2((byte)2, CodecFactory.bzip2Codec()),
+ DEFLATE((byte)3, CodecFactory.deflateCodec(CodecFactory.DEFAULT_DEFLATE_LEVEL)),
+ XZ((byte)4, CodecFactory.xzCodec(CodecFactory.DEFAULT_XZ_LEVEL));
+
+ private byte codecByte;
+
+ private CodecFactory codecFactory;
+
+ Codec(final byte codecByte, final CodecFactory codecFactory) {
+ this.codecByte = codecByte;
+ this.codecFactory = codecFactory;
+ }
+
+ private byte getCodecByte() {
+ return codecByte;
+ }
+
+ private CodecFactory getCodecFactory() {
+ return codecFactory;
+ }
+
+ private static Codec forCodecByte(byte codecByte) {
+ for (final Codec codec : Codec.values()) {
+ if (codec.getCodecByte() == codecByte) {
+ return codec;
+ }
+ }
+ throw new IllegalArgumentException("no codec for codecByte: " + codecByte);
+ }
+ }
+
private static final long serialVersionUID = 1L;
private final Class<E> avroValueType;
private transient Schema userDefinedSchema = null;
+
+ private transient Codec codec = null;
private transient DataFileWriter<E> dataFileWriter;
@@ -57,6 +100,15 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializ
this.userDefinedSchema = schema;
}
+ /**
+ * Set avro codec for compression.
+ *
+ * @param codec avro codec.
+ */
+ public void setCodec(final Codec codec) {
+ this.codec = checkNotNull(codec, "codec can not be null");
+ }
+
@Override
public void writeRecord(E record) throws IOException {
dataFileWriter.append(record);
@@ -82,6 +134,9 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializ
schema = ReflectData.get().getSchema(avroValueType);
}
dataFileWriter = new DataFileWriter<E>(datumWriter);
+ if (codec != null) {
+ dataFileWriter.setCodec(codec.getCodecFactory());
+ }
if (userDefinedSchema == null) {
dataFileWriter.create(schema, stream);
} else {
@@ -92,6 +147,12 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializ
private void writeObject(java.io.ObjectOutputStream out) throws IOException {
out.defaultWriteObject();
+ if (codec != null) {
+ out.writeByte(codec.getCodecByte());
+ } else {
+ out.writeByte(-1);
+ }
+
if(userDefinedSchema != null) {
byte[] json = userDefinedSchema.toString().getBytes();
out.writeInt(json.length);
@@ -104,6 +165,11 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializ
private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
+ byte codecByte = in.readByte();
+ if (codecByte >= 0) {
+ setCodec(Codec.forCodecByte(codecByte));
+ }
+
int length = in.readInt();
if(length != 0) {
byte[] json = new byte[length];
http://git-wip-us.apache.org/repos/asf/flink/blob/dffde7ef/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
index adbe5dd..3b01ccb 100644
--- a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
+++ b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
@@ -69,6 +69,7 @@ public class AvroOutputFormatITCase extends JavaProgramTestBase {
//output the data with AvroOutputFormat for specific user type
DataSet<User> specificUser = input.map(new ConvertToUser());
AvroOutputFormat<User> avroOutputFormat = new AvroOutputFormat<User>(User.class);
+ avroOutputFormat.setCodec(AvroOutputFormat.Codec.SNAPPY); // FLINK-4771: use a codec
avroOutputFormat.setSchema(User.SCHEMA$); //FLINK-3304: Ensure the OF is properly serializing the schema
specificUser.write(avroOutputFormat, outputPath1);
http://git-wip-us.apache.org/repos/asf/flink/blob/dffde7ef/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java
new file mode 100644
index 0000000..4d6c6b7
--- /dev/null
+++ b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import static org.apache.flink.api.java.io.AvroOutputFormat.Codec;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+import org.apache.avro.Schema;
+import org.apache.flink.api.io.avro.example.User;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+
+/**
+ * Tests for {@link AvroOutputFormat}
+ */
+public class AvroOutputFormatTest {
+
+ @Test
+ public void testSetCodec() throws Exception {
+ // given
+ final AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(User.class);
+
+ // when
+ try {
+ outputFormat.setCodec(Codec.SNAPPY);
+ } catch (Exception ex) {
+ // then
+ fail("unexpected exception");
+ }
+ }
+
+ @Test
+ public void testSetCodecError() throws Exception {
+ // given
+ boolean error = false;
+ final AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(User.class);
+
+ // when
+ try {
+ outputFormat.setCodec(null);
+ } catch (Exception ex) {
+ error = true;
+ }
+
+ // then
+ assertTrue(error);
+ }
+
+ @Test
+ public void testSerialization() throws Exception {
+
+ serializeAndDeserialize(null, null);
+ serializeAndDeserialize(null, User.SCHEMA$);
+ for (final Codec codec : Codec.values()) {
+ serializeAndDeserialize(codec, null);
+ serializeAndDeserialize(codec, User.SCHEMA$);
+ }
+ }
+
+ private void serializeAndDeserialize(final Codec codec, final Schema schema) throws IOException, ClassNotFoundException {
+ // given
+ final AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(User.class);
+ if (codec != null) {
+ outputFormat.setCodec(codec);
+ }
+ if (schema != null) {
+ outputFormat.setSchema(schema);
+ }
+
+ final ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+ // when
+ try (final ObjectOutputStream oos = new ObjectOutputStream(bos)) {
+ oos.writeObject(outputFormat);
+ }
+ try (final ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray()))) {
+ // then
+ Object o = ois.readObject();
+ assertTrue(o instanceof AvroOutputFormat);
+ final AvroOutputFormat<User> restored = (AvroOutputFormat<User>) o;
+ final Codec restoredCodec = (Codec) Whitebox.getInternalState(restored, "codec");
+ final Schema restoredSchema = (Schema) Whitebox.getInternalState(restored, "userDefinedSchema");
+
+ assertTrue(codec != null ? restoredCodec == codec : restoredCodec == null);
+ assertTrue(schema != null ? restoredSchema.equals(schema) : restoredSchema == null);
+ }
+ }
+
+ @Test
+ public void testCompression() throws Exception {
+ // given
+ final Path outputPath = new Path(File.createTempFile("avro-output-file","avro").getAbsolutePath());
+ final AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(outputPath,User.class);
+ outputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
+
+ final Path compressedOutputPath = new Path(File.createTempFile("avro-output-file","compressed.avro").getAbsolutePath());
+ final AvroOutputFormat<User> compressedOutputFormat = new AvroOutputFormat<>(compressedOutputPath,User.class);
+ compressedOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
+ compressedOutputFormat.setCodec(Codec.SNAPPY);
+
+ // when
+ output(outputFormat);
+ output(compressedOutputFormat);
+
+ // then
+ assertTrue(fileSize(outputPath) > fileSize(compressedOutputPath));
+
+ // cleanup
+ Files.delete(Paths.get(outputPath.getPath()));
+ Files.delete(Paths.get(compressedOutputPath.getPath()));
+ }
+
+ private long fileSize(Path path) throws IOException {
+ return Files.size(Paths.get(path.getPath()));
+ }
+
+ private void output(final AvroOutputFormat<User> outputFormat) throws IOException {
+ outputFormat.configure(new Configuration());
+ outputFormat.open(1,1);
+ for (int i = 0; i < 100; i++) {
+ outputFormat.writeRecord(new User("testUser",1,"blue"));
+ }
+ outputFormat.close();
+ }
+}