You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/04/26 17:40:09 UTC

[1/2] beam git commit: [BEAM-1871] Remove deprecated methods from AvroCoder

Repository: beam
Updated Branches:
  refs/heads/master 99cf72e7d -> c5cf90c70


[BEAM-1871] Remove deprecated methods from AvroCoder


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a503d349
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a503d349
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a503d349

Branch: refs/heads/master
Commit: a503d349d93bd3ef9077c0f4fae713dbb452f5d6
Parents: 99cf72e
Author: Luke Cwik <lc...@google.com>
Authored: Wed Apr 26 10:07:15 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Apr 26 10:32:44 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/coders/AvroCoder.java   | 55 +++++---------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 11 ++--
 .../java/org/apache/beam/sdk/io/AvroIOTest.java | 14 ++---
 .../org/apache/beam/sdk/io/AvroSourceTest.java  |  6 ++-
 4 files changed, 33 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a503d349/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
index 91822bf..d60a2ca 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.coders;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Supplier;
 import java.io.IOException;
 import java.io.InputStream;
@@ -224,17 +223,20 @@ public class AvroCoder<T> extends CustomCoder<T> {
       private final AvroCoder<T> myCoder = AvroCoder.this;
       @Override
       public DatumReader<T> initialValue() {
-        return myCoder.createDatumReader();
+        return myCoder.getType().equals(GenericRecord.class)
+            ? new GenericDatumReader<T>(myCoder.getSchema())
+            : new ReflectDatumReader<T>(myCoder.getSchema());
+      }
+    };
+    this.writer = new EmptyOnDeserializationThreadLocal<DatumWriter<T>>() {
+      private final AvroCoder<T> myCoder = AvroCoder.this;
+      @Override
+      public DatumWriter<T> initialValue() {
+        return myCoder.getType().equals(GenericRecord.class)
+            ? new GenericDatumWriter<T>(myCoder.getSchema())
+            : new ReflectDatumWriter<T>(myCoder.getSchema());
       }
     };
-    this.writer =
-        new EmptyOnDeserializationThreadLocal<DatumWriter<T>>() {
-          private final AvroCoder<T> myCoder = AvroCoder.this;
-          @Override
-          public DatumWriter<T> initialValue() {
-            return myCoder.createDatumWriter();
-          }
-        };
   }
 
   /**
@@ -314,39 +316,6 @@ public class AvroCoder<T> extends CustomCoder<T> {
   }
 
   /**
-   * Returns a new {@link DatumReader} that can be used to read from an Avro file directly. Assumes
-   * the schema used to read is the same as the schema that was used when writing.
-   *
-   * @deprecated For {@code AvroCoder} internal use only.
-   */
-  // TODO: once we can remove this deprecated function, inline in constructor.
-  @Deprecated
-  @VisibleForTesting
-  public DatumReader<T> createDatumReader() {
-    if (type.equals(GenericRecord.class)) {
-      return new GenericDatumReader<>(schemaSupplier.get());
-    } else {
-      return new ReflectDatumReader<>(schemaSupplier.get());
-    }
-  }
-
-  /**
-   * Returns a new {@link DatumWriter} that can be used to write to an Avro file directly.
-   *
-   * @deprecated For {@code AvroCoder} internal use only.
-   */
-  @Deprecated
-  @VisibleForTesting
-  // TODO: once we can remove this deprecated function, inline in constructor.
-  public DatumWriter<T> createDatumWriter() {
-    if (type.equals(GenericRecord.class)) {
-      return new GenericDatumWriter<>(schemaSupplier.get());
-    } else {
-      return new ReflectDatumWriter<>(schemaSupplier.get());
-    }
-  }
-
-  /**
    * Returns the schema used by this coder.
    */
   public Schema getSchema() {

http://git-wip-us.apache.org/repos/asf/beam/blob/a503d349/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 6a06972..24e158f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -23,18 +23,19 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.common.io.BaseEncoding;
-
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
 import java.util.Map;
 import java.util.regex.Pattern;
 import javax.annotation.Nullable;
-
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
 import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
@@ -1041,7 +1042,11 @@ public class AvroIO {
       @SuppressWarnings("deprecation") // uses internal test functionality.
       @Override
       protected void prepareWrite(WritableByteChannel channel) throws Exception {
-        dataFileWriter = new DataFileWriter<>(coder.createDatumWriter()).setCodec(codec.getCodec());
+        DatumWriter<T> datumWriter = coder.getType().equals(GenericRecord.class)
+            ? new GenericDatumWriter<T>(coder.getSchema())
+            : new ReflectDatumWriter<T>(coder.getSchema());
+
+        dataFileWriter = new DataFileWriter<>(datumWriter).setCodec(codec.getCodec());
         for (Map.Entry<String, Object> entry : metadata.entrySet()) {
           Object v = entry.getValue();
           if (v instanceof String) {

http://git-wip-us.apache.org/repos/asf/beam/blob/a503d349/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index a988d85..3e1c4b8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -31,7 +31,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
-
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
@@ -41,7 +40,6 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Random;
 import java.util.Set;
-
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileReader;
@@ -49,6 +47,8 @@ import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.reflect.Nullable;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.DefaultCoder;
 import org.apache.beam.sdk.io.AvroIO.Write.Bound;
@@ -321,7 +321,7 @@ public class AvroIOTest {
   public TestPipeline windowedAvroWritePipeline = TestPipeline.create();
 
   @Test
-  @Category({ValidatesRunner.class, UsesTestStream.class })
+  @Category({ValidatesRunner.class, UsesTestStream.class})
   public void testWindowedAvroIOWrite() throws Throwable {
     File baseOutputFile = new File(tmpFolder.getRoot(), "prefix");
     final String outputFilePrefix = baseOutputFile.getAbsolutePath();
@@ -393,8 +393,9 @@ public class AvroIOTest {
     for (File outputFile : expectedFiles) {
       assertTrue("Expected output file " + outputFile.getAbsolutePath(), outputFile.exists());
       try (DataFileReader<GenericClass> reader =
-               new DataFileReader<>(outputFile, AvroCoder.of(
-                   GenericClass.class).createDatumReader())) {
+               new DataFileReader<>(outputFile,
+                   new ReflectDatumReader<GenericClass>(
+                       ReflectData.get().getSchema(GenericClass.class)))) {
         Iterators.addAll(actualElements, reader);
       }
       outputFile.delete();
@@ -504,7 +505,8 @@ public class AvroIOTest {
     for (File outputFile : expectedFiles) {
       assertTrue("Expected output file " + outputFile.getName(), outputFile.exists());
       try (DataFileReader<String> reader =
-          new DataFileReader<>(outputFile, AvroCoder.of(String.class).createDatumReader())) {
+          new DataFileReader<>(outputFile,
+              new ReflectDatumReader(ReflectData.get().getSchema(String.class)))) {
         Iterators.addAll(actualElements, reader);
       }
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/a503d349/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
index a6b6db5..d6facba 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
@@ -42,11 +42,13 @@ import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileConstants;
 import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.reflect.AvroDefault;
 import org.apache.avro.reflect.Nullable;
 import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.DefaultCoder;
 import org.apache.beam.sdk.io.AvroSource.AvroMetadata;
@@ -98,7 +100,9 @@ public class AvroSourceTest {
     String path = tmpFile.toString();
 
     FileOutputStream os = new FileOutputStream(tmpFile);
-    DatumWriter<T> datumWriter = coder.createDatumWriter();
+    DatumWriter<T> datumWriter = coder.getType().equals(GenericRecord.class)
+        ? new GenericDatumWriter<T>(coder.getSchema())
+        : new ReflectDatumWriter<T>(coder.getSchema());
     try (DataFileWriter<T> writer = new DataFileWriter<>(datumWriter)) {
       writer.setCodec(CodecFactory.fromString(codec));
       writer.create(coder.getSchema(), os);


[2/2] beam git commit: [BEAM-1871] Remove deprecated methods from AvroCoder

Posted by lc...@apache.org.
[BEAM-1871] Remove deprecated methods from AvroCoder

This closes #2710


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c5cf90c7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c5cf90c7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c5cf90c7

Branch: refs/heads/master
Commit: c5cf90c7047c06f0d7c5e1d064843f1a26371dcd
Parents: 99cf72e a503d34
Author: Luke Cwik <lc...@google.com>
Authored: Wed Apr 26 10:39:59 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Apr 26 10:39:59 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/coders/AvroCoder.java   | 55 +++++---------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 11 ++--
 .../java/org/apache/beam/sdk/io/AvroIOTest.java | 14 ++---
 .../org/apache/beam/sdk/io/AvroSourceTest.java  |  6 ++-
 4 files changed, 33 insertions(+), 53 deletions(-)
----------------------------------------------------------------------