You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mi...@apache.org on 2019/12/02 19:07:05 UTC

[beam] branch release-2.17.0 updated: [BEAM-8819] Fix AvroCoder serialisation by introducing AvroGenericCoder

This is an automated email from the ASF dual-hosted git repository.

mikhail pushed a commit to branch release-2.17.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.17.0 by this push:
     new 01925d2  [BEAM-8819] Fix AvroCoder serialisation by introducing AvroGenericCoder
     new 014c740  Merge pull request #10241 from mxm/release-2.17.0
01925d2 is described below

commit 01925d238144b3c16212fd11c48dcca018eb7309
Author: Piotr Szczepanik <pi...@allegro.pl>
AuthorDate: Thu Nov 28 14:34:49 2019 +0100

    [BEAM-8819] Fix AvroCoder serialisation by introducing AvroGenericCoder
    
    Backport of #10218
---
 ...gistrar.java => AvroGenericCoderRegistrar.java} | 12 +++++-----
 ...slator.java => AvroGenericCoderTranslator.java} | 14 +++++------
 .../core/construction/CoderTranslationTest.java    |  5 ++++
 .../java/org/apache/beam/sdk/coders/AvroCoder.java |  7 +++---
 .../apache/beam/sdk/coders/AvroGenericCoder.java   | 28 +++++++---------------
 sdks/python/apache_beam/coders/avro_record.py      |  2 +-
 sdks/python/apache_beam/coders/coders.py           | 12 +++++-----
 sdks/python/apache_beam/coders/coders_test.py      |  2 +-
 .../apache_beam/coders/coders_test_common.py       |  2 +-
 .../io/external/xlang_parquetio_test.py            |  2 +-
 10 files changed, 40 insertions(+), 46 deletions(-)

diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroCoderRegistrar.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroGenericCoderRegistrar.java
similarity index 76%
rename from runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroCoderRegistrar.java
rename to runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroGenericCoderRegistrar.java
index 565bdbf..0cfda94 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroCoderRegistrar.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroGenericCoderRegistrar.java
@@ -19,22 +19,22 @@ package org.apache.beam.runners.core.construction;
 
 import com.google.auto.service.AutoService;
 import java.util.Map;
-import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.AvroGenericCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 
-/** Coder registrar for AvroCoder. */
+/** Coder registrar for AvroGenericCoder. */
 @AutoService(CoderTranslatorRegistrar.class)
-public class AvroCoderRegistrar implements CoderTranslatorRegistrar {
-  public static final String AVRO_CODER_URN = "beam:coder:avro:v1";
+public class AvroGenericCoderRegistrar implements CoderTranslatorRegistrar {
+  public static final String AVRO_CODER_URN = "beam:coder:avro:generic:v1";
 
   @Override
   public Map<Class<? extends Coder>, String> getCoderURNs() {
-    return ImmutableMap.of(AvroCoder.class, AVRO_CODER_URN);
+    return ImmutableMap.of(AvroGenericCoder.class, AVRO_CODER_URN);
   }
 
   @Override
   public Map<Class<? extends Coder>, CoderTranslator<? extends Coder>> getCoderTranslators() {
-    return ImmutableMap.of(AvroCoder.class, new AvroCoderTranslator());
+    return ImmutableMap.of(AvroGenericCoder.class, new AvroGenericCoderTranslator());
   }
 }
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroCoderTranslator.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroGenericCoderTranslator.java
similarity index 74%
copy from runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroCoderTranslator.java
copy to runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroGenericCoderTranslator.java
index 93fca3d..564a8d3 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroCoderTranslator.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroGenericCoderTranslator.java
@@ -20,25 +20,25 @@ package org.apache.beam.runners.core.construction;
 import java.util.Collections;
 import java.util.List;
 import org.apache.avro.Schema;
-import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.AvroGenericCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
 
-/** Coder translator for AvroCoder. */
-public class AvroCoderTranslator implements CoderTranslator<AvroCoder<?>> {
+/** Coder translator for AvroGenericCoder. */
+public class AvroGenericCoderTranslator implements CoderTranslator<AvroGenericCoder> {
   @Override
-  public List<? extends Coder<?>> getComponents(AvroCoder from) {
+  public List<? extends Coder<?>> getComponents(AvroGenericCoder from) {
     return Collections.emptyList();
   }
 
   @Override
-  public byte[] getPayload(AvroCoder from) {
+  public byte[] getPayload(AvroGenericCoder from) {
     return from.getSchema().toString().getBytes(Charsets.UTF_8);
   }
 
   @Override
-  public AvroCoder fromComponents(List<Coder<?>> components, byte[] payload) {
+  public AvroGenericCoder fromComponents(List<Coder<?>> components, byte[] payload) {
     Schema schema = new Schema.Parser().parse(new String(payload, Charsets.UTF_8));
-    return AvroCoder.of(schema);
+    return AvroGenericCoder.of(schema);
   }
 }
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java
index dc28b79..20b276a 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java
@@ -132,6 +132,11 @@ public class CoderTranslationTest {
               KvCoder.of(
                   new RecordCoder(),
                   AvroCoder.of(SchemaBuilder.record("record").fields().endRecord())))
+          .add(
+              StringUtf8Coder.of(),
+              SerializableCoder.of(Record.class),
+              new RecordCoder(),
+              KvCoder.of(new RecordCoder(), AvroCoder.of(Record.class)))
           .build();
     }
 
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
index b044165..27863ce 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
@@ -122,10 +122,11 @@ public class AvroCoder<T> extends CustomCoder<T> {
   }
 
   /**
-   * Returns an {@code AvroCoder} instance for the Avro schema. The implicit type is GenericRecord.
+   * Returns an {@code AvroGenericCoder} instance for the Avro schema. The implicit type is
+   * GenericRecord.
    */
-  public static AvroCoder<GenericRecord> of(Schema schema) {
-    return new AvroCoder<>(GenericRecord.class, schema);
+  public static AvroGenericCoder of(Schema schema) {
+    return AvroGenericCoder.of(schema);
   }
 
   /**
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroCoderTranslator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroGenericCoder.java
similarity index 50%
rename from runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroCoderTranslator.java
rename to sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroGenericCoder.java
index 93fca3d..be726cc 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroCoderTranslator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroGenericCoder.java
@@ -15,30 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.core.construction;
+package org.apache.beam.sdk.coders;
 
-import java.util.Collections;
-import java.util.List;
 import org.apache.avro.Schema;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
+import org.apache.avro.generic.GenericRecord;
 
-/** Coder translator for AvroCoder. */
-public class AvroCoderTranslator implements CoderTranslator<AvroCoder<?>> {
-  @Override
-  public List<? extends Coder<?>> getComponents(AvroCoder from) {
-    return Collections.emptyList();
+/** AvroCoder specialisation for GenericRecord. */
+public class AvroGenericCoder extends AvroCoder<GenericRecord> {
+  AvroGenericCoder(Schema schema) {
+    super(GenericRecord.class, schema);
   }
 
-  @Override
-  public byte[] getPayload(AvroCoder from) {
-    return from.getSchema().toString().getBytes(Charsets.UTF_8);
-  }
-
-  @Override
-  public AvroCoder fromComponents(List<Coder<?>> components, byte[] payload) {
-    Schema schema = new Schema.Parser().parse(new String(payload, Charsets.UTF_8));
-    return AvroCoder.of(schema);
+  public static AvroGenericCoder of(Schema schema) {
+    return new AvroGenericCoder(schema);
   }
 }
diff --git a/sdks/python/apache_beam/coders/avro_record.py b/sdks/python/apache_beam/coders/avro_record.py
index e65057b..a5b8b60 100644
--- a/sdks/python/apache_beam/coders/avro_record.py
+++ b/sdks/python/apache_beam/coders/avro_record.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 #
 
-"""AvroRecord for AvroCoder."""
+"""AvroRecord for AvroGenericCoder."""
 
 from __future__ import absolute_import
 
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index 35020b6..449e959 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -60,7 +60,7 @@ except ImportError:
 
 __all__ = [
     'Coder',
-    'AvroCoder', 'BooleanCoder', 'BytesCoder', 'DillCoder',
+    'AvroGenericCoder', 'BooleanCoder', 'BytesCoder', 'DillCoder',
     'FastPrimitivesCoder', 'FloatCoder', 'IterableCoder', 'PickleCoder',
     'ProtoCoder', 'SingletonCoder', 'StrUtf8Coder', 'TimestampCoder',
     'TupleCoder', 'TupleSequenceCoder', 'VarIntCoder',
@@ -819,10 +819,10 @@ class DeterministicProtoCoder(ProtoCoder):
     return self
 
 
-AVRO_CODER_URN = "beam:coder:avro:v1"
+AVRO_GENERIC_CODER_URN = "beam:coder:avro:generic:v1"
 
 
-class AvroCoder(FastCoder):
+class AvroGenericCoder(FastCoder):
   """A coder used for AvroRecord values."""
 
   def __init__(self, schema):
@@ -846,11 +846,11 @@ class AvroCoder(FastCoder):
     return AvroRecord
 
   def to_runner_api_parameter(self, context):
-    return AVRO_CODER_URN, self.schema, ()
+    return AVRO_GENERIC_CODER_URN, self.schema, ()
 
-  @Coder.register_urn(AVRO_CODER_URN, bytes)
+  @Coder.register_urn(AVRO_GENERIC_CODER_URN, bytes)
   def from_runner_api_parameter(payload, unused_components, unused_context):
-    return AvroCoder(payload)
+    return AvroGenericCoder(payload)
 
 
 class TupleCoder(FastCoder):
diff --git a/sdks/python/apache_beam/coders/coders_test.py b/sdks/python/apache_beam/coders/coders_test.py
index 74d1dce..9b39962 100644
--- a/sdks/python/apache_beam/coders/coders_test.py
+++ b/sdks/python/apache_beam/coders/coders_test.py
@@ -113,7 +113,7 @@ class DeterministicProtoCoderTest(unittest.TestCase):
       self.assertEqual(coder.encode(mm_forward), coder.encode(mm_reverse))
 
 
-class AvroTestCoder(coders.AvroCoder):
+class AvroTestCoder(coders.AvroGenericCoder):
   SCHEMA = """
   {
     "type": "record", "name": "testrecord",
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index 1b40b64..d6cb27b 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -67,7 +67,7 @@ class CodersTest(unittest.TestCase):
                    if isinstance(c, type) and issubclass(c, coders.Coder) and
                    'Base' not in c.__name__)
     standard -= set([coders.Coder,
-                     coders.AvroCoder,
+                     coders.AvroGenericCoder,
                      coders.DeterministicProtoCoder,
                      coders.FastCoder,
                      coders.ProtoCoder,
diff --git a/sdks/python/apache_beam/io/external/xlang_parquetio_test.py b/sdks/python/apache_beam/io/external/xlang_parquetio_test.py
index 0c948ca..434bb3b 100644
--- a/sdks/python/apache_beam/io/external/xlang_parquetio_test.py
+++ b/sdks/python/apache_beam/io/external/xlang_parquetio_test.py
@@ -69,7 +69,7 @@ class XlangParquetIOTest(unittest.TestCase):
         raise e
 
 
-class AvroTestCoder(coders.AvroCoder):
+class AvroTestCoder(coders.AvroGenericCoder):
   SCHEMA = """
   {
     "type": "record", "name": "testrecord",