You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2018/11/15 03:25:22 UTC
[beam] branch master updated: [BEAM-6067] Specify pipeline_coder_id
property in non-Beam-standard CloudObject coders. (#7045)
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ab5b0ae [BEAM-6067] Specify pipeline_coder_id property in non-Beam-standard CloudObject coders. (#7045)
ab5b0ae is described below
commit ab5b0aea03bc29779654758d6acd4f8e8b6d86ca
Author: CraigChambersG <45...@users.noreply.github.com>
AuthorDate: Wed Nov 14 19:25:16 2018 -0800
[BEAM-6067] Specify pipeline_coder_id property in non-Beam-standard CloudObject coders. (#7045)
---
.../core/construction/ModelCoderRegistrar.java | 4 ++
.../dataflow/DataflowPipelineTranslator.java | 10 ++-
.../util/AvroCoderCloudObjectTranslator.java | 3 +-
.../dataflow/util/CloudObjectTranslator.java | 4 +-
.../dataflow/util/CloudObjectTranslators.java | 80 ++++++++++++++--------
.../beam/runners/dataflow/util/CloudObjects.java | 19 ++++-
.../beam/runners/dataflow/util/PropertyNames.java | 1 +
.../SerializableCoderCloudObjectTranslator.java | 3 +-
.../runners/dataflow/util/CloudObjectsTest.java | 38 +++++++++-
...HarnessCoderCloudObjectTranslatorRegistrar.java | 5 +-
.../worker/graph/LengthPrefixUnknownCoders.java | 5 +-
.../dataflow/worker/util/TimerOrElement.java | 3 +-
.../dataflow/worker/AvroByteReaderFactoryTest.java | 6 +-
.../dataflow/worker/ConcatReaderFactoryTest.java | 3 +-
.../dataflow/worker/InMemoryReaderFactoryTest.java | 2 +-
.../IntrinsicMapTaskExecutorFactoryTest.java | 11 +--
.../dataflow/worker/IsmSideInputReaderTest.java | 5 +-
.../runners/dataflow/worker/ReaderFactoryTest.java | 5 +-
.../dataflow/worker/ShuffleReaderFactoryTest.java | 8 ++-
.../runners/dataflow/worker/SinkRegistryTest.java | 2 +-
.../worker/StreamingDataflowWorkerTest.java | 46 +++++++++----
...eamingPCollectionViewWriterDoFnFactoryTest.java | 3 +-
.../graph/LengthPrefixUnknownCodersTest.java | 64 ++++++++++-------
.../dataflow/worker/util/CloudSourceUtilsTest.java | 2 +-
.../dataflow/worker/util/TimerOrElementTest.java | 3 +-
25 files changed, 234 insertions(+), 101 deletions(-)
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java
index e07a478..cbb22af 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java
@@ -86,6 +86,10 @@ public class ModelCoderRegistrar implements CoderTranslatorRegistrar {
Coder.class.getSimpleName());
}
+ public static boolean isKnownCoder(Coder<?> coder) {
+ return BEAM_MODEL_CODER_URNS.containsKey(coder.getClass());
+ }
+
@Override
public Map<Class<? extends Coder>, String> getCoderURNs() {
return BEAM_MODEL_CODER_URNS;
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index f5190df..a99bd30 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -568,7 +568,7 @@ public class DataflowPipelineTranslator {
@Override
public void addEncodingInput(Coder<?> coder) {
- CloudObject encoding = CloudObjects.asCloudObject(coder);
+ CloudObject encoding = translateCoder(coder, translator);
addObject(getProperties(), PropertyNames.ENCODING, encoding);
}
@@ -668,7 +668,7 @@ public class DataflowPipelineTranslator {
if (valueCoder != null) {
// Verify that encoding can be decoded, in order to catch serialization
// failures as early as possible.
- CloudObject encoding = CloudObjects.asCloudObject(valueCoder);
+ CloudObject encoding = translateCoder(valueCoder, translator);
addObject(outputInfo, PropertyNames.ENCODING, encoding);
translator.outputCoders.put(value, valueCoder);
}
@@ -1016,7 +1016,7 @@ public class DataflowPipelineTranslator {
stepContext.addInput(
PropertyNames.RESTRICTION_CODER,
- CloudObjects.asCloudObject(transform.getRestrictionCoder()));
+ translateCoder(transform.getRestrictionCoder(), context));
}
});
}
@@ -1098,4 +1098,8 @@ public class DataflowPipelineTranslator {
stepContext.addOutput(tag.getId(), (PCollection<?>) taggedOutput.getValue());
}
}
+
+ private static CloudObject translateCoder(Coder<?> coder, TranslationContext context) {
+ return CloudObjects.asCloudObject(coder, context.isFnApi() ? context.getSdkComponents() : null);
+ }
}
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java
index 9a4047d..a4ce533 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/AvroCoderCloudObjectTranslator.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.dataflow.util;
import org.apache.avro.Schema;
+import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.sdk.coders.AvroCoder;
/** A {@link CloudObjectTranslator} for {@link AvroCoder}. */
@@ -26,7 +27,7 @@ class AvroCoderCloudObjectTranslator implements CloudObjectTranslator<AvroCoder>
private static final String SCHEMA_FIELD = "schema";
@Override
- public CloudObject toCloudObject(AvroCoder target) {
+ public CloudObject toCloudObject(AvroCoder target, SdkComponents sdkComponents) {
CloudObject base = CloudObject.forClass(AvroCoder.class);
Structs.addString(base, SCHEMA_FIELD, target.getSchema().toString());
Structs.addString(base, TYPE_FIELD, target.getType().getName());
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java
index aa4af84..3638eaf 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslator.java
@@ -17,13 +17,15 @@
*/
package org.apache.beam.runners.dataflow.util;
+import org.apache.beam.runners.core.construction.SdkComponents;
+
/**
* A translator that takes an object and creates a {@link CloudObject} which can be converted back
* to the original object.
*/
public interface CloudObjectTranslator<T> {
/** Converts the provided object into an equivalent {@link CloudObject}. */
- CloudObject toCloudObject(T target);
+ CloudObject toCloudObject(T target, SdkComponents sdkComponents);
/** Converts back into the original object from a provided {@link CloudObject}. */
T fromCloudObject(CloudObject cloudObject);
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java
index 1d1966d..b6f7388 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CustomCoder;
@@ -50,11 +51,12 @@ import org.apache.beam.sdk.values.TupleTag;
class CloudObjectTranslators {
private CloudObjectTranslators() {}
- private static CloudObject addComponents(CloudObject base, List<? extends Coder<?>> components) {
+ private static CloudObject addComponents(
+ CloudObject base, List<? extends Coder<?>> components, SdkComponents sdkComponents) {
if (!components.isEmpty()) {
List<CloudObject> cloudComponents = new ArrayList<>(components.size());
for (Coder component : components) {
- cloudComponents.add(CloudObjects.asCloudObject(component));
+ cloudComponents.add(CloudObjects.asCloudObject(component, sdkComponents));
}
Structs.addList(base, PropertyNames.COMPONENT_ENCODINGS, cloudComponents);
}
@@ -79,11 +81,13 @@ class CloudObjectTranslators {
public static CloudObjectTranslator<KvCoder> pair() {
return new CloudObjectTranslator<KvCoder>() {
@Override
- public CloudObject toCloudObject(KvCoder target) {
+ public CloudObject toCloudObject(KvCoder target, SdkComponents sdkComponents) {
CloudObject result = CloudObject.forClassName(CloudObjectKinds.KIND_PAIR);
Structs.addBoolean(result, PropertyNames.IS_PAIR_LIKE, true);
return addComponents(
- result, ImmutableList.<Coder<?>>of(target.getKeyCoder(), target.getValueCoder()));
+ result,
+ ImmutableList.<Coder<?>>of(target.getKeyCoder(), target.getValueCoder()),
+ sdkComponents);
}
@Override
@@ -112,10 +116,11 @@ class CloudObjectTranslators {
public static CloudObjectTranslator<IterableCoder> stream() {
return new CloudObjectTranslator<IterableCoder>() {
@Override
- public CloudObject toCloudObject(IterableCoder target) {
+ public CloudObject toCloudObject(IterableCoder target, SdkComponents sdkComponents) {
CloudObject result = CloudObject.forClassName(CloudObjectKinds.KIND_STREAM);
Structs.addBoolean(result, PropertyNames.IS_STREAM_LIKE, true);
- return addComponents(result, Collections.<Coder<?>>singletonList(target.getElemCoder()));
+ return addComponents(
+ result, Collections.<Coder<?>>singletonList(target.getElemCoder()), sdkComponents);
}
@Override
@@ -144,10 +149,11 @@ class CloudObjectTranslators {
static CloudObjectTranslator<LengthPrefixCoder> lengthPrefix() {
return new CloudObjectTranslator<LengthPrefixCoder>() {
@Override
- public CloudObject toCloudObject(LengthPrefixCoder target) {
+ public CloudObject toCloudObject(LengthPrefixCoder target, SdkComponents sdkComponents) {
return addComponents(
CloudObject.forClassName(CloudObjectKinds.KIND_LENGTH_PREFIX),
- Collections.<Coder<?>>singletonList(target.getValueCoder()));
+ Collections.<Coder<?>>singletonList(target.getValueCoder()),
+ sdkComponents);
}
@Override
@@ -176,9 +182,11 @@ class CloudObjectTranslators {
static CloudObjectTranslator<GlobalWindow.Coder> globalWindow() {
return new CloudObjectTranslator<GlobalWindow.Coder>() {
@Override
- public CloudObject toCloudObject(GlobalWindow.Coder target) {
+ public CloudObject toCloudObject(GlobalWindow.Coder target, SdkComponents sdkComponents) {
return addComponents(
- CloudObject.forClassName(CloudObjectKinds.KIND_GLOBAL_WINDOW), Collections.emptyList());
+ CloudObject.forClassName(CloudObjectKinds.KIND_GLOBAL_WINDOW),
+ Collections.emptyList(),
+ sdkComponents);
}
@Override
@@ -205,10 +213,11 @@ class CloudObjectTranslators {
static CloudObjectTranslator<IntervalWindowCoder> intervalWindow() {
return new CloudObjectTranslator<IntervalWindowCoder>() {
@Override
- public CloudObject toCloudObject(IntervalWindowCoder target) {
+ public CloudObject toCloudObject(IntervalWindowCoder target, SdkComponents sdkComponents) {
return addComponents(
CloudObject.forClassName(CloudObjectKinds.KIND_INTERVAL_WINDOW),
- Collections.emptyList());
+ Collections.emptyList(),
+ sdkComponents);
}
@Override
@@ -235,11 +244,13 @@ class CloudObjectTranslators {
static CloudObjectTranslator<FullWindowedValueCoder> windowedValue() {
return new CloudObjectTranslator<FullWindowedValueCoder>() {
@Override
- public CloudObject toCloudObject(FullWindowedValueCoder target) {
+ public CloudObject toCloudObject(FullWindowedValueCoder target, SdkComponents sdkComponents) {
CloudObject result = CloudObject.forClassName(CloudObjectKinds.KIND_WINDOWED_VALUE);
Structs.addBoolean(result, PropertyNames.IS_WRAPPER, true);
return addComponents(
- result, ImmutableList.<Coder<?>>of(target.getValueCoder(), target.getWindowCoder()));
+ result,
+ ImmutableList.<Coder<?>>of(target.getValueCoder(), target.getWindowCoder()),
+ sdkComponents);
}
@Override
@@ -270,9 +281,11 @@ class CloudObjectTranslators {
static CloudObjectTranslator<ByteArrayCoder> bytes() {
return new CloudObjectTranslator<ByteArrayCoder>() {
@Override
- public CloudObject toCloudObject(ByteArrayCoder target) {
+ public CloudObject toCloudObject(ByteArrayCoder target, SdkComponents sdkComponents) {
return addComponents(
- CloudObject.forClassName(CloudObjectKinds.KIND_BYTES), Collections.emptyList());
+ CloudObject.forClassName(CloudObjectKinds.KIND_BYTES),
+ Collections.emptyList(),
+ sdkComponents);
}
@Override
@@ -299,8 +312,9 @@ class CloudObjectTranslators {
static CloudObjectTranslator<VarLongCoder> varInt() {
return new CloudObjectTranslator<VarLongCoder>() {
@Override
- public CloudObject toCloudObject(VarLongCoder target) {
- return addComponents(CloudObject.forClass(target.getClass()), Collections.emptyList());
+ public CloudObject toCloudObject(VarLongCoder target, SdkComponents sdkComponents) {
+ return addComponents(
+ CloudObject.forClass(target.getClass()), Collections.emptyList(), sdkComponents);
}
@Override
@@ -326,7 +340,7 @@ class CloudObjectTranslators {
public static CloudObjectTranslator<Coder> javaSerialized() {
return new CloudObjectTranslator<Coder>() {
@Override
- public CloudObject toCloudObject(Coder target) {
+ public CloudObject toCloudObject(Coder target, SdkComponents sdkComponents) {
// CustomCoder is used as the "marker" for a java-serialized coder
CloudObject cloudObject = CloudObject.forClass(CustomCoder.class);
Structs.addString(cloudObject, TYPE_FIELD, target.getClass().getName());
@@ -363,7 +377,7 @@ class CloudObjectTranslators {
InstanceBuilder.ofType(coderClass).fromFactoryMethod("of").build();
return new CloudObjectTranslator<T>() {
@Override
- public CloudObject toCloudObject(T target) {
+ public CloudObject toCloudObject(T target, SdkComponents sdkComponents) {
return CloudObject.forClass(coderClass);
}
@@ -388,9 +402,10 @@ class CloudObjectTranslators {
final Class<? extends IterableLikeCoder> clazz) {
return new CloudObjectTranslator<IterableLikeCoder>() {
@Override
- public CloudObject toCloudObject(IterableLikeCoder target) {
+ public CloudObject toCloudObject(IterableLikeCoder target, SdkComponents sdkComponents) {
CloudObject base = CloudObject.forClass(clazz);
- return addComponents(base, Collections.<Coder<?>>singletonList(target.getElemCoder()));
+ return addComponents(
+ base, Collections.<Coder<?>>singletonList(target.getElemCoder()), sdkComponents);
}
@Override
@@ -422,10 +437,12 @@ class CloudObjectTranslators {
public static CloudObjectTranslator<MapCoder> map() {
return new CloudObjectTranslator<MapCoder>() {
@Override
- public CloudObject toCloudObject(MapCoder target) {
+ public CloudObject toCloudObject(MapCoder target, SdkComponents sdkComponents) {
CloudObject base = CloudObject.forClass(MapCoder.class);
return addComponents(
- base, ImmutableList.<Coder<?>>of(target.getKeyCoder(), target.getValueCoder()));
+ base,
+ ImmutableList.<Coder<?>>of(target.getKeyCoder(), target.getValueCoder()),
+ sdkComponents);
}
@Override
@@ -454,9 +471,10 @@ class CloudObjectTranslators {
public static CloudObjectTranslator<NullableCoder> nullable() {
return new CloudObjectTranslator<NullableCoder>() {
@Override
- public CloudObject toCloudObject(NullableCoder target) {
+ public CloudObject toCloudObject(NullableCoder target, SdkComponents sdkComponents) {
CloudObject base = CloudObject.forClass(NullableCoder.class);
- return addComponents(base, Collections.<Coder<?>>singletonList(target.getValueCoder()));
+ return addComponents(
+ base, Collections.<Coder<?>>singletonList(target.getValueCoder()), sdkComponents);
}
@Override
@@ -485,8 +503,9 @@ class CloudObjectTranslators {
public static CloudObjectTranslator<UnionCoder> union() {
return new CloudObjectTranslator<UnionCoder>() {
@Override
- public CloudObject toCloudObject(UnionCoder target) {
- return addComponents(CloudObject.forClass(UnionCoder.class), target.getElementCoders());
+ public CloudObject toCloudObject(UnionCoder target, SdkComponents sdkComponents) {
+ return addComponents(
+ CloudObject.forClass(UnionCoder.class), target.getElementCoders(), sdkComponents);
}
@Override
@@ -510,11 +529,12 @@ class CloudObjectTranslators {
public static CloudObjectTranslator<CoGbkResultCoder> coGroupByKeyResult() {
return new CloudObjectTranslator<CoGbkResultCoder>() {
@Override
- public CloudObject toCloudObject(CoGbkResultCoder target) {
+ public CloudObject toCloudObject(CoGbkResultCoder target, SdkComponents sdkComponents) {
CloudObject base = CloudObject.forClass(CoGbkResultCoder.class);
Structs.addObject(
base, PropertyNames.CO_GBK_RESULT_SCHEMA, toCloudObject(target.getSchema()));
- return addComponents(base, Collections.singletonList(target.getUnionCoder()));
+ return addComponents(
+ base, Collections.singletonList(target.getUnionCoder()), sdkComponents);
}
private CloudObject toCloudObject(CoGbkResultSchema schema) {
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
index 4b026df..a96d3dd 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
@@ -23,6 +23,9 @@ import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
import java.util.ServiceLoader;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.ModelCoderRegistrar;
+import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CustomCoder;
@@ -58,11 +61,12 @@ public class CloudObjects {
}
/** Convert the provided {@link Coder} into a {@link CloudObject}. */
- public static CloudObject asCloudObject(Coder<?> coder) {
+ public static CloudObject asCloudObject(Coder<?> coder, @Nullable SdkComponents sdkComponents) {
CloudObjectTranslator<Coder> translator =
(CloudObjectTranslator<Coder>) CODER_TRANSLATORS.get(coder.getClass());
+ CloudObject encoding;
if (translator != null) {
- return translator.toCloudObject(coder);
+ encoding = translator.toCloudObject(coder, sdkComponents);
} else {
CloudObjectTranslator customCoderTranslator = CODER_TRANSLATORS.get(CustomCoder.class);
checkNotNull(
@@ -71,8 +75,17 @@ public class CloudObjects {
CloudObjectTranslator.class.getSimpleName(),
CustomCoder.class.getSimpleName(),
DefaultCoderCloudObjectTranslatorRegistrar.class.getSimpleName());
- return customCoderTranslator.toCloudObject(coder);
+ encoding = customCoderTranslator.toCloudObject(coder, sdkComponents);
}
+ if (sdkComponents != null && !ModelCoderRegistrar.isKnownCoder(coder)) {
+ try {
+ String coderId = sdkComponents.registerCoder(coder);
+ Structs.addString(encoding, PropertyNames.PIPELINE_PROTO_CODER_ID, coderId);
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to register coder " + coder, e);
+ }
+ }
+ return encoding;
}
public static Coder<?> coderFromCloudObject(CloudObject cloudObject) {
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
index bd510c8..e644e0f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
@@ -64,4 +64,5 @@ public class PropertyNames {
public static final String DISPLAY_DATA = "display_data";
public static final String RESTRICTION_CODER = "restriction_coder";
public static final String IMPULSE_ELEMENT = "impulse_element";
+ public static final String PIPELINE_PROTO_CODER_ID = "pipeline_proto_coder_id";
}
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java
index 8cac585..b1be7a3 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SerializableCoderCloudObjectTranslator.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.dataflow.util;
import static com.google.common.base.Preconditions.checkArgument;
import java.io.Serializable;
+import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.sdk.coders.SerializableCoder;
/** A {@link CloudObjectTranslator} for {@link SerializableCoder}. */
@@ -27,7 +28,7 @@ class SerializableCoderCloudObjectTranslator implements CloudObjectTranslator<Se
private static final String TYPE_FIELD = "type";
@Override
- public CloudObject toCloudObject(SerializableCoder target) {
+ public CloudObject toCloudObject(SerializableCoder target, SdkComponents sdkComponents) {
CloudObject base = CloudObject.forClass(SerializableCoder.class);
Structs.addString(base, TYPE_FIELD, target.getRecordType().getName());
return base;
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
index 4d75ecd..b7e8cc8 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
@@ -20,7 +20,9 @@ package org.apache.beam.runners.dataflow.util;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableList.Builder;
@@ -32,6 +34,8 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import org.apache.beam.runners.core.construction.ModelCoderRegistrar;
+import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
@@ -149,12 +153,44 @@ public class CloudObjectsTest {
@Test
public void toAndFromCloudObject() throws Exception {
- CloudObject cloudObject = CloudObjects.asCloudObject(coder);
+ CloudObject cloudObject = CloudObjects.asCloudObject(coder, /*sdkComponents=*/ null);
Coder<?> fromCloudObject = CloudObjects.coderFromCloudObject(cloudObject);
assertEquals(coder.getClass(), fromCloudObject.getClass());
assertEquals(coder, fromCloudObject);
}
+
+ @Test
+ public void toAndFromCloudObjectWithSdkComponents() throws Exception {
+ SdkComponents sdkComponents = SdkComponents.create();
+ CloudObject cloudObject = CloudObjects.asCloudObject(coder, sdkComponents);
+ Coder<?> fromCloudObject = CloudObjects.coderFromCloudObject(cloudObject);
+
+ assertEquals(coder.getClass(), fromCloudObject.getClass());
+ assertEquals(coder, fromCloudObject);
+
+ checkPipelineProtoCoderIds(coder, cloudObject, sdkComponents);
+ }
+
+ private static void checkPipelineProtoCoderIds(
+ Coder<?> coder, CloudObject cloudObject, SdkComponents sdkComponents) throws Exception {
+ if (ModelCoderRegistrar.isKnownCoder(coder)) {
+ assertFalse(cloudObject.containsKey(PropertyNames.PIPELINE_PROTO_CODER_ID));
+ } else {
+ assertTrue(cloudObject.containsKey(PropertyNames.PIPELINE_PROTO_CODER_ID));
+ assertEquals(
+ sdkComponents.registerCoder(coder),
+ cloudObject.get(PropertyNames.PIPELINE_PROTO_CODER_ID));
+ }
+ List<? extends Coder<?>> coderArguments = coder.getCoderArguments();
+ Object cloudComponentsObject = cloudObject.get(PropertyNames.COMPONENT_ENCODINGS);
+ assertTrue(cloudComponentsObject instanceof List);
+ List<CloudObject> cloudComponents = (List<CloudObject>) cloudComponentsObject;
+ assertEquals(coderArguments.size(), cloudComponents.size());
+ for (int i = 0; i < coderArguments.size(); i++) {
+ checkPipelineProtoCoderIds(coderArguments.get(i), cloudComponents.get(i), sdkComponents);
+ }
+ }
}
private static class Record implements Serializable {}
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/RunnerHarnessCoderCloudObjectTranslatorRegistrar.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/RunnerHarnessCoderCloudObjectTranslatorRegistrar.java
index 2b2bfd5..de242d1 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/RunnerHarnessCoderCloudObjectTranslatorRegistrar.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/RunnerHarnessCoderCloudObjectTranslatorRegistrar.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecordCoder;
import org.apache.beam.runners.dataflow.util.CloudObject;
import org.apache.beam.runners.dataflow.util.CloudObjectTranslator;
@@ -69,7 +70,7 @@ public class RunnerHarnessCoderCloudObjectTranslatorRegistrar
implements CloudObjectTranslator<IsmRecordCoder<?>> {
@Override
- public CloudObject toCloudObject(IsmRecordCoder<?> target) {
+ public CloudObject toCloudObject(IsmRecordCoder<?> target, SdkComponents sdkComponents) {
throw new UnsupportedOperationException();
}
@@ -99,7 +100,7 @@ public class RunnerHarnessCoderCloudObjectTranslatorRegistrar
InstanceBuilder.ofType(coderClass).fromFactoryMethod("of").build();
return new CloudObjectTranslator<T>() {
@Override
- public CloudObject toCloudObject(T target) {
+ public CloudObject toCloudObject(T target, SdkComponents sdkComponents) {
throw new UnsupportedOperationException();
}
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java
index 091ab79..398037e 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java
@@ -229,7 +229,8 @@ public class LengthPrefixUnknownCoders {
// Handle well known coders.
if (LENGTH_PREFIX_CODER_TYPE.equals(coderType)) {
if (replaceWithByteArrayCoder) {
- return CloudObjects.asCloudObject(LENGTH_PREFIXED_BYTE_ARRAY_CODER);
+ return CloudObjects.asCloudObject(
+ LENGTH_PREFIXED_BYTE_ARRAY_CODER, /*sdkComponents=*/ null);
}
return codec;
} else if (WELL_KNOWN_CODER_TYPES.contains(coderType)) {
@@ -251,7 +252,7 @@ public class LengthPrefixUnknownCoders {
// Wrap unknown coders with length prefix coder.
if (replaceWithByteArrayCoder) {
- return CloudObjects.asCloudObject(LENGTH_PREFIXED_BYTE_ARRAY_CODER);
+ return CloudObjects.asCloudObject(LENGTH_PREFIXED_BYTE_ARRAY_CODER, /*sdkComponents=*/ null);
} else {
Map<String, Object> prefixedCodec = new HashMap<>();
prefixedCodec.put(PropertyNames.OBJECT_TYPE_NAME, LENGTH_PREFIX_CODER_TYPE);
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/TimerOrElement.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/TimerOrElement.java
index b0dbb26..9db549e 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/TimerOrElement.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/TimerOrElement.java
@@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.runners.dataflow.util.CloudObject;
import org.apache.beam.runners.dataflow.util.CloudObjectTranslator;
import org.apache.beam.runners.dataflow.util.CloudObjects;
@@ -70,7 +71,7 @@ public class TimerOrElement {
private static class TimerOrElementCloudObjectTranslator
implements CloudObjectTranslator<TimerOrElementCoder> {
@Override
- public CloudObject toCloudObject(TimerOrElementCoder target) {
+ public CloudObject toCloudObject(TimerOrElementCoder target, SdkComponents sdkComponents) {
throw new IllegalArgumentException("Should never be called");
}
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/AvroByteReaderFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/AvroByteReaderFactoryTest.java
index 68761fc..6c16ae6 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/AvroByteReaderFactoryTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/AvroByteReaderFactoryTest.java
@@ -73,7 +73,8 @@ public class AvroByteReaderFactoryTest {
Coder<?> coder =
WindowedValue.getFullCoder(BigEndianIntegerCoder.of(), GlobalWindow.Coder.INSTANCE);
NativeReader<?> reader =
- runTestCreateAvroReader(pathToAvroFile, null, null, CloudObjects.asCloudObject(coder));
+ runTestCreateAvroReader(
+ pathToAvroFile, null, null, CloudObjects.asCloudObject(coder, /*sdkComponents=*/ null));
Assert.assertThat(reader, new IsInstanceOf(AvroByteReader.class));
AvroByteReader avroReader = (AvroByteReader) reader;
@@ -88,7 +89,8 @@ public class AvroByteReaderFactoryTest {
Coder<?> coder =
WindowedValue.getFullCoder(BigEndianIntegerCoder.of(), GlobalWindow.Coder.INSTANCE);
NativeReader<?> reader =
- runTestCreateAvroReader(pathToAvroFile, 200L, 500L, CloudObjects.asCloudObject(coder));
+ runTestCreateAvroReader(
+ pathToAvroFile, 200L, 500L, CloudObjects.asCloudObject(coder, /*sdkComponents=*/ null));
Assert.assertThat(reader, new IsInstanceOf(AvroByteReader.class));
AvroByteReader avroReader = (AvroByteReader) reader;
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ConcatReaderFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ConcatReaderFactoryTest.java
index a12947c..772500f 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ConcatReaderFactoryTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ConcatReaderFactoryTest.java
@@ -58,7 +58,8 @@ public class ConcatReaderFactoryTest {
inMemorySourceDictionary.put(PropertyNames.SOURCE_SPEC, inMemorySourceSpec);
- CloudObject textSourceEncoding = CloudObjects.asCloudObject(StringUtf8Coder.of());
+ CloudObject textSourceEncoding =
+ CloudObjects.asCloudObject(StringUtf8Coder.of(), /*sdkComponents=*/ null);
inMemorySourceDictionary.put(PropertyNames.ENCODING, textSourceEncoding);
sourcesList.add(inMemorySourceDictionary);
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/InMemoryReaderFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/InMemoryReaderFactoryTest.java
index 9d36ac0..cdb3b2b 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/InMemoryReaderFactoryTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/InMemoryReaderFactoryTest.java
@@ -56,7 +56,7 @@ public class InMemoryReaderFactoryTest {
Source cloudSource = new Source();
cloudSource.setSpec(spec);
- cloudSource.setCodec(CloudObjects.asCloudObject(coder));
+ cloudSource.setCodec(CloudObjects.asCloudObject(coder, /*sdkComponents=*/ null));
return cloudSource;
}
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
index 0cfe3f4..972c19a 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
@@ -120,7 +120,8 @@ public class IntrinsicMapTaskExecutorFactoryTest {
.andThen(new MapTaskToNetworkFunction());
private static final CloudObject windowedStringCoder =
- CloudObjects.asCloudObject(WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()));
+ CloudObjects.asCloudObject(
+ WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()), /*sdkComponents=*/ null);
private IntrinsicMapTaskExecutorFactory mapTaskExecutorFactory;
private PipelineOptions options;
@@ -560,13 +561,15 @@ public class IntrinsicMapTaskExecutorFactoryTest {
CloudObjects.asCloudObject(
FullWindowedValueCoder.of(
KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()),
- IntervalWindowCoder.of())));
+ IntervalWindowCoder.of()),
+ /*sdkComponents=*/ null));
InstructionOutput output = new InstructionOutput();
output.setName("pgbk_output_name");
output.setCodec(
CloudObjects.asCloudObject(
- KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(BigEndianIntegerCoder.of()))));
+ KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(BigEndianIntegerCoder.of())),
+ /*sdkComponents=*/ null));
output.setOriginalName("originalName");
output.setSystemName("systemName");
@@ -691,7 +694,7 @@ public class IntrinsicMapTaskExecutorFactoryTest {
InstructionOutput output = new InstructionOutput();
output.setName("flatten_output_name");
- output.setCodec(CloudObjects.asCloudObject(StringUtf8Coder.of()));
+ output.setCodec(CloudObjects.asCloudObject(StringUtf8Coder.of(), /*sdkComponents=*/ null));
output.setOriginalName("originalName");
output.setSystemName("systemName");
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReaderTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReaderTest.java
index a07e970..3edf08e 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReaderTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReaderTest.java
@@ -1495,7 +1495,7 @@ public class IsmSideInputReaderTest {
private <T> void verifyList(List<T> expected, List<T> actual) {
assertEquals(expected.size(), actual.size());
- List<Integer> iterationOrder = new ArrayList<>();
+ List<Integer> iterationOrder = new ArrayList<Integer>();
Random random = new Random(1892389023490L);
for (int i = 0; i < expected.size(); ++i) {
iterationOrder.add(i);
@@ -1770,7 +1770,8 @@ public class IsmSideInputReaderTest {
private <K, V> Source newIsmSource(IsmRecordCoder<WindowedValue<V>> coder, String tmpFilePath) {
Source source = new Source();
source.setCodec(
- CloudObjects.asCloudObject(WindowedValue.getFullCoder(coder, GLOBAL_WINDOW_CODER)));
+ CloudObjects.asCloudObject(
+ WindowedValue.getFullCoder(coder, GLOBAL_WINDOW_CODER), /*sdkComponents=*/ null));
source.setSpec(new HashMap<String, Object>());
source.getSpec().put(PropertyNames.OBJECT_TYPE_NAME, "IsmSource");
source.getSpec().put(WorkerPropertyNames.FILENAME, tmpFilePath);
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ReaderFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ReaderFactoryTest.java
index 80d85a8..34bc386 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ReaderFactoryTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ReaderFactoryTest.java
@@ -121,7 +121,8 @@ public class ReaderFactoryTest {
Source cloudSource = new Source();
cloudSource.setSpec(spec);
- cloudSource.setCodec(CloudObjects.asCloudObject(BigEndianIntegerCoder.of()));
+ cloudSource.setCodec(
+ CloudObjects.asCloudObject(BigEndianIntegerCoder.of(), /*sdkComponents=*/ null));
PipelineOptions options = PipelineOptionsFactory.create();
ReaderRegistry registry =
@@ -141,7 +142,7 @@ public class ReaderFactoryTest {
CloudObject spec = CloudObject.forClassName("UnknownSource");
Source cloudSource = new Source();
cloudSource.setSpec(spec);
- cloudSource.setCodec(CloudObjects.asCloudObject(StringUtf8Coder.of()));
+ cloudSource.setCodec(CloudObjects.asCloudObject(StringUtf8Coder.of(), /*sdkComponents=*/ null));
try {
PipelineOptions options = PipelineOptionsFactory.create();
ReaderRegistry.defaultRegistry()
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ShuffleReaderFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ShuffleReaderFactoryTest.java
index 45022d2..c12cd9e 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ShuffleReaderFactoryTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/ShuffleReaderFactoryTest.java
@@ -87,7 +87,7 @@ public class ShuffleReaderFactoryTest {
shuffleReaderConfig,
start,
end,
- CloudObjects.asCloudObject(coder),
+ CloudObjects.asCloudObject(coder, /*sdkComponents=*/ null),
BatchModeExecutionContext.forTesting(PipelineOptionsFactory.create(), "testStage"),
UngroupedShuffleReader.class,
"UngroupedShuffleSource");
@@ -114,7 +114,8 @@ public class ShuffleReaderFactoryTest {
end,
CloudObjects.asCloudObject(
FullWindowedValueCoder.of(
- KvCoder.of(keyCoder, IterableCoder.of(valueCoder)), IntervalWindowCoder.of())),
+ KvCoder.of(keyCoder, IterableCoder.of(valueCoder)), IntervalWindowCoder.of()),
+ /*sdkComponents=*/ null),
context,
GroupingShuffleReader.class,
"GroupingShuffleSource");
@@ -142,7 +143,8 @@ public class ShuffleReaderFactoryTest {
CloudObjects.asCloudObject(
FullWindowedValueCoder.of(
KvCoder.of(keyCoder, windowedValueCoder.getValueCoder()),
- IntervalWindowCoder.of())),
+ IntervalWindowCoder.of()),
+ /*sdkComponents=*/ null),
BatchModeExecutionContext.forTesting(PipelineOptionsFactory.create(), "testStage"),
PartitioningShuffleReader.class,
"PartitioningShuffleSource");
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SinkRegistryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SinkRegistryTest.java
index 099dae4..89c1a69 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SinkRegistryTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SinkRegistryTest.java
@@ -59,7 +59,7 @@ public class SinkRegistryTest {
com.google.api.services.dataflow.model.Sink cloudSink =
new com.google.api.services.dataflow.model.Sink();
cloudSink.setSpec(spec);
- cloudSink.setCodec(CloudObjects.asCloudObject(StringUtf8Coder.of()));
+ cloudSink.setCodec(CloudObjects.asCloudObject(StringUtf8Coder.of(), /*sdkComponents=*/ null));
try {
SinkRegistry.defaultRegistry()
.create(
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
index b87ee45..d69e304 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
@@ -230,7 +230,8 @@ public class StreamingDataflowWorkerTest {
CloudObject timerCloudObject =
CloudObject.forClassName(
"com.google.cloud.dataflow.sdk.util.TimerOrElement$TimerOrElementCoder");
- List<CloudObject> component = Collections.singletonList(CloudObjects.asCloudObject(coder));
+ List<CloudObject> component =
+ Collections.singletonList(CloudObjects.asCloudObject(coder, /*sdkComponents=*/ null));
Structs.addList(timerCloudObject, PropertyNames.COMPONENT_ENCODINGS, component);
CloudObject encodedCoder = CloudObject.forClassName("kind:windowed_value");
@@ -238,7 +239,9 @@ public class StreamingDataflowWorkerTest {
Structs.addList(
encodedCoder,
PropertyNames.COMPONENT_ENCODINGS,
- ImmutableList.of(timerCloudObject, CloudObjects.asCloudObject(IntervalWindowCoder.of())));
+ ImmutableList.of(
+ timerCloudObject,
+ CloudObjects.asCloudObject(IntervalWindowCoder.of(), /*sdkComponents=*/ null)));
return new ParallelInstruction()
.setSystemName(DEFAULT_SOURCE_SYSTEM_NAME)
@@ -269,7 +272,8 @@ public class StreamingDataflowWorkerTest {
.setSpec(CloudObject.forClass(UngroupedWindmillReader.class))
.setCodec(
CloudObjects.asCloudObject(
- WindowedValue.getFullCoder(coder, IntervalWindow.getCoder())))))
+ WindowedValue.getFullCoder(coder, IntervalWindow.getCoder()),
+ /*sdkComponents=*/ null))))
.setOutputs(
Arrays.asList(
new InstructionOutput()
@@ -278,7 +282,8 @@ public class StreamingDataflowWorkerTest {
.setSystemName(DEFAULT_OUTPUT_SYSTEM_NAME)
.setCodec(
CloudObjects.asCloudObject(
- WindowedValue.getFullCoder(coder, IntervalWindow.getCoder())))));
+ WindowedValue.getFullCoder(coder, IntervalWindow.getCoder()),
+ /*sdkComponents=*/ null))));
}
private ParallelInstruction makeDoFnInstruction(
@@ -321,7 +326,8 @@ public class StreamingDataflowWorkerTest {
.setCodec(
CloudObjects.asCloudObject(
WindowedValue.getFullCoder(
- outputCoder, windowingStrategy.getWindowFn().windowCoder())))));
+ outputCoder, windowingStrategy.getWindowFn().windowCoder()),
+ /*sdkComponents=*/ null))));
}
private ParallelInstruction makeDoFnInstruction(
@@ -356,7 +362,8 @@ public class StreamingDataflowWorkerTest {
.setSpec(spec)
.setCodec(
CloudObjects.asCloudObject(
- WindowedValue.getFullCoder(coder, windowCoder)))));
+ WindowedValue.getFullCoder(coder, windowCoder),
+ /*sdkComponents=*/ null))));
}
private ParallelInstruction makeSinkInstruction(
@@ -1088,7 +1095,8 @@ public class StreamingDataflowWorkerTest {
.setCodec(
CloudObjects.asCloudObject(
WindowedValue.getFullCoder(
- StringUtf8Coder.of(), IntervalWindow.getCoder())))));
+ StringUtf8Coder.of(), IntervalWindow.getCoder()),
+ /*sdkComponents=*/ null))));
List<ParallelInstruction> instructions =
Arrays.asList(
@@ -1190,7 +1198,10 @@ public class StreamingDataflowWorkerTest {
.withTimestampCombiner(TimestampCombiner.EARLIEST),
sdkComponents)
.toByteArray()));
- addObject(spec, WorkerPropertyNames.INPUT_CODER, CloudObjects.asCloudObject(windowedKvCoder));
+ addObject(
+ spec,
+ WorkerPropertyNames.INPUT_CODER,
+ CloudObjects.asCloudObject(windowedKvCoder, /*sdkComponents=*/ null));
ParallelInstruction mergeWindowsInstruction =
new ParallelInstruction()
@@ -1208,7 +1219,9 @@ public class StreamingDataflowWorkerTest {
.setOriginalName(DEFAULT_OUTPUT_ORIGINAL_NAME)
.setSystemName(DEFAULT_OUTPUT_SYSTEM_NAME)
.setName("output")
- .setCodec(CloudObjects.asCloudObject(windowedGroupedCoder))));
+ .setCodec(
+ CloudObjects.asCloudObject(
+ windowedGroupedCoder, /*sdkComponents=*/ null))));
List<ParallelInstruction> instructions =
Arrays.asList(
@@ -1492,7 +1505,10 @@ public class StreamingDataflowWorkerTest {
.withAllowedLateness(Duration.standardMinutes(60)),
sdkComponents)
.toByteArray()));
- addObject(spec, WorkerPropertyNames.INPUT_CODER, CloudObjects.asCloudObject(windowedKvCoder));
+ addObject(
+ spec,
+ WorkerPropertyNames.INPUT_CODER,
+ CloudObjects.asCloudObject(windowedKvCoder, /*sdkComponents=*/ null));
ParallelInstruction mergeWindowsInstruction =
new ParallelInstruction()
@@ -1510,7 +1526,9 @@ public class StreamingDataflowWorkerTest {
.setOriginalName(DEFAULT_OUTPUT_ORIGINAL_NAME)
.setSystemName(DEFAULT_OUTPUT_SYSTEM_NAME)
.setName("output")
- .setCodec(CloudObjects.asCloudObject(windowedGroupedCoder))));
+ .setCodec(
+ CloudObjects.asCloudObject(
+ windowedGroupedCoder, /*sdkComponents=*/ null))));
List<ParallelInstruction> instructions =
Arrays.asList(
@@ -1650,7 +1668,8 @@ public class StreamingDataflowWorkerTest {
WindowedValue.getFullCoder(
ValueWithRecordId.ValueWithRecordIdCoder.of(
KvCoder.of(VarIntCoder.of(), VarIntCoder.of())),
- GlobalWindow.Coder.INSTANCE));
+ GlobalWindow.Coder.INSTANCE),
+ /*sdkComponents=*/ null);
return Arrays.asList(
new ParallelInstruction()
@@ -2074,7 +2093,8 @@ public class StreamingDataflowWorkerTest {
WindowedValue.getFullCoder(
ValueWithRecordId.ValueWithRecordIdCoder.of(
KvCoder.of(VarIntCoder.of(), VarIntCoder.of())),
- GlobalWindow.Coder.INSTANCE));
+ GlobalWindow.Coder.INSTANCE),
+ /*sdkComponents=*/ null);
TestCountingSource counter = new TestCountingSource(3).withThrowOnFirstSnapshot(true);
List<ParallelInstruction> instructions =
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingPCollectionViewWriterDoFnFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingPCollectionViewWriterDoFnFactoryTest.java
index 9871ce0..f920a23 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingPCollectionViewWriterDoFnFactoryTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingPCollectionViewWriterDoFnFactoryTest.java
@@ -49,7 +49,8 @@ public class StreamingPCollectionViewWriterDoFnFactoryTest {
CloudObject coder =
CloudObjects.asCloudObject(
- WindowedValue.getFullCoder(BigEndianIntegerCoder.of(), GlobalWindow.Coder.INSTANCE));
+ WindowedValue.getFullCoder(BigEndianIntegerCoder.of(), GlobalWindow.Coder.INSTANCE),
+ /*sdkComponents=*/ null);
ParDoFn parDoFn =
new StreamingPCollectionViewWriterDoFnFactory()
.create(
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCodersTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCodersTest.java
index befc876..661e394 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCodersTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCodersTest.java
@@ -105,9 +105,10 @@ public class LengthPrefixUnknownCodersTest {
@Test
public void testLengthPrefixUnknownCoders() throws Exception {
Map<String, Object> lengthPrefixedCoderCloudObject =
- forCodec(CloudObjects.asCloudObject(windowedValueCoder), false);
+ forCodec(CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ null), false);
assertEquals(
- CloudObjects.asCloudObject(prefixedWindowedValueCoder), lengthPrefixedCoderCloudObject);
+ CloudObjects.asCloudObject(prefixedWindowedValueCoder, /*sdkComponents=*/ null),
+ lengthPrefixedCoderCloudObject);
}
/** Test bypassing unknown coders that are already wrapped with {@code LengthPrefixCoder} */
@@ -119,7 +120,7 @@ public class LengthPrefixUnknownCodersTest {
GlobalWindow.Coder.INSTANCE);
Map<String, Object> lengthPrefixedCoderCloudObject =
- forCodec(CloudObjects.asCloudObject(windowedValueCoder), false);
+ forCodec(CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ null), false);
Coder<WindowedValue<KV<String, Integer>>> expectedCoder =
WindowedValue.getFullCoder(
@@ -127,7 +128,9 @@ public class LengthPrefixUnknownCodersTest {
LengthPrefixCoder.of(StringUtf8Coder.of()), LengthPrefixCoder.of(VarIntCoder.of())),
GlobalWindow.Coder.INSTANCE);
- assertEquals(CloudObjects.asCloudObject(expectedCoder), lengthPrefixedCoderCloudObject);
+ assertEquals(
+ CloudObjects.asCloudObject(expectedCoder, /*sdkComponents=*/ null),
+ lengthPrefixedCoderCloudObject);
}
/** Test replacing unknown coders with {@code LengthPrefixCoder<ByteArray>} */
@@ -139,71 +142,81 @@ public class LengthPrefixUnknownCodersTest {
GlobalWindow.Coder.INSTANCE);
Map<String, Object> lengthPrefixedCoderCloudObject =
- forCodec(CloudObjects.asCloudObject(windowedValueCoder), true);
+ forCodec(CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ null), true);
assertEquals(
- CloudObjects.asCloudObject(prefixedAndReplacedWindowedValueCoder),
+ CloudObjects.asCloudObject(prefixedAndReplacedWindowedValueCoder, /*sdkComponents=*/ null),
lengthPrefixedCoderCloudObject);
}
@Test
public void testLengthPrefixInstructionOutputCoder() throws Exception {
InstructionOutput output = new InstructionOutput();
- output.setCodec(CloudObjects.asCloudObject(windowedValueCoder));
+ output.setCodec(CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ null));
output.setFactory(new JacksonFactory());
InstructionOutput prefixedOutput = forInstructionOutput(output, false);
- assertEquals(CloudObjects.asCloudObject(prefixedWindowedValueCoder), prefixedOutput.getCodec());
+ assertEquals(
+ CloudObjects.asCloudObject(prefixedWindowedValueCoder, /*sdkComponents=*/ null),
+ prefixedOutput.getCodec());
// Should not mutate the instruction.
- assertEquals(output.getCodec(), CloudObjects.asCloudObject(windowedValueCoder));
+ assertEquals(
+ output.getCodec(), CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ null));
}
@Test
public void testLengthPrefixReadInstructionCoder() throws Exception {
ReadInstruction readInstruction = new ReadInstruction();
readInstruction.setSource(
- new Source().setCodec(CloudObjects.asCloudObject(windowedValueCoder)));
+ new Source()
+ .setCodec(CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ null)));
instruction.setRead(readInstruction);
ParallelInstruction prefixedInstruction = forParallelInstruction(instruction, false);
assertEquals(
- CloudObjects.asCloudObject(prefixedWindowedValueCoder),
+ CloudObjects.asCloudObject(prefixedWindowedValueCoder, /*sdkComponents=*/ null),
prefixedInstruction.getRead().getSource().getCodec());
// Should not mutate the instruction.
assertEquals(
- readInstruction.getSource().getCodec(), CloudObjects.asCloudObject(windowedValueCoder));
+ readInstruction.getSource().getCodec(),
+ CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ null));
}
@Test
public void testLengthPrefixWriteInstructionCoder() throws Exception {
WriteInstruction writeInstruction = new WriteInstruction();
- writeInstruction.setSink(new Sink().setCodec(CloudObjects.asCloudObject(windowedValueCoder)));
+ writeInstruction.setSink(
+ new Sink()
+ .setCodec(CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ null)));
instruction.setWrite(writeInstruction);
ParallelInstruction prefixedInstruction = forParallelInstruction(instruction, false);
assertEquals(
- CloudObjects.asCloudObject(prefixedWindowedValueCoder),
+ CloudObjects.asCloudObject(prefixedWindowedValueCoder, /*sdkComponents=*/ null),
prefixedInstruction.getWrite().getSink().getCodec());
// Should not mutate the instruction.
assertEquals(
- CloudObjects.asCloudObject(windowedValueCoder), writeInstruction.getSink().getCodec());
+ CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ null),
+ writeInstruction.getSink().getCodec());
}
@Test
public void testLengthPrefixParDoInstructionCoder() throws Exception {
ParDoInstruction parDo = new ParDoInstruction();
CloudObject spec = CloudObject.forClassName(MERGE_BUCKETS_DO_FN);
- spec.put(WorkerPropertyNames.INPUT_CODER, CloudObjects.asCloudObject(windowedValueCoder));
+ spec.put(
+ WorkerPropertyNames.INPUT_CODER,
+ CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ null));
parDo.setUserFn(spec);
instruction.setParDo(parDo);
ParallelInstruction prefixedInstruction = forParallelInstruction(instruction, false);
assertEquals(
- CloudObjects.asCloudObject(prefixedWindowedValueCoder),
+ CloudObjects.asCloudObject(prefixedWindowedValueCoder, /*sdkComponents=*/ null),
prefixedInstruction.getParDo().getUserFn().get(WorkerPropertyNames.INPUT_CODER));
// Should not mutate the instruction.
assertEquals(
- CloudObjects.asCloudObject(windowedValueCoder),
+ CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ null),
parDo.getUserFn().get(WorkerPropertyNames.INPUT_CODER));
}
@@ -256,7 +269,7 @@ public class LengthPrefixUnknownCodersTest {
network.addNode(grpcPortNode);
network.addEdge(grpcPortNode, instructionOutputNode, DefaultEdge.create());
assertEquals(
- CloudObjects.asCloudObject(prefixedWindowedValueCoder),
+ CloudObjects.asCloudObject(prefixedWindowedValueCoder, /*sdkComponents=*/ null),
((InstructionOutputNode) forInstructionOutputNode(network).apply(instructionOutputNode))
.getInstructionOutput()
.getCodec());
@@ -269,7 +282,7 @@ public class LengthPrefixUnknownCodersTest {
network.addNode(grpcPortNode);
network.addEdge(instructionOutputNode, grpcPortNode, DefaultEdge.create());
assertEquals(
- CloudObjects.asCloudObject(prefixedWindowedValueCoder),
+ CloudObjects.asCloudObject(prefixedWindowedValueCoder, /*sdkComponents=*/ null),
((InstructionOutputNode) forInstructionOutputNode(network).apply(instructionOutputNode))
.getInstructionOutput()
.getCodec());
@@ -283,7 +296,7 @@ public class LengthPrefixUnknownCodersTest {
network.addNode(readNode);
network.addEdge(readNode, instructionOutputNode, DefaultEdge.create());
assertEquals(
- CloudObjects.asCloudObject(windowedValueCoder),
+ CloudObjects.asCloudObject(windowedValueCoder, /*sdkComponents=*/ null),
((InstructionOutputNode) forInstructionOutputNode(network).apply(instructionOutputNode))
.getInstructionOutput()
.getCodec());
@@ -317,7 +330,8 @@ public class LengthPrefixUnknownCodersTest {
SideInputInfo sideInputInfo = new SideInputInfo().setSources(new ArrayList<>());
sideInputInfo.setFactory(new JacksonFactory());
for (Coder<?> coder : coders) {
- Source source = new Source().setCodec(CloudObjects.asCloudObject(coder));
+ Source source =
+ new Source().setCodec(CloudObjects.asCloudObject(coder, /*sdkComponents=*/ null));
source.setFactory(new JacksonFactory());
sideInputInfo.getSources().add(source);
}
@@ -340,7 +354,7 @@ public class LengthPrefixUnknownCodersTest {
new ReadInstruction()
.setSource(
new Source()
- .setCodec(CloudObjects.asCloudObject(coder))
+ .setCodec(CloudObjects.asCloudObject(coder, /*sdkComponents=*/ null))
.setSpec(CloudObject.forClassName(readClassName))));
parallelInstruction.setFactory(new JacksonFactory());
@@ -349,7 +363,9 @@ public class LengthPrefixUnknownCodersTest {
private static InstructionOutputNode createInstructionOutputNode(String name, Coder<?> coder) {
InstructionOutput instructionOutput =
- new InstructionOutput().setName(name).setCodec(CloudObjects.asCloudObject(coder));
+ new InstructionOutput()
+ .setName(name)
+ .setCodec(CloudObjects.asCloudObject(coder, /*sdkComponents=*/ null));
instructionOutput.setFactory(new JacksonFactory());
return InstructionOutputNode.create(instructionOutput);
}
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/CloudSourceUtilsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/CloudSourceUtilsTest.java
index 9814bb3..43baf71 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/CloudSourceUtilsTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/CloudSourceUtilsTest.java
@@ -62,7 +62,7 @@ public class CloudSourceUtilsTest {
source.getBaseSpecs().add(grandparent);
source.getBaseSpecs().add(parent);
source.setSpec(child);
- source.setCodec(CloudObjects.asCloudObject(StringUtf8Coder.of()));
+ source.setCodec(CloudObjects.asCloudObject(StringUtf8Coder.of(), /*sdkComponents=*/ null));
Source flat = CloudSourceUtils.flattenBaseSpecs(source);
assertNull(flat.getBaseSpecs());
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/TimerOrElementTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/TimerOrElementTest.java
index a7387ca..b4ae74a 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/TimerOrElementTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/TimerOrElementTest.java
@@ -55,7 +55,8 @@ public class TimerOrElementTest {
"com.google.cloud.dataflow.sdk.util.TimerOrElement$TimerOrElementCoder");
List<CloudObject> component =
Collections.singletonList(
- CloudObjects.asCloudObject(KvCoder.of(VarLongCoder.of(), ByteArrayCoder.of())));
+ CloudObjects.asCloudObject(
+ KvCoder.of(VarLongCoder.of(), ByteArrayCoder.of()), /*sdkComponents=*/ null));
Structs.addList(cloudObject, PropertyNames.COMPONENT_ENCODINGS, component);
Coder<?> decoded = CloudObjects.coderFromCloudObject(cloudObject);