You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/10/12 16:35:58 UTC
[1/2] incubator-beam git commit: Add compression codec for
AvroIO.Write
Repository: incubator-beam
Updated Branches:
refs/heads/master 4f991fd82 -> 142229e37
Add compression codec for AvroIO.Write
BEHAVIOUR CHANGE: prior to this change Avro output would not use
compression. Starting from this commit, by default Avro output is
compressed using deflate codec (level 6).
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2b22d003
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2b22d003
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2b22d003
Branch: refs/heads/master
Commit: 2b22d003dabb7fddabdc8aaea872478fe13d407a
Parents: 4f991fd
Author: Rafal Wojdyla <ra...@spotify.com>
Authored: Mon Oct 3 14:02:59 2016 -0400
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Oct 12 09:35:23 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/AvroIO.java | 158 ++++++++++++++++---
.../sdk/io/SerializableAvroCodecFactory.java | 112 +++++++++++++
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 107 ++++++++++++-
.../io/SerializableAvroCodecFactoryTest.java | 100 ++++++++++++
4 files changed, 458 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b22d003/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 267265d..eeb4bb7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -27,6 +27,7 @@ import java.nio.channels.WritableByteChannel;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.reflect.ReflectData;
@@ -443,6 +444,13 @@ public class AvroIO {
}
/**
+ * Returns a {@link PTransform} that writes Avro file(s) using specified codec.
+ */
+ public static Bound<GenericRecord> withCodec(CodecFactory codec) {
+ return new Bound<>(GenericRecord.class).withCodec(codec);
+ }
+
+ /**
* A {@link PTransform} that writes a bounded {@link PCollection} to an Avro file (or
* multiple Avro files matching a sharding pattern).
*
@@ -450,6 +458,8 @@ public class AvroIO {
*/
public static class Bound<T> extends PTransform<PCollection<T>, PDone> {
private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
+ private static final SerializableAvroCodecFactory DEFAULT_CODEC =
+ new SerializableAvroCodecFactory(CodecFactory.deflateCodec(6));
/** The filename to write to. */
@Nullable
@@ -467,9 +477,23 @@ public class AvroIO {
final Schema schema;
/** An option to indicate if output validation is desired. Default is true. */
final boolean validate;
+ /**
+ * The codec used to encode the blocks in the Avro file. String value drawn from those in
+ * https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html
+ */
+ final SerializableAvroCodecFactory codec;
Bound(Class<T> type) {
- this(null, null, "", 0, DEFAULT_SHARD_TEMPLATE, type, null, true);
+ this(
+ null,
+ null,
+ "",
+ 0,
+ DEFAULT_SHARD_TEMPLATE,
+ type,
+ null,
+ true,
+ DEFAULT_CODEC);
}
Bound(
@@ -480,7 +504,8 @@ public class AvroIO {
String shardTemplate,
Class<T> type,
Schema schema,
- boolean validate) {
+ boolean validate,
+ SerializableAvroCodecFactory codec) {
super(name);
this.filenamePrefix = filenamePrefix;
this.filenameSuffix = filenameSuffix;
@@ -489,6 +514,7 @@ public class AvroIO {
this.type = type;
this.schema = schema;
this.validate = validate;
+ this.codec = codec;
}
/**
@@ -503,7 +529,15 @@ public class AvroIO {
public Bound<T> to(String filenamePrefix) {
validateOutputComponent(filenamePrefix);
return new Bound<>(
- name, filenamePrefix, filenameSuffix, numShards, shardTemplate, type, schema, validate);
+ name,
+ filenamePrefix,
+ filenameSuffix,
+ numShards,
+ shardTemplate,
+ type,
+ schema,
+ validate,
+ codec);
}
/**
@@ -517,7 +551,15 @@ public class AvroIO {
public Bound<T> withSuffix(String filenameSuffix) {
validateOutputComponent(filenameSuffix);
return new Bound<>(
- name, filenamePrefix, filenameSuffix, numShards, shardTemplate, type, schema, validate);
+ name,
+ filenamePrefix,
+ filenameSuffix,
+ numShards,
+ shardTemplate,
+ type,
+ schema,
+ validate,
+ codec);
}
/**
@@ -537,7 +579,15 @@ public class AvroIO {
public Bound<T> withNumShards(int numShards) {
checkArgument(numShards >= 0);
return new Bound<>(
- name, filenamePrefix, filenameSuffix, numShards, shardTemplate, type, schema, validate);
+ name,
+ filenamePrefix,
+ filenameSuffix,
+ numShards,
+ shardTemplate,
+ type,
+ schema,
+ validate,
+ codec);
}
/**
@@ -550,7 +600,15 @@ public class AvroIO {
*/
public Bound<T> withShardNameTemplate(String shardTemplate) {
return new Bound<>(
- name, filenamePrefix, filenameSuffix, numShards, shardTemplate, type, schema, validate);
+ name,
+ filenamePrefix,
+ filenameSuffix,
+ numShards,
+ shardTemplate,
+ type,
+ schema,
+ validate,
+ codec);
}
/**
@@ -563,7 +621,16 @@ public class AvroIO {
* <p>Does not modify this object.
*/
public Bound<T> withoutSharding() {
- return new Bound<>(name, filenamePrefix, filenameSuffix, 1, "", type, schema, validate);
+ return new Bound<>(
+ name,
+ filenamePrefix,
+ filenameSuffix,
+ 1,
+ "",
+ type,
+ schema,
+ validate,
+ codec);
}
/**
@@ -584,7 +651,8 @@ public class AvroIO {
shardTemplate,
type,
ReflectData.get().getSchema(type),
- validate);
+ validate,
+ codec);
}
/**
@@ -603,7 +671,8 @@ public class AvroIO {
shardTemplate,
GenericRecord.class,
schema,
- validate);
+ validate,
+ codec);
}
/**
@@ -629,7 +698,34 @@ public class AvroIO {
*/
public Bound<T> withoutValidation() {
return new Bound<>(
- name, filenamePrefix, filenameSuffix, numShards, shardTemplate, type, schema, false);
+ name,
+ filenamePrefix,
+ filenameSuffix,
+ numShards,
+ shardTemplate,
+ type,
+ schema,
+ false,
+ codec);
+ }
+
+ /**
+ * Returns a new {@link PTransform} that's like this one but
+ * that writes to Avro file(s) compressed using specified codec.
+ *
+ * <p>Does not modify this object.
+ */
+ public Bound<T> withCodec(CodecFactory codec) {
+ return new Bound<>(
+ name,
+ filenamePrefix,
+ filenameSuffix,
+ numShards,
+ shardTemplate,
+ type,
+ schema,
+ validate,
+ new SerializableAvroCodecFactory(codec));
}
@Override
@@ -645,7 +741,11 @@ public class AvroIO {
org.apache.beam.sdk.io.Write.Bound<T> write =
org.apache.beam.sdk.io.Write.to(
new AvroSink<>(
- filenamePrefix, filenameSuffix, shardTemplate, AvroCoder.of(type, schema)));
+ filenamePrefix,
+ filenameSuffix,
+ shardTemplate,
+ AvroCoder.of(type, schema),
+ codec));
if (getNumShards() > 0) {
write = write.withNumShards(getNumShards());
}
@@ -671,7 +771,10 @@ public class AvroIO {
0)
.addIfNotDefault(DisplayData.item("validation", validate)
.withLabel("Validation Enabled"),
- true);
+ true)
+ .addIfNotDefault(DisplayData.item("codec", codec.toString())
+ .withLabel("Avro Compression Codec"),
+ DEFAULT_CODEC.toString());
}
/**
@@ -713,6 +816,10 @@ public class AvroIO {
public boolean needsValidation() {
return validate;
}
+
+ public CodecFactory getCodec() {
+ return codec.getCodec();
+ }
}
/** Disallow construction of utility class. */
@@ -741,17 +848,24 @@ public class AvroIO {
@VisibleForTesting
static class AvroSink<T> extends FileBasedSink<T> {
private final AvroCoder<T> coder;
+ private final SerializableAvroCodecFactory codec;
@VisibleForTesting
AvroSink(
- String baseOutputFilename, String extension, String fileNameTemplate, AvroCoder<T> coder) {
+ String baseOutputFilename,
+ String extension,
+ String fileNameTemplate,
+ AvroCoder<T> coder,
+ SerializableAvroCodecFactory codec) {
super(baseOutputFilename, extension, fileNameTemplate);
this.coder = coder;
+ this.codec = codec;
+
}
@Override
public FileBasedSink.FileBasedWriteOperation<T> createWriteOperation(PipelineOptions options) {
- return new AvroWriteOperation<>(this, coder);
+ return new AvroWriteOperation<>(this, coder, codec);
}
/**
@@ -760,15 +874,19 @@ public class AvroIO {
*/
private static class AvroWriteOperation<T> extends FileBasedWriteOperation<T> {
private final AvroCoder<T> coder;
+ private final SerializableAvroCodecFactory codec;
- private AvroWriteOperation(AvroSink<T> sink, AvroCoder<T> coder) {
+ private AvroWriteOperation(AvroSink<T> sink,
+ AvroCoder<T> coder,
+ SerializableAvroCodecFactory codec) {
super(sink);
this.coder = coder;
+ this.codec = codec;
}
@Override
public FileBasedWriter<T> createWriter(PipelineOptions options) throws Exception {
- return new AvroWriter<>(this, coder);
+ return new AvroWriter<>(this, coder, codec);
}
}
@@ -779,17 +897,21 @@ public class AvroIO {
private static class AvroWriter<T> extends FileBasedWriter<T> {
private final AvroCoder<T> coder;
private DataFileWriter<T> dataFileWriter;
+ private SerializableAvroCodecFactory codec;
- public AvroWriter(FileBasedWriteOperation<T> writeOperation, AvroCoder<T> coder) {
+ public AvroWriter(FileBasedWriteOperation<T> writeOperation,
+ AvroCoder<T> coder,
+ SerializableAvroCodecFactory codec) {
super(writeOperation);
this.mimeType = MimeTypes.BINARY;
this.coder = coder;
+ this.codec = codec;
}
@SuppressWarnings("deprecation") // uses internal test functionality.
@Override
protected void prepareWrite(WritableByteChannel channel) throws Exception {
- dataFileWriter = new DataFileWriter<>(coder.createDatumWriter());
+ dataFileWriter = new DataFileWriter<>(coder.createDatumWriter()).setCodec(codec.getCodec());
dataFileWriter.create(coder.getSchema(), Channels.newOutputStream(channel));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b22d003/sdks/java/core/src/main/java/org/apache/beam/sdk/io/SerializableAvroCodecFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/SerializableAvroCodecFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/SerializableAvroCodecFactory.java
new file mode 100644
index 0000000..ce52e99
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/SerializableAvroCodecFactory.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.avro.file.DataFileConstants.BZIP2_CODEC;
+import static org.apache.avro.file.DataFileConstants.DEFLATE_CODEC;
+import static org.apache.avro.file.DataFileConstants.NULL_CODEC;
+import static org.apache.avro.file.DataFileConstants.SNAPPY_CODEC;
+import static org.apache.avro.file.DataFileConstants.XZ_CODEC;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.avro.file.CodecFactory;
+
+/**
+ * A wrapper allows {@link org.apache.avro.file.CodecFactory}s to be serialized using Java's
+ * standard serialization mechanisms.
+ */
+class SerializableAvroCodecFactory implements Externalizable {
+ private static final long serialVersionUID = 7445324844109564303L;
+ private static final List<String> noOptAvroCodecs = Arrays.asList(NULL_CODEC,
+ SNAPPY_CODEC,
+ BZIP2_CODEC);
+ private static final Pattern deflatePattern = Pattern.compile(DEFLATE_CODEC + "-(?<level>-?\\d)");
+ private static final Pattern xzPattern = Pattern.compile(XZ_CODEC + "-(?<level>\\d)");
+
+ private CodecFactory codecFactory;
+
+ // For java.io.Serializable only
+ public SerializableAvroCodecFactory() {}
+
+ public SerializableAvroCodecFactory(CodecFactory codecFactory) {
+ checkNotNull(codecFactory, "Codec can't be null");
+ checkState(checkIsSupportedCodec(codecFactory), "%s is not supported", codecFactory);
+ this.codecFactory = codecFactory;
+ }
+
+ private boolean checkIsSupportedCodec(CodecFactory codecFactory) {
+ final String codecStr = codecFactory.toString();
+ return noOptAvroCodecs.contains(codecStr)
+ || deflatePattern.matcher(codecStr).matches()
+ || xzPattern.matcher(codecStr).matches();
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeUTF(codecFactory.toString());
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ final String codecStr = in.readUTF();
+
+ switch (codecStr) {
+ case NULL_CODEC:
+ case SNAPPY_CODEC:
+ case BZIP2_CODEC:
+ codecFactory = CodecFactory.fromString(codecStr);
+ return;
+ }
+
+ Matcher deflateMatcher = deflatePattern.matcher(codecStr);
+ if (deflateMatcher.find()) {
+ codecFactory = CodecFactory.deflateCodec(
+ Integer.parseInt(deflateMatcher.group("level")));
+ return;
+ }
+
+ Matcher xzMatcher = xzPattern.matcher(codecStr);
+ if (xzMatcher.find()) {
+ codecFactory = CodecFactory.xzCodec(
+ Integer.parseInt(xzMatcher.group("level")));
+ return;
+ }
+
+ throw new IllegalStateException(codecStr + " is not supported");
+ }
+
+ public CodecFactory getCodec() {
+ return codecFactory;
+ }
+
+ @Override
+ public String toString() {
+ checkNotNull(codecFactory, "Inner CodecFactory is null, please use non default constructor");
+ return codecFactory.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b22d003/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 81f05d7..1b1b1fa 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io;
+import static org.apache.avro.file.DataFileConstants.SNAPPY_CODEC;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.hasItem;
@@ -29,13 +30,17 @@ import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.reflect.Nullable;
import org.apache.beam.sdk.coders.AvroCoder;
@@ -51,6 +56,7 @@ import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.PCollection;
import org.junit.BeforeClass;
import org.junit.Ignore;
@@ -146,6 +152,64 @@ public class AvroIOTest {
p.run();
}
+ @Test
+ @SuppressWarnings("unchecked")
+ @Category(NeedsRunner.class)
+ public void testAvroIOCompressedWriteAndReadASingleFile() throws Throwable {
+ TestPipeline p = TestPipeline.create();
+ List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"),
+ new GenericClass(5, "bar"));
+ File outputFile = tmpFolder.newFile("output.avro");
+
+ p.apply(Create.of(values))
+ .apply(AvroIO.Write.to(outputFile.getAbsolutePath())
+ .withoutSharding()
+ .withCodec(CodecFactory.deflateCodec(9))
+ .withSchema(GenericClass.class));
+ p.run();
+
+ p = TestPipeline.create();
+ PCollection<GenericClass> input = p
+ .apply(AvroIO.Read
+ .from(outputFile.getAbsolutePath())
+ .withSchema(GenericClass.class));
+
+ PAssert.that(input).containsInAnyOrder(values);
+ p.run();
+ DataFileStream dataFileStream = new DataFileStream(new FileInputStream(outputFile),
+ new GenericDatumReader());
+ assertEquals(dataFileStream.getMetaString("avro.codec"), "deflate");
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ @Category(NeedsRunner.class)
+ public void testAvroIONullCodecWriteAndReadASingleFile() throws Throwable {
+ TestPipeline p = TestPipeline.create();
+ List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"),
+ new GenericClass(5, "bar"));
+ File outputFile = tmpFolder.newFile("output.avro");
+
+ p.apply(Create.of(values))
+ .apply(AvroIO.Write.to(outputFile.getAbsolutePath())
+ .withoutSharding()
+ .withSchema(GenericClass.class)
+ .withCodec(CodecFactory.nullCodec()));
+ p.run();
+
+ p = TestPipeline.create();
+ PCollection<GenericClass> input = p
+ .apply(AvroIO.Read
+ .from(outputFile.getAbsolutePath())
+ .withSchema(GenericClass.class));
+
+ PAssert.that(input).containsInAnyOrder(values);
+ p.run();
+ DataFileStream dataFileStream = new DataFileStream(new FileInputStream(outputFile),
+ new GenericDatumReader());
+ assertEquals(dataFileStream.getMetaString("avro.codec"), "null");
+ }
+
@DefaultCoder(AvroCoder.class)
static class GenericClassV2 {
int intField;
@@ -212,6 +276,45 @@ public class AvroIOTest {
p.run();
}
+ @Test
+ public void testWriteWithDefaultCodec() throws Exception {
+ AvroIO.Write.Bound<GenericRecord> write = AvroIO.Write
+ .to("gs://bucket/foo/baz");
+ assertEquals(write.getCodec().toString(), CodecFactory.deflateCodec(6).toString());
+ }
+
+ @Test
+ public void testWriteWithCustomCodec() throws Exception {
+ AvroIO.Write.Bound<GenericRecord> write = AvroIO.Write
+ .to("gs://bucket/foo/baz")
+ .withCodec(CodecFactory.snappyCodec());
+ assertEquals(write.getCodec().toString(), SNAPPY_CODEC);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testWriteWithSerDeCustomDeflateCodec() throws Exception {
+ AvroIO.Write.Bound<GenericRecord> write = AvroIO.Write
+ .to("gs://bucket/foo/baz")
+ .withCodec(CodecFactory.deflateCodec(9));
+
+ AvroIO.Write.Bound<GenericRecord> serdeWrite = SerializableUtils.clone(write);
+
+ assertEquals(serdeWrite.getCodec().toString(), CodecFactory.deflateCodec(9).toString());
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testWriteWithSerDeCustomXZCodec() throws Exception {
+ AvroIO.Write.Bound<GenericRecord> write = AvroIO.Write
+ .to("gs://bucket/foo/baz")
+ .withCodec(CodecFactory.xzCodec(9));
+
+ AvroIO.Write.Bound<GenericRecord> serdeWrite = SerializableUtils.clone(write);
+
+ assertEquals(serdeWrite.getCodec().toString(), CodecFactory.xzCodec(9).toString());
+ }
+
@SuppressWarnings("deprecation") // using AvroCoder#createDatumReader for tests.
private void runTestWrite(String[] expectedElements, int numShards) throws IOException {
File baseOutputFile = new File(tmpFolder.getRoot(), "prefix");
@@ -304,7 +407,8 @@ public class AvroIOTest {
.withSuffix("bar")
.withSchema(GenericClass.class)
.withNumShards(100)
- .withoutValidation();
+ .withoutValidation()
+ .withCodec(CodecFactory.snappyCodec());
DisplayData displayData = DisplayData.from(write);
@@ -314,6 +418,7 @@ public class AvroIOTest {
assertThat(displayData, hasDisplayItem("schema", GenericClass.class));
assertThat(displayData, hasDisplayItem("numShards", 100));
assertThat(displayData, hasDisplayItem("validation", false));
+ assertThat(displayData, hasDisplayItem("codec", CodecFactory.snappyCodec().toString()));
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b22d003/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SerializableAvroCodecFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SerializableAvroCodecFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SerializableAvroCodecFactoryTest.java
new file mode 100644
index 0000000..3fe8740
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SerializableAvroCodecFactoryTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io;
+
+import static org.apache.avro.file.DataFileConstants.BZIP2_CODEC;
+import static org.apache.avro.file.DataFileConstants.DEFLATE_CODEC;
+import static org.apache.avro.file.DataFileConstants.NULL_CODEC;
+import static org.apache.avro.file.DataFileConstants.SNAPPY_CODEC;
+import static org.apache.avro.file.DataFileConstants.XZ_CODEC;
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.avro.file.CodecFactory;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests of SerializableAvroCodecFactory.
+ */
+@RunWith(JUnit4.class)
+public class SerializableAvroCodecFactoryTest {
+ private final List<String> avroCodecs = Arrays.asList(NULL_CODEC,
+ SNAPPY_CODEC,
+ DEFLATE_CODEC,
+ XZ_CODEC,
+ BZIP2_CODEC);
+
+ @Test
+ public void testDefaultCodecsIn() throws Exception {
+ for (String codec : avroCodecs) {
+ SerializableAvroCodecFactory codecFactory = new SerializableAvroCodecFactory(
+ CodecFactory.fromString(codec));
+
+ assertEquals(codecFactory.getCodec().toString(), (CodecFactory.fromString(codec).toString()));
+ }
+ }
+
+ @Test
+ public void testDefaultCodecsSerDe() throws Exception {
+ for (String codec : avroCodecs) {
+ SerializableAvroCodecFactory codecFactory = new SerializableAvroCodecFactory(
+ CodecFactory.fromString(codec));
+
+ SerializableAvroCodecFactory serdeC = SerializableUtils.clone(codecFactory);
+
+ assertEquals(serdeC.getCodec().toString(), CodecFactory.fromString(codec).toString());
+ }
+ }
+
+ @Test
+ public void testDeflateCodecSerDeWithLevels() throws Exception {
+ for (int i = 0; i < 10; ++i) {
+ SerializableAvroCodecFactory codecFactory = new SerializableAvroCodecFactory(
+ CodecFactory.deflateCodec(i));
+
+ SerializableAvroCodecFactory serdeC = SerializableUtils.clone(codecFactory);
+
+ assertEquals(serdeC.getCodec().toString(), CodecFactory.deflateCodec(i).toString());
+ }
+ }
+
+ @Test
+ public void testXZCodecSerDeWithLevels() throws Exception {
+ for (int i = 0; i < 10; ++i) {
+ SerializableAvroCodecFactory codecFactory = new SerializableAvroCodecFactory(
+ CodecFactory.xzCodec(i));
+
+ SerializableAvroCodecFactory serdeC = SerializableUtils.clone(codecFactory);
+
+ assertEquals(serdeC.getCodec().toString(), CodecFactory.xzCodec(i).toString());
+ }
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testNullCodecToString() throws Exception {
+ // use default CTR (available cause Serializable)
+ SerializableAvroCodecFactory codec = new SerializableAvroCodecFactory();
+ codec.toString();
+ }
+}
+
[2/2] incubator-beam git commit: Closes #1038
Posted by dh...@apache.org.
Closes #1038
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/142229e3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/142229e3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/142229e3
Branch: refs/heads/master
Commit: 142229e3719952451ef9a65c4cfc77c2d27520fd
Parents: 4f991fd 2b22d00
Author: Dan Halperin <dh...@google.com>
Authored: Wed Oct 12 09:35:49 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Oct 12 09:35:49 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/AvroIO.java | 158 ++++++++++++++++---
.../sdk/io/SerializableAvroCodecFactory.java | 112 +++++++++++++
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 107 ++++++++++++-
.../io/SerializableAvroCodecFactoryTest.java | 100 ++++++++++++
4 files changed, 458 insertions(+), 19 deletions(-)
----------------------------------------------------------------------