You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/10/12 16:35:58 UTC

[1/2] incubator-beam git commit: Add compression codec for AvroIO.Write

Repository: incubator-beam
Updated Branches:
  refs/heads/master 4f991fd82 -> 142229e37


Add compression codec for AvroIO.Write

BEHAVIOUR CHANGE: prior to this change Avro output would not use
compression. Starting from this commit, by default Avro output is
compressed using deflate codec (level 6).


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2b22d003
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2b22d003
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2b22d003

Branch: refs/heads/master
Commit: 2b22d003dabb7fddabdc8aaea872478fe13d407a
Parents: 4f991fd
Author: Rafal Wojdyla <ra...@spotify.com>
Authored: Mon Oct 3 14:02:59 2016 -0400
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Oct 12 09:35:23 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 158 ++++++++++++++++---
 .../sdk/io/SerializableAvroCodecFactory.java    | 112 +++++++++++++
 .../java/org/apache/beam/sdk/io/AvroIOTest.java | 107 ++++++++++++-
 .../io/SerializableAvroCodecFactoryTest.java    | 100 ++++++++++++
 4 files changed, 458 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b22d003/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 267265d..eeb4bb7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -27,6 +27,7 @@ import java.nio.channels.WritableByteChannel;
 import java.util.regex.Pattern;
 import javax.annotation.Nullable;
 import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.reflect.ReflectData;
@@ -443,6 +444,13 @@ public class AvroIO {
     }
 
     /**
+     * Returns a {@link PTransform} that writes Avro file(s) using specified codec.
+     */
+    public static Bound<GenericRecord> withCodec(CodecFactory codec) {
+      return new Bound<>(GenericRecord.class).withCodec(codec);
+    }
+
+    /**
      * A {@link PTransform} that writes a bounded {@link PCollection} to an Avro file (or
      * multiple Avro files matching a sharding pattern).
      *
@@ -450,6 +458,8 @@ public class AvroIO {
      */
     public static class Bound<T> extends PTransform<PCollection<T>, PDone> {
       private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
+      private static final SerializableAvroCodecFactory DEFAULT_CODEC =
+          new SerializableAvroCodecFactory(CodecFactory.deflateCodec(6));
 
       /** The filename to write to. */
       @Nullable
@@ -467,9 +477,23 @@ public class AvroIO {
       final Schema schema;
       /** An option to indicate if output validation is desired. Default is true. */
       final boolean validate;
+      /**
+       * The codec used to encode the blocks in the Avro file. String value drawn from those in
+       * https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html
+       */
+      final SerializableAvroCodecFactory codec;
 
       Bound(Class<T> type) {
-        this(null, null, "", 0, DEFAULT_SHARD_TEMPLATE, type, null, true);
+        this(
+            null,
+            null,
+            "",
+            0,
+            DEFAULT_SHARD_TEMPLATE,
+            type,
+            null,
+            true,
+            DEFAULT_CODEC);
       }
 
       Bound(
@@ -480,7 +504,8 @@ public class AvroIO {
           String shardTemplate,
           Class<T> type,
           Schema schema,
-          boolean validate) {
+          boolean validate,
+          SerializableAvroCodecFactory codec) {
         super(name);
         this.filenamePrefix = filenamePrefix;
         this.filenameSuffix = filenameSuffix;
@@ -489,6 +514,7 @@ public class AvroIO {
         this.type = type;
         this.schema = schema;
         this.validate = validate;
+        this.codec = codec;
       }
 
       /**
@@ -503,7 +529,15 @@ public class AvroIO {
       public Bound<T> to(String filenamePrefix) {
         validateOutputComponent(filenamePrefix);
         return new Bound<>(
-            name, filenamePrefix, filenameSuffix, numShards, shardTemplate, type, schema, validate);
+            name,
+            filenamePrefix,
+            filenameSuffix,
+            numShards,
+            shardTemplate,
+            type,
+            schema,
+            validate,
+            codec);
       }
 
       /**
@@ -517,7 +551,15 @@ public class AvroIO {
       public Bound<T> withSuffix(String filenameSuffix) {
         validateOutputComponent(filenameSuffix);
         return new Bound<>(
-            name, filenamePrefix, filenameSuffix, numShards, shardTemplate, type, schema, validate);
+            name,
+            filenamePrefix,
+            filenameSuffix,
+            numShards,
+            shardTemplate,
+            type,
+            schema,
+            validate,
+            codec);
       }
 
       /**
@@ -537,7 +579,15 @@ public class AvroIO {
       public Bound<T> withNumShards(int numShards) {
         checkArgument(numShards >= 0);
         return new Bound<>(
-            name, filenamePrefix, filenameSuffix, numShards, shardTemplate, type, schema, validate);
+            name,
+            filenamePrefix,
+            filenameSuffix,
+            numShards,
+            shardTemplate,
+            type,
+            schema,
+            validate,
+            codec);
       }
 
       /**
@@ -550,7 +600,15 @@ public class AvroIO {
        */
       public Bound<T> withShardNameTemplate(String shardTemplate) {
         return new Bound<>(
-            name, filenamePrefix, filenameSuffix, numShards, shardTemplate, type, schema, validate);
+            name,
+            filenamePrefix,
+            filenameSuffix,
+            numShards,
+            shardTemplate,
+            type,
+            schema,
+            validate,
+            codec);
       }
 
       /**
@@ -563,7 +621,16 @@ public class AvroIO {
        * <p>Does not modify this object.
        */
       public Bound<T> withoutSharding() {
-        return new Bound<>(name, filenamePrefix, filenameSuffix, 1, "", type, schema, validate);
+        return new Bound<>(
+            name,
+            filenamePrefix,
+            filenameSuffix,
+            1,
+            "",
+            type,
+            schema,
+            validate,
+            codec);
       }
 
       /**
@@ -584,7 +651,8 @@ public class AvroIO {
             shardTemplate,
             type,
             ReflectData.get().getSchema(type),
-            validate);
+            validate,
+            codec);
       }
 
       /**
@@ -603,7 +671,8 @@ public class AvroIO {
             shardTemplate,
             GenericRecord.class,
             schema,
-            validate);
+            validate,
+            codec);
       }
 
       /**
@@ -629,7 +698,34 @@ public class AvroIO {
        */
       public Bound<T> withoutValidation() {
         return new Bound<>(
-            name, filenamePrefix, filenameSuffix, numShards, shardTemplate, type, schema, false);
+            name,
+            filenamePrefix,
+            filenameSuffix,
+            numShards,
+            shardTemplate,
+            type,
+            schema,
+            false,
+            codec);
+      }
+
+      /**
+       * Returns a new {@link PTransform} that's like this one but
+       * that writes to Avro file(s) compressed using specified codec.
+       *
+       * <p>Does not modify this object.
+       */
+      public Bound<T> withCodec(CodecFactory codec) {
+        return new Bound<>(
+            name,
+            filenamePrefix,
+            filenameSuffix,
+            numShards,
+            shardTemplate,
+            type,
+            schema,
+            validate,
+            new SerializableAvroCodecFactory(codec));
       }
 
       @Override
@@ -645,7 +741,11 @@ public class AvroIO {
         org.apache.beam.sdk.io.Write.Bound<T> write =
             org.apache.beam.sdk.io.Write.to(
                 new AvroSink<>(
-                    filenamePrefix, filenameSuffix, shardTemplate, AvroCoder.of(type, schema)));
+                    filenamePrefix,
+                    filenameSuffix,
+                    shardTemplate,
+                    AvroCoder.of(type, schema),
+                    codec));
         if (getNumShards() > 0) {
           write = write.withNumShards(getNumShards());
         }
@@ -671,7 +771,10 @@ public class AvroIO {
                 0)
             .addIfNotDefault(DisplayData.item("validation", validate)
                 .withLabel("Validation Enabled"),
-                true);
+                true)
+            .addIfNotDefault(DisplayData.item("codec", codec.toString())
+                .withLabel("Avro Compression Codec"),
+                DEFAULT_CODEC.toString());
       }
 
       /**
@@ -713,6 +816,10 @@ public class AvroIO {
       public boolean needsValidation() {
         return validate;
       }
+
+      public CodecFactory getCodec() {
+        return codec.getCodec();
+      }
     }
 
     /** Disallow construction of utility class. */
@@ -741,17 +848,24 @@ public class AvroIO {
   @VisibleForTesting
   static class AvroSink<T> extends FileBasedSink<T> {
     private final AvroCoder<T> coder;
+    private final SerializableAvroCodecFactory codec;
 
     @VisibleForTesting
     AvroSink(
-        String baseOutputFilename, String extension, String fileNameTemplate, AvroCoder<T> coder) {
+        String baseOutputFilename,
+        String extension,
+        String fileNameTemplate,
+        AvroCoder<T> coder,
+        SerializableAvroCodecFactory codec) {
       super(baseOutputFilename, extension, fileNameTemplate);
       this.coder = coder;
+      this.codec = codec;
+
     }
 
     @Override
     public FileBasedSink.FileBasedWriteOperation<T> createWriteOperation(PipelineOptions options) {
-      return new AvroWriteOperation<>(this, coder);
+      return new AvroWriteOperation<>(this, coder, codec);
     }
 
     /**
@@ -760,15 +874,19 @@ public class AvroIO {
      */
     private static class AvroWriteOperation<T> extends FileBasedWriteOperation<T> {
       private final AvroCoder<T> coder;
+      private final SerializableAvroCodecFactory codec;
 
-      private AvroWriteOperation(AvroSink<T> sink, AvroCoder<T> coder) {
+      private AvroWriteOperation(AvroSink<T> sink,
+                                 AvroCoder<T> coder,
+                                 SerializableAvroCodecFactory codec) {
         super(sink);
         this.coder = coder;
+        this.codec = codec;
       }
 
       @Override
       public FileBasedWriter<T> createWriter(PipelineOptions options) throws Exception {
-        return new AvroWriter<>(this, coder);
+        return new AvroWriter<>(this, coder, codec);
       }
     }
 
@@ -779,17 +897,21 @@ public class AvroIO {
     private static class AvroWriter<T> extends FileBasedWriter<T> {
       private final AvroCoder<T> coder;
       private DataFileWriter<T> dataFileWriter;
+      private SerializableAvroCodecFactory codec;
 
-      public AvroWriter(FileBasedWriteOperation<T> writeOperation, AvroCoder<T> coder) {
+      public AvroWriter(FileBasedWriteOperation<T> writeOperation,
+                        AvroCoder<T> coder,
+                        SerializableAvroCodecFactory codec) {
         super(writeOperation);
         this.mimeType = MimeTypes.BINARY;
         this.coder = coder;
+        this.codec = codec;
       }
 
       @SuppressWarnings("deprecation") // uses internal test functionality.
       @Override
       protected void prepareWrite(WritableByteChannel channel) throws Exception {
-        dataFileWriter = new DataFileWriter<>(coder.createDatumWriter());
+        dataFileWriter = new DataFileWriter<>(coder.createDatumWriter()).setCodec(codec.getCodec());
         dataFileWriter.create(coder.getSchema(), Channels.newOutputStream(channel));
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b22d003/sdks/java/core/src/main/java/org/apache/beam/sdk/io/SerializableAvroCodecFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/SerializableAvroCodecFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/SerializableAvroCodecFactory.java
new file mode 100644
index 0000000..ce52e99
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/SerializableAvroCodecFactory.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.avro.file.DataFileConstants.BZIP2_CODEC;
+import static org.apache.avro.file.DataFileConstants.DEFLATE_CODEC;
+import static org.apache.avro.file.DataFileConstants.NULL_CODEC;
+import static org.apache.avro.file.DataFileConstants.SNAPPY_CODEC;
+import static org.apache.avro.file.DataFileConstants.XZ_CODEC;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.avro.file.CodecFactory;
+
+/**
+ * A wrapper allows  {@link org.apache.avro.file.CodecFactory}s to be serialized using Java's
+ * standard serialization mechanisms.
+ */
+class SerializableAvroCodecFactory implements Externalizable {
+  private static final long serialVersionUID = 7445324844109564303L;
+  private static final List<String> noOptAvroCodecs = Arrays.asList(NULL_CODEC,
+                                                                    SNAPPY_CODEC,
+                                                                    BZIP2_CODEC);
+  private static final Pattern deflatePattern = Pattern.compile(DEFLATE_CODEC + "-(?<level>-?\\d)");
+  private static final Pattern xzPattern = Pattern.compile(XZ_CODEC + "-(?<level>\\d)");
+
+  private CodecFactory codecFactory;
+
+  // For java.io.Serializable only
+  public SerializableAvroCodecFactory() {}
+
+  public SerializableAvroCodecFactory(CodecFactory codecFactory) {
+    checkNotNull(codecFactory, "Codec can't be null");
+    checkState(checkIsSupportedCodec(codecFactory), "%s is not supported", codecFactory);
+    this.codecFactory = codecFactory;
+  }
+
+  private boolean checkIsSupportedCodec(CodecFactory codecFactory) {
+    final String codecStr = codecFactory.toString();
+    return noOptAvroCodecs.contains(codecStr)
+        || deflatePattern.matcher(codecStr).matches()
+        || xzPattern.matcher(codecStr).matches();
+  }
+
+  @Override
+  public void writeExternal(ObjectOutput out) throws IOException {
+    out.writeUTF(codecFactory.toString());
+  }
+
+  @Override
+  public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+    final String codecStr = in.readUTF();
+
+    switch (codecStr) {
+      case NULL_CODEC:
+      case SNAPPY_CODEC:
+      case BZIP2_CODEC:
+        codecFactory = CodecFactory.fromString(codecStr);
+        return;
+    }
+
+    Matcher deflateMatcher = deflatePattern.matcher(codecStr);
+    if (deflateMatcher.find()) {
+      codecFactory = CodecFactory.deflateCodec(
+          Integer.parseInt(deflateMatcher.group("level")));
+      return;
+    }
+
+    Matcher xzMatcher = xzPattern.matcher(codecStr);
+    if (xzMatcher.find()) {
+      codecFactory = CodecFactory.xzCodec(
+          Integer.parseInt(xzMatcher.group("level")));
+      return;
+    }
+
+    throw new IllegalStateException(codecStr + " is not supported");
+  }
+
+  public CodecFactory getCodec() {
+    return codecFactory;
+  }
+
+  @Override
+  public String toString() {
+    checkNotNull(codecFactory, "Inner CodecFactory is null, please use non default constructor");
+    return codecFactory.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b22d003/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 81f05d7..1b1b1fa 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io;
 
+import static org.apache.avro.file.DataFileConstants.SNAPPY_CODEC;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.hasItem;
@@ -29,13 +30,17 @@ import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.reflect.Nullable;
 import org.apache.beam.sdk.coders.AvroCoder;
@@ -51,6 +56,7 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
@@ -146,6 +152,64 @@ public class AvroIOTest {
     p.run();
   }
 
+  @Test
+  @SuppressWarnings("unchecked")
+  @Category(NeedsRunner.class)
+  public void testAvroIOCompressedWriteAndReadASingleFile() throws Throwable {
+    TestPipeline p = TestPipeline.create();
+    List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"),
+        new GenericClass(5, "bar"));
+    File outputFile = tmpFolder.newFile("output.avro");
+
+    p.apply(Create.of(values))
+        .apply(AvroIO.Write.to(outputFile.getAbsolutePath())
+            .withoutSharding()
+            .withCodec(CodecFactory.deflateCodec(9))
+            .withSchema(GenericClass.class));
+    p.run();
+
+    p = TestPipeline.create();
+    PCollection<GenericClass> input = p
+        .apply(AvroIO.Read
+            .from(outputFile.getAbsolutePath())
+            .withSchema(GenericClass.class));
+
+    PAssert.that(input).containsInAnyOrder(values);
+    p.run();
+    DataFileStream dataFileStream = new DataFileStream(new FileInputStream(outputFile),
+        new GenericDatumReader());
+    assertEquals(dataFileStream.getMetaString("avro.codec"), "deflate");
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  @Category(NeedsRunner.class)
+  public void testAvroIONullCodecWriteAndReadASingleFile() throws Throwable {
+    TestPipeline p = TestPipeline.create();
+    List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"),
+        new GenericClass(5, "bar"));
+    File outputFile = tmpFolder.newFile("output.avro");
+
+    p.apply(Create.of(values))
+        .apply(AvroIO.Write.to(outputFile.getAbsolutePath())
+            .withoutSharding()
+            .withSchema(GenericClass.class)
+            .withCodec(CodecFactory.nullCodec()));
+    p.run();
+
+    p = TestPipeline.create();
+    PCollection<GenericClass> input = p
+        .apply(AvroIO.Read
+            .from(outputFile.getAbsolutePath())
+            .withSchema(GenericClass.class));
+
+    PAssert.that(input).containsInAnyOrder(values);
+    p.run();
+    DataFileStream dataFileStream = new DataFileStream(new FileInputStream(outputFile),
+        new GenericDatumReader());
+    assertEquals(dataFileStream.getMetaString("avro.codec"), "null");
+  }
+
   @DefaultCoder(AvroCoder.class)
   static class GenericClassV2 {
     int intField;
@@ -212,6 +276,45 @@ public class AvroIOTest {
     p.run();
   }
 
+  @Test
+  public void testWriteWithDefaultCodec() throws Exception {
+    AvroIO.Write.Bound<GenericRecord> write = AvroIO.Write
+        .to("gs://bucket/foo/baz");
+    assertEquals(write.getCodec().toString(), CodecFactory.deflateCodec(6).toString());
+  }
+
+  @Test
+  public void testWriteWithCustomCodec() throws Exception {
+    AvroIO.Write.Bound<GenericRecord> write = AvroIO.Write
+        .to("gs://bucket/foo/baz")
+        .withCodec(CodecFactory.snappyCodec());
+    assertEquals(write.getCodec().toString(), SNAPPY_CODEC);
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testWriteWithSerDeCustomDeflateCodec() throws Exception {
+    AvroIO.Write.Bound<GenericRecord> write = AvroIO.Write
+        .to("gs://bucket/foo/baz")
+        .withCodec(CodecFactory.deflateCodec(9));
+
+    AvroIO.Write.Bound<GenericRecord> serdeWrite = SerializableUtils.clone(write);
+
+    assertEquals(serdeWrite.getCodec().toString(), CodecFactory.deflateCodec(9).toString());
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testWriteWithSerDeCustomXZCodec() throws Exception {
+    AvroIO.Write.Bound<GenericRecord> write = AvroIO.Write
+        .to("gs://bucket/foo/baz")
+        .withCodec(CodecFactory.xzCodec(9));
+
+    AvroIO.Write.Bound<GenericRecord> serdeWrite = SerializableUtils.clone(write);
+
+    assertEquals(serdeWrite.getCodec().toString(), CodecFactory.xzCodec(9).toString());
+  }
+
   @SuppressWarnings("deprecation") // using AvroCoder#createDatumReader for tests.
   private void runTestWrite(String[] expectedElements, int numShards) throws IOException {
     File baseOutputFile = new File(tmpFolder.getRoot(), "prefix");
@@ -304,7 +407,8 @@ public class AvroIOTest {
         .withSuffix("bar")
         .withSchema(GenericClass.class)
         .withNumShards(100)
-        .withoutValidation();
+        .withoutValidation()
+        .withCodec(CodecFactory.snappyCodec());
 
     DisplayData displayData = DisplayData.from(write);
 
@@ -314,6 +418,7 @@ public class AvroIOTest {
     assertThat(displayData, hasDisplayItem("schema", GenericClass.class));
     assertThat(displayData, hasDisplayItem("numShards", 100));
     assertThat(displayData, hasDisplayItem("validation", false));
+    assertThat(displayData, hasDisplayItem("codec", CodecFactory.snappyCodec().toString()));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b22d003/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SerializableAvroCodecFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SerializableAvroCodecFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SerializableAvroCodecFactoryTest.java
new file mode 100644
index 0000000..3fe8740
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SerializableAvroCodecFactoryTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io;
+
+import static org.apache.avro.file.DataFileConstants.BZIP2_CODEC;
+import static org.apache.avro.file.DataFileConstants.DEFLATE_CODEC;
+import static org.apache.avro.file.DataFileConstants.NULL_CODEC;
+import static org.apache.avro.file.DataFileConstants.SNAPPY_CODEC;
+import static org.apache.avro.file.DataFileConstants.XZ_CODEC;
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.avro.file.CodecFactory;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests of SerializableAvroCodecFactory.
+ */
+@RunWith(JUnit4.class)
+public class SerializableAvroCodecFactoryTest {
+  private final List<String> avroCodecs = Arrays.asList(NULL_CODEC,
+      SNAPPY_CODEC,
+      DEFLATE_CODEC,
+      XZ_CODEC,
+      BZIP2_CODEC);
+
+  @Test
+  public void testDefaultCodecsIn() throws Exception {
+    for (String codec : avroCodecs) {
+      SerializableAvroCodecFactory codecFactory = new SerializableAvroCodecFactory(
+          CodecFactory.fromString(codec));
+
+      assertEquals(codecFactory.getCodec().toString(), (CodecFactory.fromString(codec).toString()));
+    }
+  }
+
+  @Test
+  public void testDefaultCodecsSerDe() throws Exception {
+    for (String codec : avroCodecs) {
+      SerializableAvroCodecFactory codecFactory = new SerializableAvroCodecFactory(
+          CodecFactory.fromString(codec));
+
+      SerializableAvroCodecFactory serdeC = SerializableUtils.clone(codecFactory);
+
+      assertEquals(serdeC.getCodec().toString(), CodecFactory.fromString(codec).toString());
+    }
+  }
+
+  @Test
+  public void testDeflateCodecSerDeWithLevels() throws Exception {
+    for (int i = 0; i < 10; ++i) {
+      SerializableAvroCodecFactory codecFactory = new SerializableAvroCodecFactory(
+          CodecFactory.deflateCodec(i));
+
+      SerializableAvroCodecFactory serdeC = SerializableUtils.clone(codecFactory);
+
+      assertEquals(serdeC.getCodec().toString(), CodecFactory.deflateCodec(i).toString());
+    }
+  }
+
+  @Test
+  public void testXZCodecSerDeWithLevels() throws Exception {
+    for (int i = 0; i < 10; ++i) {
+      SerializableAvroCodecFactory codecFactory = new SerializableAvroCodecFactory(
+          CodecFactory.xzCodec(i));
+
+      SerializableAvroCodecFactory serdeC = SerializableUtils.clone(codecFactory);
+
+      assertEquals(serdeC.getCodec().toString(), CodecFactory.xzCodec(i).toString());
+    }
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testNullCodecToString() throws Exception {
+    // use default CTR (available cause Serializable)
+    SerializableAvroCodecFactory codec = new SerializableAvroCodecFactory();
+    codec.toString();
+  }
+}
+


[2/2] incubator-beam git commit: Closes #1038

Posted by dh...@apache.org.
Closes #1038


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/142229e3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/142229e3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/142229e3

Branch: refs/heads/master
Commit: 142229e3719952451ef9a65c4cfc77c2d27520fd
Parents: 4f991fd 2b22d00
Author: Dan Halperin <dh...@google.com>
Authored: Wed Oct 12 09:35:49 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Oct 12 09:35:49 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 158 ++++++++++++++++---
 .../sdk/io/SerializableAvroCodecFactory.java    | 112 +++++++++++++
 .../java/org/apache/beam/sdk/io/AvroIOTest.java | 107 ++++++++++++-
 .../io/SerializableAvroCodecFactoryTest.java    | 100 ++++++++++++
 4 files changed, 458 insertions(+), 19 deletions(-)
----------------------------------------------------------------------