You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/05/01 21:16:41 UTC

[1/2] beam git commit: This closes #2679

Repository: beam
Updated Branches:
  refs/heads/master 3fff52d43 -> 3ae944130


This closes #2679


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3ae94413
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3ae94413
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3ae94413

Branch: refs/heads/master
Commit: 3ae94413000299908af06c712d9238023b099526
Parents: 3fff52d 21d7458
Author: Thomas Groh <tg...@google.com>
Authored: Mon May 1 14:16:28 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon May 1 14:16:28 2017 -0700

----------------------------------------------------------------------
 .../runners/dataflow/util/CloudObjectKinds.java |  33 ++
 .../dataflow/util/CloudObjectTranslator.java    |  13 +-
 .../dataflow/util/CloudObjectTranslators.java   | 375 +++++++++++++++++++
 .../runners/dataflow/util/CloudObjects.java     |  28 +-
 ...aultCoderCloudObjectTranslatorRegistrar.java |  83 +++-
 .../runners/dataflow/util/CloudObjectsTest.java | 119 ++++--
 .../beam/sdk/coders/LengthPrefixCoder.java      |  11 +-
 7 files changed, 614 insertions(+), 48 deletions(-)
----------------------------------------------------------------------



[2/2] beam git commit: Add Cloud Object Translators for Coders

Posted by tg...@apache.org.
Add Cloud Object Translators for Coders

This consists of most of the formerly AtomicCoders, plus some coders
with a known structure and a generic "CustomCoder" translator.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/21d7458d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/21d7458d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/21d7458d

Branch: refs/heads/master
Commit: 21d7458d005c12242c6e63e26dd4fd3a2d495165
Parents: 3fff52d
Author: Thomas Groh <tg...@google.com>
Authored: Wed Apr 26 18:02:30 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon May 1 14:16:28 2017 -0700

----------------------------------------------------------------------
 .../runners/dataflow/util/CloudObjectKinds.java |  33 ++
 .../dataflow/util/CloudObjectTranslator.java    |  13 +-
 .../dataflow/util/CloudObjectTranslators.java   | 375 +++++++++++++++++++
 .../runners/dataflow/util/CloudObjects.java     |  28 +-
 ...aultCoderCloudObjectTranslatorRegistrar.java |  83 +++-
 .../runners/dataflow/util/CloudObjectsTest.java | 119 ++++--
 .../beam/sdk/coders/LengthPrefixCoder.java      |  11 +-
 7 files changed, 614 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/21d7458d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectKinds.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectKinds.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectKinds.java
new file mode 100644
index 0000000..1499f17
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectKinds.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.dataflow.util;
+
+import org.apache.beam.sdk.util.CloudObject;
+
+/**
+ * Known kinds of {@link CloudObject}.
+ */
+class CloudObjectKinds {
+  static final String KIND_GLOBAL_WINDOW = "kind:global_window";
+  static final String KIND_INTERVAL_WINDOW = "kind:interval_window";
+  static final String KIND_LENGTH_PREFIX = "kind:length_prefix";
+  static final String KIND_PAIR = "kind:pair";
+  static final String KIND_STREAM = "kind:stream";
+  static final String KIND_WINDOWED_VALUE = "kind:windowed_value";
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/21d7458d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java
index d0b111f..534370f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java
@@ -21,7 +21,7 @@ package org.apache.beam.runners.dataflow.util;
 import org.apache.beam.sdk.util.CloudObject;
 
 /**
- * An translator that takes an object and creates a {@link CloudObject} which can be converted back
+ * A translator that takes an object and creates a {@link CloudObject} which can be converted back
  * to the original object.
  */
 public interface CloudObjectTranslator<T> {
@@ -34,4 +34,15 @@ public interface CloudObjectTranslator<T> {
    * Converts back into the original object from a provided {@link CloudObject}.
    */
   T fromCloudObject(CloudObject cloudObject);
+
+  /**
+   * Gets the class this {@link CloudObjectTranslator} is capable of converting.
+   */
+  Class<? extends T> getSupportedClass();
+
+  /**
+   * Gets the class name that will represent the {@link CloudObject} created by this {@link
+   * CloudObjectTranslator}.
+   */
+  String cloudObjectClassName();
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/21d7458d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java
new file mode 100644
index 0000000..7a95a9e
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.dataflow.util;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
+import org.apache.beam.sdk.util.CloudObject;
+import org.apache.beam.sdk.util.InstanceBuilder;
+import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.StringUtils;
+import org.apache.beam.sdk.util.Structs;
+import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
+
+/** Utilities for creating {@link CloudObjectTranslator} instances for {@link Coder Coders}. */
+class CloudObjectTranslators {
+  private CloudObjectTranslators() {}
+
+  private static CloudObject addComponents(CloudObject base, List<? extends Coder<?>> components) {
+    if (!components.isEmpty()) {
+      List<CloudObject> cloudComponents = new ArrayList<>(components.size());
+      for (Coder component : components) {
+        cloudComponents.add(CloudObjects.asCloudObject(component));
+      }
+      Structs.addList(base, PropertyNames.COMPONENT_ENCODINGS, cloudComponents);
+    }
+
+    return base;
+  }
+
+  private static List<Coder<?>> getComponents(CloudObject target) {
+    List<Map<String, Object>> cloudComponents =
+        Structs.getListOfMaps(
+            target,
+            PropertyNames.COMPONENT_ENCODINGS,
+            Collections.<Map<String, Object>>emptyList());
+    List<Coder<?>> components = new ArrayList<>();
+    for (Map<String, Object> cloudComponent : cloudComponents) {
+      components.add(CloudObjects.coderFromCloudObject(CloudObject.fromSpec(cloudComponent)));
+    }
+    return components;
+  }
+
+  /**
+   * Returns a {@link CloudObjectTranslator} that produces a {@link CloudObject} that is of kind
+   * "pair".
+   */
+  public static CloudObjectTranslator<KvCoder> pair() {
+    return new CloudObjectTranslator<KvCoder>() {
+      @Override
+      public CloudObject toCloudObject(KvCoder target) {
+        CloudObject result = CloudObject.forClassName(CloudObjectKinds.KIND_PAIR);
+        Structs.addBoolean(result, PropertyNames.IS_PAIR_LIKE, true);
+        return addComponents(
+            result, ImmutableList.<Coder<?>>of(target.getKeyCoder(), target.getValueCoder()));
+      }
+
+      @Override
+      public KvCoder fromCloudObject(CloudObject object) {
+        return KvCoder.of(getComponents(object));
+      }
+
+      @Override
+      public Class<KvCoder> getSupportedClass() {
+        return KvCoder.class;
+      }
+
+      @Override
+      public String cloudObjectClassName() {
+        return CloudObjectKinds.KIND_PAIR;
+      }
+    };
+  }
+
+  /**
+   * Returns a {@link CloudObjectTranslator} that produces a {@link CloudObject} that is of kind
+   * "stream".
+   */
+  public static CloudObjectTranslator<IterableCoder> stream() {
+    return new CloudObjectTranslator<IterableCoder>() {
+      @Override
+      public CloudObject toCloudObject(IterableCoder target) {
+        CloudObject result = CloudObject.forClassName(CloudObjectKinds.KIND_STREAM);
+        Structs.addBoolean(result, PropertyNames.IS_STREAM_LIKE, true);
+        return addComponents(
+            result, Collections.<Coder<?>>singletonList(target.getElemCoder()));
+      }
+
+      @Override
+      public IterableCoder fromCloudObject(CloudObject object) {
+        return IterableCoder.of(getComponents(object));
+      }
+
+      @Override
+      public Class<? extends IterableCoder> getSupportedClass() {
+        return IterableCoder.class;
+      }
+
+      @Override
+      public String cloudObjectClassName() {
+        return CloudObjectKinds.KIND_STREAM;
+      }
+    };
+  }
+
+  /**
+   * Returns a {@link CloudObjectTranslator} that produces a {@link CloudObject} that is of kind
+   * "length_prefix".
+   */
+  static CloudObjectTranslator<LengthPrefixCoder> lengthPrefix() {
+    return new CloudObjectTranslator<LengthPrefixCoder>() {
+      @Override
+      public CloudObject toCloudObject(LengthPrefixCoder target) {
+        return addComponents(
+            CloudObject.forClassName(CloudObjectKinds.KIND_LENGTH_PREFIX),
+            Collections.<Coder<?>>singletonList(target.getValueCoder()));
+      }
+
+      @Override
+      public LengthPrefixCoder fromCloudObject(CloudObject object) {
+        return LengthPrefixCoder.of(getComponents(object));
+      }
+
+      @Override
+      public Class<? extends LengthPrefixCoder> getSupportedClass() {
+        return LengthPrefixCoder.class;
+      }
+
+      @Override
+      public String cloudObjectClassName() {
+        return CloudObjectKinds.KIND_LENGTH_PREFIX;
+      }
+    };
+  }
+
+  /**
+   * Returns a {@link CloudObjectTranslator} that produces a {@link CloudObject} that is of kind
+   * "global_window".
+   */
+  static CloudObjectTranslator<GlobalWindow.Coder> globalWindow() {
+    return new CloudObjectTranslator<GlobalWindow.Coder>() {
+      @Override
+      public CloudObject toCloudObject(GlobalWindow.Coder target) {
+        return addComponents(
+            CloudObject.forClassName(CloudObjectKinds.KIND_GLOBAL_WINDOW),
+            Collections.<Coder<?>>emptyList());
+      }
+
+      @Override
+      public GlobalWindow.Coder fromCloudObject(CloudObject object) {
+        return GlobalWindow.Coder.INSTANCE;
+      }
+
+      @Override
+      public Class<? extends GlobalWindow.Coder> getSupportedClass() {
+        return GlobalWindow.Coder.class;
+      }
+
+      @Override
+      public String cloudObjectClassName() {
+        return CloudObjectKinds.KIND_GLOBAL_WINDOW;
+      }
+    };
+  }
+
+  /**
+   * Returns a {@link CloudObjectTranslator} that produces a {@link CloudObject} that is of kind
+   * "interval_window".
+   */
+  static CloudObjectTranslator<IntervalWindowCoder> intervalWindow() {
+    return new CloudObjectTranslator<IntervalWindowCoder>() {
+      @Override
+      public CloudObject toCloudObject(IntervalWindowCoder target) {
+        return addComponents(
+            CloudObject.forClassName(CloudObjectKinds.KIND_INTERVAL_WINDOW),
+            Collections.<Coder<?>>emptyList());
+      }
+
+      @Override
+      public IntervalWindowCoder fromCloudObject(CloudObject object) {
+        return IntervalWindowCoder.of();
+      }
+
+      @Override
+      public Class<? extends IntervalWindowCoder> getSupportedClass() {
+        return IntervalWindowCoder.class;
+      }
+
+      @Override
+      public String cloudObjectClassName() {
+        return CloudObjectKinds.KIND_INTERVAL_WINDOW;
+      }
+    };
+  }
+
+  /**
+   * Returns a {@link CloudObjectTranslator} that produces a {@link CloudObject} that is of kind
+   * "windowed_value".
+   */
+  static CloudObjectTranslator<FullWindowedValueCoder> windowedValue() {
+    return new CloudObjectTranslator<FullWindowedValueCoder>() {
+      @Override
+      public CloudObject toCloudObject(FullWindowedValueCoder target) {
+        CloudObject result = CloudObject.forClassName(CloudObjectKinds.KIND_WINDOWED_VALUE);
+        Structs.addBoolean(result, PropertyNames.IS_WRAPPER, true);
+        return addComponents(
+            result, ImmutableList.<Coder<?>>of(target.getValueCoder(), target.getWindowCoder()));
+      }
+
+      @Override
+      public FullWindowedValueCoder fromCloudObject(CloudObject object) {
+        return FullWindowedValueCoder.of(getComponents(object));
+      }
+
+      @Override
+      public Class<? extends FullWindowedValueCoder> getSupportedClass() {
+        return FullWindowedValueCoder.class;
+      }
+
+      @Override
+      public String cloudObjectClassName() {
+        return CloudObjectKinds.KIND_WINDOWED_VALUE;
+      }
+    };
+  }
+
+  /**
+   * Returns a {@link CloudObjectTranslator} that produces a {@link CloudObject} that is of kind
+   * "bytes".
+   */
+  static CloudObjectTranslator<ByteArrayCoder> bytes() {
+    return new CloudObjectTranslator<ByteArrayCoder>() {
+      @Override
+      public CloudObject toCloudObject(ByteArrayCoder target) {
+        return addComponents(
+            CloudObject.forClass(target.getClass()), Collections.<Coder<?>>emptyList());
+      }
+
+      @Override
+      public ByteArrayCoder fromCloudObject(CloudObject object) {
+        return ByteArrayCoder.of();
+      }
+
+      @Override
+      public Class<? extends ByteArrayCoder> getSupportedClass() {
+        return ByteArrayCoder.class;
+      }
+
+      @Override
+      public String cloudObjectClassName() {
+        return CloudObject.forClass(ByteArrayCoder.class).getClassName();
+      }
+
+    };
+  }
+
+  /**
+   * Returns a {@link CloudObjectTranslator} that produces a {@link CloudObject} that is of kind
+   * "varint".
+   */
+  static CloudObjectTranslator<VarLongCoder> varInt() {
+    return new CloudObjectTranslator<VarLongCoder>() {
+      @Override
+      public CloudObject toCloudObject(VarLongCoder target) {
+        return addComponents(
+            CloudObject.forClass(target.getClass()), Collections.<Coder<?>>emptyList());
+      }
+
+      @Override
+      public VarLongCoder fromCloudObject(CloudObject object) {
+        return VarLongCoder.of();
+      }
+
+      @Override
+      public Class<? extends VarLongCoder> getSupportedClass() {
+        return VarLongCoder.class;
+      }
+
+      @Override
+      public String cloudObjectClassName() {
+        return CloudObject.forClass(VarLongCoder.class).getClassName();
+      }
+    };
+  }
+
+  private static final String CODER_FIELD = "serialized_coder";
+  private static final String TYPE_FIELD = "type";
+  public static CloudObjectTranslator<? extends CustomCoder> custom() {
+    return new CloudObjectTranslator<CustomCoder>() {
+      @Override
+      public CloudObject toCloudObject(CustomCoder target) {
+        CloudObject cloudObject = CloudObject.forClass(CustomCoder.class);
+        Structs.addString(cloudObject, TYPE_FIELD, target.getClass().getName());
+        Structs.addString(
+            cloudObject,
+            CODER_FIELD,
+            StringUtils.byteArrayToJsonString(SerializableUtils.serializeToByteArray(target)));
+        return cloudObject;
+      }
+
+      @Override
+      public CustomCoder fromCloudObject(CloudObject cloudObject) {
+        String serializedCoder = Structs.getString(cloudObject, CODER_FIELD);
+        String type = Structs.getString(cloudObject, TYPE_FIELD);
+        return (CustomCoder<?>)
+            SerializableUtils.deserializeFromByteArray(
+                StringUtils.jsonStringToByteArray(serializedCoder), type);
+      }
+
+      @Override
+      public Class<? extends CustomCoder> getSupportedClass() {
+        return CustomCoder.class;
+      }
+
+      @Override
+      public String cloudObjectClassName() {
+        return CloudObject.forClass(CustomCoder.class).getClassName();
+      }
+    };
+  }
+
+  public static <T extends Coder> CloudObjectTranslator<T> atomic(final Class<T> coderClass) {
+    // Make sure that the instance will be instantiable from the class.
+    InstanceBuilder.ofType(coderClass).fromFactoryMethod("of").build();
+    return new CloudObjectTranslator<T>() {
+      @Override
+      public CloudObject toCloudObject(T target) {
+        return CloudObject.forClass(coderClass);
+      }
+
+      @Override
+      public T fromCloudObject(CloudObject cloudObject) {
+        return InstanceBuilder.ofType(coderClass).fromFactoryMethod("of").build();
+      }
+
+      @Override
+      public Class<? extends T> getSupportedClass() {
+        return coderClass;
+      }
+
+      @Override
+      public String cloudObjectClassName() {
+        return CloudObject.forClass(coderClass).getClassName();
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/21d7458d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
index bc0cc75..a55d10c 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
@@ -19,6 +19,7 @@
 package org.apache.beam.runners.dataflow.util;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.collect.ImmutableMap;
 import java.util.Map;
@@ -26,9 +27,6 @@ import java.util.ServiceLoader;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.util.CloudObject;
-import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.util.StringUtils;
-import org.apache.beam.sdk.util.Structs;
 
 /** Utilities for converting an object to a {@link CloudObject}. */
 public class CloudObjects {
@@ -43,8 +41,8 @@ public class CloudObjects {
       populateCoderTranslators() {
     ImmutableMap.Builder<Class<? extends Coder>, CloudObjectTranslator<? extends Coder>> builder =
         ImmutableMap.builder();
-    for (CoderCloudObjectTranslatorRegistrar coderRegistrar :
-        ServiceLoader.load(CoderCloudObjectTranslatorRegistrar.class)) {
+    for (CoderCloudObjectTranslatorRegistrar coderRegistrar : ServiceLoader.load(
+        CoderCloudObjectTranslatorRegistrar.class)) {
       builder.putAll(coderRegistrar.classesToTranslators());
     }
     return builder.build();
@@ -70,24 +68,20 @@ public class CloudObjects {
     if (translator != null) {
       return translator.toCloudObject(coder);
     } else if (coder instanceof CustomCoder) {
-      return customCoderAsCloudObject((CustomCoder<?>) coder);
+      CloudObjectTranslator customCoderTranslator = CODER_TRANSLATORS.get(CustomCoder.class);
+      checkNotNull(
+          customCoderTranslator,
+          "No %s registered for %s, but it is in the %s",
+          CloudObjectTranslator.class.getSimpleName(),
+          CustomCoder.class.getSimpleName(),
+          DefaultCoderCloudObjectTranslatorRegistrar.class.getSimpleName());
+      return customCoderTranslator.toCloudObject(coder);
     }
     throw new IllegalArgumentException(
         String.format(
             "Non-Custom %s with no registered %s", Coder.class, CloudObjectTranslator.class));
   }
 
-  private static CloudObject customCoderAsCloudObject(CustomCoder<?> coder) {
-    CloudObject result = CloudObject.forClass(CustomCoder.class);
-    Structs.addString(result, "type", coder.getClass().getName());
-    Structs.addString(
-        result,
-        "serialized_coder",
-        StringUtils.byteArrayToJsonString(SerializableUtils.serializeToByteArray(coder)));
-
-    return result;
-  }
-
   public static Coder<?> coderFromCloudObject(CloudObject cloudObject) {
     CloudObjectTranslator<? extends Coder> translator =
         CLOUD_OBJECT_CLASS_NAME_TRANSLATORS.get(cloudObject.getClassName());

http://git-wip-us.apache.org/repos/asf/beam/blob/21d7458d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
index 3d7b534..3b9fa95 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
@@ -19,9 +19,34 @@
 package org.apache.beam.runners.dataflow.util;
 
 import com.google.auto.service.AutoService;
-import java.util.Collections;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
+import com.google.common.collect.ImmutableSet;
+import java.util.List;
 import java.util.Map;
+import org.apache.beam.runners.dataflow.internal.IsmFormat.FooterCoder;
+import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmShardCoder;
+import org.apache.beam.runners.dataflow.internal.IsmFormat.KeyPrefixCoder;
+import org.apache.beam.runners.dataflow.util.RandomAccessData.RandomAccessDataCoder;
+import org.apache.beam.sdk.coders.BigDecimalCoder;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.BigIntegerCoder;
+import org.apache.beam.sdk.coders.BitSetCoder;
+import org.apache.beam.sdk.coders.ByteCoder;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.DoubleCoder;
+import org.apache.beam.sdk.coders.DurationCoder;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.TextualIntegerCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.FileBasedSink.FileResultCoder;
+import org.apache.beam.sdk.io.gcp.bigquery.TableDestinationCoder;
+import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
 
 /**
  * The {@link CoderCloudObjectTranslatorRegistrar} containing the default collection of
@@ -30,15 +55,63 @@ import org.apache.beam.sdk.coders.Coder;
 @AutoService(CoderCloudObjectTranslatorRegistrar.class)
 public class DefaultCoderCloudObjectTranslatorRegistrar
     implements CoderCloudObjectTranslatorRegistrar {
+  private static final List<CloudObjectTranslator<? extends Coder>> DEFAULT_TRANSLATORS =
+      ImmutableList.<CloudObjectTranslator<? extends Coder>>of(
+          CloudObjectTranslators.globalWindow(),
+          CloudObjectTranslators.intervalWindow(),
+          CloudObjectTranslators.bytes(),
+          CloudObjectTranslators.varInt(),
+          CloudObjectTranslators.lengthPrefix(),
+          CloudObjectTranslators.stream(),
+          CloudObjectTranslators.pair(),
+          CloudObjectTranslators.windowedValue(),
+          CloudObjectTranslators.custom());
+  @VisibleForTesting
+  static final ImmutableSet<Class<? extends Coder>> KNOWN_ATOMIC_CODERS =
+      ImmutableSet.<Class<? extends Coder>>of(
+          BigDecimalCoder.class,
+          BigEndianIntegerCoder.class,
+          BigEndianLongCoder.class,
+          BigIntegerCoder.class,
+          BitSetCoder.class,
+          ByteCoder.class,
+          DoubleCoder.class,
+          DurationCoder.class,
+          FileResultCoder.class,
+          FooterCoder.class,
+          InstantCoder.class,
+          IsmShardCoder.class,
+          KeyPrefixCoder.class,
+          RandomAccessDataCoder.class,
+          StringUtf8Coder.class,
+          TableDestinationCoder.class,
+          TableRowJsonCoder.class,
+          TextualIntegerCoder.class,
+          VarIntCoder.class,
+          VoidCoder.class);
+
+  @Override
   public Map<String, CloudObjectTranslator<? extends Coder>> classNamesToTranslators() {
-    // TODO: Add translators
-    return Collections.emptyMap();
+    ImmutableMap.Builder<String, CloudObjectTranslator<? extends Coder>> nameToTranslators =
+        ImmutableMap.builder();
+    for (CloudObjectTranslator<? extends Coder> translator : classesToTranslators().values()) {
+      nameToTranslators.put(translator.cloudObjectClassName(), translator);
+    }
+    return nameToTranslators.build();
   }
 
   @Override
   public Map<Class<? extends Coder>, CloudObjectTranslator<? extends Coder>>
       classesToTranslators() {
-    // TODO: Add translato
-    return Collections.emptyMap();
+    Builder<Class<? extends Coder>, CloudObjectTranslator<? extends Coder>> builder =
+        ImmutableMap.<Class<? extends Coder>, CloudObjectTranslator<? extends Coder>>builder();
+    for (CloudObjectTranslator<? extends Coder> defaultTranslator : DEFAULT_TRANSLATORS) {
+      builder.put(defaultTranslator.getSupportedClass(), defaultTranslator);
+    }
+    for (Class<? extends Coder> atomicCoder : KNOWN_ATOMIC_CODERS) {
+      builder.put(atomicCoder, CloudObjectTranslators.atomic(atomicCoder));
+    }
+    return builder
+        .build();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/21d7458d/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
index 7562322..fdea285 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
@@ -18,19 +18,36 @@
 
 package org.apache.beam.runners.dataflow.util;
 
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableList.Builder;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.util.CloudObject;
+import org.apache.beam.sdk.util.InstanceBuilder;
+import org.apache.beam.sdk.util.Serializer;
+import org.apache.beam.sdk.util.WindowedValue;
 import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
 import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
 import org.junit.runners.Parameterized.Parameters;
@@ -38,37 +55,97 @@ import org.junit.runners.Parameterized.Parameters;
 /**
  * Tests for {@link CloudObjects}.
  */
-@RunWith(Parameterized.class)
+@RunWith(Enclosed.class)
 public class CloudObjectsTest {
-  @Parameters(name = "{index}: {0}")
-  public static Iterable<Coder<?>> data() {
-    // TODO: Implement when translators are registered with the CoderCloudObjectTranslatorRegsitrar
-    return ImmutableList.<Coder<?>>builder().build();
+  /**
+   * Tests that all of the Default Coders are tested.
+   */
+  @RunWith(JUnit4.class)
+  public static class DefaultsPresentTest {
+    @Test
+    public void defaultCodersAllTested() {
+      Set<Class<? extends Coder>> defaultCoderTranslators =
+          new DefaultCoderCloudObjectTranslatorRegistrar().classesToTranslators().keySet();
+      Set<Class<? extends Coder>> testedClasses = new HashSet<>();
+      for (Coder<?> tested : DefaultCoders.data()) {
+        if (tested instanceof ObjectCoder) {
+          testedClasses.add(CustomCoder.class);
+          assertThat(defaultCoderTranslators, hasItem(CustomCoder.class));
+        } else {
+          testedClasses.add(tested.getClass());
+          assertThat(defaultCoderTranslators, hasItem(tested.getClass()));
+        }
+      }
+      Set<Class<? extends Coder>> missing = new HashSet<>();
+      missing.addAll(defaultCoderTranslators);
+      missing.removeAll(testedClasses);
+      assertThat(missing, emptyIterable());
+    }
+
+    @Test
+    public void defaultCodersIncludesCustomCoder() {
+      Set<Class<? extends Coder>> defaultCoders =
+          new DefaultCoderCloudObjectTranslatorRegistrar().classesToTranslators().keySet();
+      assertThat(defaultCoders, hasItem(CustomCoder.class));
+    }
   }
 
-  @Parameter(0)
-  public Coder<?> coder;
 
-  @Test
-  public void toAndFromCloudObject() throws Exception {
-    CloudObject cloudObject = CloudObjects.asCloudObject(coder);
-    Coder<?> reconstructed = CloudObjects.coderFromCloudObject(cloudObject);
+  /**
+   * Tests that all of the registered coders in {@link DefaultCoderCloudObjectTranslatorRegistrar}
+   * can be serialized and deserialized with {@link CloudObjects}.
+   */
+  @RunWith(Parameterized.class)
+  public static class DefaultCoders {
+    @Parameters(name = "{index}: {0}")
+    public static Iterable<Coder<?>> data() {
+      Builder<Coder<?>> dataBuilder =
+          ImmutableList.<Coder<?>>builder()
+              .add(new ObjectCoder())
+              .add(GlobalWindow.Coder.INSTANCE)
+              .add(IntervalWindow.getCoder())
+              .add(LengthPrefixCoder.of(VarLongCoder.of()))
+              .add(IterableCoder.of(VarLongCoder.of()))
+              .add(KvCoder.of(VarLongCoder.of(), ByteArrayCoder.of()))
+              .add(
+                  WindowedValue.getFullCoder(
+                      KvCoder.of(VarLongCoder.of(), ByteArrayCoder.of()),
+                      IntervalWindow.getCoder()))
+              .add(VarLongCoder.of())
+              .add(ByteArrayCoder.of());
+      for (Class<? extends Coder> atomicCoder :
+          DefaultCoderCloudObjectTranslatorRegistrar.KNOWN_ATOMIC_CODERS) {
+        dataBuilder.add(InstanceBuilder.ofType(atomicCoder).fromFactoryMethod("of").build());
+      }
+      return dataBuilder
+          .build();
+    }
 
-    assertEquals(coder.getClass(), reconstructed.getClass());
-    assertEquals(coder, reconstructed);
-  }
+    @Parameter(0) public Coder<?> coder;
 
-  static class Record implements Serializable {}
+    @Test
+    public void toAndFromCloudObject() throws Exception {
+      CloudObject cloudObject = CloudObjects.asCloudObject(coder);
+      Coder<?> reconstructed = Serializer.deserialize(cloudObject, Coder.class);
+      Coder<?> fromCloudObject = CloudObjects.coderFromCloudObject(cloudObject);
 
-  private static class RecordCoder extends CustomCoder<Record> {
+      assertEquals(coder.getClass(), reconstructed.getClass());
+      assertEquals(coder.getClass(), fromCloudObject.getClass());
+      assertEquals(coder, reconstructed);
+      assertEquals(coder, fromCloudObject);
+    }
+  }
+
+  private static class ObjectCoder extends CustomCoder<Object> {
     @Override
-    public void encode(Record value, OutputStream outStream, Context context)
-        throws CoderException, IOException {}
+    public void encode(Object value, OutputStream outStream, Context context)
+        throws CoderException, IOException {
+    }
 
     @Override
-    public Record decode(InputStream inStream, Context context)
+    public Object decode(InputStream inStream, Context context)
         throws CoderException, IOException {
-      return new Record();
+      return new Object();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/21d7458d/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
index 6a1f8ed..b73fb7f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
@@ -86,16 +86,19 @@ public class LengthPrefixCoder<T> extends StructuredCoder<T> {
     return valueCoder.decode(ByteStreams.limit(inStream, size), Context.OUTER);
   }
 
-  public Coder<?> getValueCoder() {
-    return valueCoder;
-  }
-
   @Override
   public List<? extends Coder<?>> getCoderArguments() {
     return ImmutableList.of(valueCoder);
   }
 
   /**
+   * Gets the value coder that will be prefixed by the length.
+   */
+  public Coder<T> getValueCoder() {
+    return valueCoder;
+  }
+
+  /**
    * {@code LengthPrefixCoder} is deterministic if the nested {@code Coder} is.
    *
    * {@inheritDoc}