You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/05/01 21:16:41 UTC
[1/2] beam git commit: This closes #2679
Repository: beam
Updated Branches:
refs/heads/master 3fff52d43 -> 3ae944130
This closes #2679
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3ae94413
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3ae94413
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3ae94413
Branch: refs/heads/master
Commit: 3ae94413000299908af06c712d9238023b099526
Parents: 3fff52d 21d7458
Author: Thomas Groh <tg...@google.com>
Authored: Mon May 1 14:16:28 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon May 1 14:16:28 2017 -0700
----------------------------------------------------------------------
.../runners/dataflow/util/CloudObjectKinds.java | 33 ++
.../dataflow/util/CloudObjectTranslator.java | 13 +-
.../dataflow/util/CloudObjectTranslators.java | 375 +++++++++++++++++++
.../runners/dataflow/util/CloudObjects.java | 28 +-
...aultCoderCloudObjectTranslatorRegistrar.java | 83 +++-
.../runners/dataflow/util/CloudObjectsTest.java | 119 ++++--
.../beam/sdk/coders/LengthPrefixCoder.java | 11 +-
7 files changed, 614 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
[2/2] beam git commit: Add Cloud Object Translators for Coders
Posted by tg...@apache.org.
Add Cloud Object Translators for Coders
This consists of most of the formerly AtomicCoders, plus some coders
with a known structure and a generic "CustomCoder" translator.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/21d7458d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/21d7458d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/21d7458d
Branch: refs/heads/master
Commit: 21d7458d005c12242c6e63e26dd4fd3a2d495165
Parents: 3fff52d
Author: Thomas Groh <tg...@google.com>
Authored: Wed Apr 26 18:02:30 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon May 1 14:16:28 2017 -0700
----------------------------------------------------------------------
.../runners/dataflow/util/CloudObjectKinds.java | 33 ++
.../dataflow/util/CloudObjectTranslator.java | 13 +-
.../dataflow/util/CloudObjectTranslators.java | 375 +++++++++++++++++++
.../runners/dataflow/util/CloudObjects.java | 28 +-
...aultCoderCloudObjectTranslatorRegistrar.java | 83 +++-
.../runners/dataflow/util/CloudObjectsTest.java | 119 ++++--
.../beam/sdk/coders/LengthPrefixCoder.java | 11 +-
7 files changed, 614 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/21d7458d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectKinds.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectKinds.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectKinds.java
new file mode 100644
index 0000000..1499f17
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectKinds.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.dataflow.util;
+
+import org.apache.beam.sdk.util.CloudObject;
+
+/**
+ * Known kinds of {@link CloudObject}.
+ */
+class CloudObjectKinds {
+ static final String KIND_GLOBAL_WINDOW = "kind:global_window";
+ static final String KIND_INTERVAL_WINDOW = "kind:interval_window";
+ static final String KIND_LENGTH_PREFIX = "kind:length_prefix";
+ static final String KIND_PAIR = "kind:pair";
+ static final String KIND_STREAM = "kind:stream";
+ static final String KIND_WINDOWED_VALUE = "kind:windowed_value";
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/21d7458d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java
index d0b111f..534370f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java
@@ -21,7 +21,7 @@ package org.apache.beam.runners.dataflow.util;
import org.apache.beam.sdk.util.CloudObject;
/**
- * An translator that takes an object and creates a {@link CloudObject} which can be converted back
+ * A translator that takes an object and creates a {@link CloudObject} which can be converted back
* to the original object.
*/
public interface CloudObjectTranslator<T> {
@@ -34,4 +34,15 @@ public interface CloudObjectTranslator<T> {
* Converts back into the original object from a provided {@link CloudObject}.
*/
T fromCloudObject(CloudObject cloudObject);
+
+ /**
+ * Gets the class this {@link CloudObjectTranslator} is capable of converting.
+ */
+ Class<? extends T> getSupportedClass();
+
+ /**
+ * Gets the class name that will represent the {@link CloudObject} created by this {@link
+ * CloudObjectTranslator}.
+ */
+ String cloudObjectClassName();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/21d7458d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java
new file mode 100644
index 0000000..7a95a9e
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.dataflow.util;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
+import org.apache.beam.sdk.util.CloudObject;
+import org.apache.beam.sdk.util.InstanceBuilder;
+import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.StringUtils;
+import org.apache.beam.sdk.util.Structs;
+import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
+
+/** Utilities for creating {@link CloudObjectTranslator} instances for {@link Coder Coders}. */
+class CloudObjectTranslators {
+ private CloudObjectTranslators() {}
+
+ private static CloudObject addComponents(CloudObject base, List<? extends Coder<?>> components) {
+ if (!components.isEmpty()) {
+ List<CloudObject> cloudComponents = new ArrayList<>(components.size());
+ for (Coder component : components) {
+ cloudComponents.add(CloudObjects.asCloudObject(component));
+ }
+ Structs.addList(base, PropertyNames.COMPONENT_ENCODINGS, cloudComponents);
+ }
+
+ return base;
+ }
+
+ private static List<Coder<?>> getComponents(CloudObject target) {
+ List<Map<String, Object>> cloudComponents =
+ Structs.getListOfMaps(
+ target,
+ PropertyNames.COMPONENT_ENCODINGS,
+ Collections.<Map<String, Object>>emptyList());
+ List<Coder<?>> components = new ArrayList<>();
+ for (Map<String, Object> cloudComponent : cloudComponents) {
+ components.add(CloudObjects.coderFromCloudObject(CloudObject.fromSpec(cloudComponent)));
+ }
+ return components;
+ }
+
+ /**
+ * Returns a {@link CloudObjectTranslator} that produces a {@link CloudObject} that is of kind
+ * "pair".
+ */
+ public static CloudObjectTranslator<KvCoder> pair() {
+ return new CloudObjectTranslator<KvCoder>() {
+ @Override
+ public CloudObject toCloudObject(KvCoder target) {
+ CloudObject result = CloudObject.forClassName(CloudObjectKinds.KIND_PAIR);
+ Structs.addBoolean(result, PropertyNames.IS_PAIR_LIKE, true);
+ return addComponents(
+ result, ImmutableList.<Coder<?>>of(target.getKeyCoder(), target.getValueCoder()));
+ }
+
+ @Override
+ public KvCoder fromCloudObject(CloudObject object) {
+ return KvCoder.of(getComponents(object));
+ }
+
+ @Override
+ public Class<KvCoder> getSupportedClass() {
+ return KvCoder.class;
+ }
+
+ @Override
+ public String cloudObjectClassName() {
+ return CloudObjectKinds.KIND_PAIR;
+ }
+ };
+ }
+
+ /**
+ * Returns a {@link CloudObjectTranslator} that produces a {@link CloudObject} that is of kind
+ * "stream".
+ */
+ public static CloudObjectTranslator<IterableCoder> stream() {
+ return new CloudObjectTranslator<IterableCoder>() {
+ @Override
+ public CloudObject toCloudObject(IterableCoder target) {
+ CloudObject result = CloudObject.forClassName(CloudObjectKinds.KIND_STREAM);
+ Structs.addBoolean(result, PropertyNames.IS_STREAM_LIKE, true);
+ return addComponents(
+ result, Collections.<Coder<?>>singletonList(target.getElemCoder()));
+ }
+
+ @Override
+ public IterableCoder fromCloudObject(CloudObject object) {
+ return IterableCoder.of(getComponents(object));
+ }
+
+ @Override
+ public Class<? extends IterableCoder> getSupportedClass() {
+ return IterableCoder.class;
+ }
+
+ @Override
+ public String cloudObjectClassName() {
+ return CloudObjectKinds.KIND_STREAM;
+ }
+ };
+ }
+
+ /**
+ * Returns a {@link CloudObjectTranslator} that produces a {@link CloudObject} that is of kind
+ * "length_prefix".
+ */
+ static CloudObjectTranslator<LengthPrefixCoder> lengthPrefix() {
+ return new CloudObjectTranslator<LengthPrefixCoder>() {
+ @Override
+ public CloudObject toCloudObject(LengthPrefixCoder target) {
+ return addComponents(
+ CloudObject.forClassName(CloudObjectKinds.KIND_LENGTH_PREFIX),
+ Collections.<Coder<?>>singletonList(target.getValueCoder()));
+ }
+
+ @Override
+ public LengthPrefixCoder fromCloudObject(CloudObject object) {
+ return LengthPrefixCoder.of(getComponents(object));
+ }
+
+ @Override
+ public Class<? extends LengthPrefixCoder> getSupportedClass() {
+ return LengthPrefixCoder.class;
+ }
+
+ @Override
+ public String cloudObjectClassName() {
+ return CloudObjectKinds.KIND_LENGTH_PREFIX;
+ }
+ };
+ }
+
+ /**
+ * Returns a {@link CloudObjectTranslator} that produces a {@link CloudObject} that is of kind
+ * "global_window".
+ */
+ static CloudObjectTranslator<GlobalWindow.Coder> globalWindow() {
+ return new CloudObjectTranslator<GlobalWindow.Coder>() {
+ @Override
+ public CloudObject toCloudObject(GlobalWindow.Coder target) {
+ return addComponents(
+ CloudObject.forClassName(CloudObjectKinds.KIND_GLOBAL_WINDOW),
+ Collections.<Coder<?>>emptyList());
+ }
+
+ @Override
+ public GlobalWindow.Coder fromCloudObject(CloudObject object) {
+ return GlobalWindow.Coder.INSTANCE;
+ }
+
+ @Override
+ public Class<? extends GlobalWindow.Coder> getSupportedClass() {
+ return GlobalWindow.Coder.class;
+ }
+
+ @Override
+ public String cloudObjectClassName() {
+ return CloudObjectKinds.KIND_GLOBAL_WINDOW;
+ }
+ };
+ }
+
+ /**
+ * Returns a {@link CloudObjectTranslator} that produces a {@link CloudObject} that is of kind
+ * "interval_window".
+ */
+ static CloudObjectTranslator<IntervalWindowCoder> intervalWindow() {
+ return new CloudObjectTranslator<IntervalWindowCoder>() {
+ @Override
+ public CloudObject toCloudObject(IntervalWindowCoder target) {
+ return addComponents(
+ CloudObject.forClassName(CloudObjectKinds.KIND_INTERVAL_WINDOW),
+ Collections.<Coder<?>>emptyList());
+ }
+
+ @Override
+ public IntervalWindowCoder fromCloudObject(CloudObject object) {
+ return IntervalWindowCoder.of();
+ }
+
+ @Override
+ public Class<? extends IntervalWindowCoder> getSupportedClass() {
+ return IntervalWindowCoder.class;
+ }
+
+ @Override
+ public String cloudObjectClassName() {
+ return CloudObjectKinds.KIND_INTERVAL_WINDOW;
+ }
+ };
+ }
+
+ /**
+ * Returns a {@link CloudObjectTranslator} that produces a {@link CloudObject} that is of kind
+ * "windowed_value".
+ */
+ static CloudObjectTranslator<FullWindowedValueCoder> windowedValue() {
+ return new CloudObjectTranslator<FullWindowedValueCoder>() {
+ @Override
+ public CloudObject toCloudObject(FullWindowedValueCoder target) {
+ CloudObject result = CloudObject.forClassName(CloudObjectKinds.KIND_WINDOWED_VALUE);
+ Structs.addBoolean(result, PropertyNames.IS_WRAPPER, true);
+ return addComponents(
+ result, ImmutableList.<Coder<?>>of(target.getValueCoder(), target.getWindowCoder()));
+ }
+
+ @Override
+ public FullWindowedValueCoder fromCloudObject(CloudObject object) {
+ return FullWindowedValueCoder.of(getComponents(object));
+ }
+
+ @Override
+ public Class<? extends FullWindowedValueCoder> getSupportedClass() {
+ return FullWindowedValueCoder.class;
+ }
+
+ @Override
+ public String cloudObjectClassName() {
+ return CloudObjectKinds.KIND_WINDOWED_VALUE;
+ }
+ };
+ }
+
+ /**
+ * Returns a {@link CloudObjectTranslator} that produces a {@link CloudObject} that is of kind
+ * "bytes".
+ */
+ static CloudObjectTranslator<ByteArrayCoder> bytes() {
+ return new CloudObjectTranslator<ByteArrayCoder>() {
+ @Override
+ public CloudObject toCloudObject(ByteArrayCoder target) {
+ return addComponents(
+ CloudObject.forClass(target.getClass()), Collections.<Coder<?>>emptyList());
+ }
+
+ @Override
+ public ByteArrayCoder fromCloudObject(CloudObject object) {
+ return ByteArrayCoder.of();
+ }
+
+ @Override
+ public Class<? extends ByteArrayCoder> getSupportedClass() {
+ return ByteArrayCoder.class;
+ }
+
+ @Override
+ public String cloudObjectClassName() {
+ return CloudObject.forClass(ByteArrayCoder.class).getClassName();
+ }
+
+ };
+ }
+
+ /**
+ * Returns a {@link CloudObjectTranslator} that produces a {@link CloudObject} that is of kind
+ * "varint".
+ */
+ static CloudObjectTranslator<VarLongCoder> varInt() {
+ return new CloudObjectTranslator<VarLongCoder>() {
+ @Override
+ public CloudObject toCloudObject(VarLongCoder target) {
+ return addComponents(
+ CloudObject.forClass(target.getClass()), Collections.<Coder<?>>emptyList());
+ }
+
+ @Override
+ public VarLongCoder fromCloudObject(CloudObject object) {
+ return VarLongCoder.of();
+ }
+
+ @Override
+ public Class<? extends VarLongCoder> getSupportedClass() {
+ return VarLongCoder.class;
+ }
+
+ @Override
+ public String cloudObjectClassName() {
+ return CloudObject.forClass(VarLongCoder.class).getClassName();
+ }
+ };
+ }
+
+ private static final String CODER_FIELD = "serialized_coder";
+ private static final String TYPE_FIELD = "type";
+ public static CloudObjectTranslator<? extends CustomCoder> custom() {
+ return new CloudObjectTranslator<CustomCoder>() {
+ @Override
+ public CloudObject toCloudObject(CustomCoder target) {
+ CloudObject cloudObject = CloudObject.forClass(CustomCoder.class);
+ Structs.addString(cloudObject, TYPE_FIELD, target.getClass().getName());
+ Structs.addString(
+ cloudObject,
+ CODER_FIELD,
+ StringUtils.byteArrayToJsonString(SerializableUtils.serializeToByteArray(target)));
+ return cloudObject;
+ }
+
+ @Override
+ public CustomCoder fromCloudObject(CloudObject cloudObject) {
+ String serializedCoder = Structs.getString(cloudObject, CODER_FIELD);
+ String type = Structs.getString(cloudObject, TYPE_FIELD);
+ return (CustomCoder<?>)
+ SerializableUtils.deserializeFromByteArray(
+ StringUtils.jsonStringToByteArray(serializedCoder), type);
+ }
+
+ @Override
+ public Class<? extends CustomCoder> getSupportedClass() {
+ return CustomCoder.class;
+ }
+
+ @Override
+ public String cloudObjectClassName() {
+ return CloudObject.forClass(CustomCoder.class).getClassName();
+ }
+ };
+ }
+
+ public static <T extends Coder> CloudObjectTranslator<T> atomic(final Class<T> coderClass) {
+ // Make sure that the instance will be instantiable from the class.
+ InstanceBuilder.ofType(coderClass).fromFactoryMethod("of").build();
+ return new CloudObjectTranslator<T>() {
+ @Override
+ public CloudObject toCloudObject(T target) {
+ return CloudObject.forClass(coderClass);
+ }
+
+ @Override
+ public T fromCloudObject(CloudObject cloudObject) {
+ return InstanceBuilder.ofType(coderClass).fromFactoryMethod("of").build();
+ }
+
+ @Override
+ public Class<? extends T> getSupportedClass() {
+ return coderClass;
+ }
+
+ @Override
+ public String cloudObjectClassName() {
+ return CloudObject.forClass(coderClass).getClassName();
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/21d7458d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
index bc0cc75..a55d10c 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
@@ -19,6 +19,7 @@
package org.apache.beam.runners.dataflow.util;
import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
@@ -26,9 +27,6 @@ import java.util.ServiceLoader;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.util.CloudObject;
-import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.util.StringUtils;
-import org.apache.beam.sdk.util.Structs;
/** Utilities for converting an object to a {@link CloudObject}. */
public class CloudObjects {
@@ -43,8 +41,8 @@ public class CloudObjects {
populateCoderTranslators() {
ImmutableMap.Builder<Class<? extends Coder>, CloudObjectTranslator<? extends Coder>> builder =
ImmutableMap.builder();
- for (CoderCloudObjectTranslatorRegistrar coderRegistrar :
- ServiceLoader.load(CoderCloudObjectTranslatorRegistrar.class)) {
+ for (CoderCloudObjectTranslatorRegistrar coderRegistrar : ServiceLoader.load(
+ CoderCloudObjectTranslatorRegistrar.class)) {
builder.putAll(coderRegistrar.classesToTranslators());
}
return builder.build();
@@ -70,24 +68,20 @@ public class CloudObjects {
if (translator != null) {
return translator.toCloudObject(coder);
} else if (coder instanceof CustomCoder) {
- return customCoderAsCloudObject((CustomCoder<?>) coder);
+ CloudObjectTranslator customCoderTranslator = CODER_TRANSLATORS.get(CustomCoder.class);
+ checkNotNull(
+ customCoderTranslator,
+ "No %s registered for %s, but it is in the %s",
+ CloudObjectTranslator.class.getSimpleName(),
+ CustomCoder.class.getSimpleName(),
+ DefaultCoderCloudObjectTranslatorRegistrar.class.getSimpleName());
+ return customCoderTranslator.toCloudObject(coder);
}
throw new IllegalArgumentException(
String.format(
"Non-Custom %s with no registered %s", Coder.class, CloudObjectTranslator.class));
}
- private static CloudObject customCoderAsCloudObject(CustomCoder<?> coder) {
- CloudObject result = CloudObject.forClass(CustomCoder.class);
- Structs.addString(result, "type", coder.getClass().getName());
- Structs.addString(
- result,
- "serialized_coder",
- StringUtils.byteArrayToJsonString(SerializableUtils.serializeToByteArray(coder)));
-
- return result;
- }
-
public static Coder<?> coderFromCloudObject(CloudObject cloudObject) {
CloudObjectTranslator<? extends Coder> translator =
CLOUD_OBJECT_CLASS_NAME_TRANSLATORS.get(cloudObject.getClassName());
http://git-wip-us.apache.org/repos/asf/beam/blob/21d7458d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
index 3d7b534..3b9fa95 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
@@ -19,9 +19,34 @@
package org.apache.beam.runners.dataflow.util;
import com.google.auto.service.AutoService;
-import java.util.Collections;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
+import com.google.common.collect.ImmutableSet;
+import java.util.List;
import java.util.Map;
+import org.apache.beam.runners.dataflow.internal.IsmFormat.FooterCoder;
+import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmShardCoder;
+import org.apache.beam.runners.dataflow.internal.IsmFormat.KeyPrefixCoder;
+import org.apache.beam.runners.dataflow.util.RandomAccessData.RandomAccessDataCoder;
+import org.apache.beam.sdk.coders.BigDecimalCoder;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.BigIntegerCoder;
+import org.apache.beam.sdk.coders.BitSetCoder;
+import org.apache.beam.sdk.coders.ByteCoder;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.DoubleCoder;
+import org.apache.beam.sdk.coders.DurationCoder;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.TextualIntegerCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.FileBasedSink.FileResultCoder;
+import org.apache.beam.sdk.io.gcp.bigquery.TableDestinationCoder;
+import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
/**
* The {@link CoderCloudObjectTranslatorRegistrar} containing the default collection of
@@ -30,15 +55,63 @@ import org.apache.beam.sdk.coders.Coder;
@AutoService(CoderCloudObjectTranslatorRegistrar.class)
public class DefaultCoderCloudObjectTranslatorRegistrar
implements CoderCloudObjectTranslatorRegistrar {
+ private static final List<CloudObjectTranslator<? extends Coder>> DEFAULT_TRANSLATORS =
+ ImmutableList.<CloudObjectTranslator<? extends Coder>>of(
+ CloudObjectTranslators.globalWindow(),
+ CloudObjectTranslators.intervalWindow(),
+ CloudObjectTranslators.bytes(),
+ CloudObjectTranslators.varInt(),
+ CloudObjectTranslators.lengthPrefix(),
+ CloudObjectTranslators.stream(),
+ CloudObjectTranslators.pair(),
+ CloudObjectTranslators.windowedValue(),
+ CloudObjectTranslators.custom());
+ @VisibleForTesting
+ static final ImmutableSet<Class<? extends Coder>> KNOWN_ATOMIC_CODERS =
+ ImmutableSet.<Class<? extends Coder>>of(
+ BigDecimalCoder.class,
+ BigEndianIntegerCoder.class,
+ BigEndianLongCoder.class,
+ BigIntegerCoder.class,
+ BitSetCoder.class,
+ ByteCoder.class,
+ DoubleCoder.class,
+ DurationCoder.class,
+ FileResultCoder.class,
+ FooterCoder.class,
+ InstantCoder.class,
+ IsmShardCoder.class,
+ KeyPrefixCoder.class,
+ RandomAccessDataCoder.class,
+ StringUtf8Coder.class,
+ TableDestinationCoder.class,
+ TableRowJsonCoder.class,
+ TextualIntegerCoder.class,
+ VarIntCoder.class,
+ VoidCoder.class);
+
+ @Override
public Map<String, CloudObjectTranslator<? extends Coder>> classNamesToTranslators() {
- // TODO: Add translators
- return Collections.emptyMap();
+ ImmutableMap.Builder<String, CloudObjectTranslator<? extends Coder>> nameToTranslators =
+ ImmutableMap.builder();
+ for (CloudObjectTranslator<? extends Coder> translator : classesToTranslators().values()) {
+ nameToTranslators.put(translator.cloudObjectClassName(), translator);
+ }
+ return nameToTranslators.build();
}
@Override
public Map<Class<? extends Coder>, CloudObjectTranslator<? extends Coder>>
classesToTranslators() {
- // TODO: Add translato
- return Collections.emptyMap();
+ Builder<Class<? extends Coder>, CloudObjectTranslator<? extends Coder>> builder =
+ ImmutableMap.<Class<? extends Coder>, CloudObjectTranslator<? extends Coder>>builder();
+ for (CloudObjectTranslator<? extends Coder> defaultTranslator : DEFAULT_TRANSLATORS) {
+ builder.put(defaultTranslator.getSupportedClass(), defaultTranslator);
+ }
+ for (Class<? extends Coder> atomicCoder : KNOWN_ATOMIC_CODERS) {
+ builder.put(atomicCoder, CloudObjectTranslators.atomic(atomicCoder));
+ }
+ return builder
+ .build();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/21d7458d/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
index 7562322..fdea285 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
@@ -18,19 +18,36 @@
package org.apache.beam.runners.dataflow.util;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableList.Builder;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.util.CloudObject;
+import org.apache.beam.sdk.util.InstanceBuilder;
+import org.apache.beam.sdk.util.Serializer;
+import org.apache.beam.sdk.util.WindowedValue;
import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
@@ -38,37 +55,97 @@ import org.junit.runners.Parameterized.Parameters;
/**
* Tests for {@link CloudObjects}.
*/
-@RunWith(Parameterized.class)
+@RunWith(Enclosed.class)
public class CloudObjectsTest {
- @Parameters(name = "{index}: {0}")
- public static Iterable<Coder<?>> data() {
- // TODO: Implement when translators are registered with the CoderCloudObjectTranslatorRegsitrar
- return ImmutableList.<Coder<?>>builder().build();
+ /**
+ * Tests that all of the Default Coders are tested.
+ */
+ @RunWith(JUnit4.class)
+ public static class DefaultsPresentTest {
+ @Test
+ public void defaultCodersAllTested() {
+ Set<Class<? extends Coder>> defaultCoderTranslators =
+ new DefaultCoderCloudObjectTranslatorRegistrar().classesToTranslators().keySet();
+ Set<Class<? extends Coder>> testedClasses = new HashSet<>();
+ for (Coder<?> tested : DefaultCoders.data()) {
+ if (tested instanceof ObjectCoder) {
+ testedClasses.add(CustomCoder.class);
+ assertThat(defaultCoderTranslators, hasItem(CustomCoder.class));
+ } else {
+ testedClasses.add(tested.getClass());
+ assertThat(defaultCoderTranslators, hasItem(tested.getClass()));
+ }
+ }
+ Set<Class<? extends Coder>> missing = new HashSet<>();
+ missing.addAll(defaultCoderTranslators);
+ missing.removeAll(testedClasses);
+ assertThat(missing, emptyIterable());
+ }
+
+ @Test
+ public void defaultCodersIncludesCustomCoder() {
+ Set<Class<? extends Coder>> defaultCoders =
+ new DefaultCoderCloudObjectTranslatorRegistrar().classesToTranslators().keySet();
+ assertThat(defaultCoders, hasItem(CustomCoder.class));
+ }
}
- @Parameter(0)
- public Coder<?> coder;
- @Test
- public void toAndFromCloudObject() throws Exception {
- CloudObject cloudObject = CloudObjects.asCloudObject(coder);
- Coder<?> reconstructed = CloudObjects.coderFromCloudObject(cloudObject);
+ /**
+ * Tests that all of the registered coders in {@link DefaultCoderCloudObjectTranslatorRegistrar}
+ * can be serialized and deserialized with {@link CloudObjects}.
+ */
+ @RunWith(Parameterized.class)
+ public static class DefaultCoders {
+ @Parameters(name = "{index}: {0}")
+ public static Iterable<Coder<?>> data() {
+ Builder<Coder<?>> dataBuilder =
+ ImmutableList.<Coder<?>>builder()
+ .add(new ObjectCoder())
+ .add(GlobalWindow.Coder.INSTANCE)
+ .add(IntervalWindow.getCoder())
+ .add(LengthPrefixCoder.of(VarLongCoder.of()))
+ .add(IterableCoder.of(VarLongCoder.of()))
+ .add(KvCoder.of(VarLongCoder.of(), ByteArrayCoder.of()))
+ .add(
+ WindowedValue.getFullCoder(
+ KvCoder.of(VarLongCoder.of(), ByteArrayCoder.of()),
+ IntervalWindow.getCoder()))
+ .add(VarLongCoder.of())
+ .add(ByteArrayCoder.of());
+ for (Class<? extends Coder> atomicCoder :
+ DefaultCoderCloudObjectTranslatorRegistrar.KNOWN_ATOMIC_CODERS) {
+ dataBuilder.add(InstanceBuilder.ofType(atomicCoder).fromFactoryMethod("of").build());
+ }
+ return dataBuilder
+ .build();
+ }
- assertEquals(coder.getClass(), reconstructed.getClass());
- assertEquals(coder, reconstructed);
- }
+ @Parameter(0) public Coder<?> coder;
- static class Record implements Serializable {}
+ @Test
+ public void toAndFromCloudObject() throws Exception {
+ CloudObject cloudObject = CloudObjects.asCloudObject(coder);
+ Coder<?> reconstructed = Serializer.deserialize(cloudObject, Coder.class);
+ Coder<?> fromCloudObject = CloudObjects.coderFromCloudObject(cloudObject);
- private static class RecordCoder extends CustomCoder<Record> {
+ assertEquals(coder.getClass(), reconstructed.getClass());
+ assertEquals(coder.getClass(), fromCloudObject.getClass());
+ assertEquals(coder, reconstructed);
+ assertEquals(coder, fromCloudObject);
+ }
+ }
+
+ private static class ObjectCoder extends CustomCoder<Object> {
@Override
- public void encode(Record value, OutputStream outStream, Context context)
- throws CoderException, IOException {}
+ public void encode(Object value, OutputStream outStream, Context context)
+ throws CoderException, IOException {
+ }
@Override
- public Record decode(InputStream inStream, Context context)
+ public Object decode(InputStream inStream, Context context)
throws CoderException, IOException {
- return new Record();
+ return new Object();
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/21d7458d/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
index 6a1f8ed..b73fb7f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
@@ -86,16 +86,19 @@ public class LengthPrefixCoder<T> extends StructuredCoder<T> {
return valueCoder.decode(ByteStreams.limit(inStream, size), Context.OUTER);
}
- public Coder<?> getValueCoder() {
- return valueCoder;
- }
-
@Override
public List<? extends Coder<?>> getCoderArguments() {
return ImmutableList.of(valueCoder);
}
/**
+ * Gets the value coder that will be prefixed by the length.
+ */
+ public Coder<T> getValueCoder() {
+ return valueCoder;
+ }
+
+ /**
* {@code LengthPrefixCoder} is deterministic if the nested {@code Coder} is.
*
* {@inheritDoc}