You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/10/15 07:47:06 UTC

[3/3] flink git commit: [FLINK-4771] [avro] Add support for compression to AvroOutputFormat.

[FLINK-4771] [avro] Add support for compression to AvroOutputFormat.

This closes #2612


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dffde7ef
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dffde7ef
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dffde7ef

Branch: refs/heads/master
Commit: dffde7efb3c60da83e54f4202004d5d70e174e8f
Parents: dc99deb
Author: larsbachmann <la...@posteo.de>
Authored: Sat Oct 8 13:30:14 2016 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Sat Oct 15 07:59:51 2016 +0200

----------------------------------------------------------------------
 .../flink/api/java/io/AvroOutputFormat.java     |  66 ++++++++
 .../flink/api/avro/AvroOutputFormatITCase.java  |   1 +
 .../flink/api/java/io/AvroOutputFormatTest.java | 154 +++++++++++++++++++
 3 files changed, 221 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dffde7ef/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
index 4d8313c..600d1e5 100644
--- a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
+++ b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
@@ -18,6 +18,7 @@
 package org.apache.flink.api.java.io;
 
 import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.reflect.ReflectData;
@@ -26,16 +27,58 @@ import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.core.fs.Path;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 import java.io.IOException;
 import java.io.Serializable;
 
 public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializable {
 
+	/**
+	 * Wrapper which encapsulates the supported codec and a related serialization byte.
+	 */
+	public enum Codec {
+
+		NULL((byte)0, CodecFactory.nullCodec()),
+		SNAPPY((byte)1, CodecFactory.snappyCodec()),
+		BZIP2((byte)2, CodecFactory.bzip2Codec()),
+		DEFLATE((byte)3, CodecFactory.deflateCodec(CodecFactory.DEFAULT_DEFLATE_LEVEL)),
+		XZ((byte)4, CodecFactory.xzCodec(CodecFactory.DEFAULT_XZ_LEVEL));
+
+		private byte codecByte;
+
+		private CodecFactory codecFactory;
+
+		Codec(final byte codecByte, final CodecFactory codecFactory) {
+			this.codecByte = codecByte;
+			this.codecFactory = codecFactory;
+		}
+
+		private byte getCodecByte() {
+			return codecByte;
+		}
+
+		private CodecFactory getCodecFactory() {
+			return codecFactory;
+		}
+
+		private static Codec forCodecByte(byte codecByte) {
+			for (final Codec codec : Codec.values()) {
+				if (codec.getCodecByte() == codecByte) {
+					return codec;
+				}
+			}
+			throw new IllegalArgumentException("no codec for codecByte: " + codecByte);
+		}
+	}
+
 	private static final long serialVersionUID = 1L;
 
 	private final Class<E> avroValueType;
 
 	private transient Schema userDefinedSchema = null;
+
+	private transient Codec codec = null;
 	
 	private transient DataFileWriter<E> dataFileWriter;
 
@@ -57,6 +100,15 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializ
 		this.userDefinedSchema = schema;
 	}
 
+	/**
+	 * Set avro codec for compression.
+	 *
+	 * @param codec avro codec.
+	 */
+	public void setCodec(final Codec codec) {
+		this.codec = checkNotNull(codec, "codec can not be null");
+	}
+
 	@Override
 	public void writeRecord(E record) throws IOException {
 		dataFileWriter.append(record);
@@ -82,6 +134,9 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializ
 			schema = ReflectData.get().getSchema(avroValueType);
 		}
 		dataFileWriter = new DataFileWriter<E>(datumWriter);
+		if (codec != null) {
+			dataFileWriter.setCodec(codec.getCodecFactory());
+		}
 		if (userDefinedSchema == null) {
 			dataFileWriter.create(schema, stream);
 		} else {
@@ -92,6 +147,12 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializ
 	private void writeObject(java.io.ObjectOutputStream out) throws IOException {
 		out.defaultWriteObject();
 
+		if (codec != null) {
+			out.writeByte(codec.getCodecByte());
+		} else {
+			out.writeByte(-1);
+		}
+
 		if(userDefinedSchema != null) {
 			byte[] json = userDefinedSchema.toString().getBytes();
 			out.writeInt(json.length);
@@ -104,6 +165,11 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializ
 	private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
 		in.defaultReadObject();
 
+		byte codecByte = in.readByte();
+		if (codecByte >= 0) {
+			setCodec(Codec.forCodecByte(codecByte));
+		}
+
 		int length = in.readInt();
 		if(length != 0) {
 			byte[] json = new byte[length];

http://git-wip-us.apache.org/repos/asf/flink/blob/dffde7ef/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
index adbe5dd..3b01ccb 100644
--- a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
+++ b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
@@ -69,6 +69,7 @@ public class AvroOutputFormatITCase extends JavaProgramTestBase {
 		//output the data with AvroOutputFormat for specific user type
 		DataSet<User> specificUser = input.map(new ConvertToUser());
 		AvroOutputFormat<User> avroOutputFormat = new AvroOutputFormat<User>(User.class);
+		avroOutputFormat.setCodec(AvroOutputFormat.Codec.SNAPPY); // FLINK-4771: use a codec
 		avroOutputFormat.setSchema(User.SCHEMA$); //FLINK-3304: Ensure the OF is properly serializing the schema
 		specificUser.write(avroOutputFormat, outputPath1);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dffde7ef/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java
new file mode 100644
index 0000000..4d6c6b7
--- /dev/null
+++ b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import static org.apache.flink.api.java.io.AvroOutputFormat.Codec;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+import org.apache.avro.Schema;
+import org.apache.flink.api.io.avro.example.User;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+
+/**
+ * Tests for {@link AvroOutputFormat}
+ */
+public class AvroOutputFormatTest {
+
+	@Test
+	public void testSetCodec() throws Exception {
+		// given
+		final AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(User.class);
+
+		// when
+		try {
+			outputFormat.setCodec(Codec.SNAPPY);
+		} catch (Exception ex) {
+			// then
+			fail("unexpected exception");
+		}
+	}
+
+	@Test
+	public void testSetCodecError() throws Exception {
+		// given
+		boolean error = false;
+		final AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(User.class);
+
+		// when
+		try {
+			outputFormat.setCodec(null);
+		} catch (Exception ex) {
+			error = true;
+		}
+
+		// then
+		assertTrue(error);
+	}
+
+	@Test
+	public void testSerialization() throws Exception {
+
+		serializeAndDeserialize(null, null);
+		serializeAndDeserialize(null, User.SCHEMA$);
+		for (final Codec codec : Codec.values()) {
+			serializeAndDeserialize(codec, null);
+			serializeAndDeserialize(codec, User.SCHEMA$);
+		}
+	}
+
+	private void serializeAndDeserialize(final Codec codec, final Schema schema) throws IOException, ClassNotFoundException {
+		// given
+		final AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(User.class);
+		if (codec != null) {
+			outputFormat.setCodec(codec);
+		}
+		if (schema != null) {
+			outputFormat.setSchema(schema);
+		}
+
+		final ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+		// when
+		try (final ObjectOutputStream oos = new ObjectOutputStream(bos)) {
+			oos.writeObject(outputFormat);
+		}
+		try (final ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray()))) {
+			// then
+			Object o = ois.readObject();
+			assertTrue(o instanceof AvroOutputFormat);
+			final AvroOutputFormat<User> restored = (AvroOutputFormat<User>) o;
+			final Codec restoredCodec = (Codec) Whitebox.getInternalState(restored, "codec");
+			final Schema restoredSchema = (Schema) Whitebox.getInternalState(restored, "userDefinedSchema");
+
+			assertTrue(codec != null ? restoredCodec == codec : restoredCodec == null);
+			assertTrue(schema != null ? restoredSchema.equals(schema) : restoredSchema == null);
+		}
+	}
+
+	@Test
+	public void testCompression() throws Exception {
+		// given
+		final Path outputPath = new Path(File.createTempFile("avro-output-file","avro").getAbsolutePath());
+		final AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(outputPath,User.class);
+		outputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
+
+		final Path compressedOutputPath = new Path(File.createTempFile("avro-output-file","compressed.avro").getAbsolutePath());
+		final AvroOutputFormat<User> compressedOutputFormat = new AvroOutputFormat<>(compressedOutputPath,User.class);
+		compressedOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
+		compressedOutputFormat.setCodec(Codec.SNAPPY);
+
+		// when
+		output(outputFormat);
+		output(compressedOutputFormat);
+
+		// then
+		assertTrue(fileSize(outputPath) > fileSize(compressedOutputPath));
+
+		// cleanup
+		Files.delete(Paths.get(outputPath.getPath()));
+		Files.delete(Paths.get(compressedOutputPath.getPath()));
+	}
+
+	private long fileSize(Path path) throws IOException {
+		return Files.size(Paths.get(path.getPath()));
+	}
+
+	private void output(final AvroOutputFormat<User> outputFormat) throws IOException {
+		outputFormat.configure(new Configuration());
+		outputFormat.open(1,1);
+		for (int i = 0; i < 100; i++) {
+			outputFormat.writeRecord(new User("testUser",1,"blue"));
+		}
+		outputFormat.close();
+	}
+}