You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/05/01 23:45:56 UTC
[1/2] beam git commit: Add CloudObjectTranslators for Avro,
Serializable
Repository: beam
Updated Branches:
refs/heads/master cd813fba0 -> 4867c995f
Add CloudObjectTranslators for Avro, Serializable
This means these coders can be sent to Dataflow with a stable
serialization.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2ce97566
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2ce97566
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2ce97566
Branch: refs/heads/master
Commit: 2ce97566be76dacd9d529bde090e1f66f9ae819b
Parents: cd813fb
Author: Thomas Groh <tg...@google.com>
Authored: Wed Apr 26 20:05:09 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon May 1 16:45:17 2017 -0700
----------------------------------------------------------------------
runners/google-cloud-dataflow-java/pom.xml | 5 ++
.../util/AvroCoderCloudObjectTranslator.java | 62 +++++++++++++++++++
...aultCoderCloudObjectTranslatorRegistrar.java | 2 +
.../SerializableCoderCloudObjectTranslator.java | 65 ++++++++++++++++++++
.../runners/dataflow/util/CloudObjectsTest.java | 13 ++--
.../sdk/extensions/protobuf/ProtoCoder.java | 4 ++
6 files changed, 146 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/2ce97566/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index cb0fa7f..e75abbd 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -289,6 +289,11 @@
</dependency>
<dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </dependency>
+
+ <dependency>
<!-- Note: when relocating guava, ensure guava-testlib is not also relocated by
excluding com.google.common.**.testing.* -->
<groupId>com.google.guava</groupId>
http://git-wip-us.apache.org/repos/asf/beam/blob/2ce97566/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java
new file mode 100644
index 0000000..444849d
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.dataflow.util;
+
+import org.apache.avro.Schema;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.util.CloudObject;
+import org.apache.beam.sdk.util.Structs;
+
+/** A {@link CloudObjectTranslator} for {@link AvroCoder}. */
+class AvroCoderCloudObjectTranslator implements CloudObjectTranslator<AvroCoder> {
+ private static final String TYPE_FIELD = "type";
+ private static final String SCHEMA_FIELD = "schema";
+
+ @Override
+ public CloudObject toCloudObject(AvroCoder target) {
+ CloudObject base = CloudObject.forClass(AvroCoder.class);
+ Structs.addString(base, SCHEMA_FIELD, target.getSchema().toString());
+ Structs.addString(base, TYPE_FIELD, target.getType().getName());
+ return base;
+ }
+
+ @Override
+ public AvroCoder<?> fromCloudObject(CloudObject cloudObject) {
+ Schema.Parser parser = new Schema.Parser();
+ String className = Structs.getString(cloudObject, TYPE_FIELD);
+ String schemaString = Structs.getString(cloudObject, SCHEMA_FIELD);
+ try {
+ Class<?> type = Class.forName(className);
+ Schema schema = parser.parse(schemaString);
+ return AvroCoder.of(type, schema);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ @Override
+ public Class<? extends AvroCoder> getSupportedClass() {
+ return AvroCoder.class;
+ }
+
+ @Override
+ public String cloudObjectClassName() {
+ return CloudObject.forClass(AvroCoder.class).getClassName();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/2ce97566/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
index 3b9fa95..29f047f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
@@ -65,6 +65,8 @@ public class DefaultCoderCloudObjectTranslatorRegistrar
CloudObjectTranslators.stream(),
CloudObjectTranslators.pair(),
CloudObjectTranslators.windowedValue(),
+ new AvroCoderCloudObjectTranslator(),
+ new SerializableCoderCloudObjectTranslator(),
CloudObjectTranslators.custom());
@VisibleForTesting
static final ImmutableSet<Class<? extends Coder>> KNOWN_ATOMIC_CODERS =
http://git-wip-us.apache.org/repos/asf/beam/blob/2ce97566/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java
new file mode 100644
index 0000000..67c021c
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.dataflow.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.util.CloudObject;
+import org.apache.beam.sdk.util.Structs;
+
+/** A {@link CloudObjectTranslator} for {@link SerializableCoder}. */
+class SerializableCoderCloudObjectTranslator implements CloudObjectTranslator<SerializableCoder> {
+ private static final String TYPE_FIELD = "type";
+
+ @Override
+ public CloudObject toCloudObject(SerializableCoder target) {
+ CloudObject base = CloudObject.forClass(SerializableCoder.class);
+ Structs.addString(base, TYPE_FIELD, target.getRecordType().getName());
+ return base;
+ }
+
+ @Override
+ public SerializableCoder<?> fromCloudObject(CloudObject cloudObject) {
+ String className = Structs.getString(cloudObject, TYPE_FIELD);
+ try {
+ Class<? extends Serializable> targetClass =
+ (Class<? extends Serializable>) Class.forName(className);
+ checkArgument(
+ Serializable.class.isAssignableFrom(targetClass),
+ "Target class %s does not extend %s",
+ targetClass.getName(),
+ Serializable.class.getSimpleName());
+ return SerializableCoder.of(targetClass);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ @Override
+ public Class<SerializableCoder> getSupportedClass() {
+ return SerializableCoder.class;
+ }
+
+ @Override
+ public String cloudObjectClassName() {
+ return CloudObject.forClass(SerializableCoder.class).getClassName();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/2ce97566/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
index fdea285..a6a3f25 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
@@ -28,8 +28,10 @@ import com.google.common.collect.ImmutableList.Builder;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.io.Serializable;
import java.util.HashSet;
import java.util.Set;
+import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
@@ -37,12 +39,12 @@ import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.util.CloudObject;
import org.apache.beam.sdk.util.InstanceBuilder;
-import org.apache.beam.sdk.util.Serializer;
import org.apache.beam.sdk.util.WindowedValue;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
@@ -112,7 +114,9 @@ public class CloudObjectsTest {
KvCoder.of(VarLongCoder.of(), ByteArrayCoder.of()),
IntervalWindow.getCoder()))
.add(VarLongCoder.of())
- .add(ByteArrayCoder.of());
+ .add(ByteArrayCoder.of())
+ .add(SerializableCoder.of(Record.class))
+ .add(AvroCoder.of(Record.class));
for (Class<? extends Coder> atomicCoder :
DefaultCoderCloudObjectTranslatorRegistrar.KNOWN_ATOMIC_CODERS) {
dataBuilder.add(InstanceBuilder.ofType(atomicCoder).fromFactoryMethod("of").build());
@@ -126,16 +130,15 @@ public class CloudObjectsTest {
@Test
public void toAndFromCloudObject() throws Exception {
CloudObject cloudObject = CloudObjects.asCloudObject(coder);
- Coder<?> reconstructed = Serializer.deserialize(cloudObject, Coder.class);
Coder<?> fromCloudObject = CloudObjects.coderFromCloudObject(cloudObject);
- assertEquals(coder.getClass(), reconstructed.getClass());
assertEquals(coder.getClass(), fromCloudObject.getClass());
- assertEquals(coder, reconstructed);
assertEquals(coder, fromCloudObject);
}
}
+ private static class Record implements Serializable {}
+
private static class ObjectCoder extends CustomCoder<Object> {
@Override
public void encode(Object value, OutputStream outStream, Context context)
http://git-wip-us.apache.org/repos/asf/beam/blob/2ce97566/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
index 9ec7aec..8e90a5f 100644
--- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
+++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
@@ -229,6 +229,10 @@ public class ProtoCoder<T extends Message> extends CustomCoder<T> {
return protoMessageClass;
}
+ public Set<Class<?>> getExtensionHosts() {
+ return extensionHostClasses;
+ }
+
/**
* Returns the {@link ExtensionRegistry} listing all known Protocol Buffers extension messages
* to {@code T} registered with this {@link ProtoCoder}.
[2/2] beam git commit: [BEAM-2020] Add CloudObjectTranslators for
Avro, Serializable
Posted by lc...@apache.org.
[BEAM-2020] Add CloudObjectTranslators for Avro, Serializable
This closes #2727
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4867c995
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4867c995
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4867c995
Branch: refs/heads/master
Commit: 4867c995fac2918fed241e2335cf7ea47585ffe4
Parents: cd813fb 2ce9756
Author: Luke Cwik <lc...@google.com>
Authored: Mon May 1 16:45:42 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon May 1 16:45:42 2017 -0700
----------------------------------------------------------------------
runners/google-cloud-dataflow-java/pom.xml | 5 ++
.../util/AvroCoderCloudObjectTranslator.java | 62 +++++++++++++++++++
...aultCoderCloudObjectTranslatorRegistrar.java | 2 +
.../SerializableCoderCloudObjectTranslator.java | 65 ++++++++++++++++++++
.../runners/dataflow/util/CloudObjectsTest.java | 13 ++--
.../sdk/extensions/protobuf/ProtoCoder.java | 4 ++
6 files changed, 146 insertions(+), 5 deletions(-)
----------------------------------------------------------------------