You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mi...@apache.org on 2019/12/02 19:07:05 UTC
[beam] branch release-2.17.0 updated: [BEAM-8819] Fix AvroCoder
serialisation by introducing AvroGenericCoder
This is an automated email from the ASF dual-hosted git repository.
mikhail pushed a commit to branch release-2.17.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.17.0 by this push:
new 01925d2 [BEAM-8819] Fix AvroCoder serialisation by introducing AvroGenericCoder
new 014c740 Merge pull request #10241 from mxm/release-2.17.0
01925d2 is described below
commit 01925d238144b3c16212fd11c48dcca018eb7309
Author: Piotr Szczepanik <pi...@allegro.pl>
AuthorDate: Thu Nov 28 14:34:49 2019 +0100
[BEAM-8819] Fix AvroCoder serialisation by introducing AvroGenericCoder
Backport of #10218
---
...gistrar.java => AvroGenericCoderRegistrar.java} | 12 +++++-----
...slator.java => AvroGenericCoderTranslator.java} | 14 +++++------
.../core/construction/CoderTranslationTest.java | 5 ++++
.../java/org/apache/beam/sdk/coders/AvroCoder.java | 7 +++---
.../apache/beam/sdk/coders/AvroGenericCoder.java | 28 +++++++---------------
sdks/python/apache_beam/coders/avro_record.py | 2 +-
sdks/python/apache_beam/coders/coders.py | 12 +++++-----
sdks/python/apache_beam/coders/coders_test.py | 2 +-
.../apache_beam/coders/coders_test_common.py | 2 +-
.../io/external/xlang_parquetio_test.py | 2 +-
10 files changed, 40 insertions(+), 46 deletions(-)
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroCoderRegistrar.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroGenericCoderRegistrar.java
similarity index 76%
rename from runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroCoderRegistrar.java
rename to runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroGenericCoderRegistrar.java
index 565bdbf..0cfda94 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroCoderRegistrar.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroGenericCoderRegistrar.java
@@ -19,22 +19,22 @@ package org.apache.beam.runners.core.construction;
import com.google.auto.service.AutoService;
import java.util.Map;
-import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.AvroGenericCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
-/** Coder registrar for AvroCoder. */
+/** Coder registrar for AvroGenericCoder. */
@AutoService(CoderTranslatorRegistrar.class)
-public class AvroCoderRegistrar implements CoderTranslatorRegistrar {
- public static final String AVRO_CODER_URN = "beam:coder:avro:v1";
+public class AvroGenericCoderRegistrar implements CoderTranslatorRegistrar {
+ public static final String AVRO_CODER_URN = "beam:coder:avro:generic:v1";
@Override
public Map<Class<? extends Coder>, String> getCoderURNs() {
- return ImmutableMap.of(AvroCoder.class, AVRO_CODER_URN);
+ return ImmutableMap.of(AvroGenericCoder.class, AVRO_CODER_URN);
}
@Override
public Map<Class<? extends Coder>, CoderTranslator<? extends Coder>> getCoderTranslators() {
- return ImmutableMap.of(AvroCoder.class, new AvroCoderTranslator());
+ return ImmutableMap.of(AvroGenericCoder.class, new AvroGenericCoderTranslator());
}
}
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroCoderTranslator.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroGenericCoderTranslator.java
similarity index 74%
copy from runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroCoderTranslator.java
copy to runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroGenericCoderTranslator.java
index 93fca3d..564a8d3 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroCoderTranslator.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroGenericCoderTranslator.java
@@ -20,25 +20,25 @@ package org.apache.beam.runners.core.construction;
import java.util.Collections;
import java.util.List;
import org.apache.avro.Schema;
-import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.AvroGenericCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
-/** Coder translator for AvroCoder. */
-public class AvroCoderTranslator implements CoderTranslator<AvroCoder<?>> {
+/** Coder translator for AvroGenericCoder. */
+public class AvroGenericCoderTranslator implements CoderTranslator<AvroGenericCoder> {
@Override
- public List<? extends Coder<?>> getComponents(AvroCoder from) {
+ public List<? extends Coder<?>> getComponents(AvroGenericCoder from) {
return Collections.emptyList();
}
@Override
- public byte[] getPayload(AvroCoder from) {
+ public byte[] getPayload(AvroGenericCoder from) {
return from.getSchema().toString().getBytes(Charsets.UTF_8);
}
@Override
- public AvroCoder fromComponents(List<Coder<?>> components, byte[] payload) {
+ public AvroGenericCoder fromComponents(List<Coder<?>> components, byte[] payload) {
Schema schema = new Schema.Parser().parse(new String(payload, Charsets.UTF_8));
- return AvroCoder.of(schema);
+ return AvroGenericCoder.of(schema);
}
}
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java
index dc28b79..20b276a 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java
@@ -132,6 +132,11 @@ public class CoderTranslationTest {
KvCoder.of(
new RecordCoder(),
AvroCoder.of(SchemaBuilder.record("record").fields().endRecord())))
+ .add(
+ StringUtf8Coder.of(),
+ SerializableCoder.of(Record.class),
+ new RecordCoder(),
+ KvCoder.of(new RecordCoder(), AvroCoder.of(Record.class)))
.build();
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
index b044165..27863ce 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
@@ -122,10 +122,11 @@ public class AvroCoder<T> extends CustomCoder<T> {
}
/**
- * Returns an {@code AvroCoder} instance for the Avro schema. The implicit type is GenericRecord.
+ * Returns an {@code AvroGenericCoder} instance for the Avro schema. The implicit type is
+ * GenericRecord.
*/
- public static AvroCoder<GenericRecord> of(Schema schema) {
- return new AvroCoder<>(GenericRecord.class, schema);
+ public static AvroGenericCoder of(Schema schema) {
+ return AvroGenericCoder.of(schema);
}
/**
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroCoderTranslator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroGenericCoder.java
similarity index 50%
rename from runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroCoderTranslator.java
rename to sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroGenericCoder.java
index 93fca3d..be726cc 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroCoderTranslator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroGenericCoder.java
@@ -15,30 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.runners.core.construction;
+package org.apache.beam.sdk.coders;
-import java.util.Collections;
-import java.util.List;
import org.apache.avro.Schema;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
+import org.apache.avro.generic.GenericRecord;
-/** Coder translator for AvroCoder. */
-public class AvroCoderTranslator implements CoderTranslator<AvroCoder<?>> {
- @Override
- public List<? extends Coder<?>> getComponents(AvroCoder from) {
- return Collections.emptyList();
+/** AvroCoder specialisation for GenericRecord. */
+public class AvroGenericCoder extends AvroCoder<GenericRecord> {
+ AvroGenericCoder(Schema schema) {
+ super(GenericRecord.class, schema);
}
- @Override
- public byte[] getPayload(AvroCoder from) {
- return from.getSchema().toString().getBytes(Charsets.UTF_8);
- }
-
- @Override
- public AvroCoder fromComponents(List<Coder<?>> components, byte[] payload) {
- Schema schema = new Schema.Parser().parse(new String(payload, Charsets.UTF_8));
- return AvroCoder.of(schema);
+ public static AvroGenericCoder of(Schema schema) {
+ return new AvroGenericCoder(schema);
}
}
diff --git a/sdks/python/apache_beam/coders/avro_record.py b/sdks/python/apache_beam/coders/avro_record.py
index e65057b..a5b8b60 100644
--- a/sdks/python/apache_beam/coders/avro_record.py
+++ b/sdks/python/apache_beam/coders/avro_record.py
@@ -15,7 +15,7 @@
# limitations under the License.
#
-"""AvroRecord for AvroCoder."""
+"""AvroRecord for AvroGenericCoder."""
from __future__ import absolute_import
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index 35020b6..449e959 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -60,7 +60,7 @@ except ImportError:
__all__ = [
'Coder',
- 'AvroCoder', 'BooleanCoder', 'BytesCoder', 'DillCoder',
+ 'AvroGenericCoder', 'BooleanCoder', 'BytesCoder', 'DillCoder',
'FastPrimitivesCoder', 'FloatCoder', 'IterableCoder', 'PickleCoder',
'ProtoCoder', 'SingletonCoder', 'StrUtf8Coder', 'TimestampCoder',
'TupleCoder', 'TupleSequenceCoder', 'VarIntCoder',
@@ -819,10 +819,10 @@ class DeterministicProtoCoder(ProtoCoder):
return self
-AVRO_CODER_URN = "beam:coder:avro:v1"
+AVRO_GENERIC_CODER_URN = "beam:coder:avro:generic:v1"
-class AvroCoder(FastCoder):
+class AvroGenericCoder(FastCoder):
"""A coder used for AvroRecord values."""
def __init__(self, schema):
@@ -846,11 +846,11 @@ class AvroCoder(FastCoder):
return AvroRecord
def to_runner_api_parameter(self, context):
- return AVRO_CODER_URN, self.schema, ()
+ return AVRO_GENERIC_CODER_URN, self.schema, ()
- @Coder.register_urn(AVRO_CODER_URN, bytes)
+ @Coder.register_urn(AVRO_GENERIC_CODER_URN, bytes)
def from_runner_api_parameter(payload, unused_components, unused_context):
- return AvroCoder(payload)
+ return AvroGenericCoder(payload)
class TupleCoder(FastCoder):
diff --git a/sdks/python/apache_beam/coders/coders_test.py b/sdks/python/apache_beam/coders/coders_test.py
index 74d1dce..9b39962 100644
--- a/sdks/python/apache_beam/coders/coders_test.py
+++ b/sdks/python/apache_beam/coders/coders_test.py
@@ -113,7 +113,7 @@ class DeterministicProtoCoderTest(unittest.TestCase):
self.assertEqual(coder.encode(mm_forward), coder.encode(mm_reverse))
-class AvroTestCoder(coders.AvroCoder):
+class AvroTestCoder(coders.AvroGenericCoder):
SCHEMA = """
{
"type": "record", "name": "testrecord",
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index 1b40b64..d6cb27b 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -67,7 +67,7 @@ class CodersTest(unittest.TestCase):
if isinstance(c, type) and issubclass(c, coders.Coder) and
'Base' not in c.__name__)
standard -= set([coders.Coder,
- coders.AvroCoder,
+ coders.AvroGenericCoder,
coders.DeterministicProtoCoder,
coders.FastCoder,
coders.ProtoCoder,
diff --git a/sdks/python/apache_beam/io/external/xlang_parquetio_test.py b/sdks/python/apache_beam/io/external/xlang_parquetio_test.py
index 0c948ca..434bb3b 100644
--- a/sdks/python/apache_beam/io/external/xlang_parquetio_test.py
+++ b/sdks/python/apache_beam/io/external/xlang_parquetio_test.py
@@ -69,7 +69,7 @@ class XlangParquetIOTest(unittest.TestCase):
raise e
-class AvroTestCoder(coders.AvroCoder):
+class AvroTestCoder(coders.AvroGenericCoder):
SCHEMA = """
{
"type": "record", "name": "testrecord",