You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/01/04 00:36:20 UTC
[4/6] beam git commit: Swap to use initializeCloudObject as
customization point for CloudObjects. Hide StandardCoder#getComponents() and
have coders only rely on Coder#getCoderArguments()
Swap to use initializeCloudObject as customization point for CloudObjects.
Hide StandardCoder#getComponents() and have coders only rely on Coder#getCoderArguments()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1b76d3dc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1b76d3dc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1b76d3dc
Branch: refs/heads/master
Commit: 1b76d3dc18a1367d2530fc870e8cb3046cdc714f
Parents: 3de4108
Author: Luke Cwik <lc...@google.com>
Authored: Thu Dec 29 13:39:45 2016 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Jan 3 16:35:37 2017 -0800
----------------------------------------------------------------------
.../beam/runners/dataflow/internal/IsmFormat.java | 14 +++++++-------
.../beam/runners/spark/coders/WritableCoder.java | 4 ++--
.../org/apache/beam/sdk/coders/AtomicCoder.java | 2 +-
.../org/apache/beam/sdk/coders/AvroCoder.java | 4 ++--
.../org/apache/beam/sdk/coders/CustomCoder.java | 18 +-----------------
.../org/apache/beam/sdk/coders/IterableCoder.java | 4 ++--
.../org/apache/beam/sdk/coders/JAXBCoder.java | 4 ++--
.../java/org/apache/beam/sdk/coders/KvCoder.java | 4 ++--
.../apache/beam/sdk/coders/LengthPrefixCoder.java | 2 ++
.../apache/beam/sdk/coders/SerializableCoder.java | 4 ++--
.../org/apache/beam/sdk/coders/StandardCoder.java | 12 ++++++++++--
.../beam/sdk/coders/protobuf/ProtoCoder.java | 8 ++++----
.../org/apache/beam/sdk/transforms/Combine.java | 11 +++--------
.../beam/sdk/transforms/join/CoGbkResult.java | 13 +++----------
.../org/apache/beam/sdk/util/WindowedValue.java | 12 ++++++------
.../org/apache/beam/sdk/coders/KvCoderTest.java | 5 +++++
.../beam/sdk/util/SerializableUtilsTest.java | 4 ++--
.../apache/beam/sdk/io/hdfs/AvroWrapperCoder.java | 4 ++--
.../apache/beam/sdk/io/hdfs/WritableCoder.java | 4 ++--
19 files changed, 60 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
index 6a244b0..5b733c8 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
@@ -125,7 +125,7 @@ public class IsmFormat {
checkArgument(!keyComponents.isEmpty(), "Expected non-empty list of key components.");
checkArgument(!isMetadataKey(keyComponents),
"Expected key components to not contain metadata key.");
- return new AutoValue_IsmFormat_IsmRecord<V>(keyComponents, value, null);
+ return new AutoValue_IsmFormat_IsmRecord<>(keyComponents, value, null);
}
public static <V> IsmRecord<V> meta(List<?> keyComponents, byte[] metadata) {
@@ -133,7 +133,7 @@ public class IsmFormat {
checkArgument(!keyComponents.isEmpty(), "Expected non-empty list of key components.");
checkArgument(isMetadataKey(keyComponents),
"Expected key components to contain metadata key.");
- return new AutoValue_IsmFormat_IsmRecord<V>(keyComponents, null, metadata);
+ return new AutoValue_IsmFormat_IsmRecord<>(keyComponents, null, metadata);
}
/** Returns the list of key components. */
@@ -379,11 +379,11 @@ public class IsmFormat {
}
@Override
- public CloudObject asCloudObject() {
- CloudObject cloudObject = super.asCloudObject();
- addLong(cloudObject, PropertyNames.NUM_SHARD_CODERS, numberOfShardKeyCoders);
- addLong(cloudObject, PropertyNames.NUM_METADATA_SHARD_CODERS, numberOfMetadataShardKeyCoders);
- return cloudObject;
+ protected CloudObject initializeCloudObject() {
+ CloudObject result = CloudObject.forClass(getClass());
+ addLong(result, PropertyNames.NUM_SHARD_CODERS, numberOfShardKeyCoders);
+ addLong(result, PropertyNames.NUM_METADATA_SHARD_CODERS, numberOfMetadataShardKeyCoders);
+ return result;
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java
index e63c660..40c2627 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java
@@ -107,8 +107,8 @@ public class WritableCoder<T extends Writable> extends StandardCoder<T> {
}
@Override
- public CloudObject asCloudObject() {
- CloudObject result = super.asCloudObject();
+ protected CloudObject initializeCloudObject() {
+ CloudObject result = CloudObject.forClass(getClass());
result.put("type", type.getName());
return result;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
index 60908fa..c024f89 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
@@ -32,7 +32,7 @@ public abstract class AtomicCoder<T> extends DeterministicStandardCoder<T> {
protected AtomicCoder() { }
@Override
- public List<Coder<?>> getCoderArguments() {
+ public final List<Coder<?>> getCoderArguments() {
return null;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
index 41afdc6..eee0906 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
@@ -327,8 +327,8 @@ public class AvroCoder<T> extends StandardCoder<T> {
}
@Override
- public CloudObject asCloudObject() {
- CloudObject result = super.asCloudObject();
+ protected CloudObject initializeCloudObject() {
+ CloudObject result = CloudObject.forClass(getClass());
addString(result, "type", type.getName());
addString(result, "schema", schemaSupplier.get().toString());
return result;
http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
index 2614cc1..59d29de 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
@@ -17,17 +17,12 @@
*/
package org.apache.beam.sdk.coders;
-import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.sdk.util.Structs.addString;
-import static org.apache.beam.sdk.util.Structs.addStringList;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.Lists;
import java.io.Serializable;
-import java.util.Collection;
import org.apache.beam.sdk.util.CloudObject;
-import org.apache.beam.sdk.util.PropertyNames;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.StringUtils;
@@ -72,7 +67,7 @@ public abstract class CustomCoder<T> extends AtomicCoder<T>
* @return A thin {@link CloudObject} wrapping of the Java serialization of {@code this}.
*/
@Override
- public CloudObject asCloudObject() {
+ public CloudObject initializeCloudObject() {
// N.B. We use the CustomCoder class, not the derived class, since during
// deserialization we will be using the CustomCoder's static factory method
// to construct an instance of the derived class.
@@ -82,17 +77,6 @@ public abstract class CustomCoder<T> extends AtomicCoder<T>
StringUtils.byteArrayToJsonString(
SerializableUtils.serializeToByteArray(this)));
- String encodingId = getEncodingId();
- checkNotNull(encodingId, "Coder.getEncodingId() must not return null.");
- if (!encodingId.isEmpty()) {
- addString(result, PropertyNames.ENCODING_ID, encodingId);
- }
-
- Collection<String> allowedEncodings = getAllowedEncodings();
- if (!allowedEncodings.isEmpty()) {
- addStringList(result, PropertyNames.ALLOWED_ENCODINGS, Lists.newArrayList(allowedEncodings));
- }
-
return result;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
index 11fb172..cc6b970 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
@@ -68,8 +68,8 @@ public class IterableCoder<T> extends IterableLikeCoder<T, Iterable<T>> {
}
@Override
- public CloudObject asCloudObject() {
- CloudObject result = super.asCloudObject();
+ protected CloudObject initializeCloudObject() {
+ CloudObject result = CloudObject.forClass(getClass());
addBoolean(result, PropertyNames.IS_STREAM_LIKE, true);
return result;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java
index 748b07d..7afd225 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java
@@ -167,8 +167,8 @@ public class JAXBCoder<T> extends AtomicCoder<T> {
}
@Override
- public CloudObject asCloudObject() {
- CloudObject result = super.asCloudObject();
+ protected CloudObject initializeCloudObject() {
+ CloudObject result = CloudObject.forClass(getClass());
Structs.addString(result, JAXB_CLASS, jaxbClass.getName());
return result;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
index c0d3aa4..1e70a30 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
@@ -122,8 +122,8 @@ public class KvCoder<K, V> extends StandardCoder<KV<K, V>> {
}
@Override
- public CloudObject asCloudObject() {
- CloudObject result = super.asCloudObject();
+ protected CloudObject initializeCloudObject() {
+ CloudObject result = CloudObject.forClass(getClass());
addBoolean(result, PropertyNames.IS_PAIR_LIKE, true);
return result;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
index dd9af32..7319200 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
@@ -65,6 +65,8 @@ public class LengthPrefixCoder<T> extends StandardCoder<T> {
this.valueCoder = valueCoder;
}
+
+
@Override
public void encode(T value, OutputStream outStream, Context context)
throws CoderException, IOException {
http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
index 46777b9..de7cea8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
@@ -144,8 +144,8 @@ public class SerializableCoder<T extends Serializable> extends AtomicCoder<T> {
}
@Override
- public CloudObject asCloudObject() {
- CloudObject result = super.asCloudObject();
+ public CloudObject initializeCloudObject() {
+ CloudObject result = CloudObject.forClass(getClass());
result.put("type", type.getName());
return result;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java
index 0e57ed2..c17c376 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java
@@ -120,9 +120,13 @@ public abstract class StandardCoder<T> implements Coder<T> {
return builder.toString();
}
+ /**
+ * {@link StandardCoder} implementations should override {@link #initializeCloudObject}
+ * if the default {@link CloudObject} representation needs to change.
+ */
@Override
- public CloudObject asCloudObject() {
- CloudObject result = CloudObject.forClass(getClass());
+ public final CloudObject asCloudObject() {
+ CloudObject result = initializeCloudObject();
List<? extends Coder<?>> components = getComponents();
if (!components.isEmpty()) {
@@ -147,6 +151,10 @@ public abstract class StandardCoder<T> implements Coder<T> {
return result;
}
+ protected CloudObject initializeCloudObject() {
+ return CloudObject.forClass(getClass());
+ }
+
/**
* {@inheritDoc}
*
http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java
index 9bba42b..a5f53ff 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java
@@ -124,7 +124,7 @@ public class ProtoCoder<T extends Message> extends AtomicCoder<T> {
* Returns a {@link ProtoCoder} for the given Protocol Buffers {@link Message}.
*/
public static <T extends Message> ProtoCoder<T> of(Class<T> protoMessageClass) {
- return new ProtoCoder<T>(protoMessageClass, ImmutableSet.<Class<?>>of());
+ return new ProtoCoder<>(protoMessageClass, ImmutableSet.<Class<?>>of());
}
/**
@@ -162,7 +162,7 @@ public class ProtoCoder<T extends Message> extends AtomicCoder<T> {
}
}
- return new ProtoCoder<T>(
+ return new ProtoCoder<>(
protoMessageClass,
new ImmutableSet.Builder<Class<?>>()
.addAll(extensionHostClasses)
@@ -337,8 +337,8 @@ public class ProtoCoder<T extends Message> extends AtomicCoder<T> {
}
@Override
- public CloudObject asCloudObject() {
- CloudObject result = super.asCloudObject();
+ public CloudObject initializeCloudObject() {
+ CloudObject result = CloudObject.forClass(getClass());
Structs.addString(result, PROTO_MESSAGE_CLASS, protoMessageClass.getName());
List<CloudObject> extensionHostClassNames = Lists.newArrayList();
for (String className : getSortedExtensionClasses()) {
http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 92c04ca..98a7bec 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -27,7 +27,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
@@ -43,6 +42,7 @@ import org.apache.beam.sdk.coders.StandardCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn;
import org.apache.beam.sdk.transforms.CombineFnBase.AbstractGlobalCombineFn;
import org.apache.beam.sdk.transforms.CombineFnBase.AbstractPerKeyCombineFn;
import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
@@ -654,11 +654,6 @@ public class Combine {
}
@Override
- public List<Coder<?>> getCoderArguments() {
- return Arrays.<Coder<?>>asList(valueCoder);
- }
-
- @Override
public void encode(Holder<V> accumulator, OutputStream outStream, Context context)
throws CoderException, IOException {
if (accumulator.present) {
@@ -2225,11 +2220,11 @@ public class Combine {
}
public static <InputT, AccumT> InputOrAccum<InputT, AccumT> input(InputT input) {
- return new InputOrAccum<InputT, AccumT>(input, null);
+ return new InputOrAccum<>(input, null);
}
public static <InputT, AccumT> InputOrAccum<InputT, AccumT> accum(AccumT aggr) {
- return new InputOrAccum<InputT, AccumT>(null, aggr);
+ return new InputOrAccum<>(null, aggr);
}
private static class InputOrAccumCoder<InputT, AccumT>
http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
index 7b849e7..9e0a011 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
@@ -30,7 +30,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
@@ -242,20 +241,14 @@ public class CoGbkResult {
this.unionCoder = unionCoder;
}
-
@Override
public List<? extends Coder<?>> getCoderArguments() {
- return null;
- }
-
- @Override
- public List<? extends Coder<?>> getComponents() {
- return Arrays.<Coder<?>>asList(unionCoder);
+ return ImmutableList.of(unionCoder);
}
@Override
- public CloudObject asCloudObject() {
- CloudObject result = super.asCloudObject();
+ public CloudObject initializeCloudObject() {
+ CloudObject result = CloudObject.forClass(getClass());
addObject(result, PropertyNames.CO_GBK_RESULT_SCHEMA, schema.asCloudObject());
return result;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
index 1b3e648..ce13317 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
@@ -131,7 +131,7 @@ public abstract class WindowedValue<T> {
*/
@Deprecated
public static <T> WindowedValue<T> valueInEmptyWindows(T value) {
- return new ValueInEmptyWindows<T>(value, PaneInfo.NO_FIRING);
+ return new ValueInEmptyWindows<>(value, PaneInfo.NO_FIRING);
}
/**
@@ -143,7 +143,7 @@ public abstract class WindowedValue<T> {
*/
@Deprecated
public static <T> WindowedValue<T> valueInEmptyWindows(T value, PaneInfo pane) {
- return new ValueInEmptyWindows<T>(value, pane);
+ return new ValueInEmptyWindows<>(value, pane);
}
/**
@@ -696,8 +696,8 @@ public abstract class WindowedValue<T> {
}
@Override
- public CloudObject asCloudObject() {
- CloudObject result = super.asCloudObject();
+ public CloudObject initializeCloudObject() {
+ CloudObject result = CloudObject.forClass(getClass());
addBoolean(result, PropertyNames.IS_WRAPPER, true);
return result;
}
@@ -770,8 +770,8 @@ public abstract class WindowedValue<T> {
}
@Override
- public CloudObject asCloudObject() {
- CloudObject result = super.asCloudObject();
+ public CloudObject initializeCloudObject() {
+ CloudObject result = CloudObject.forClass(getClass());
addBoolean(result, PropertyNames.IS_WRAPPER, true);
return result;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
index 436e227..4c07c83 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
@@ -95,6 +95,11 @@ public class KvCoderTest {
private static final Coder<KV<String, Integer>> TEST_CODER =
KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of());
+ @Test
+ public void testEnc() {
+ System.out.println(TEST_CODER.asCloudObject());
+ }
+
private static final List<KV<String, Integer>> TEST_VALUES =
Arrays.asList(KV.of("", -1), KV.of("hello", 0), KV.of("goodbye", Integer.MAX_VALUE));
http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
index 5435a45..9f86ed2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
@@ -129,8 +129,8 @@ public class SerializableUtilsTest {
}
@Override
- public CloudObject asCloudObject() {
- CloudObject result = super.asCloudObject();
+ public CloudObject initializeCloudObject() {
+ CloudObject result = CloudObject.forClass(getClass());
result.put("unserializableField", unserializableField);
return result;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java
index 45a8037..7e01846 100644
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java
@@ -100,8 +100,8 @@ public class AvroWrapperCoder<WrapperT extends AvroWrapper<DatumT>, DatumT>
}
@Override
- public CloudObject asCloudObject() {
- CloudObject result = super.asCloudObject();
+ public CloudObject initializeCloudObject() {
+ CloudObject result = CloudObject.forClass(getClass());
result.put("wrapperType", wrapperType.getName());
return result;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java
index 96ba87a..637e686 100644
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java
@@ -101,8 +101,8 @@ public class WritableCoder<T extends Writable> extends StandardCoder<T> {
}
@Override
- public CloudObject asCloudObject() {
- CloudObject result = super.asCloudObject();
+ public CloudObject initializeCloudObject() {
+ CloudObject result = CloudObject.forClass(getClass());
result.put("type", type.getName());
return result;
}