You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2018/11/15 03:25:22 UTC

[beam] branch master updated: [BEAM-6067] Specify pipeline_coder_id property in non-Beam-standard CloudObject coders. (#7045)

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ab5b0ae  [BEAM-6067] Specify pipeline_coder_id property in non-Beam-standard CloudObject coders. (#7045)
ab5b0ae is described below

commit ab5b0aea03bc29779654758d6acd4f8e8b6d86ca
Author: CraigChambersG <45...@users.noreply.github.com>
AuthorDate: Wed Nov 14 19:25:16 2018 -0800

    [BEAM-6067] Specify pipeline_coder_id property in non-Beam-standard CloudObject coders. (#7045)
---
 .../core/construction/ModelCoderRegistrar.java     |  4 ++
 .../dataflow/DataflowPipelineTranslator.java       | 10 ++-
 .../util/AvroCoderCloudObjectTranslator.java       |  3 +-
 .../dataflow/util/CloudObjectTranslator.java       |  4 +-
 .../dataflow/util/CloudObjectTranslators.java      | 80 ++++++++++++++--------
 .../beam/runners/dataflow/util/CloudObjects.java   | 19 ++++-
 .../beam/runners/dataflow/util/PropertyNames.java  |  1 +
 .../SerializableCoderCloudObjectTranslator.java    |  3 +-
 .../runners/dataflow/util/CloudObjectsTest.java    | 38 +++++++++-
 ...HarnessCoderCloudObjectTranslatorRegistrar.java |  5 +-
 .../worker/graph/LengthPrefixUnknownCoders.java    |  5 +-
 .../dataflow/worker/util/TimerOrElement.java       |  3 +-
 .../dataflow/worker/AvroByteReaderFactoryTest.java |  6 +-
 .../dataflow/worker/ConcatReaderFactoryTest.java   |  3 +-
 .../dataflow/worker/InMemoryReaderFactoryTest.java |  2 +-
 .../IntrinsicMapTaskExecutorFactoryTest.java       | 11 +--
 .../dataflow/worker/IsmSideInputReaderTest.java    |  5 +-
 .../runners/dataflow/worker/ReaderFactoryTest.java |  5 +-
 .../dataflow/worker/ShuffleReaderFactoryTest.java  |  8 ++-
 .../runners/dataflow/worker/SinkRegistryTest.java  |  2 +-
 .../worker/StreamingDataflowWorkerTest.java        | 46 +++++++++----
 ...eamingPCollectionViewWriterDoFnFactoryTest.java |  3 +-
 .../graph/LengthPrefixUnknownCodersTest.java       | 64 ++++++++++-------
 .../dataflow/worker/util/CloudSourceUtilsTest.java |  2 +-
 .../dataflow/worker/util/TimerOrElementTest.java   |  3 +-
 25 files changed, 234 insertions(+), 101 deletions(-)

diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java
index e07a478..cbb22af 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java
@@ -86,6 +86,10 @@ public class ModelCoderRegistrar implements CoderTranslatorRegistrar {
         Coder.class.getSimpleName());
   }
 
+  public static boolean isKnownCoder(Coder<?> coder) {
+    return BEAM_MODEL_CODER_URNS.containsKey(coder.getClass());
+  }
+
   @Override
   public Map<Class<? extends Coder>, String> getCoderURNs() {
     return BEAM_MODEL_CODER_URNS;
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index f5190df..a99bd30 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -568,7 +568,7 @@ public class DataflowPipelineTranslator {
 
     @Override
     public void addEncodingInput(Coder<?> coder) {
-      CloudObject encoding = CloudObjects.asCloudObject(coder);
+      CloudObject encoding = translateCoder(coder, translator);
       addObject(getProperties(), PropertyNames.ENCODING, encoding);
     }
 
@@ -668,7 +668,7 @@ public class DataflowPipelineTranslator {
       if (valueCoder != null) {
         // Verify that encoding can be decoded, in order to catch serialization
         // failures as early as possible.
-        CloudObject encoding = CloudObjects.asCloudObject(valueCoder);
+        CloudObject encoding = translateCoder(valueCoder, translator);
         addObject(outputInfo, PropertyNames.ENCODING, encoding);
         translator.outputCoders.put(value, valueCoder);
       }
@@ -1016,7 +1016,7 @@ public class DataflowPipelineTranslator {
 
             stepContext.addInput(
                 PropertyNames.RESTRICTION_CODER,
-                CloudObjects.asCloudObject(transform.getRestrictionCoder()));
+                translateCoder(transform.getRestrictionCoder(), context));
           }
         });
   }
@@ -1098,4 +1098,8 @@ public class DataflowPipelineTranslator {
       stepContext.addOutput(tag.getId(), (PCollection<?>) taggedOutput.getValue());
     }
   }
+
+  private static CloudObject translateCoder(Coder<?> coder, TranslationContext context) {
+    return CloudObjects.asCloudObject(coder, context.isFnApi() ? context.getSdkComponents() : null);
+  }
 }
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java
index 9a4047d..a4ce533 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.dataflow.util;
 
 import org.apache.avro.Schema;
+import org.apache.beam.runners.core.construction.SdkComponents;
 import org.apache.beam.sdk.coders.AvroCoder;
 
 /** A {@link CloudObjectTranslator} for {@link AvroCoder}. */
@@ -26,7 +27,7 @@ class AvroCoderCloudObjectTranslator implements CloudObjectTranslator<AvroCoder>
   private static final String SCHEMA_FIELD = "schema";
 
   @Override
-  public CloudObject toCloudObject(AvroCoder target) {
+  public CloudObject toCloudObject(AvroCoder target, SdkComponents sdkComponents) {
     CloudObject base = CloudObject.forClass(AvroCoder.class);
     Structs.addString(base, SCHEMA_FIELD, target.getSchema().toString());
     Structs.addString(base, TYPE_FIELD, target.getType().getName());
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java
index aa4af84..3638eaf 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java
@@ -17,13 +17,15 @@
  */
 package org.apache.beam.runners.dataflow.util;
 
+import org.apache.beam.runners.core.construction.SdkComponents;
+
 /**
  * A translator that takes an object and creates a {@link CloudObject} which can be converted back
  * to the original object.
  */
 public interface CloudObjectTranslator<T> {
   /** Converts the provided object into an equivalent {@link CloudObject}. */
-  CloudObject toCloudObject(T target);
+  CloudObject toCloudObject(T target, SdkComponents sdkComponents);
 
   /** Converts back into the original object from a provided {@link CloudObject}. */
   T fromCloudObject(CloudObject cloudObject);
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java
index 1d1966d..b6f7388 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.runners.core.construction.SdkComponents;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CustomCoder;
@@ -50,11 +51,12 @@ import org.apache.beam.sdk.values.TupleTag;
 class CloudObjectTranslators {
   private CloudObjectTranslators() {}
 
-  private static CloudObject addComponents(CloudObject base, List<? extends Coder<?>> components) {
+  private static CloudObject addComponents(
+      CloudObject base, List<? extends Coder<?>> components, SdkComponents sdkComponents) {
     if (!components.isEmpty()) {
       List<CloudObject> cloudComponents = new ArrayList<>(components.size());
       for (Coder component : components) {
-        cloudComponents.add(CloudObjects.asCloudObject(component));
+        cloudComponents.add(CloudObjects.asCloudObject(component, sdkComponents));
       }
       Structs.addList(base, PropertyNames.COMPONENT_ENCODINGS, cloudComponents);
     }
@@ -79,11 +81,13 @@ class CloudObjectTranslators {
   public static CloudObjectTranslator<KvCoder> pair() {
     return new CloudObjectTranslator<KvCoder>() {
       @Override
-      public CloudObject toCloudObject(KvCoder target) {
+      public CloudObject toCloudObject(KvCoder target, SdkComponents sdkComponents) {
         CloudObject result = CloudObject.forClassName(CloudObjectKinds.KIND_PAIR);
         Structs.addBoolean(result, PropertyNames.IS_PAIR_LIKE, true);
         return addComponents(
-            result, ImmutableList.<Coder<?>>of(target.getKeyCoder(), target.getValueCoder()));
+            result,
+            ImmutableList.<Coder<?>>of(target.getKeyCoder(), target.getValueCoder()),
+            sdkComponents);
       }
 
       @Override
@@ -112,10 +116,11 @@ class CloudObjectTranslators {
   public static CloudObjectTranslator<IterableCoder> stream() {
     return new CloudObjectTranslator<IterableCoder>() {
       @Override
-      public CloudObject toCloudObject(IterableCoder target) {
+      public CloudObject toCloudObject(IterableCoder target, SdkComponents sdkComponents) {
         CloudObject result = CloudObject.forClassName(CloudObjectKinds.KIND_STREAM);
         Structs.addBoolean(result, PropertyNames.IS_STREAM_LIKE, true);
-        return addComponents(result, Collections.<Coder<?>>singletonList(target.getElemCoder()));
+        return addComponents(
+            result, Collections.<Coder<?>>singletonList(target.getElemCoder()), sdkComponents);
       }
 
       @Override
@@ -144,10 +149,11 @@ class CloudObjectTranslators {
   static CloudObjectTranslator<LengthPrefixCoder> lengthPrefix() {
     return new CloudObjectTranslator<LengthPrefixCoder>() {
       @Override
-      public CloudObject toCloudObject(LengthPrefixCoder target) {
+      public CloudObject toCloudObject(LengthPrefixCoder target, SdkComponents sdkComponents) {
         return addComponents(
             CloudObject.forClassName(CloudObjectKinds.KIND_LENGTH_PREFIX),
-            Collections.<Coder<?>>singletonList(target.getValueCoder()));
+            Collections.<Coder<?>>singletonList(target.getValueCoder()),
+            sdkComponents);
       }
 
       @Override
@@ -176,9 +182,11 @@ class CloudObjectTranslators {
   static CloudObjectTranslator<GlobalWindow.Coder> globalWindow() {
     return new CloudObjectTranslator<GlobalWindow.Coder>() {
       @Override
-      public CloudObject toCloudObject(GlobalWindow.Coder target) {
+      public CloudObject toCloudObject(GlobalWindow.Coder target, SdkComponents sdkComponents) {
         return addComponents(
-            CloudObject.forClassName(CloudObjectKinds.KIND_GLOBAL_WINDOW), Collections.emptyList());
+            CloudObject.forClassName(CloudObjectKinds.KIND_GLOBAL_WINDOW),
+            Collections.emptyList(),
+            sdkComponents);
       }
 
       @Override
@@ -205,10 +213,11 @@ class CloudObjectTranslators {
   static CloudObjectTranslator<IntervalWindowCoder> intervalWindow() {
     return new CloudObjectTranslator<IntervalWindowCoder>() {
       @Override
-      public CloudObject toCloudObject(IntervalWindowCoder target) {
+      public CloudObject toCloudObject(IntervalWindowCoder target, SdkComponents sdkComponents) {
         return addComponents(
             CloudObject.forClassName(CloudObjectKinds.KIND_INTERVAL_WINDOW),
-            Collections.emptyList());
+            Collections.emptyList(),
+            sdkComponents);
       }
 
       @Override
@@ -235,11 +244,13 @@ class CloudObjectTranslators {
   static CloudObjectTranslator<FullWindowedValueCoder> windowedValue() {
     return new CloudObjectTranslator<FullWindowedValueCoder>() {
       @Override
-      public CloudObject toCloudObject(FullWindowedValueCoder target) {
+      public CloudObject toCloudObject(FullWindowedValueCoder target, SdkComponents sdkComponents) {
         CloudObject result = CloudObject.forClassName(CloudObjectKinds.KIND_WINDOWED_VALUE);
         Structs.addBoolean(result, PropertyNames.IS_WRAPPER, true);
         return addComponents(
-            result, ImmutableList.<Coder<?>>of(target.getValueCoder(), target.getWindowCoder()));
+            result,
+            ImmutableList.<Coder<?>>of(target.getValueCoder(), target.getWindowCoder()),
+            sdkComponents);
       }
 
       @Override
@@ -270,9 +281,11 @@ class CloudObjectTranslators {
   static CloudObjectTranslator<ByteArrayCoder> bytes() {
     return new CloudObjectTranslator<ByteArrayCoder>() {
       @Override
-      public CloudObject toCloudObject(ByteArrayCoder target) {
+      public CloudObject toCloudObject(ByteArrayCoder target, SdkComponents sdkComponents) {
         return addComponents(
-            CloudObject.forClassName(CloudObjectKinds.KIND_BYTES), Collections.emptyList());
+            CloudObject.forClassName(CloudObjectKinds.KIND_BYTES),
+            Collections.emptyList(),
+            sdkComponents);
       }
 
       @Override
@@ -299,8 +312,9 @@ class CloudObjectTranslators {
   static CloudObjectTranslator<VarLongCoder> varInt() {
     return new CloudObjectTranslator<VarLongCoder>() {
       @Override
-      public CloudObject toCloudObject(VarLongCoder target) {
-        return addComponents(CloudObject.forClass(target.getClass()), Collections.emptyList());
+      public CloudObject toCloudObject(VarLongCoder target, SdkComponents sdkComponents) {
+        return addComponents(
+            CloudObject.forClass(target.getClass()), Collections.emptyList(), sdkComponents);
       }
 
       @Override
@@ -326,7 +340,7 @@ class CloudObjectTranslators {
   public static CloudObjectTranslator<Coder> javaSerialized() {
     return new CloudObjectTranslator<Coder>() {
       @Override
-      public CloudObject toCloudObject(Coder target) {
+      public CloudObject toCloudObject(Coder target, SdkComponents sdkComponents) {
         // CustomCoder is used as the "marker" for a java-serialized coder
         CloudObject cloudObject = CloudObject.forClass(CustomCoder.class);
         Structs.addString(cloudObject, TYPE_FIELD, target.getClass().getName());
@@ -363,7 +377,7 @@ class CloudObjectTranslators {
     InstanceBuilder.ofType(coderClass).fromFactoryMethod("of").build();
     return new CloudObjectTranslator<T>() {
       @Override
-      public CloudObject toCloudObject(T target) {
+      public CloudObject toCloudObject(T target, SdkComponents sdkComponents) {
         return CloudObject.forClass(coderClass);
       }
 
@@ -388,9 +402,10 @@ class CloudObjectTranslators {
       final Class<? extends IterableLikeCoder> clazz) {
     return new CloudObjectTranslator<IterableLikeCoder>() {
       @Override
-      public CloudObject toCloudObject(IterableLikeCoder target) {
+      public CloudObject toCloudObject(IterableLikeCoder target, SdkComponents sdkComponents) {
         CloudObject base = CloudObject.forClass(clazz);
-        return addComponents(base, Collections.<Coder<?>>singletonList(target.getElemCoder()));
+        return addComponents(
+            base, Collections.<Coder<?>>singletonList(target.getElemCoder()), sdkComponents);
       }
 
       @Override
@@ -422,10 +437,12 @@ class CloudObjectTranslators {
   public static CloudObjectTranslator<MapCoder> map() {
     return new CloudObjectTranslator<MapCoder>() {
       @Override
-      public CloudObject toCloudObject(MapCoder target) {
+      public CloudObject toCloudObject(MapCoder target, SdkComponents sdkComponents) {
         CloudObject base = CloudObject.forClass(MapCoder.class);
         return addComponents(
-            base, ImmutableList.<Coder<?>>of(target.getKeyCoder(), target.getValueCoder()));
+            base,
+            ImmutableList.<Coder<?>>of(target.getKeyCoder(), target.getValueCoder()),
+            sdkComponents);
       }
 
       @Override
@@ -454,9 +471,10 @@ class CloudObjectTranslators {
   public static CloudObjectTranslator<NullableCoder> nullable() {
     return new CloudObjectTranslator<NullableCoder>() {
       @Override
-      public CloudObject toCloudObject(NullableCoder target) {
+      public CloudObject toCloudObject(NullableCoder target, SdkComponents sdkComponents) {
         CloudObject base = CloudObject.forClass(NullableCoder.class);
-        return addComponents(base, Collections.<Coder<?>>singletonList(target.getValueCoder()));
+        return addComponents(
+            base, Collections.<Coder<?>>singletonList(target.getValueCoder()), sdkComponents);
       }
 
       @Override
@@ -485,8 +503,9 @@ class CloudObjectTranslators {
   public static CloudObjectTranslator<UnionCoder> union() {
     return new CloudObjectTranslator<UnionCoder>() {
       @Override
-      public CloudObject toCloudObject(UnionCoder target) {
-        return addComponents(CloudObject.forClass(UnionCoder.class), target.getElementCoders());
+      public CloudObject toCloudObject(UnionCoder target, SdkComponents sdkComponents) {
+        return addComponents(
+            CloudObject.forClass(UnionCoder.class), target.getElementCoders(), sdkComponents);
       }
 
       @Override
@@ -510,11 +529,12 @@ class CloudObjectTranslators {
   public static CloudObjectTranslator<CoGbkResultCoder> coGroupByKeyResult() {
     return new CloudObjectTranslator<CoGbkResultCoder>() {
       @Override
-      public CloudObject toCloudObject(CoGbkResultCoder target) {
+      public CloudObject toCloudObject(CoGbkResultCoder target, SdkComponents sdkComponents) {
         CloudObject base = CloudObject.forClass(CoGbkResultCoder.class);
         Structs.addObject(
             base, PropertyNames.CO_GBK_RESULT_SCHEMA, toCloudObject(target.getSchema()));
-        return addComponents(base, Collections.singletonList(target.getUnionCoder()));
+        return addComponents(
+            base, Collections.singletonList(target.getUnionCoder()), sdkComponents);
       }
 
       private CloudObject toCloudObject(CoGbkResultSchema schema) {
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
index 4b026df..a96d3dd 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
@@ -23,6 +23,9 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import com.google.common.collect.ImmutableMap;
 import java.util.Map;
 import java.util.ServiceLoader;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.ModelCoderRegistrar;
+import org.apache.beam.runners.core.construction.SdkComponents;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CustomCoder;
 
@@ -58,11 +61,12 @@ public class CloudObjects {
   }
 
   /** Convert the provided {@link Coder} into a {@link CloudObject}. */
-  public static CloudObject asCloudObject(Coder<?> coder) {
+  public static CloudObject asCloudObject(Coder<?> coder, @Nullable SdkComponents sdkComponents) {
     CloudObjectTranslator<Coder> translator =
         (CloudObjectTranslator<Coder>) CODER_TRANSLATORS.get(coder.getClass());
+    CloudObject encoding;
     if (translator != null) {
-      return translator.toCloudObject(coder);
+      encoding = translator.toCloudObject(coder, sdkComponents);
     } else {
       CloudObjectTranslator customCoderTranslator = CODER_TRANSLATORS.get(CustomCoder.class);
       checkNotNull(
@@ -71,8 +75,17 @@ public class CloudObjects {
           CloudObjectTranslator.class.getSimpleName(),
           CustomCoder.class.getSimpleName(),
           DefaultCoderCloudObjectTranslatorRegistrar.class.getSimpleName());
-      return customCoderTranslator.toCloudObject(coder);
+      encoding = customCoderTranslator.toCloudObject(coder, sdkComponents);
     }
+    if (sdkComponents != null && !ModelCoderRegistrar.isKnownCoder(coder)) {
+      try {
+        String coderId = sdkComponents.registerCoder(coder);
+        Structs.addString(encoding, PropertyNames.PIPELINE_PROTO_CODER_ID, coderId);
+      } catch (Exception e) {
+        throw new RuntimeException("Unable to register coder " + coder, e);
+      }
+    }
+    return encoding;
   }
 
   public static Coder<?> coderFromCloudObject(CloudObject cloudObject) {
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
index bd510c8..e644e0f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
@@ -64,4 +64,5 @@ public class PropertyNames {
   public static final String DISPLAY_DATA = "display_data";
   public static final String RESTRICTION_CODER = "restriction_coder";
   public static final String IMPULSE_ELEMENT = "impulse_element";
+  public static final String PIPELINE_PROTO_CODER_ID = "pipeline_proto_coder_id";
 }
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java
index 8cac585..b1be7a3 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.dataflow.util;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import java.io.Serializable;
+import org.apache.beam.runners.core.construction.SdkComponents;
 import org.apache.beam.sdk.coders.SerializableCoder;
 
 /** A {@link CloudObjectTranslator} for {@link SerializableCoder}. */
@@ -27,7 +28,7 @@ class SerializableCoderCloudObjectTranslator implements CloudObjectTranslator<Se
   private static final String TYPE_FIELD = "type";
 
   @Override
-  public CloudObject toCloudObject(SerializableCoder target) {
+  public CloudObject toCloudObject(SerializableCoder target, SdkComponents sdkComponents) {
     CloudObject base = CloudObject.forClass(SerializableCoder.class);
     Structs.addString(base, TYPE_FIELD, target.getRecordType().getName());
     return base;
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
index 4d75ecd..b7e8cc8 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
@@ -20,7 +20,9 @@ package org.apache.beam.runners.dataflow.util;
 import static org.hamcrest.Matchers.emptyIterable;
 import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableList.Builder;
@@ -32,6 +34,8 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import org.apache.beam.runners.core.construction.ModelCoderRegistrar;
+import org.apache.beam.runners.core.construction.SdkComponents;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
@@ -149,12 +153,44 @@ public class CloudObjectsTest {
 
     @Test
     public void toAndFromCloudObject() throws Exception {
-      CloudObject cloudObject = CloudObjects.asCloudObject(coder);
+      CloudObject cloudObject = CloudObjects.asCloudObject(coder, /*sdkComponents=*/ null);
       Coder<?> fromCloudObject = CloudObjects.coderFromCloudObject(cloudObject);
 
       assertEquals(coder.getClass(), fromCloudObject.getClass());
       assertEquals(coder, fromCloudObject);
     }
+
+    @Test
+    public void toAndFromCloudObjectWithSdkComponents() throws Exception {
+      SdkComponents sdkComponents = SdkComponents.create();
+      CloudObject cloudObject = CloudObjects.asCloudObject(coder, sdkComponents);
+      Coder<?> fromCloudObject = CloudObjects.coderFromCloudObject(cloudObject);
+
+      assertEquals(coder.getClass(), fromCloudObject.getClass());
+      assertEquals(coder, fromCloudObject);
+
+      checkPipelineProtoCoderIds(coder, cloudObject, sdkComponents);
+    }
+
+    private static void checkPipelineProtoCoderIds(
+        Coder<?> coder, CloudObject cloudObject, SdkComponents sdkComponents) throws Exception {
+      if (ModelCoderRegistrar.isKnownCoder(coder)) {
+        assertFalse(cloudObject.containsKey(PropertyNames.PIPELINE_PROTO_CODER_ID));
+      } else {
+        assertTrue(cloudObject.containsKey(PropertyNames.PIPELINE_PROTO_CODER_ID));
+        assertEquals(
+            sdkComponents.registerCoder(coder),
+            cloudObject.get(PropertyNames.PIPELINE_PROTO_CODER_ID));
+      }
+      List<? extends Coder<?>> coderArguments = coder.getCoderArguments();
+      Object cloudComponentsObject = cloudObject.get(PropertyNames.COMPONENT_ENCODINGS);
+      assertTrue(cloudComponentsObject instanceof List);
+      List<CloudObject> cloudComponents = (List<CloudObject>) cloudComponentsObject;
+      assertEquals(coderArguments.size(), cloudComponents.size());
+      for (int i = 0; i < coderArguments.size(); i++) {
+        checkPipelineProtoCoderIds(coderArguments.get(i), cloudComponents.get(i), sdkComponents);
+      }
+    }
   }
 
   private static class Record implements Serializable {}
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/RunnerHarnessCoderCloudObjectTranslatorRegistrar.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/RunnerHarnessCoderCloudObjectTranslatorRegistrar.java
index 2b2bfd5..de242d1 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/RunnerHarnessCoderCloudObjectTranslatorRegistrar.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/RunnerHarnessCoderCloudObjectTranslatorRegistrar.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.runners.core.construction.SdkComponents;
 import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecordCoder;
 import org.apache.beam.runners.dataflow.util.CloudObject;
 import org.apache.beam.runners.dataflow.util.CloudObjectTranslator;
@@ -69,7 +70,7 @@ public class RunnerHarnessCoderCloudObjectTranslatorRegistrar
       implements CloudObjectTranslator<IsmRecordCoder<?>> {
 
     @Override
-    public CloudObject toCloudObject(IsmRecordCoder<?> target) {
+    public CloudObject toCloudObject(IsmRecordCoder<?> target, SdkComponents sdkComponents) {
       throw new UnsupportedOperationException();
     }
 
@@ -99,7 +100,7 @@ public class RunnerHarnessCoderCloudObjectTranslatorRegistrar
     InstanceBuilder.ofType(coderClass).fromFactoryMethod("of").build();
     return new CloudObjectTranslator<T>() {
       @Override
-      public CloudObject toCloudObject(T target) {
+      public CloudObject toCloudObject(T target, SdkComponents sdkComponents) {
         throw new UnsupportedOperationException();
       }
 
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java
index 091ab79..398037e 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java
@@ -229,7 +229,8 @@ public class LengthPrefixUnknownCoders {
     // Handle well known coders.
     if (LENGTH_PREFIX_CODER_TYPE.equals(coderType)) {
       if (replaceWithByteArrayCoder) {
-        return CloudObjects.asCloudObject(LENGTH_PREFIXED_BYTE_ARRAY_CODER);
+        return CloudObjects.asCloudObject(
+            LENGTH_PREFIXED_BYTE_ARRAY_CODER, /*sdkComponents=*/ null);
       }
       return codec;
     } else if (WELL_KNOWN_CODER_TYPES.contains(coderType)) {
@@ -251,7 +252,7 @@ public class LengthPrefixUnknownCoders {
 
     // Wrap unknown coders with length prefix coder.
     if (replaceWithByteArrayCoder) {
-      return CloudObjects.asCloudObject(LENGTH_PREFIXED_BYTE_ARRAY_CODER);
+      return CloudObjects.asCloudObject(LENGTH_PREFIXED_BYTE_ARRAY_CODER, /*sdkComponents=*/ null);
     } else {
       Map<String, Object> prefixedCodec = new HashMap<>();
       prefixedCodec.put(PropertyNames.OBJECT_TYPE_NAME, LENGTH_PREFIX_CODER_TYPE);
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/TimerOrElement.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/TimerOrElement.java
index b0dbb26..9db549e 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/TimerOrElement.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/TimerOrElement.java
@@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableMap;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.runners.core.construction.SdkComponents;
 import org.apache.beam.runners.dataflow.util.CloudObject;
 import org.apache.beam.runners.dataflow.util.CloudObjectTranslator;
 import org.apache.beam.runners.dataflow.util.CloudObjects;
@@ -70,7 +71,7 @@ public class TimerOrElement {
   private static class TimerOrElementCloudObjectTranslator
       implements CloudObjectTranslator<TimerOrElementCoder> {
     @Override
-    public CloudObject toCloudObject(TimerOrElementCoder target) {
+    public CloudObject toCloudObject(TimerOrElementCoder target, SdkComponents sdkComponents) {
       throw new IllegalArgumentException("Should never be called");
     }
 
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/AvroByteReaderFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/AvroByteReaderFactoryTest.java
index 68761fc..6c16ae6 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/AvroByteReaderFactoryTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/AvroByteReaderFactoryTest.java
@@ -73,7 +73,8 @@ public class AvroByteReaderFactoryTest {
     Coder<?> coder =
         WindowedValue.getFullCoder(BigEndianIntegerCoder.of(), GlobalWindow.Coder.INSTANCE);
     NativeReader<?> reader =
-        runTestCreateAvroReader(pathToAvroFile, null, null, CloudObjects.asCloudObject(coder));
+        runTestCreateAvroReader(
+            pathToAvroFile, null, null, CloudObjects.asCloudObject(coder, /*sdkComponents=*/ null));
 
     Assert.assertThat(reader, new IsInstanceOf(AvroByteReader.class));
     AvroByteReader avroReader = (AvroByteReader) reader;
@@ -88,7 +89,8 @@ public class AvroByteReaderFactoryTest {
     Coder<?> coder =
         WindowedValue.getFullCoder(BigEndianIntegerCoder.of(), GlobalWindow.Coder.INSTANCE);
     NativeReader<?> reader =
-        runTestCreateAvroReader(pathToAvroFile, 200L, 500L, CloudObjects.asCloudObject(coder));
+        runTestCreateAvroReader(
+            pathToAvroFile, 200L, 500L, CloudObjects.asCloudObject(coder, /*sdkComponents=*/ null));
 
     Assert.assertThat(reader, new IsInstanceOf(AvroByteReader.class));
     AvroByteReader avroReader = (AvroByteReader) reader;
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ConcatReaderFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ConcatReaderFactoryTest.java
index a12947c..772500f 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ConcatReaderFactoryTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ConcatReaderFactoryTest.java
@@ -58,7 +58,8 @@ public class ConcatReaderFactoryTest {
 
       inMemorySourceDictionary.put(PropertyNames.SOURCE_SPEC, inMemorySourceSpec);
 
-      CloudObject textSourceEncoding = CloudObjects.asCloudObject(StringUtf8Coder.of());
+      CloudObject textSourceEncoding =
+          CloudObjects.asCloudObject(StringUtf8Coder.of(), /*sdkComponents=*/ null);
       inMemorySourceDictionary.put(PropertyNames.ENCODING, textSourceEncoding);
 
       sourcesList.add(inMemorySourceDictionary);
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/InMemoryReaderFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/InMemoryReaderFactoryTest.java
index 9d36ac0..cdb3b2b 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/InMemoryReaderFactoryTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/InMemoryReaderFactoryTest.java
@@ -56,7 +56,7 @@ public class InMemoryReaderFactoryTest {
 
     Source cloudSource = new Source();
     cloudSource.setSpec(spec);
-    cloudSource.setCodec(CloudObjects.asCloudObject(coder));
+    cloudSource.setCodec(CloudObjects.asCloudObject(coder, /*sdkComponents=*/ null));
 
     return cloudSource;
   }
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
index 0cfe3f4..972c19a 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
@@ -120,7 +120,8 @@ public class IntrinsicMapTaskExecutorFactoryTest {
           .andThen(new MapTaskToNetworkFunction());
 
   private static final CloudObject windowedStringCoder =
-      CloudObjects.asCloudObject(WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()));
+      CloudObjects.asCloudObject(
+          WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()), /*sdkComponents=*/ null);
 
   private IntrinsicMapTaskExecutorFactory mapTaskExecutorFactory;
   private PipelineOptions options;
@@ -560,13 +561,15 @@ public class IntrinsicMapTaskExecutorFactoryTest {
         CloudObjects.asCloudObject(
             FullWindowedValueCoder.of(
                 KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()),
-                IntervalWindowCoder.of())));
+                IntervalWindowCoder.of()),
+            /*sdkComponents=*/ null));
 
     InstructionOutput output = new InstructionOutput();
     output.setName("pgbk_output_name");
     output.setCodec(
         CloudObjects.asCloudObject(
-            KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(BigEndianIntegerCoder.of()))));
+            KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(BigEndianIntegerCoder.of())),
+            /*sdkComponents=*/ null));
     output.setOriginalName("originalName");
     output.setSystemName("systemName");
 
@@ -691,7 +694,7 @@ public class IntrinsicMapTaskExecutorFactoryTest {
 
     InstructionOutput output = new InstructionOutput();
     output.setName("flatten_output_name");
-    output.setCodec(CloudObjects.asCloudObject(StringUtf8Coder.of()));
+    output.setCodec(CloudObjects.asCloudObject(StringUtf8Coder.of(), /*sdkComponents=*/ null));
     output.setOriginalName("originalName");
     output.setSystemName("systemName");
 
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReaderTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReaderTest.java
index a07e970..3edf08e 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReaderTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReaderTest.java
@@ -1495,7 +1495,7 @@ public class IsmSideInputReaderTest {
   private <T> void verifyList(List<T> expected, List<T> actual) {
     assertEquals(expected.size(), actual.size());
 
-    List<Integer> iterationOrder = new ArrayList<>();
+    List<Integer> iterationOrder = new ArrayList<Integer>();
     Random random = new Random(1892389023490L);
     for (int i = 0; i < expected.size(); ++i) {
       iterationOrder.add(i);
@@ -1770,7 +1770,8 @@ public class IsmSideInputReaderTest {
   private <K, V> Source newIsmSource(IsmRecordCoder<WindowedValue<V>> coder, String tmpFilePath) {
     Source source = new Source();
     source.setCodec(
-        CloudObjects.asCloudObject(WindowedValue.getFullCoder(coder, GLOBAL_WINDOW_CODER)));
+        CloudObjects.asCloudObject(
+            WindowedValue.getFullCoder(coder, GLOBAL_WINDOW_CODER), /*sdkComponents=*/ null));
     source.setSpec(new HashMap<String, Object>());
     source.getSpec().put(PropertyNames.OBJECT_TYPE_NAME, "IsmSource");
     source.getSpec().put(WorkerPropertyNames.FILENAME, tmpFilePath);
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ReaderFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ReaderFactoryTest.java
index 80d85a8..34bc386 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ReaderFactoryTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ReaderFactoryTest.java
@@ -121,7 +121,8 @@ public class ReaderFactoryTest {
 
     Source cloudSource = new Source();
     cloudSource.setSpec(spec);
-    cloudSource.setCodec(CloudObjects.asCloudObject(BigEndianIntegerCoder.of()));
+    cloudSource.setCodec(
+        CloudObjects.asCloudObject(BigEndianIntegerCoder.of(), /*sdkComponents=*/ null));
 
     PipelineOptions options = PipelineOptionsFactory.create();
     ReaderRegistry registry =
@@ -141,7 +142,7 @@ public class ReaderFactoryTest {
     CloudObject spec = CloudObject.forClassName("UnknownSource");
     Source cloudSource = new Source();
     cloudSource.setSpec(spec);
-    cloudSource.setCodec(CloudObjects.asCloudObject(StringUtf8Coder.of()));
+    cloudSource.setCodec(CloudObjects.asCloudObject(StringUtf8Coder.of(), /*sdkComponents=*/ null));
     try {
       PipelineOptions options = PipelineOptionsFactory.create();
       ReaderRegistry.defaultRegistry()
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ShuffleReaderFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ShuffleReaderFactoryTest.java
index 45022d2..c12cd9e 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ShuffleReaderFactoryTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ShuffleReaderFactoryTest.java
@@ -87,7 +87,7 @@ public class ShuffleReaderFactoryTest {
             shuffleReaderConfig,
             start,
             end,
-            CloudObjects.asCloudObject(coder),
+            CloudObjects.asCloudObject(coder, /*sdkComponents=*/ null),
             BatchModeExecutionContext.forTesting(PipelineOptionsFactory.create(), "testStage"),
             UngroupedShuffleReader.class,
             "UngroupedShuffleSource");
@@ -114,7 +114,8 @@ public class ShuffleReaderFactoryTest {
             end,
             CloudObjects.asCloudObject(
                 FullWindowedValueCoder.of(
-                    KvCoder.of(keyCoder, IterableCoder.of(valueCoder)), IntervalWindowCoder.of())),
+                    KvCoder.of(keyCoder, IterableCoder.of(valueCoder)), IntervalWindowCoder.of()),
+                /*sdkComponents=*/ null),
             context,
             GroupingShuffleReader.class,
             "GroupingShuffleSource");
@@ -142,7 +143,8 @@ public class ShuffleReaderFactoryTest {
             CloudObjects.asCloudObject(
                 FullWindowedValueCoder.of(
                     KvCoder.of(keyCoder, windowedValueCoder.getValueCoder()),
-                    IntervalWindowCoder.of())),
+                    IntervalWindowCoder.of()),
+                /*sdkComponents=*/ null),
             BatchModeExecutionContext.forTesting(PipelineOptionsFactory.create(), "testStage"),
             PartitioningShuffleReader.class,
             "PartitioningShuffleSource");
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SinkRegistryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SinkRegistryTest.java
index 099dae4..89c1a69 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SinkRegistryTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SinkRegistryTest.java
@@ -59,7 +59,7 @@ public class SinkRegistryTest {
     com.google.api.services.dataflow.model.Sink cloudSink =
         new com.google.api.services.dataflow.model.Sink();
     cloudSink.setSpec(spec);
-    cloudSink.setCodec(CloudObjects.asCloudObject(StringUtf8Coder.of()));
+    cloudSink.setCodec(CloudObjects.asCloudObject(StringUtf8Coder.of(), /*sdkComponents=*/ null));
     try {
       SinkRegistry.defaultRegistry()
           .create(
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
index b87ee45..d69e304 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
@@ -230,7 +230,8 @@ public class StreamingDataflowWorkerTest {
     CloudObject timerCloudObject =
         CloudObject.forClassName(
             "com.google.cloud.dataflow.sdk.util.TimerOrElement$TimerOrElementCoder");
-    List<CloudObject> component = Collections.singletonList(CloudObjects.asCloudObject(coder));
+    List<CloudObject> component =
+        Collections.singletonList(CloudObjects.asCloudObject(coder, /*sdkComponents=*/ null));
     Structs.addList(timerCloudObject, PropertyNames.COMPONENT_ENCODINGS, component);
 
     CloudObject encodedCoder = CloudObject.forClassName("kind:windowed_value");
@@ -238,7 +239,9 @@ public class StreamingDataflowWorkerTest {
     Structs.addList(
         encodedCoder,
         PropertyNames.COMPONENT_ENCODINGS,
-        ImmutableList.of(timerCloudObject, CloudObjects.asCloudObject(IntervalWindowCoder.of())));
+        ImmutableList.of(
+            timerCloudObject,
+            CloudObjects.asCloudObject(IntervalWindowCoder.of(), /*sdkComponents=*/ null)));
 
     return new ParallelInstruction()
         .setSystemName(DEFAULT_SOURCE_SYSTEM_NAME)
@@ -269,7 +272,8 @@ public class StreamingDataflowWorkerTest {
                         .setSpec(CloudObject.forClass(UngroupedWindmillReader.class))
                         .setCodec(
                             CloudObjects.asCloudObject(
-                                WindowedValue.getFullCoder(coder, IntervalWindow.getCoder())))))
+                                WindowedValue.getFullCoder(coder, IntervalWindow.getCoder()),
+                                /*sdkComponents=*/ null))))
         .setOutputs(
             Arrays.asList(
                 new InstructionOutput()
@@ -278,7 +282,8 @@ public class StreamingDataflowWorkerTest {
                     .setSystemName(DEFAULT_OUTPUT_SYSTEM_NAME)
                     .setCodec(
                         CloudObjects.asCloudObject(
-                            WindowedValue.getFullCoder(coder, IntervalWindow.getCoder())))));
+                            WindowedValue.getFullCoder(coder, IntervalWindow.getCoder()),
+                            /*sdkComponents=*/ null))));
   }
 
   private ParallelInstruction makeDoFnInstruction(
@@ -321,7 +326,8 @@ public class StreamingDataflowWorkerTest {
                     .setCodec(
                         CloudObjects.asCloudObject(
                             WindowedValue.getFullCoder(
-                                outputCoder, windowingStrategy.getWindowFn().windowCoder())))));
+                                outputCoder, windowingStrategy.getWindowFn().windowCoder()),
+                            /*sdkComponents=*/ null))));
   }
 
   private ParallelInstruction makeDoFnInstruction(
@@ -356,7 +362,8 @@ public class StreamingDataflowWorkerTest {
                         .setSpec(spec)
                         .setCodec(
                             CloudObjects.asCloudObject(
-                                WindowedValue.getFullCoder(coder, windowCoder)))));
+                                WindowedValue.getFullCoder(coder, windowCoder),
+                                /*sdkComponents=*/ null))));
   }
 
   private ParallelInstruction makeSinkInstruction(
@@ -1088,7 +1095,8 @@ public class StreamingDataflowWorkerTest {
                         .setCodec(
                             CloudObjects.asCloudObject(
                                 WindowedValue.getFullCoder(
-                                    StringUtf8Coder.of(), IntervalWindow.getCoder())))));
+                                    StringUtf8Coder.of(), IntervalWindow.getCoder()),
+                                /*sdkComponents=*/ null))));
 
     List<ParallelInstruction> instructions =
         Arrays.asList(
@@ -1190,7 +1198,10 @@ public class StreamingDataflowWorkerTest {
                         .withTimestampCombiner(TimestampCombiner.EARLIEST),
                     sdkComponents)
                 .toByteArray()));
-    addObject(spec, WorkerPropertyNames.INPUT_CODER, CloudObjects.asCloudObject(windowedKvCoder));
+    addObject(
+        spec,
+        WorkerPropertyNames.INPUT_CODER,
+        CloudObjects.asCloudObject(windowedKvCoder, /*sdkComponents=*/ null));
 
     ParallelInstruction mergeWindowsInstruction =
         new ParallelInstruction()
@@ -1208,7 +1219,9 @@ public class StreamingDataflowWorkerTest {
                         .setOriginalName(DEFAULT_OUTPUT_ORIGINAL_NAME)
                         .setSystemName(DEFAULT_OUTPUT_SYSTEM_NAME)
                         .setName("output")
-                        .setCodec(CloudObjects.asCloudObject(windowedGroupedCoder))));
+                        .setCodec(
+                            CloudObjects.asCloudObject(
+                                windowedGroupedCoder, /*sdkComponents=*/ null))));
 
     List<ParallelInstruction> instructions =
         Arrays.asList(
@@ -1492,7 +1505,10 @@ public class StreamingDataflowWorkerTest {
                         .withAllowedLateness(Duration.standardMinutes(60)),
                     sdkComponents)
                 .toByteArray()));
-    addObject(spec, WorkerPropertyNames.INPUT_CODER, CloudObjects.asCloudObject(windowedKvCoder));
+    addObject(
+        spec,
+        WorkerPropertyNames.INPUT_CODER,
+        CloudObjects.asCloudObject(windowedKvCoder, /*sdkComponents=*/ null));
 
     ParallelInstruction mergeWindowsInstruction =
         new ParallelInstruction()
@@ -1510,7 +1526,9 @@ public class StreamingDataflowWorkerTest {
                         .setOriginalName(DEFAULT_OUTPUT_ORIGINAL_NAME)
                         .setSystemName(DEFAULT_OUTPUT_SYSTEM_NAME)
                         .setName("output")
-                        .setCodec(CloudObjects.asCloudObject(windowedGroupedCoder))));
+                        .setCodec(
+                            CloudObjects.asCloudObject(
+                                windowedGroupedCoder, /*sdkComponents=*/ null))));
 
     List<ParallelInstruction> instructions =
         Arrays.asList(
@@ -1650,7 +1668,8 @@ public class StreamingDataflowWorkerTest {
             WindowedValue.getFullCoder(
                 ValueWithRecordId.ValueWithRecordIdCoder.of(
                     KvCoder.of(VarIntCoder.of(), VarIntCoder.of())),
-                GlobalWindow.Coder.INSTANCE));
+                GlobalWindow.Coder.INSTANCE),
+            /*sdkComponents=*/ null);
 
     return Arrays.asList(
         new ParallelInstruction()
@@ -2074,7 +2093,8 @@ public class StreamingDataflowWorkerTest {
               WindowedValue.getFullCoder(
                   ValueWithRecordId.ValueWithRecordIdCoder.of(
                       KvCoder.of(VarIntCoder.of(), VarIntCoder.of())),
-                  GlobalWindow.Coder.INSTANCE));
+                  GlobalWindow.Coder.INSTANCE),
+              /*sdkComponents=*/ null);
 
       TestCountingSource counter = new TestCountingSource(3).withThrowOnFirstSnapshot(true);
       List<ParallelInstruction> instructions =
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingPCollectionViewWriterDoFnFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingPCollectionViewWriterDoFnFactoryTest.java
index 9871ce0..f920a23 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingPCollectionViewWriterDoFnFactoryTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingPCollectionViewWriterDoFnFactoryTest.java
@@ -49,7 +49,8 @@ public class StreamingPCollectionViewWriterDoFnFactoryTest {
 
     CloudObject coder =
         CloudObjects.asCloudObject(
-            WindowedValue.getFullCoder(BigEndianIntegerCoder.of(), GlobalWindow.Coder.INSTANCE));
+            WindowedValue.getFullCoder(BigEndianIntegerCoder.of(), GlobalWindow.Coder.INSTANCE),
+            /*sdkComponents=*/ null);
     ParDoFn parDoFn =
         new StreamingPCollectionViewWriterDoFnFactory()
             .create(
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCodersTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCodersTest.java
index befc876..661e394 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCodersTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCodersTest.java
@@ -105,9 +105,10 @@ public class LengthPrefixUnknownCodersTest {
   @Test
   public void testLengthPrefixUnknownCoders() throws Exception {
     Map<String, Object> lengthPrefixedCoderCloudObject =
-        forCodec(CloudObjects.asCloudObject(windowedValueCoder), false);
+        forCodec(CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ null), false);
     assertEquals(
-        CloudObjects.asCloudObject(prefixedWindowedValueCoder), lengthPrefixedCoderCloudObject);
+        CloudObjects.asCloudObject(prefixedWindowedValueCoder, /*sdkComponents=*/ null),
+        lengthPrefixedCoderCloudObject);
   }
 
   /** Test bypassing unknown coders that are already wrapped with {@code LengthPrefixCoder} */
@@ -119,7 +120,7 @@ public class LengthPrefixUnknownCodersTest {
             GlobalWindow.Coder.INSTANCE);
 
     Map<String, Object> lengthPrefixedCoderCloudObject =
-        forCodec(CloudObjects.asCloudObject(windowedValueCoder), false);
+        forCodec(CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ null), false);
 
     Coder<WindowedValue<KV<String, Integer>>> expectedCoder =
         WindowedValue.getFullCoder(
@@ -127,7 +128,9 @@ public class LengthPrefixUnknownCodersTest {
                 LengthPrefixCoder.of(StringUtf8Coder.of()), LengthPrefixCoder.of(VarIntCoder.of())),
             GlobalWindow.Coder.INSTANCE);
 
-    assertEquals(CloudObjects.asCloudObject(expectedCoder), lengthPrefixedCoderCloudObject);
+    assertEquals(
+        CloudObjects.asCloudObject(expectedCoder, /*sdkComponents=*/ null),
+        lengthPrefixedCoderCloudObject);
   }
 
   /** Test replacing unknown coders with {@code LengthPrefixCoder<ByteArray>} */
@@ -139,71 +142,81 @@ public class LengthPrefixUnknownCodersTest {
             GlobalWindow.Coder.INSTANCE);
 
     Map<String, Object> lengthPrefixedCoderCloudObject =
-        forCodec(CloudObjects.asCloudObject(windowedValueCoder), true);
+        forCodec(CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ null), true);
 
     assertEquals(
-        CloudObjects.asCloudObject(prefixedAndReplacedWindowedValueCoder),
+        CloudObjects.asCloudObject(prefixedAndReplacedWindowedValueCoder, /*sdkComponents=*/ null),
         lengthPrefixedCoderCloudObject);
   }
 
   @Test
   public void testLengthPrefixInstructionOutputCoder() throws Exception {
     InstructionOutput output = new InstructionOutput();
-    output.setCodec(CloudObjects.asCloudObject(windowedValueCoder));
+    output.setCodec(CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ null));
     output.setFactory(new JacksonFactory());
 
     InstructionOutput prefixedOutput = forInstructionOutput(output, false);
-    assertEquals(CloudObjects.asCloudObject(prefixedWindowedValueCoder), prefixedOutput.getCodec());
+    assertEquals(
+        CloudObjects.asCloudObject(prefixedWindowedValueCoder, /*sdkComponents=*/ null),
+        prefixedOutput.getCodec());
     // Should not mutate the instruction.
-    assertEquals(output.getCodec(), CloudObjects.asCloudObject(windowedValueCoder));
+    assertEquals(
+        output.getCodec(), CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ null));
   }
 
   @Test
   public void testLengthPrefixReadInstructionCoder() throws Exception {
     ReadInstruction readInstruction = new ReadInstruction();
     readInstruction.setSource(
-        new Source().setCodec(CloudObjects.asCloudObject(windowedValueCoder)));
+        new Source()
+            .setCodec(CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ null)));
     instruction.setRead(readInstruction);
 
     ParallelInstruction prefixedInstruction = forParallelInstruction(instruction, false);
     assertEquals(
-        CloudObjects.asCloudObject(prefixedWindowedValueCoder),
+        CloudObjects.asCloudObject(prefixedWindowedValueCoder, /*sdkComponents=*/ null),
         prefixedInstruction.getRead().getSource().getCodec());
     // Should not mutate the instruction.
     assertEquals(
-        readInstruction.getSource().getCodec(), CloudObjects.asCloudObject(windowedValueCoder));
+        readInstruction.getSource().getCodec(),
+        CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ null));
   }
 
   @Test
   public void testLengthPrefixWriteInstructionCoder() throws Exception {
     WriteInstruction writeInstruction = new WriteInstruction();
-    writeInstruction.setSink(new Sink().setCodec(CloudObjects.asCloudObject(windowedValueCoder)));
+    writeInstruction.setSink(
+        new Sink()
+            .setCodec(CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ null)));
     instruction.setWrite(writeInstruction);
 
     ParallelInstruction prefixedInstruction = forParallelInstruction(instruction, false);
     assertEquals(
-        CloudObjects.asCloudObject(prefixedWindowedValueCoder),
+        CloudObjects.asCloudObject(prefixedWindowedValueCoder, /*sdkComponents=*/ null),
         prefixedInstruction.getWrite().getSink().getCodec());
     // Should not mutate the instruction.
     assertEquals(
-        CloudObjects.asCloudObject(windowedValueCoder), writeInstruction.getSink().getCodec());
+        CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ null),
+        writeInstruction.getSink().getCodec());
   }
 
   @Test
   public void testLengthPrefixParDoInstructionCoder() throws Exception {
     ParDoInstruction parDo = new ParDoInstruction();
     CloudObject spec = CloudObject.forClassName(MERGE_BUCKETS_DO_FN);
-    spec.put(WorkerPropertyNames.INPUT_CODER, CloudObjects.asCloudObject(windowedValueCoder));
+    spec.put(
+        WorkerPropertyNames.INPUT_CODER,
+        CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ null));
     parDo.setUserFn(spec);
     instruction.setParDo(parDo);
 
     ParallelInstruction prefixedInstruction = forParallelInstruction(instruction, false);
     assertEquals(
-        CloudObjects.asCloudObject(prefixedWindowedValueCoder),
+        CloudObjects.asCloudObject(prefixedWindowedValueCoder, /*sdkComponents=*/ null),
         prefixedInstruction.getParDo().getUserFn().get(WorkerPropertyNames.INPUT_CODER));
     // Should not mutate the instruction.
     assertEquals(
-        CloudObjects.asCloudObject(windowedValueCoder),
+        CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ null),
         parDo.getUserFn().get(WorkerPropertyNames.INPUT_CODER));
   }
 
@@ -256,7 +269,7 @@ public class LengthPrefixUnknownCodersTest {
     network.addNode(grpcPortNode);
     network.addEdge(grpcPortNode, instructionOutputNode, DefaultEdge.create());
     assertEquals(
-        CloudObjects.asCloudObject(prefixedWindowedValueCoder),
+        CloudObjects.asCloudObject(prefixedWindowedValueCoder, /*sdkComponents=*/ null),
         ((InstructionOutputNode) forInstructionOutputNode(network).apply(instructionOutputNode))
             .getInstructionOutput()
             .getCodec());
@@ -269,7 +282,7 @@ public class LengthPrefixUnknownCodersTest {
     network.addNode(grpcPortNode);
     network.addEdge(instructionOutputNode, grpcPortNode, DefaultEdge.create());
     assertEquals(
-        CloudObjects.asCloudObject(prefixedWindowedValueCoder),
+        CloudObjects.asCloudObject(prefixedWindowedValueCoder, /*sdkComponents=*/ null),
         ((InstructionOutputNode) forInstructionOutputNode(network).apply(instructionOutputNode))
             .getInstructionOutput()
             .getCodec());
@@ -283,7 +296,7 @@ public class LengthPrefixUnknownCodersTest {
     network.addNode(readNode);
     network.addEdge(readNode, instructionOutputNode, DefaultEdge.create());
     assertEquals(
-        CloudObjects.asCloudObject(windowedValueCoder),
+        CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ null),
         ((InstructionOutputNode) forInstructionOutputNode(network).apply(instructionOutputNode))
             .getInstructionOutput()
             .getCodec());
@@ -317,7 +330,8 @@ public class LengthPrefixUnknownCodersTest {
     SideInputInfo sideInputInfo = new SideInputInfo().setSources(new ArrayList<>());
     sideInputInfo.setFactory(new JacksonFactory());
     for (Coder<?> coder : coders) {
-      Source source = new Source().setCodec(CloudObjects.asCloudObject(coder));
+      Source source =
+          new Source().setCodec(CloudObjects.asCloudObject(coder, /*sdkComponents=*/ null));
       source.setFactory(new JacksonFactory());
       sideInputInfo.getSources().add(source);
     }
@@ -340,7 +354,7 @@ public class LengthPrefixUnknownCodersTest {
                 new ReadInstruction()
                     .setSource(
                         new Source()
-                            .setCodec(CloudObjects.asCloudObject(coder))
+                            .setCodec(CloudObjects.asCloudObject(coder, /*sdkComponents=*/ null))
                             .setSpec(CloudObject.forClassName(readClassName))));
 
     parallelInstruction.setFactory(new JacksonFactory());
@@ -349,7 +363,9 @@ public class LengthPrefixUnknownCodersTest {
 
   private static InstructionOutputNode createInstructionOutputNode(String name, Coder<?> coder) {
     InstructionOutput instructionOutput =
-        new InstructionOutput().setName(name).setCodec(CloudObjects.asCloudObject(coder));
+        new InstructionOutput()
+            .setName(name)
+            .setCodec(CloudObjects.asCloudObject(coder, /*sdkComponents=*/ null));
     instructionOutput.setFactory(new JacksonFactory());
     return InstructionOutputNode.create(instructionOutput);
   }
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/CloudSourceUtilsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/CloudSourceUtilsTest.java
index 9814bb3..43baf71 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/CloudSourceUtilsTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/CloudSourceUtilsTest.java
@@ -62,7 +62,7 @@ public class CloudSourceUtilsTest {
     source.getBaseSpecs().add(grandparent);
     source.getBaseSpecs().add(parent);
     source.setSpec(child);
-    source.setCodec(CloudObjects.asCloudObject(StringUtf8Coder.of()));
+    source.setCodec(CloudObjects.asCloudObject(StringUtf8Coder.of(), /*sdkComponents=*/ null));
 
     Source flat = CloudSourceUtils.flattenBaseSpecs(source);
     assertNull(flat.getBaseSpecs());
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/TimerOrElementTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/TimerOrElementTest.java
index a7387ca..b4ae74a 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/TimerOrElementTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/TimerOrElementTest.java
@@ -55,7 +55,8 @@ public class TimerOrElementTest {
             "com.google.cloud.dataflow.sdk.util.TimerOrElement$TimerOrElementCoder");
     List<CloudObject> component =
         Collections.singletonList(
-            CloudObjects.asCloudObject(KvCoder.of(VarLongCoder.of(), ByteArrayCoder.of())));
+            CloudObjects.asCloudObject(
+                KvCoder.of(VarLongCoder.of(), ByteArrayCoder.of()), /*sdkComponents=*/ null));
     Structs.addList(cloudObject, PropertyNames.COMPONENT_ENCODINGS, component);
 
     Coder<?> decoded = CloudObjects.coderFromCloudObject(cloudObject);