You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/29 02:08:07 UTC
[1/2] beam git commit: Rename StandardCoder to StructuredCoder
Repository: beam
Updated Branches:
refs/heads/master d233a240e -> 8cc4d59c2
Rename StandardCoder to StructuredCoder
StandardCoder has improper connotations - mainly, "Standard" as in
"Standardized" as opposed to "Standard" as in "normal". StructuredCoder
communicates the important part of the class, which is that the coder
has some meaningful structure, and that structure can be used by a
runner.
Update Dataflow Worker Version
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2d13bacf
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2d13bacf
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2d13bacf
Branch: refs/heads/master
Commit: 2d13bacf9880801fb8398bc5f214e6518e62cce8
Parents: d233a24
Author: Thomas Groh <tg...@google.com>
Authored: Wed Apr 26 16:30:57 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Apr 28 18:43:20 2017 -0700
----------------------------------------------------------------------
.../beam/runners/core/construction/Coders.java | 12 +-
.../runners/core/construction/CodersTest.java | 14 +-
...aultCoderCloudObjectTranslatorRegistrar.java | 1 -
.../apache/beam/sdk/coders/ByteArrayCoder.java | 2 +-
.../org/apache/beam/sdk/coders/CustomCoder.java | 4 +-
.../apache/beam/sdk/coders/DelegateCoder.java | 2 +-
.../beam/sdk/coders/IterableLikeCoder.java | 2 +-
.../org/apache/beam/sdk/coders/KvCoder.java | 2 +-
.../beam/sdk/coders/LengthPrefixCoder.java | 12 +-
.../apache/beam/sdk/coders/NullableCoder.java | 12 +-
.../beam/sdk/coders/SerializableCoder.java | 2 +-
.../apache/beam/sdk/coders/StandardCoder.java | 231 ------------------
.../apache/beam/sdk/coders/StructuredCoder.java | 231 ++++++++++++++++++
.../apache/beam/sdk/coders/VarLongCoder.java | 2 +-
.../sdk/transforms/windowing/GlobalWindow.java | 4 +-
.../transforms/windowing/IntervalWindow.java | 4 +-
.../org/apache/beam/sdk/util/WindowedValue.java | 4 +-
.../beam/sdk/coders/LengthPrefixCoderTest.java | 3 +-
.../beam/sdk/coders/StandardCoderTest.java | 238 -------------------
.../beam/sdk/coders/StructuredCoderTest.java | 238 +++++++++++++++++++
20 files changed, 510 insertions(+), 510 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/2d13bacf/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
index 6fe5dc9..8793df4 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
@@ -36,7 +36,7 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
-import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
@@ -59,8 +59,8 @@ public class Coders {
// The URNs for coders which are shared across languages
@VisibleForTesting
- static final BiMap<Class<? extends StandardCoder>, String> KNOWN_CODER_URNS =
- ImmutableBiMap.<Class<? extends StandardCoder>, String>builder()
+ static final BiMap<Class<? extends StructuredCoder>, String> KNOWN_CODER_URNS =
+ ImmutableBiMap.<Class<? extends StructuredCoder>, String>builder()
.put(ByteArrayCoder.class, "urn:beam:coders:bytes:0.1")
.put(KvCoder.class, "urn:beam:coders:kv:0.1")
.put(VarLongCoder.class, "urn:beam:coders:varint:0.1")
@@ -82,13 +82,13 @@ public class Coders {
private static RunnerApi.Coder toKnownCoder(Coder<?> coder, SdkComponents components)
throws IOException {
checkArgument(
- coder instanceof StandardCoder,
+ coder instanceof StructuredCoder,
"A Known %s must implement %s, but %s of class %s does not",
Coder.class.getSimpleName(),
- StandardCoder.class.getSimpleName(),
+ StructuredCoder.class.getSimpleName(),
coder,
coder.getClass().getName());
- StandardCoder<?> stdCoder = (StandardCoder<?>) coder;
+ StructuredCoder<?> stdCoder = (StructuredCoder<?>) coder;
List<String> componentIds = new ArrayList<>();
for (Coder<?> componentCoder : stdCoder.getComponents()) {
componentIds.add(components.registerCoder(componentCoder));
http://git-wip-us.apache.org/repos/asf/beam/blob/2d13bacf/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
index c9d32ee..ca0fdc9 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
@@ -40,8 +40,8 @@ import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.coders.StandardCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
@@ -60,8 +60,8 @@ import org.junit.runners.Parameterized.Parameters;
/** Tests for {@link Coders}. */
@RunWith(Enclosed.class)
public class CodersTest {
- private static final Set<StandardCoder<?>> KNOWN_CODERS =
- ImmutableSet.<StandardCoder<?>>builder()
+ private static final Set<StructuredCoder<?>> KNOWN_CODERS =
+ ImmutableSet.<StructuredCoder<?>>builder()
.add(ByteArrayCoder.of())
.add(KvCoder.of(VarLongCoder.of(), VarLongCoder.of()))
.add(VarLongCoder.of())
@@ -85,12 +85,12 @@ public class CodersTest {
// Validates that every known coder in the Coders class is represented in a "Known Coder"
// tests, which demonstrates that they are serialized via components and specified URNs rather
// than java serialized
- Set<Class<? extends StandardCoder>> knownCoderClasses = Coders.KNOWN_CODER_URNS.keySet();
- Set<Class<? extends StandardCoder>> knownCoderTests = new HashSet<>();
- for (StandardCoder<?> coder : KNOWN_CODERS) {
+ Set<Class<? extends StructuredCoder>> knownCoderClasses = Coders.KNOWN_CODER_URNS.keySet();
+ Set<Class<? extends StructuredCoder>> knownCoderTests = new HashSet<>();
+ for (StructuredCoder<?> coder : KNOWN_CODERS) {
knownCoderTests.add(coder.getClass());
}
- Set<Class<? extends StandardCoder>> missingKnownCoders = new HashSet<>(knownCoderClasses);
+ Set<Class<? extends StructuredCoder>> missingKnownCoders = new HashSet<>(knownCoderClasses);
missingKnownCoders.removeAll(knownCoderTests);
checkState(
missingKnownCoders.isEmpty(),
http://git-wip-us.apache.org/repos/asf/beam/blob/2d13bacf/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
index 72fd9ce..3d7b534 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
@@ -30,7 +30,6 @@ import org.apache.beam.sdk.coders.Coder;
@AutoService(CoderCloudObjectTranslatorRegistrar.class)
public class DefaultCoderCloudObjectTranslatorRegistrar
implements CoderCloudObjectTranslatorRegistrar {
- @Override
public Map<String, CloudObjectTranslator<? extends Coder>> classNamesToTranslators() {
// TODO: Add translators
return Collections.emptyMap();
http://git-wip-us.apache.org/repos/asf/beam/blob/2d13bacf/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
index dd34f28..cba8d49 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
@@ -40,7 +40,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
* encoded via a {@link VarIntCoder}.</li>
* </ul>
*/
-public class ByteArrayCoder extends StandardCoder<byte[]> {
+public class ByteArrayCoder extends StructuredCoder<byte[]> {
@JsonCreator
public static ByteArrayCoder of() {
http://git-wip-us.apache.org/repos/asf/beam/blob/2d13bacf/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
index 55ec2aa..1627f8a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
@@ -40,7 +40,7 @@ import org.apache.beam.sdk.util.StringUtils;
*
* @param <T> the type of elements handled by this coder
*/
-public abstract class CustomCoder<T> extends StandardCoder<T>
+public abstract class CustomCoder<T> extends StructuredCoder<T>
implements Serializable {
@JsonCreator
@@ -116,5 +116,5 @@ public abstract class CustomCoder<T> extends StandardCoder<T>
// This coder inherits isRegisterByteSizeObserverCheap,
// getEncodedElementByteSize and registerByteSizeObserver
- // from StandardCoder. Override if we can do better.
+ // from StructuredCoder. Override if we can do better.
}
http://git-wip-us.apache.org/repos/asf/beam/blob/2d13bacf/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java
index de9659b..86077eb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java
@@ -170,7 +170,7 @@ public final class DelegateCoder<T, IntermediateT> extends CustomCoder<T> {
private final CodingFunction<IntermediateT, T> fromFn;
// null unless the user explicitly provides a TypeDescriptor.
- // If null, then the machinery from the superclass (StandardCoder) will be used
+ // If null, then the machinery from the superclass (StructuredCoder) will be used
// to try to deduce a good type descriptor.
@Nullable private final TypeDescriptor<T> typeDescriptor;
http://git-wip-us.apache.org/repos/asf/beam/blob/2d13bacf/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
index 61402ac..8e10ca2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
@@ -58,7 +58,7 @@ import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
* @param <IterableT> the type of the Iterables being transcoded
*/
public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>>
- extends StandardCoder<IterableT> {
+ extends StructuredCoder<IterableT> {
public Coder<T> getElemCoder() {
return elementCoder;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/2d13bacf/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
index fcb906c..3d813b6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
@@ -40,7 +40,7 @@ import org.apache.beam.sdk.values.TypeParameter;
* @param <K> the type of the keys of the KVs being transcoded
* @param <V> the type of the values of the KVs being transcoded
*/
-public class KvCoder<K, V> extends StandardCoder<KV<K, V>> {
+public class KvCoder<K, V> extends StructuredCoder<KV<K, V>> {
public static <K, V> KvCoder<K, V> of(Coder<K> keyCoder,
Coder<V> valueCoder) {
return new KvCoder<>(keyCoder, valueCoder);
http://git-wip-us.apache.org/repos/asf/beam/blob/2d13bacf/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
index d123a38..0972b1e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
@@ -41,7 +41,7 @@ import org.apache.beam.sdk.util.VarInt;
*
* @param <T> the type of the values being transcoded
*/
-public class LengthPrefixCoder<T> extends StandardCoder<T> {
+public class LengthPrefixCoder<T> extends StructuredCoder<T> {
public static <T> LengthPrefixCoder<T> of(
Coder<T> valueCoder) {
@@ -112,7 +112,7 @@ public class LengthPrefixCoder<T> extends StandardCoder<T> {
}
/**
- * Overridden to short-circuit the default {@code StandardCoder} behavior of encoding and
+ * Overridden to short-circuit the default {@code StructuredCoder} behavior of encoding and
* counting the bytes. The size is known to be the size of the value plus the number of bytes
* required to prefix the length.
*
@@ -120,15 +120,15 @@ public class LengthPrefixCoder<T> extends StandardCoder<T> {
*/
@Override
protected long getEncodedElementByteSize(T value, Context context) throws Exception {
- if (valueCoder instanceof StandardCoder) {
- // If valueCoder is a StandardCoder then we can ask it directly for the encoded size of
+ if (valueCoder instanceof StructuredCoder) {
+ // If valueCoder is a StructuredCoder then we can ask it directly for the encoded size of
// the value, adding the number of bytes to represent the length.
- long valueSize = ((StandardCoder<T>) valueCoder).getEncodedElementByteSize(
+ long valueSize = ((StructuredCoder<T>) valueCoder).getEncodedElementByteSize(
value, Context.OUTER);
return VarInt.getLength(valueSize) + valueSize;
}
- // If value is not a StandardCoder then fall back to the default StandardCoder behavior
+ // If value is not a StructuredCoder then fall back to the default StructuredCoder behavior
// of encoding and counting the bytes. The encoding will include the length prefix.
return super.getEncodedElementByteSize(value, context);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/2d13bacf/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
index 1fd9a99..747d91c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
@@ -119,7 +119,7 @@ public class NullableCoder<T> extends CustomCoder<T> {
}
/**
- * Overridden to short-circuit the default {@code StandardCoder} behavior of encoding and
+ * Overridden to short-circuit the default {@code StructuredCoder} behavior of encoding and
* counting the bytes. The size is known (1 byte) when {@code value} is {@code null}, otherwise
* the size is 1 byte plus the size of nested {@code Coder}'s encoding of {@code value}.
*
@@ -135,7 +135,7 @@ public class NullableCoder<T> extends CustomCoder<T> {
}
/**
- * Overridden to short-circuit the default {@code StandardCoder} behavior of encoding and
+ * Overridden to short-circuit the default {@code StructuredCoder} behavior of encoding and
* counting the bytes. The size is known (1 byte) when {@code value} is {@code null}, otherwise
* the size is 1 byte plus the size of nested {@code Coder}'s encoding of {@code value}.
*
@@ -147,14 +147,14 @@ public class NullableCoder<T> extends CustomCoder<T> {
return 1;
}
- if (valueCoder instanceof StandardCoder) {
- // If valueCoder is a StandardCoder then we can ask it directly for the encoded size of
+ if (valueCoder instanceof StructuredCoder) {
+ // If valueCoder is a StructuredCoder then we can ask it directly for the encoded size of
// the value, adding 1 byte to count the null indicator.
- return 1 + ((StandardCoder<T>) valueCoder)
+ return 1 + ((StructuredCoder<T>) valueCoder)
.getEncodedElementByteSize(value, context);
}
- // If value is not a StandardCoder then fall back to the default StandardCoder behavior
+ // If value is not a StructuredCoder then fall back to the default StructuredCoder behavior
// of encoding and counting the bytes. The encoding will include the null indicator byte.
return super.getEncodedElementByteSize(value, context);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/2d13bacf/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
index 1a737ab..b52b9db 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
@@ -152,6 +152,6 @@ public class SerializableCoder<T extends Serializable> extends CustomCoder<T> {
// This coder inherits isRegisterByteSizeObserverCheap,
// getEncodedElementByteSize and registerByteSizeObserver
- // from StandardCoder. Looks like we cannot do much better
+ // from StructuredCoder. Looks like we cannot do much better
// in this case.
}
http://git-wip-us.apache.org/repos/asf/beam/blob/2d13bacf/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java
deleted file mode 100644
index f8d82a5..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.coders;
-
-import static org.apache.beam.sdk.util.Structs.addList;
-
-import com.google.common.io.ByteStreams;
-import com.google.common.io.CountingOutputStream;
-import java.io.ByteArrayOutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import org.apache.beam.sdk.util.CloudObject;
-import org.apache.beam.sdk.util.PropertyNames;
-import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-/**
- * An abstract base class to implement a {@link Coder} that defines equality, hashing, and printing
- * via the class name and recursively using {@link #getComponents}.
- *
- * <p>To extend {@link StandardCoder}, override the following methods as appropriate:
- *
- * <ul>
- * <li>{@link #getComponents}: the default implementation returns {@link #getCoderArguments}.</li>
- * <li>{@link #getEncodedElementByteSize} and
- * {@link #isRegisterByteSizeObserverCheap}: the
- * default implementation encodes values to bytes and counts the bytes, which is considered
- * expensive.</li>
- * </ul>
- */
-public abstract class StandardCoder<T> implements Coder<T> {
- protected StandardCoder() {}
-
- /**
- * Returns the list of {@link Coder Coders} that are components of this {@link Coder}.
- */
- public List<? extends Coder<?>> getComponents() {
- List<? extends Coder<?>> coderArguments = getCoderArguments();
- if (coderArguments == null) {
- return Collections.emptyList();
- } else {
- return coderArguments;
- }
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code true} if the two {@link StandardCoder} instances have the
- * same class and equal components.
- */
- @Override
- public boolean equals(Object o) {
- if (o == null || this.getClass() != o.getClass()) {
- return false;
- }
- StandardCoder<?> that = (StandardCoder<?>) o;
- return this.getComponents().equals(that.getComponents());
- }
-
- @Override
- public int hashCode() {
- return getClass().hashCode() * 31 + getComponents().hashCode();
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- String s = getClass().getName();
- builder.append(s.substring(s.lastIndexOf('.') + 1));
-
- List<? extends Coder<?>> componentCoders = getComponents();
- if (!componentCoders.isEmpty()) {
- builder.append('(');
- boolean first = true;
- for (Coder<?> componentCoder : componentCoders) {
- if (first) {
- first = false;
- } else {
- builder.append(',');
- }
- builder.append(componentCoder.toString());
- }
- builder.append(')');
- }
- return builder.toString();
- }
-
- /**
- * Adds the following properties to the {@link CloudObject} representation:
- * <ul>
- * <li>component_encodings: A list of coders represented as {@link CloudObject}s
- * equivalent to the {@link #getCoderArguments}.</li>
- * </ul>
- *
- * <p>{@link StandardCoder} implementations should override {@link #initializeCloudObject}
- * to customize the {@link CloudObject} representation.
- */
- @Override
- public final CloudObject asCloudObject() {
- CloudObject result = initializeCloudObject();
-
- List<? extends Coder<?>> components = getComponents();
- if (!components.isEmpty()) {
- List<CloudObject> cloudComponents = new ArrayList<>(components.size());
- for (Coder<?> coder : components) {
- cloudComponents.add(coder.asCloudObject());
- }
- addList(result, PropertyNames.COMPONENT_ENCODINGS, cloudComponents);
- }
-
- return result;
- }
-
- /**
- * Subclasses should override this method to customize the {@link CloudObject}
- * representation. {@link StandardCoder#asCloudObject} delegates to this method
- * to provide an initial {@link CloudObject}.
- *
- * <p>The default implementation returns a {@link CloudObject} using
- * {@link Object#getClass} for the type.
- */
- protected CloudObject initializeCloudObject() {
- return CloudObject.forClass(getClass());
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code false} unless it is overridden. {@link StandardCoder#registerByteSizeObserver}
- * invokes {@link #getEncodedElementByteSize} which requires re-encoding an element
- * unless it is overridden. This is considered expensive.
- */
- @Override
- public boolean isRegisterByteSizeObserverCheap(T value, Context context) {
- return false;
- }
-
- /**
- * Returns the size in bytes of the encoded value using this coder.
- */
- protected long getEncodedElementByteSize(T value, Context context)
- throws Exception {
- try (CountingOutputStream os = new CountingOutputStream(ByteStreams.nullOutputStream())) {
- encode(value, os, context);
- return os.getCount();
- } catch (Exception exn) {
- throw new IllegalArgumentException(
- "Unable to encode element '" + value + "' with coder '" + this + "'.", exn);
- }
- }
-
- /**
- * {@inheritDoc}
- *
- * <p>For {@link StandardCoder} subclasses, this notifies {@code observer} about the byte size
- * of the encoded value using this coder as returned by {@link #getEncodedElementByteSize}.
- */
- @Override
- public void registerByteSizeObserver(
- T value, ElementByteSizeObserver observer, Context context)
- throws Exception {
- observer.update(getEncodedElementByteSize(value, context));
- }
-
- protected void verifyDeterministic(String message, Iterable<Coder<?>> coders)
- throws NonDeterministicException {
- for (Coder<?> coder : coders) {
- try {
- coder.verifyDeterministic();
- } catch (NonDeterministicException e) {
- throw new NonDeterministicException(this, message, e);
- }
- }
- }
-
- protected void verifyDeterministic(String message, Coder<?>... coders)
- throws NonDeterministicException {
- verifyDeterministic(message, Arrays.asList(coders));
- }
-
- /**
- * {@inheritDoc}
- *
- * @return {@code false} for {@link StandardCoder} unless overridden.
- */
- @Override
- public boolean consistentWithEquals() {
- return false;
- }
-
- @Override
- public Object structuralValue(T value) {
- if (value != null && consistentWithEquals()) {
- return value;
- } else {
- try {
- ByteArrayOutputStream os = new ByteArrayOutputStream();
- encode(value, os, Context.OUTER);
- return new StructuralByteArray(os.toByteArray());
- } catch (Exception exn) {
- throw new IllegalArgumentException(
- "Unable to encode element '" + value + "' with coder '" + this + "'.", exn);
- }
- }
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public TypeDescriptor<T> getEncodedTypeDescriptor() {
- return (TypeDescriptor<T>)
- TypeDescriptor.of(getClass()).resolveType(new TypeDescriptor<T>() {}.getType());
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/2d13bacf/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java
new file mode 100644
index 0000000..bce382c
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StructuredCoder.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.coders;
+
+import static org.apache.beam.sdk.util.Structs.addList;
+
+import com.google.common.io.ByteStreams;
+import com.google.common.io.CountingOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.util.CloudObject;
+import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An abstract base class to implement a {@link Coder} that defines equality, hashing, and printing
+ * via the class name and recursively using {@link #getComponents}.
+ *
+ * <p>To extend {@link StructuredCoder}, override the following methods as appropriate:
+ *
+ * <ul>
+ * <li>{@link #getComponents}: the default implementation returns {@link #getCoderArguments}.</li>
+ * <li>{@link #getEncodedElementByteSize} and
+ * {@link #isRegisterByteSizeObserverCheap}: the
+ * default implementation encodes values to bytes and counts the bytes, which is considered
+ * expensive.</li>
+ * </ul>
+ */
+public abstract class StructuredCoder<T> implements Coder<T> {
+ protected StructuredCoder() {}
+
+ /**
+ * Returns the list of {@link Coder Coders} that are components of this {@link Coder}.
+ */
+ public List<? extends Coder<?>> getComponents() {
+ List<? extends Coder<?>> coderArguments = getCoderArguments();
+ if (coderArguments == null) {
+ return Collections.emptyList();
+ } else {
+ return coderArguments;
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @return {@code true} if the two {@link StructuredCoder} instances have the
+ * same class and equal components.
+ */
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || this.getClass() != o.getClass()) {
+ return false;
+ }
+ StructuredCoder<?> that = (StructuredCoder<?>) o;
+ return this.getComponents().equals(that.getComponents());
+ }
+
+ @Override
+ public int hashCode() {
+ return getClass().hashCode() * 31 + getComponents().hashCode();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ String s = getClass().getName();
+ builder.append(s.substring(s.lastIndexOf('.') + 1));
+
+ List<? extends Coder<?>> componentCoders = getComponents();
+ if (!componentCoders.isEmpty()) {
+ builder.append('(');
+ boolean first = true;
+ for (Coder<?> componentCoder : componentCoders) {
+ if (first) {
+ first = false;
+ } else {
+ builder.append(',');
+ }
+ builder.append(componentCoder.toString());
+ }
+ builder.append(')');
+ }
+ return builder.toString();
+ }
+
+ /**
+ * Adds the following properties to the {@link CloudObject} representation:
+ * <ul>
+ * <li>component_encodings: A list of coders represented as {@link CloudObject}s
+ * equivalent to the {@link #getCoderArguments}.</li>
+ * </ul>
+ *
+ * <p>{@link StructuredCoder} implementations should override {@link #initializeCloudObject}
+ * to customize the {@link CloudObject} representation.
+ */
+ @Override
+ public final CloudObject asCloudObject() {
+ CloudObject result = initializeCloudObject();
+
+ List<? extends Coder<?>> components = getComponents();
+ if (!components.isEmpty()) {
+ List<CloudObject> cloudComponents = new ArrayList<>(components.size());
+ for (Coder<?> coder : components) {
+ cloudComponents.add(coder.asCloudObject());
+ }
+ addList(result, PropertyNames.COMPONENT_ENCODINGS, cloudComponents);
+ }
+
+ return result;
+ }
+
+ /**
+ * Subclasses should override this method to customize the {@link CloudObject}
+ * representation. {@link StructuredCoder#asCloudObject} delegates to this method
+ * to provide an initial {@link CloudObject}.
+ *
+ * <p>The default implementation returns a {@link CloudObject} using
+ * {@link Object#getClass} for the type.
+ */
+ protected CloudObject initializeCloudObject() {
+ return CloudObject.forClass(getClass());
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @return {@code false} unless it is overridden. {@link StructuredCoder#registerByteSizeObserver}
+ * invokes {@link #getEncodedElementByteSize} which requires re-encoding an element
+ * unless it is overridden. This is considered expensive.
+ */
+ @Override
+ public boolean isRegisterByteSizeObserverCheap(T value, Context context) {
+ return false;
+ }
+
+ /**
+ * Returns the size in bytes of the encoded value using this coder.
+ */
+ protected long getEncodedElementByteSize(T value, Context context)
+ throws Exception {
+ try (CountingOutputStream os = new CountingOutputStream(ByteStreams.nullOutputStream())) {
+ encode(value, os, context);
+ return os.getCount();
+ } catch (Exception exn) {
+ throw new IllegalArgumentException(
+ "Unable to encode element '" + value + "' with coder '" + this + "'.", exn);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * <p>For {@link StructuredCoder} subclasses, this notifies {@code observer} about the byte size
+ * of the encoded value using this coder as returned by {@link #getEncodedElementByteSize}.
+ */
+ @Override
+ public void registerByteSizeObserver(
+ T value, ElementByteSizeObserver observer, Context context)
+ throws Exception {
+ observer.update(getEncodedElementByteSize(value, context));
+ }
+
+ protected void verifyDeterministic(String message, Iterable<Coder<?>> coders)
+ throws NonDeterministicException {
+ for (Coder<?> coder : coders) {
+ try {
+ coder.verifyDeterministic();
+ } catch (NonDeterministicException e) {
+ throw new NonDeterministicException(this, message, e);
+ }
+ }
+ }
+
+ protected void verifyDeterministic(String message, Coder<?>... coders)
+ throws NonDeterministicException {
+ verifyDeterministic(message, Arrays.asList(coders));
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @return {@code false} for {@link StructuredCoder} unless overridden.
+ */
+ @Override
+ public boolean consistentWithEquals() {
+ return false;
+ }
+
+ @Override
+ public Object structuralValue(T value) {
+ if (value != null && consistentWithEquals()) {
+ return value;
+ } else {
+ try {
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ encode(value, os, Context.OUTER);
+ return new StructuralByteArray(os.toByteArray());
+ } catch (Exception exn) {
+ throw new IllegalArgumentException(
+ "Unable to encode element '" + value + "' with coder '" + this + "'.", exn);
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public TypeDescriptor<T> getEncodedTypeDescriptor() {
+ return (TypeDescriptor<T>)
+ TypeDescriptor.of(getClass()).resolveType(new TypeDescriptor<T>() {}.getType());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/2d13bacf/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
index 16474ba..7fc094f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
@@ -32,7 +32,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
* numbers always take 10 bytes, so {@link BigEndianLongCoder} may be preferable for
* longs that are known to often be large or negative.
*/
-public class VarLongCoder extends StandardCoder<Long> {
+public class VarLongCoder extends StructuredCoder<Long> {
public static VarLongCoder of() {
return INSTANCE;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/2d13bacf/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
index ffc8011..79c9352 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
@@ -21,7 +21,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collections;
import java.util.List;
-import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.util.CloudObject;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -64,7 +64,7 @@ public class GlobalWindow extends BoundedWindow {
/**
* {@link Coder} for encoding and decoding {@code GlobalWindow}s.
*/
- public static class Coder extends StandardCoder<GlobalWindow> {
+ public static class Coder extends StructuredCoder<GlobalWindow> {
public static final Coder INSTANCE = new Coder();
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/2d13bacf/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
index aaa2e83..55bf585 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
@@ -26,7 +26,7 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.DurationCoder;
import org.apache.beam.sdk.coders.InstantCoder;
-import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.util.CloudObject;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -168,7 +168,7 @@ public class IntervalWindow extends BoundedWindow
/**
* Encodes an {@link IntervalWindow} as a pair of its upper bound and duration.
*/
- public static class IntervalWindowCoder extends StandardCoder<IntervalWindow> {
+ public static class IntervalWindowCoder extends StructuredCoder<IntervalWindow> {
private static final IntervalWindowCoder INSTANCE = new IntervalWindowCoder();
http://git-wip-us.apache.org/repos/asf/beam/blob/2d13bacf/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
index 6b75951..fc9a404 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
@@ -39,7 +39,7 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CollectionCoder;
import org.apache.beam.sdk.coders.InstantCoder;
-import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -576,7 +576,7 @@ public abstract class WindowedValue<T> {
* Abstract class for {@code WindowedValue} coder.
*/
public abstract static class WindowedValueCoder<T>
- extends StandardCoder<WindowedValue<T>> {
+ extends StructuredCoder<WindowedValue<T>> {
final Coder<T> valueCoder;
WindowedValueCoder(Coder<T> valueCoder) {
http://git-wip-us.apache.org/repos/asf/beam/blob/2d13bacf/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
index 27ac48a..fa81a7c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
@@ -33,7 +33,8 @@ import org.junit.runners.JUnit4;
/** Tests for {@link LengthPrefixCoder}. */
@RunWith(JUnit4.class)
public class LengthPrefixCoderTest {
- private static final StandardCoder<byte[]> TEST_CODER = LengthPrefixCoder.of(ByteArrayCoder.of());
+ private static final StructuredCoder<byte[]> TEST_CODER =
+ LengthPrefixCoder.of(ByteArrayCoder.of());
private static final List<byte[]> TEST_VALUES = Arrays.asList(
new byte[]{ 0xa, 0xb, 0xc },
http://git-wip-us.apache.org/repos/asf/beam/blob/2d13bacf/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/StandardCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/StandardCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/StandardCoderTest.java
deleted file mode 100644
index a948f78..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/StandardCoderTest.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.coders;
-
-import com.google.common.collect.ImmutableList;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.testing.CoderProperties;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import org.hamcrest.CoreMatchers;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Test case for {@link StandardCoder}.
- */
-@RunWith(JUnit4.class)
-public class StandardCoderTest {
-
- /**
- * A coder for nullable {@code Boolean} values that is consistent with equals.
- */
- private static class NullBooleanCoder extends StandardCoder<Boolean> {
-
- private static final long serialVersionUID = 0L;
-
- @Override
- public void encode(@Nullable Boolean value, OutputStream outStream, Context context)
- throws CoderException, IOException {
- if (value == null) {
- outStream.write(2);
- } else if (value) {
- outStream.write(1);
- } else {
- outStream.write(0);
- }
- }
-
- @Override
- @Nullable
- public Boolean decode(
- InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
- throws CoderException, IOException {
- int value = inStream.read();
- if (value == 0) {
- return false;
- } else if (value == 1) {
- return true;
- } else if (value == 2) {
- return null;
- }
- throw new CoderException("Invalid value for nullable Boolean: " + value);
- }
-
- @Override
- public List<? extends Coder<?>> getCoderArguments() {
- return Collections.emptyList();
- }
-
- @Override
- public void verifyDeterministic() throws NonDeterministicException { }
-
- @Override
- public boolean consistentWithEquals() {
- return true;
- }
- }
-
- /**
- * A boxed {@code int} with {@code equals()} that compares object identity.
- */
- private static class ObjectIdentityBoolean {
- private final boolean value;
- public ObjectIdentityBoolean(boolean value) {
- this.value = value;
- }
- public boolean getValue() {
- return value;
- }
- }
-
- /**
- * A coder for nullable boxed {@code Boolean} values that is not consistent with equals.
- */
- private static class ObjectIdentityBooleanCoder extends StandardCoder<ObjectIdentityBoolean> {
-
- private static final long serialVersionUID = 0L;
-
- @Override
- public void encode(
- @Nullable ObjectIdentityBoolean value, OutputStream outStream, Context context)
- throws CoderException, IOException {
- if (value == null) {
- outStream.write(2);
- } else if (value.getValue()){
- outStream.write(1);
- } else {
- outStream.write(0);
- }
- }
-
- @Override
- @Nullable
- public ObjectIdentityBoolean decode(
- InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
- throws CoderException, IOException {
- int value = inStream.read();
- if (value == 0) {
- return new ObjectIdentityBoolean(false);
- } else if (value == 1) {
- return new ObjectIdentityBoolean(true);
- } else if (value == 2) {
- return null;
- }
- throw new CoderException("Invalid value for nullable Boolean: " + value);
- }
-
- @Override
- public List<? extends Coder<?>> getCoderArguments() {
- return Collections.emptyList();
- }
-
- @Override
- public void verifyDeterministic() throws NonDeterministicException { }
-
- @Override
- public boolean consistentWithEquals() {
- return false;
- }
- }
-
- /**
- * Tests that {@link StandardCoder#structuralValue()} is correct whenever a subclass has a correct
- * {@link Coder#consistentWithEquals()}.
- */
- @Test
- public void testStructuralValue() throws Exception {
- List<Boolean> testBooleans = Arrays.asList(null, true, false);
- List<ObjectIdentityBoolean> testInconsistentBooleans =
- Arrays.asList(null, new ObjectIdentityBoolean(true), new ObjectIdentityBoolean(false));
-
- Coder<Boolean> consistentCoder = new NullBooleanCoder();
- for (Boolean value1 : testBooleans) {
- for (Boolean value2 : testBooleans) {
- CoderProperties.structuralValueConsistentWithEquals(consistentCoder, value1, value2);
- }
- }
-
- Coder<ObjectIdentityBoolean> inconsistentCoder = new ObjectIdentityBooleanCoder();
- for (ObjectIdentityBoolean value1 : testInconsistentBooleans) {
- for (ObjectIdentityBoolean value2 : testInconsistentBooleans) {
- CoderProperties.structuralValueConsistentWithEquals(inconsistentCoder, value1, value2);
- }
- }
- }
-
- /**
- * Test for verifying {@link StandardCoder#toString()}.
- */
- @Test
- public void testToString() {
- Assert.assertThat(new ObjectIdentityBooleanCoder().toString(),
- CoreMatchers.equalTo("StandardCoderTest$ObjectIdentityBooleanCoder"));
-
- ObjectIdentityBooleanCoder coderWithArgs = new ObjectIdentityBooleanCoder() {
- @Override
- public List<? extends Coder<?>> getCoderArguments() {
- return ImmutableList.<Coder<?>>builder()
- .add(BigDecimalCoder.of(), BigIntegerCoder.of())
- .build();
- }
- };
-
- Assert.assertThat(coderWithArgs.toString(),
- CoreMatchers.equalTo("StandardCoderTest$1(BigDecimalCoder,BigIntegerCoder)"));
- }
-
- @Test
- public void testGenericStandardCoderFallsBackToT() throws Exception {
- Assert.assertThat(
- new Foo<String>().getEncodedTypeDescriptor().getType(),
- CoreMatchers.not(TypeDescriptor.of(String.class).getType()));
- }
-
- @Test
- public void testGenericStandardCoder() throws Exception {
- Assert.assertThat(new FooTwo().getEncodedTypeDescriptor(),
- CoreMatchers.equalTo(TypeDescriptor.of(String.class)));
- }
-
- private static class Foo<T> extends StandardCoder<T> {
-
- @Override
- public void encode(T value, OutputStream outStream, Coder.Context context)
- throws CoderException, IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public T decode(InputStream inStream, Coder.Context context)
- throws CoderException, IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public List<? extends Coder<?>> getCoderArguments() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void verifyDeterministic() throws Coder.NonDeterministicException {}
- }
-
- private static class FooTwo extends Foo<String> {
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/2d13bacf/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/StructuredCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/StructuredCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/StructuredCoderTest.java
new file mode 100644
index 0000000..af2c94e
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/StructuredCoderTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.coders;
+
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Test case for {@link StructuredCoder}.
+ */
+@RunWith(JUnit4.class)
+public class StructuredCoderTest {
+
+ /**
+ * A coder for nullable {@code Boolean} values that is consistent with equals.
+ */
+ private static class NullBooleanCoder extends StructuredCoder<Boolean> {
+
+ private static final long serialVersionUID = 0L;
+
+ @Override
+ public void encode(@Nullable Boolean value, OutputStream outStream, Context context)
+ throws CoderException, IOException {
+ if (value == null) {
+ outStream.write(2);
+ } else if (value) {
+ outStream.write(1);
+ } else {
+ outStream.write(0);
+ }
+ }
+
+ @Override
+ @Nullable
+ public Boolean decode(
+ InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
+ throws CoderException, IOException {
+ int value = inStream.read();
+ if (value == 0) {
+ return false;
+ } else if (value == 1) {
+ return true;
+ } else if (value == 2) {
+ return null;
+ }
+ throw new CoderException("Invalid value for nullable Boolean: " + value);
+ }
+
+ @Override
+ public List<? extends Coder<?>> getCoderArguments() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException { }
+
+ @Override
+ public boolean consistentWithEquals() {
+ return true;
+ }
+ }
+
+ /**
+ * A boxed {@code int} with {@code equals()} that compares object identity.
+ */
+ private static class ObjectIdentityBoolean {
+ private final boolean value;
+ public ObjectIdentityBoolean(boolean value) {
+ this.value = value;
+ }
+ public boolean getValue() {
+ return value;
+ }
+ }
+
+ /**
+ * A coder for nullable boxed {@code Boolean} values that is not consistent with equals.
+ */
+ private static class ObjectIdentityBooleanCoder extends StructuredCoder<ObjectIdentityBoolean> {
+
+ private static final long serialVersionUID = 0L;
+
+ @Override
+ public void encode(
+ @Nullable ObjectIdentityBoolean value, OutputStream outStream, Context context)
+ throws CoderException, IOException {
+ if (value == null) {
+ outStream.write(2);
+ } else if (value.getValue()){
+ outStream.write(1);
+ } else {
+ outStream.write(0);
+ }
+ }
+
+ @Override
+ @Nullable
+ public ObjectIdentityBoolean decode(
+ InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
+ throws CoderException, IOException {
+ int value = inStream.read();
+ if (value == 0) {
+ return new ObjectIdentityBoolean(false);
+ } else if (value == 1) {
+ return new ObjectIdentityBoolean(true);
+ } else if (value == 2) {
+ return null;
+ }
+ throw new CoderException("Invalid value for nullable Boolean: " + value);
+ }
+
+ @Override
+ public List<? extends Coder<?>> getCoderArguments() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException { }
+
+ @Override
+ public boolean consistentWithEquals() {
+ return false;
+ }
+ }
+
+ /**
+ * Tests that {@link StructuredCoder#structuralValue()} is correct whenever a subclass has a
+ * correct {@link Coder#consistentWithEquals()}.
+ */
+ @Test
+ public void testStructuralValue() throws Exception {
+ List<Boolean> testBooleans = Arrays.asList(null, true, false);
+ List<ObjectIdentityBoolean> testInconsistentBooleans =
+ Arrays.asList(null, new ObjectIdentityBoolean(true), new ObjectIdentityBoolean(false));
+
+ Coder<Boolean> consistentCoder = new NullBooleanCoder();
+ for (Boolean value1 : testBooleans) {
+ for (Boolean value2 : testBooleans) {
+ CoderProperties.structuralValueConsistentWithEquals(consistentCoder, value1, value2);
+ }
+ }
+
+ Coder<ObjectIdentityBoolean> inconsistentCoder = new ObjectIdentityBooleanCoder();
+ for (ObjectIdentityBoolean value1 : testInconsistentBooleans) {
+ for (ObjectIdentityBoolean value2 : testInconsistentBooleans) {
+ CoderProperties.structuralValueConsistentWithEquals(inconsistentCoder, value1, value2);
+ }
+ }
+ }
+
+ /**
+ * Test for verifying {@link StructuredCoder#toString()}.
+ */
+ @Test
+ public void testToString() {
+ Assert.assertThat(new ObjectIdentityBooleanCoder().toString(),
+ CoreMatchers.equalTo("StructuredCoderTest$ObjectIdentityBooleanCoder"));
+
+ ObjectIdentityBooleanCoder coderWithArgs = new ObjectIdentityBooleanCoder() {
+ @Override
+ public List<? extends Coder<?>> getCoderArguments() {
+ return ImmutableList.<Coder<?>>builder()
+ .add(BigDecimalCoder.of(), BigIntegerCoder.of())
+ .build();
+ }
+ };
+
+ Assert.assertThat(coderWithArgs.toString(),
+ CoreMatchers.equalTo("StructuredCoderTest$1(BigDecimalCoder,BigIntegerCoder)"));
+ }
+
+ @Test
+ public void testGenericStandardCoderFallsBackToT() throws Exception {
+ Assert.assertThat(
+ new Foo<String>().getEncodedTypeDescriptor().getType(),
+ CoreMatchers.not(TypeDescriptor.of(String.class).getType()));
+ }
+
+ @Test
+ public void testGenericStandardCoder() throws Exception {
+ Assert.assertThat(new FooTwo().getEncodedTypeDescriptor(),
+ CoreMatchers.equalTo(TypeDescriptor.of(String.class)));
+ }
+
+ private static class Foo<T> extends StructuredCoder<T> {
+
+ @Override
+ public void encode(T value, OutputStream outStream, Coder.Context context)
+ throws CoderException, IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public T decode(InputStream inStream, Coder.Context context)
+ throws CoderException, IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<? extends Coder<?>> getCoderArguments() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void verifyDeterministic() throws Coder.NonDeterministicException {}
+ }
+
+ private static class FooTwo extends Foo<String> {
+ }
+}
[2/2] beam git commit: This closes #2720
Posted by tg...@apache.org.
This closes #2720
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8cc4d59c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8cc4d59c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8cc4d59c
Branch: refs/heads/master
Commit: 8cc4d59c2f617746228ae71f7c9b3606ab41d4aa
Parents: d233a24 2d13bac
Author: Thomas Groh <tg...@google.com>
Authored: Fri Apr 28 19:07:56 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Apr 28 19:07:56 2017 -0700
----------------------------------------------------------------------
.../beam/runners/core/construction/Coders.java | 12 +-
.../runners/core/construction/CodersTest.java | 14 +-
...aultCoderCloudObjectTranslatorRegistrar.java | 1 -
.../apache/beam/sdk/coders/ByteArrayCoder.java | 2 +-
.../org/apache/beam/sdk/coders/CustomCoder.java | 4 +-
.../apache/beam/sdk/coders/DelegateCoder.java | 2 +-
.../beam/sdk/coders/IterableLikeCoder.java | 2 +-
.../org/apache/beam/sdk/coders/KvCoder.java | 2 +-
.../beam/sdk/coders/LengthPrefixCoder.java | 12 +-
.../apache/beam/sdk/coders/NullableCoder.java | 12 +-
.../beam/sdk/coders/SerializableCoder.java | 2 +-
.../apache/beam/sdk/coders/StandardCoder.java | 231 ------------------
.../apache/beam/sdk/coders/StructuredCoder.java | 231 ++++++++++++++++++
.../apache/beam/sdk/coders/VarLongCoder.java | 2 +-
.../sdk/transforms/windowing/GlobalWindow.java | 4 +-
.../transforms/windowing/IntervalWindow.java | 4 +-
.../org/apache/beam/sdk/util/WindowedValue.java | 4 +-
.../beam/sdk/coders/LengthPrefixCoderTest.java | 3 +-
.../beam/sdk/coders/StandardCoderTest.java | 238 -------------------
.../beam/sdk/coders/StructuredCoderTest.java | 238 +++++++++++++++++++
20 files changed, 510 insertions(+), 510 deletions(-)
----------------------------------------------------------------------