You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/01/04 00:36:17 UTC

[1/6] beam git commit: Update Dataflow worker image to be able to deserialize well known coder types within TimerOrElementCoder.

Repository: beam
Updated Branches:
  refs/heads/master d86db15ba -> 5c612272d


Update Dataflow worker image to be able to deserialize well known coder types within TimerOrElementCoder.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8c692140
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8c692140
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8c692140

Branch: refs/heads/master
Commit: 8c692140c73d0723347aaaa4cf863a53d5ac78ce
Parents: bfde34d
Author: Luke Cwik <lc...@google.com>
Authored: Tue Jan 3 14:54:10 2017 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Jan 3 16:35:37 2017 -0800

----------------------------------------------------------------------
 .../org/apache/beam/runners/dataflow/dataflow.properties         | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8c692140/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
index 2912f61..399146d 100644
--- a/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
+++ b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
@@ -18,6 +18,6 @@
 
 environment.major.version=6
 
-worker.image.batch=dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20161221
+worker.image.batch=dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20170103
 
-worker.image.streaming=dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20161221
+worker.image.streaming=dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20170103


[6/6] beam git commit: [BEAM-1231] Use well known coder types in Java

Posted by lc...@apache.org.
[BEAM-1231] Use well known coder types in Java

This closes #1717


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5c612272
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5c612272
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5c612272

Branch: refs/heads/master
Commit: 5c612272d582421fbd540381c150e68205470f3b
Parents: d86db15 8c69214
Author: Luke Cwik <lc...@google.com>
Authored: Tue Jan 3 16:36:05 2017 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Jan 3 16:36:05 2017 -0800

----------------------------------------------------------------------
 .../runners/core/KeyedWorkItemCoderTest.java    |   6 +
 .../UnboundedReadFromBoundedSourceTest.java     |   7 +
 .../runners/dataflow/internal/IsmFormat.java    |  14 +-
 .../beam/runners/dataflow/dataflow.properties   |   4 +-
 .../runners/spark/coders/WritableCoder.java     |   4 +-
 .../org/apache/beam/sdk/coders/AtomicCoder.java |   2 +-
 .../org/apache/beam/sdk/coders/AvroCoder.java   |   4 +-
 .../apache/beam/sdk/coders/CollectionCoder.java |   4 +-
 .../org/apache/beam/sdk/coders/CustomCoder.java |  18 +--
 .../apache/beam/sdk/coders/IterableCoder.java   |   4 +-
 .../org/apache/beam/sdk/coders/JAXBCoder.java   |   4 +-
 .../org/apache/beam/sdk/coders/KvCoder.java     |   4 +-
 .../beam/sdk/coders/LengthPrefixCoder.java      | 145 +++++++++++++++++++
 .../beam/sdk/coders/SerializableCoder.java      |   4 +-
 .../org/apache/beam/sdk/coders/SetCoder.java    |   4 +-
 .../apache/beam/sdk/coders/StandardCoder.java   |  31 +++-
 .../beam/sdk/coders/protobuf/ProtoCoder.java    |   8 +-
 .../org/apache/beam/sdk/transforms/Combine.java |  11 +-
 .../beam/sdk/transforms/join/CoGbkResult.java   |  13 +-
 .../sdk/transforms/windowing/GlobalWindow.java  |   6 +
 .../org/apache/beam/sdk/util/CoderUtils.java    |  28 ++--
 .../org/apache/beam/sdk/util/WindowedValue.java |  12 +-
 .../beam/sdk/values/TimestampedValue.java       |   4 +-
 .../beam/sdk/coders/CollectionCoderTest.java    |   6 +
 .../beam/sdk/coders/IterableCoderTest.java      |  18 ++-
 .../org/apache/beam/sdk/coders/KvCoderTest.java |  19 +++
 .../beam/sdk/coders/LengthPrefixCoderTest.java  | 129 +++++++++++++++++
 .../apache/beam/sdk/coders/ListCoderTest.java   |   8 +-
 .../apache/beam/sdk/coders/MapCoderTest.java    |   9 +-
 .../beam/sdk/coders/NullableCoderTest.java      |   6 +
 .../apache/beam/sdk/coders/SetCoderTest.java    |   6 +
 .../apache/beam/sdk/testing/TestStreamTest.java |   5 +
 .../testing/ValueInSingleWindowCoderTest.java   |   7 +
 .../apache/beam/sdk/transforms/CombineTest.java |  70 ---------
 .../transforms/join/CoGbkResultCoderTest.java   |  10 +-
 .../sdk/transforms/join/UnionCoderTest.java     |  24 +--
 .../transforms/windowing/GlobalWindowTest.java  |  64 ++++++++
 .../beam/sdk/util/SerializableUtilsTest.java    |   4 +-
 .../beam/sdk/util/TimerInternalsTest.java       |   5 +
 .../beam/sdk/util/ValueWithRecordIdTest.java    |  34 +++++
 .../apache/beam/sdk/util/WindowedValueTest.java |  23 +++
 .../beam/sdk/values/TimestampedValueTest.java   |  19 ++-
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    |   7 +-
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |   6 +
 .../beam/sdk/io/hdfs/AvroWrapperCoder.java      |   4 +-
 .../apache/beam/sdk/io/hdfs/WritableCoder.java  |   4 +-
 .../beam/sdk/io/hdfs/AvroWrapperCoderTest.java  |   1 -
 .../beam/sdk/io/kafka/KafkaRecordCoderTest.java |  34 +++++
 48 files changed, 677 insertions(+), 186 deletions(-)
----------------------------------------------------------------------



[2/6] beam git commit: Fix coders to have type information and test coders with components to ensure that they serialize and deserialize correctly with well known coder types.

Posted by lc...@apache.org.
Fix coders to have type information and test coders with components to ensure that they serialize and deserialize correctly with well known coder types.

Jackson requires type information to invoke the coder Jackson module to be able to resolve @type information.
The coder Jackson module is configured to only apply to Coder types and subtypes.
In the case of List<Object> or List<?>, Jackson ignores the coder Jackson module and attempts to deserialize the type
using the default context which expects a fully qualified class name which fails on well known types.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bfde34dd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bfde34dd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bfde34dd

Branch: refs/heads/master
Commit: bfde34ddf92567b70cb1887237fbc4d6baaa1846
Parents: 6947d21
Author: Luke Cwik <lc...@google.com>
Authored: Tue Jan 3 14:11:18 2017 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Jan 3 16:35:37 2017 -0800

----------------------------------------------------------------------
 .../runners/core/KeyedWorkItemCoderTest.java    |  6 ++
 .../UnboundedReadFromBoundedSourceTest.java     |  7 ++
 .../apache/beam/sdk/coders/CollectionCoder.java |  4 +-
 .../org/apache/beam/sdk/coders/SetCoder.java    |  4 +-
 .../beam/sdk/values/TimestampedValue.java       |  4 +-
 .../beam/sdk/coders/CollectionCoderTest.java    |  6 ++
 .../beam/sdk/coders/IterableCoderTest.java      |  6 ++
 .../org/apache/beam/sdk/coders/KvCoderTest.java |  7 ++
 .../beam/sdk/coders/LengthPrefixCoderTest.java  |  6 ++
 .../apache/beam/sdk/coders/ListCoderTest.java   |  8 ++-
 .../apache/beam/sdk/coders/MapCoderTest.java    |  9 ++-
 .../beam/sdk/coders/NullableCoderTest.java      |  6 ++
 .../apache/beam/sdk/coders/SetCoderTest.java    |  6 ++
 .../apache/beam/sdk/testing/TestStreamTest.java |  5 ++
 .../testing/ValueInSingleWindowCoderTest.java   |  7 ++
 .../apache/beam/sdk/transforms/CombineTest.java | 70 --------------------
 .../transforms/join/CoGbkResultCoderTest.java   | 10 ++-
 .../sdk/transforms/join/UnionCoderTest.java     | 24 +++----
 .../beam/sdk/util/TimerInternalsTest.java       |  5 ++
 .../beam/sdk/util/ValueWithRecordIdTest.java    | 34 ++++++++++
 .../apache/beam/sdk/util/WindowedValueTest.java | 12 ++++
 .../beam/sdk/values/TimestampedValueTest.java   | 19 +++++-
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    |  7 +-
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |  6 ++
 .../beam/sdk/io/hdfs/AvroWrapperCoderTest.java  |  1 -
 .../beam/sdk/io/kafka/KafkaRecordCoderTest.java | 34 ++++++++++
 26 files changed, 216 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java
index 37fabdd..56a6f6b 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java
@@ -61,4 +61,10 @@ public class KeyedWorkItemCoderTest {
     CoderProperties.coderDecodeEncodeEqual(
         coder, KeyedWorkItems.<String, Integer>timersWorkItem("foo", timers));
   }
+
+  @Test
+  public void testCoderIsSerializableWithWellKnownCoderType() throws Exception {
+    CoderProperties.coderSerializable(KeyedWorkItemCoder.of(
+        GlobalWindow.Coder.INSTANCE, GlobalWindow.Coder.INSTANCE, GlobalWindow.Coder.INSTANCE));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
index e03793b..e1968cb 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
@@ -46,6 +46,7 @@ import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -53,6 +54,7 @@ import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Distinct;
 import org.apache.beam.sdk.transforms.Max;
 import org.apache.beam.sdk.transforms.Min;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
@@ -92,6 +94,11 @@ public class UnboundedReadFromBoundedSourceTest {
   }
 
   @Test
+  public void testCheckpointCoderIsSerializableWithWellKnownCoderType() throws Exception {
+    CoderProperties.coderSerializable(new CheckpointCoder<>(GlobalWindow.Coder.INSTANCE));
+  }
+
+  @Test
   @Category(NeedsRunner.class)
   public void testBoundedToUnboundedSourceAdapter() throws Exception {
     long numElements = 100;

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java
index 7c61e88..bf05253 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java
@@ -52,9 +52,9 @@ public class CollectionCoder<T> extends IterableLikeCoder<T, Collection<T>> {
   @JsonCreator
   public static CollectionCoder<?> of(
       @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-      List<Object> components) {
+      List<Coder<?>> components) {
     checkArgument(components.size() == 1, "Expecting 1 component, got %s", components.size());
-    return of((Coder<?>) components.get(0));
+    return of(components.get(0));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java
index 0d1b017..a8fd1cc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java
@@ -47,9 +47,9 @@ public class SetCoder<T> extends IterableLikeCoder<T, Set<T>> {
   @JsonCreator
   public static SetCoder<?> of(
       @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-      List<Object> components) {
+      List<Coder<?>> components) {
     checkArgument(components.size() == 1, "Expecting 1 component, got %s", components.size());
-    return of((Coder<?>) components.get(0));
+    return of(components.get(0));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
index dd80fb2..7f3bbd3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
@@ -103,10 +103,10 @@ public class TimestampedValue<V> {
     @JsonCreator
     public static TimestampedValueCoder<?> of(
         @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-        List<Object> components) {
+        List<Coder<?>> components) {
       checkArgument(components.size() == 1,
                     "Expecting 1 component, got " + components.size());
-      return of((Coder<?>) components.get(0));
+      return of(components.get(0));
     }
 
     @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CollectionCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CollectionCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CollectionCoderTest.java
index 6a5d94b..faa7e1a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CollectionCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CollectionCoderTest.java
@@ -24,6 +24,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.TreeSet;
 import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.junit.Rule;
 import org.junit.Test;
@@ -89,4 +90,9 @@ public class CollectionCoderTest {
 
     CoderUtils.encodeToBase64(TEST_CODER, null);
   }
+
+  @Test
+  public void testCoderIsSerializableWithWellKnownCoderType() throws Exception {
+    CoderProperties.coderSerializable(CollectionCoder.of(GlobalWindow.Coder.INSTANCE));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java
index 2df768e..e200efe 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java
@@ -26,6 +26,7 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.CloudObject;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.Structs;
@@ -55,6 +56,11 @@ public class IterableCoderTest {
   }
 
   @Test
+  public void testCoderIsSerializableWithWellKnownCoderType() throws Exception {
+    CoderProperties.coderSerializable(ListCoder.of(GlobalWindow.Coder.INSTANCE));
+  }
+
+  @Test
   public void testDecodeEncodeContentsInSameOrder() throws Exception {
     for (Iterable<Integer> value : TEST_VALUES) {
       CoderProperties.<Integer, Iterable<Integer>>coderDecodeEncodeContentsInSameOrder(

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
index 1d1f905..1422897 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.CloudObject;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.Structs;
@@ -87,6 +88,12 @@ public class KvCoderTest {
     }
   }
 
+  @Test
+  public void testCoderIsSerializableWithWellKnownCoderType() throws Exception {
+    CoderProperties.coderSerializable(
+        KvCoder.of(GlobalWindow.Coder.INSTANCE, GlobalWindow.Coder.INSTANCE));
+  }
+
   // If this changes, it implies the binary format has changed!
   private static final String EXPECTED_ENCODING_ID = "";
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
index 95d405d..f7942d3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.CloudObject;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -63,6 +64,11 @@ public class LengthPrefixCoderTest {
   }
 
   @Test
+  public void testCoderIsSerializableWithWellKnownCoderType() throws Exception {
+    CoderProperties.coderSerializable(LengthPrefixCoder.of(GlobalWindow.Coder.INSTANCE));
+  }
+
+  @Test
   public void testEncodedSize() throws Exception {
     assertEquals(4L,
         TEST_CODER.getEncodedElementByteSize(TEST_VALUES.get(0), Coder.Context.NESTED));

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ListCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ListCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ListCoderTest.java
index ba9cc9d..8d91343 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ListCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ListCoderTest.java
@@ -25,6 +25,7 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.junit.Rule;
 import org.junit.Test;
@@ -42,7 +43,12 @@ public class ListCoderTest {
       Collections.<Integer>emptyList(),
       Collections.singletonList(43),
       Arrays.asList(1, 2, 3, 4),
-      new LinkedList<Integer>(Arrays.asList(7, 6, 5)));
+      new LinkedList<>(Arrays.asList(7, 6, 5)));
+
+  @Test
+  public void testCoderIsSerializableWithWellKnownCoderType() throws Exception {
+    CoderProperties.coderSerializable(ListCoder.of(GlobalWindow.Coder.INSTANCE));
+  }
 
   @Test
   public void testDecodeEncodeContentsInSameOrder() throws Exception {

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java
index 1053c79..67366c8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.junit.Rule;
 import org.junit.Test;
@@ -44,7 +45,7 @@ public class MapCoderTest {
 
   private static final List<Map<Integer, String>> TEST_VALUES = Arrays.<Map<Integer, String>>asList(
       Collections.<Integer, String>emptyMap(),
-      new TreeMap<Integer, String>(new ImmutableMap.Builder<Integer, String>()
+      new TreeMap<>(new ImmutableMap.Builder<Integer, String>()
           .put(1, "hello").put(-1, "foo").build()));
 
   @Test
@@ -55,6 +56,12 @@ public class MapCoderTest {
   }
 
   @Test
+  public void testCoderIsSerializableWithWellKnownCoderType() throws Exception {
+    CoderProperties.coderSerializable(
+        MapCoder.of(GlobalWindow.Coder.INSTANCE, GlobalWindow.Coder.INSTANCE));
+  }
+
+  @Test
   public void testGetInstanceComponentsNonempty() {
     Map<Integer, String> map = new HashMap<>();
     map.put(17, "foozle");

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
index 21ecb45..63066a4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
@@ -35,6 +35,7 @@ import java.util.Collections;
 import java.util.List;
 import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -66,6 +67,11 @@ public class NullableCoderTest {
     CoderProperties.coderSerializable(TEST_CODER);
   }
 
+  @Test
+  public void testCoderIsSerializableWithWellKnownCoderType() throws Exception {
+    CoderProperties.coderSerializable(NullableCoder.of(GlobalWindow.Coder.INSTANCE));
+  }
+
   // If this changes, it implies the binary format has changed.
   private static final String EXPECTED_ENCODING_ID = "";
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SetCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SetCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SetCoderTest.java
index 58b0b8e..e7f8d1d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SetCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SetCoderTest.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
 import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.junit.Rule;
 import org.junit.Test;
@@ -44,6 +45,11 @@ public class SetCoderTest {
       new TreeSet<>(Arrays.asList(31, -5, 83)));
 
   @Test
+  public void testCoderIsSerializableWithWellKnownCoderType() throws Exception {
+    CoderProperties.coderSerializable(SetCoder.of(GlobalWindow.Coder.INSTANCE));
+  }
+
+  @Test
   public void testDecodeEncodeContentsEqual() throws Exception {
     for (Set<Integer> value : TEST_VALUES) {
       CoderProperties.coderDecodeEncodeContentsEqual(TEST_CODER, value);

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
index c12e9f3..a6a5f0e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
@@ -347,4 +347,9 @@ public class TestStreamTest implements Serializable {
     CoderProperties.coderDecodeEncodeEqual(coder, wm);
     CoderProperties.coderDecodeEncodeEqual(coder, procTime);
   }
+
+  @Test
+  public void testCoderIsSerializableWithWellKnownCoderType() {
+    CoderProperties.coderSerializable(TestStream.EventCoder.of(GlobalWindow.Coder.INSTANCE));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ValueInSingleWindowCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ValueInSingleWindowCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ValueInSingleWindowCoderTest.java
index daf73b6..3cc016e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ValueInSingleWindowCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ValueInSingleWindowCoderTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.testing;
 
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.joda.time.Duration;
@@ -48,4 +49,10 @@ public class ValueInSingleWindowCoderTest {
     CoderProperties.coderSerializable(
         ValueInSingleWindow.Coder.of(StringUtf8Coder.of(), IntervalWindow.getCoder()));
   }
+
+  @Test
+  public void testCoderIsSerializableWithWellKnownCoderType() {
+    CoderProperties.coderSerializable(ValueInSingleWindow.Coder.of(
+        GlobalWindow.Coder.INSTANCE, GlobalWindow.Coder.INSTANCE));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index fef47fb..890a36e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -29,11 +29,8 @@ import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -52,10 +49,8 @@ import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.DoubleCoder;
-import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.testing.NeedsRunner;
@@ -75,7 +70,6 @@ import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
-import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -200,7 +194,6 @@ public class CombineTest implements Serializable {
   private void runTestBasicCombine(List<KV<String, Integer>> table,
                                    Set<Integer> globalUnique,
                                    List<KV<String, Set<Integer>>> perKeyUnique) {
-    pipeline.getCoderRegistry().registerCoder(Set.class, SetCoder.class);
     PCollection<KV<String, Integer>> input = createInput(pipeline, table);
 
     PCollection<Set<Integer>> unique = input
@@ -759,69 +752,6 @@ public class CombineTest implements Serializable {
     }
   }
 
-  // Note: not a deterministic encoding
-  private static class SetCoder<T> extends StandardCoder<Set<T>> {
-
-    public static <T> SetCoder<T> of(Coder<T> elementCoder) {
-      return new SetCoder<>(elementCoder);
-    }
-
-    @JsonCreator
-    public static SetCoder<?> of(
-        @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-        List<Coder<?>> components) {
-      checkArgument(components.size() == 1, "Expecting 1 component, got %s", components.size());
-      return of((Coder<?>) components.get(0));
-    }
-
-    @SuppressWarnings("unused") // required for coder instantiation
-    public static <T> List<Object> getInstanceComponents(Set<T> exampleValue) {
-      return IterableCoder.getInstanceComponents(exampleValue);
-    }
-
-    private final Coder<Iterable<T>> iterableCoder;
-
-    private SetCoder(Coder<T> elementCoder) {
-      iterableCoder = IterableCoder.of(elementCoder);
-    }
-
-    @Override
-    public void encode(Set<T> value, OutputStream outStream, Context context)
-        throws CoderException, IOException {
-      iterableCoder.encode(value, outStream, context);
-    }
-
-    @Override
-    public Set<T> decode(InputStream inStream, Context context)
-        throws CoderException, IOException {
-      // TODO: Eliminate extra copy if used in production.
-      return Sets.newHashSet(iterableCoder.decode(inStream, context));
-    }
-
-    @Override
-    public List<? extends Coder<?>> getCoderArguments() {
-      return iterableCoder.getCoderArguments();
-    }
-
-    @Override
-    public void verifyDeterministic() throws NonDeterministicException {
-      throw new NonDeterministicException(this,
-          "CombineTest.SetCoder does not encode in a deterministic order.");
-    }
-
-    @Override
-    public boolean isRegisterByteSizeObserverCheap(Set<T> value, Context context) {
-      return iterableCoder.isRegisterByteSizeObserverCheap(value, context);
-    }
-
-    @Override
-    public void registerByteSizeObserver(
-        Set<T> value, ElementByteSizeObserver observer, Context context)
-        throws Exception {
-      iterableCoder.registerByteSizeObserver(value, observer, context);
-    }
-  }
-
   /** Example AccumulatingCombineFn. */
   private static class MeanInts extends
       Combine.AccumulatingCombineFn<Integer, MeanInts.CountSum, Double> {

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGbkResultCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGbkResultCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGbkResultCoderTest.java
index 18ecd9b..97eb47a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGbkResultCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGbkResultCoderTest.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.transforms.join.CoGbkResult.CoGbkResultCoder;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.junit.Test;
@@ -79,7 +80,14 @@ public class CoGbkResultCoderTest {
   }
 
   @Test
-  public void testSerializationDeserialization() {
+  public void testCoderIsSerialiable() {
     CoderProperties.coderSerializable(TEST_CODER);
   }
+
+  @Test
+  public void testCoderIsSerializableWithWellKnownCoderType() {
+    CoderProperties.coderSerializable(CoGbkResultCoder.of(
+        CoGbkResultSchema.of(ImmutableList.<TupleTag<?>>of(new TupleTag<GlobalWindow>())),
+        UnionCoder.of(ImmutableList.<Coder<?>>of(GlobalWindow.Coder.INSTANCE))));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/UnionCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/UnionCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/UnionCoderTest.java
index 41ba952..4ba78c8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/UnionCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/UnionCoderTest.java
@@ -17,14 +17,13 @@
  */
 package org.apache.beam.sdk.transforms.join;
 
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
+import com.google.common.collect.ImmutableList;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.DoubleCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.util.CloudObject;
-import org.apache.beam.sdk.util.Serializer;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.TimerInternals.TimerDataCoder;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -36,12 +35,13 @@ import org.junit.runners.JUnit4;
 public class UnionCoderTest {
 
   @Test
-  public void testSerializationDeserialization() {
-    UnionCoder newCoder =
-        UnionCoder.of(Arrays.<Coder<?>>asList(StringUtf8Coder.of(),
-            DoubleCoder.of()));
-    CloudObject encoding = newCoder.asCloudObject();
-    Coder<?> decodedCoder = Serializer.deserialize(encoding, Coder.class);
-    assertEquals(newCoder, decodedCoder);
+  public void testCoderIsSerializable() {
+    CoderProperties.coderSerializable(UnionCoder.of(ImmutableList.<Coder<?>>of(
+        StringUtf8Coder.of(), DoubleCoder.of())));
+  }
+
+  @Test
+  public void testCoderIsSerializableWithWellKnownCoderType() {
+    CoderProperties.coderSerializable(TimerDataCoder.of(GlobalWindow.Coder.INSTANCE));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TimerInternalsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TimerInternalsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TimerInternalsTest.java
index 7b56f1c..b024f89 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TimerInternalsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TimerInternalsTest.java
@@ -59,6 +59,11 @@ public class TimerInternalsTest {
   }
 
   @Test
+  public void testCoderIsSerializableWithWellKnownCoderType() {
+    CoderProperties.coderSerializable(TimerDataCoder.of(GlobalWindow.Coder.INSTANCE));
+  }
+
+  @Test
   public void testCompareTo() {
     Instant firstTimestamp = new Instant(100);
     Instant secondTimestamp = new Instant(200);

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ValueWithRecordIdTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ValueWithRecordIdTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ValueWithRecordIdTest.java
new file mode 100644
index 0000000..e3a2dc6
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ValueWithRecordIdTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.ValueWithRecordId.ValueWithRecordIdCoder;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link ValueWithRecordId}. */
+@RunWith(JUnit4.class)
+public class ValueWithRecordIdTest {
+  @Test
+  public void testCoderIsSerializableWithWellKnownCoderType() {
+    CoderProperties.coderSerializable(ValueWithRecordIdCoder.of(GlobalWindow.Coder.INSTANCE));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
index 1c67ef0..8bfdcef 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
@@ -30,6 +30,7 @@ import java.util.Arrays;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -75,6 +76,17 @@ public class WindowedValueTest {
   }
 
   @Test
+  public void testFullWindowedValueCoderIsSerializableWithWellKnownCoderType() {
+    CoderProperties.coderSerializable(WindowedValue.getFullCoder(
+        GlobalWindow.Coder.INSTANCE, GlobalWindow.Coder.INSTANCE));
+  }
+
+  @Test
+  public void testValueOnlyWindowedValueCoderIsSerializableWithWellKnownCoderType() {
+    CoderProperties.coderSerializable(WindowedValue.getValueOnlyCoder(GlobalWindow.Coder.INSTANCE));
+  }
+
+  @Test
   public void testExplodeWindowsInNoWindowsEmptyIterable() {
     WindowedValue<String> value =
         WindowedValue.of(

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TimestampedValueTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TimestampedValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TimestampedValueTest.java
index a982f31..e0f01f8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TimestampedValueTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TimestampedValueTest.java
@@ -21,9 +21,10 @@ package org.apache.beam.sdk.values;
 import static org.junit.Assert.assertEquals;
 
 import com.google.common.testing.EqualsTester;
-
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;
@@ -80,4 +81,18 @@ public class TimestampedValueTest {
             TimestampedValue.atMinimumTimestamp("foo"))
         .testEquals();
   }
+
+  private static final Coder<TimestampedValue<GlobalWindow>> CODER =
+      TimestampedValue.TimestampedValueCoder.of(GlobalWindow.Coder.INSTANCE);
+
+  @Test
+  public void testCoderEncodeDecodeEquals() throws Exception {
+    CoderProperties.coderDecodeEncodeEqual(CODER,
+        TimestampedValue.of(GlobalWindow.INSTANCE, Instant.now()));
+  }
+
+  @Test
+  public void testCoderIsSerializableWithWellKnownCoderType() {
+    CoderProperties.coderSerializable(CODER);
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 7efa115..4b19973 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -2685,7 +2685,7 @@ public class BigQueryIO {
     private final int shardNumber;
 
     public static <K> ShardedKey<K> of(K key, int shardNumber) {
-      return new ShardedKey<K>(key, shardNumber);
+      return new ShardedKey<>(key, shardNumber);
     }
 
     private ShardedKey(K key, int shardNumber) {
@@ -2705,7 +2705,8 @@ public class BigQueryIO {
   /**
    * A {@link Coder} for {@link ShardedKey}, using a wrapped key {@link Coder}.
    */
-  private static class ShardedKeyCoder<KeyT>
+  @VisibleForTesting
+  static class ShardedKeyCoder<KeyT>
       extends StandardCoder<ShardedKey<KeyT>> {
     public static <KeyT> ShardedKeyCoder<KeyT> of(Coder<KeyT> keyCoder) {
       return new ShardedKeyCoder<>(keyCoder);
@@ -2739,7 +2740,7 @@ public class BigQueryIO {
     @Override
     public ShardedKey<KeyT> decode(InputStream inStream, Context context)
         throws IOException {
-      return new ShardedKey<KeyT>(
+      return new ShardedKey<>(
           keyCoder.decode(inStream, context.nested()),
           shardNumberCoder.decode(inStream, context));
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 4ddfdea..471b5e4 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -122,6 +122,7 @@ import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
@@ -2352,4 +2353,9 @@ public class BigQueryIOTest implements Serializable {
       return null;
     }
   }
+
+  @Test
+  public void testShardedKeyCoderIsSerializableWithWellKnownCoderType() {
+    CoderProperties.coderSerializable(BigQueryIO.ShardedKeyCoder.of(GlobalWindow.Coder.INSTANCE));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoderTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoderTest.java
index 6ebea3a..4d9c819 100644
--- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoderTest.java
+++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoderTest.java
@@ -47,5 +47,4 @@ public class AvroWrapperCoderTest {
 
     CoderProperties.coderDecodeEncodeEqual(coder, value);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoderTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoderTest.java
new file mode 100644
index 0000000..426103d
--- /dev/null
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoderTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link KafkaRecordCoder}. */
+@RunWith(JUnit4.class)
+public class KafkaRecordCoderTest {
+  @Test
+  public void testCoderIsSerializableWithWellKnownCoderType() {
+    CoderProperties.coderSerializable(
+        KafkaRecordCoder.of(GlobalWindow.Coder.INSTANCE, GlobalWindow.Coder.INSTANCE));
+  }
+}


[5/6] beam git commit: Use kind:* for certain well known coder types.

Posted by lc...@apache.org.
Use kind:* for certain well known coder types.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6947d21b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6947d21b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6947d21b

Branch: refs/heads/master
Commit: 6947d21bc1876c408a23b5e98ef813bb59d5b8d1
Parents: 1b76d3d
Author: Luke Cwik <lc...@google.com>
Authored: Thu Dec 29 14:20:04 2016 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Jan 3 16:35:37 2017 -0800

----------------------------------------------------------------------
 .../apache/beam/sdk/coders/IterableCoder.java   |  2 +-
 .../org/apache/beam/sdk/coders/KvCoder.java     |  2 +-
 .../beam/sdk/coders/LengthPrefixCoder.java      |  8 ++-
 .../apache/beam/sdk/coders/StandardCoder.java   | 23 ++++++-
 .../sdk/transforms/windowing/GlobalWindow.java  |  6 ++
 .../org/apache/beam/sdk/util/WindowedValue.java |  2 +-
 .../beam/sdk/coders/IterableCoderTest.java      | 12 +++-
 .../org/apache/beam/sdk/coders/KvCoderTest.java | 11 +++-
 .../beam/sdk/coders/LengthPrefixCoderTest.java  | 31 ++++++----
 .../transforms/windowing/GlobalWindowTest.java  | 64 ++++++++++++++++++++
 .../apache/beam/sdk/util/WindowedValueTest.java | 11 ++++
 11 files changed, 150 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6947d21b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
index cc6b970..a1f6fa3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
@@ -69,7 +69,7 @@ public class IterableCoder<T> extends IterableLikeCoder<T, Iterable<T>> {
 
   @Override
   protected CloudObject initializeCloudObject() {
-    CloudObject result = CloudObject.forClass(getClass());
+    CloudObject result = CloudObject.forClassName("kind:stream");
     addBoolean(result, PropertyNames.IS_STREAM_LIKE, true);
     return result;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/6947d21b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
index 1e70a30..c9d05fc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
@@ -123,7 +123,7 @@ public class KvCoder<K, V> extends StandardCoder<KV<K, V>> {
 
   @Override
   protected CloudObject initializeCloudObject() {
-    CloudObject result = CloudObject.forClass(getClass());
+    CloudObject result = CloudObject.forClassName("kind:pair");
     addBoolean(result, PropertyNames.IS_PAIR_LIKE, true);
     return result;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/6947d21b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
index 7319200..d123a38 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
@@ -30,6 +30,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.List;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.util.CloudObject;
 import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.util.VarInt;
 
@@ -65,7 +66,10 @@ public class LengthPrefixCoder<T> extends StandardCoder<T> {
     this.valueCoder = valueCoder;
   }
 
-
+  @Override
+  protected CloudObject initializeCloudObject() {
+    return CloudObject.forClassName("kind:length_prefix");
+  }
 
   @Override
   public void encode(T value, OutputStream outStream, Context context)
@@ -125,7 +129,7 @@ public class LengthPrefixCoder<T> extends StandardCoder<T> {
     }
 
     // If value is not a StandardCoder then fall back to the default StandardCoder behavior
-    // of encoding and counting the bytes. The encoding will include the null indicator byte.
+    // of encoding and counting the bytes. The encoding will include the length prefix.
     return super.getEncodedElementByteSize(value, context);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/6947d21b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java
index c17c376..e9a1bd3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java
@@ -121,8 +121,19 @@ public abstract class StandardCoder<T> implements Coder<T> {
   }
 
   /**
-   * {@link StandardCoder} implementations should override {@link #initializeCloudObject}
-   * if the default {@link CloudObject} representation needs to change.
+   * Adds the following properties to the {@link CloudObject} representation:
+   * <ul>
+   *   <li>component_encodings: A list of coders represented as {@link CloudObject}s
+   *       equivalent to the {@link #getCoderArguments}.</li>
+   *   <li>encoding_id: An identifier for the binary format written by {@link #encode}. See
+   *       {@link #getEncodingId} for further details.</li>
+   *   <li>allowed_encodings: A collection of encodings supported by {@link #decode} in
+   *       addition to the encoding from {@link #getEncodingId()} (which is assumed supported).
+   *       See {@link #getAllowedEncodings} for further details.</li>
+   * </ul>
+   *
+   * <p>{@link StandardCoder} implementations should override {@link #initializeCloudObject}
+   * to customize the {@link CloudObject} representation.
    */
   @Override
   public final CloudObject asCloudObject() {
@@ -151,6 +162,14 @@ public abstract class StandardCoder<T> implements Coder<T> {
     return result;
   }
 
+  /**
+   * Subclasses should override this method to customize the {@link CloudObject}
+   * representation. {@link StandardCoder#asCloudObject} delegates to this method
+   * to provide an initial {@link CloudObject}.
+   *
+   * <p>The default implementation returns a {@link CloudObject} using
+   * {@link Object#getClass} for the type.
+   */
   protected CloudObject initializeCloudObject() {
     return CloudObject.forClass(getClass());
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/6947d21b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
index 58b059a..c27749d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.transforms.windowing;
 import java.io.InputStream;
 import java.io.OutputStream;
 import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.util.CloudObject;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 
@@ -62,6 +63,11 @@ public class GlobalWindow extends BoundedWindow {
       return GlobalWindow.INSTANCE;
     }
 
+    @Override
+    protected CloudObject initializeCloudObject() {
+      return CloudObject.forClassName("kind:global_window");
+    }
+
     private Coder() {}
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/6947d21b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
index ce13317..6b75951 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
@@ -697,7 +697,7 @@ public abstract class WindowedValue<T> {
 
     @Override
     public CloudObject initializeCloudObject() {
-      CloudObject result = CloudObject.forClass(getClass());
+      CloudObject result = CloudObject.forClassName("kind:windowed_value");
       addBoolean(result, PropertyNames.IS_WRAPPER, true);
       return result;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/6947d21b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java
index 15ec44b..2df768e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java
@@ -19,13 +19,16 @@ package org.apache.beam.sdk.coders;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.util.CloudObject;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.Structs;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -42,7 +45,14 @@ public class IterableCoderTest {
       Collections.<Integer>emptyList(),
       Collections.<Integer>singletonList(13),
       Arrays.<Integer>asList(1, 2, 3, 4),
-      new LinkedList<Integer>(Arrays.asList(7, 6, 5)));
+      new LinkedList<>(Arrays.asList(7, 6, 5)));
+
+  @Test
+  public void testCloudObjectRepresentation() throws Exception {
+    CloudObject cloudObject = TEST_CODER.asCloudObject();
+    assertEquals("kind:stream", cloudObject.getClassName());
+    assertTrue(Structs.getBoolean(cloudObject, "is_stream_like"));
+  }
 
   @Test
   public void testDecodeEncodeContentsInSameOrder() throws Exception {

http://git-wip-us.apache.org/repos/asf/beam/blob/6947d21b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
index 4c07c83..1d1f905 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
@@ -17,11 +17,16 @@
  */
 package org.apache.beam.sdk.coders;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.util.CloudObject;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.Structs;
 import org.apache.beam.sdk.values.KV;
 import org.junit.Rule;
 import org.junit.Test;
@@ -96,8 +101,10 @@ public class KvCoderTest {
       KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of());
 
   @Test
-  public void testEnc() {
-    System.out.println(TEST_CODER.asCloudObject());
+  public void testCloudObjectRepresentation() throws Exception {
+    CloudObject cloudObject = TEST_CODER.asCloudObject();
+    assertEquals("kind:pair", cloudObject.getClassName());
+    assertTrue(Structs.getBoolean(cloudObject, "is_pair_like"));
   }
 
   private static final List<KV<String, Integer>> TEST_VALUES =

http://git-wip-us.apache.org/repos/asf/beam/blob/6947d21b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
index e31c561..95d405d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.util.CloudObject;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -40,6 +41,22 @@ public class LengthPrefixCoderTest {
     new byte[]{ 0xd, 0xe },
     new byte[]{ });
 
+  /**
+   * Generated data to check that the wire format has not changed. To regenerate, see
+   * {@link org.apache.beam.sdk.coders.PrintBase64Encodings}.
+   */
+  private static final List<String> TEST_ENCODINGS = ImmutableList.of(
+      "AwoLDA",
+      "Ag0D",
+      "Ag0O",
+      "AA");
+
+  @Test
+  public void testCloudObjectRepresentation() throws Exception {
+    CloudObject cloudObject = TEST_CODER.asCloudObject();
+    assertEquals("kind:length_prefix", cloudObject.getClassName());
+  }
+
   @Test
   public void testCoderSerializable() throws Exception {
     CoderProperties.coderSerializable(TEST_CODER);
@@ -76,10 +93,10 @@ public class LengthPrefixCoderTest {
   @Test
   public void testRegisterByteSizeObserver() throws Exception {
     CoderProperties.testByteCount(TEST_CODER, Coder.Context.OUTER,
-                                   new byte[][]{{ 0xa, 0xb, 0xc }});
+        new byte[][]{{ 0xa, 0xb, 0xc }});
 
     CoderProperties.testByteCount(TEST_CODER, Coder.Context.NESTED,
-                                   new byte[][]{{ 0xa, 0xb, 0xc }, {}, {}, { 0xd, 0xe }, {}});
+        new byte[][]{{ 0xa, 0xb, 0xc }, {}, {}, { 0xd, 0xe }, {}});
   }
 
   @Test
@@ -99,16 +116,6 @@ public class LengthPrefixCoderTest {
     CoderProperties.coderHasEncodingId(TEST_CODER, EXPECTED_ENCODING_ID);
   }
 
-  /**
-   * Generated data to check that the wire format has not changed. To regenerate, see
-   * {@link org.apache.beam.sdk.coders.PrintBase64Encodings}.
-   */
-  private static final List<String> TEST_ENCODINGS = Arrays.asList(
-      "AwoLDA",
-      "Ag0D",
-      "Ag0O",
-      "AA");
-
   @Test
   public void testWireFormatEncode() throws Exception {
     CoderProperties.coderEncodesBase64(TEST_CODER, TEST_VALUES, TEST_ENCODINGS);

http://git-wip-us.apache.org/repos/asf/beam/blob/6947d21b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java
new file mode 100644
index 0000000..1857332
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.io.ByteStreams;
+import com.google.common.io.CountingOutputStream;
+import org.apache.beam.sdk.coders.Coder.Context;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.util.CloudObject;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link GlobalWindow}. */
+@RunWith(JUnit4.class)
+public class GlobalWindowTest {
+  @Test
+  public void testCoderCloudObjectRepresentation() throws Exception {
+    CloudObject cloudObject = GlobalWindow.Coder.INSTANCE.asCloudObject();
+    assertEquals("kind:global_window", cloudObject.getClassName());
+  }
+
+  @Test
+  public void testCoderBinaryRepresentation() throws Exception {
+    CountingOutputStream out = new CountingOutputStream(ByteStreams.nullOutputStream());
+    GlobalWindow.Coder.INSTANCE.encode(GlobalWindow.INSTANCE, out, Context.OUTER);
+    assertEquals(0, out.getCount());
+    GlobalWindow.Coder.INSTANCE.encode(GlobalWindow.INSTANCE, out, Context.NESTED);
+    assertEquals(0, out.getCount());
+  }
+
+  @Test
+  public void testCoderEncodeDecodeEquals() throws Exception {
+    CoderProperties.coderDecodeEncodeEqual(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE);
+  }
+
+  @Test
+  public void testCoderIsSerializable() {
+    CoderProperties.coderSerializable(GlobalWindow.Coder.INSTANCE);
+  }
+
+  @Test
+  public void testCoderIsDeterministic() throws Exception {
+    CoderProperties.coderDeterministic(
+        GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE, GlobalWindow.INSTANCE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/6947d21b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
index 0c69a59..1c67ef0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
@@ -20,7 +20,9 @@ package org.apache.beam.sdk.util;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.emptyIterable;
 import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
@@ -29,6 +31,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
@@ -64,6 +67,14 @@ public class WindowedValueTest {
   }
 
   @Test
+  public void testWindowedValueCoderCloudObjectRepresentation() throws Exception {
+    CloudObject cloudObject = WindowedValue.getFullCoder(
+        StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE).asCloudObject();
+    assertEquals("kind:windowed_value", cloudObject.getClassName());
+    assertTrue(Structs.getBoolean(cloudObject, "is_wrapper"));
+  }
+
+  @Test
   public void testExplodeWindowsInNoWindowsEmptyIterable() {
     WindowedValue<String> value =
         WindowedValue.of(


[4/6] beam git commit: Swap to use initializeCloudObject as customization point for CloudObjects. Hide StandardCoder#getComponents() and have coders only rely on Coder#getCoderArguments()

Posted by lc...@apache.org.
Swap to use initializeCloudObject as customization point for CloudObjects.
Hide StandardCoder#getComponents() and have coders only rely on Coder#getCoderArguments()


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1b76d3dc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1b76d3dc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1b76d3dc

Branch: refs/heads/master
Commit: 1b76d3dc18a1367d2530fc870e8cb3046cdc714f
Parents: 3de4108
Author: Luke Cwik <lc...@google.com>
Authored: Thu Dec 29 13:39:45 2016 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Jan 3 16:35:37 2017 -0800

----------------------------------------------------------------------
 .../beam/runners/dataflow/internal/IsmFormat.java | 14 +++++++-------
 .../beam/runners/spark/coders/WritableCoder.java  |  4 ++--
 .../org/apache/beam/sdk/coders/AtomicCoder.java   |  2 +-
 .../org/apache/beam/sdk/coders/AvroCoder.java     |  4 ++--
 .../org/apache/beam/sdk/coders/CustomCoder.java   | 18 +-----------------
 .../org/apache/beam/sdk/coders/IterableCoder.java |  4 ++--
 .../org/apache/beam/sdk/coders/JAXBCoder.java     |  4 ++--
 .../java/org/apache/beam/sdk/coders/KvCoder.java  |  4 ++--
 .../apache/beam/sdk/coders/LengthPrefixCoder.java |  2 ++
 .../apache/beam/sdk/coders/SerializableCoder.java |  4 ++--
 .../org/apache/beam/sdk/coders/StandardCoder.java | 12 ++++++++++--
 .../beam/sdk/coders/protobuf/ProtoCoder.java      |  8 ++++----
 .../org/apache/beam/sdk/transforms/Combine.java   | 11 +++--------
 .../beam/sdk/transforms/join/CoGbkResult.java     | 13 +++----------
 .../org/apache/beam/sdk/util/WindowedValue.java   | 12 ++++++------
 .../org/apache/beam/sdk/coders/KvCoderTest.java   |  5 +++++
 .../beam/sdk/util/SerializableUtilsTest.java      |  4 ++--
 .../apache/beam/sdk/io/hdfs/AvroWrapperCoder.java |  4 ++--
 .../apache/beam/sdk/io/hdfs/WritableCoder.java    |  4 ++--
 19 files changed, 60 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
index 6a244b0..5b733c8 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
@@ -125,7 +125,7 @@ public class IsmFormat {
       checkArgument(!keyComponents.isEmpty(), "Expected non-empty list of key components.");
       checkArgument(!isMetadataKey(keyComponents),
           "Expected key components to not contain metadata key.");
-      return new AutoValue_IsmFormat_IsmRecord<V>(keyComponents, value, null);
+      return new AutoValue_IsmFormat_IsmRecord<>(keyComponents, value, null);
     }
 
     public static <V> IsmRecord<V> meta(List<?> keyComponents, byte[] metadata) {
@@ -133,7 +133,7 @@ public class IsmFormat {
       checkArgument(!keyComponents.isEmpty(), "Expected non-empty list of key components.");
       checkArgument(isMetadataKey(keyComponents),
           "Expected key components to contain metadata key.");
-      return new AutoValue_IsmFormat_IsmRecord<V>(keyComponents, null, metadata);
+      return new AutoValue_IsmFormat_IsmRecord<>(keyComponents, null, metadata);
     }
 
     /** Returns the list of key components. */
@@ -379,11 +379,11 @@ public class IsmFormat {
     }
 
     @Override
-    public CloudObject asCloudObject() {
-      CloudObject cloudObject = super.asCloudObject();
-      addLong(cloudObject, PropertyNames.NUM_SHARD_CODERS, numberOfShardKeyCoders);
-      addLong(cloudObject, PropertyNames.NUM_METADATA_SHARD_CODERS, numberOfMetadataShardKeyCoders);
-      return cloudObject;
+    protected CloudObject initializeCloudObject() {
+      CloudObject result = CloudObject.forClass(getClass());
+      addLong(result, PropertyNames.NUM_SHARD_CODERS, numberOfShardKeyCoders);
+      addLong(result, PropertyNames.NUM_METADATA_SHARD_CODERS, numberOfMetadataShardKeyCoders);
+      return result;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java
index e63c660..40c2627 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java
@@ -107,8 +107,8 @@ public class WritableCoder<T extends Writable> extends StandardCoder<T> {
   }
 
   @Override
-  public CloudObject asCloudObject() {
-    CloudObject result = super.asCloudObject();
+  protected CloudObject initializeCloudObject() {
+    CloudObject result = CloudObject.forClass(getClass());
     result.put("type", type.getName());
     return result;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
index 60908fa..c024f89 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AtomicCoder.java
@@ -32,7 +32,7 @@ public abstract class AtomicCoder<T> extends DeterministicStandardCoder<T> {
   protected AtomicCoder() { }
 
   @Override
-  public List<Coder<?>> getCoderArguments() {
+  public final List<Coder<?>> getCoderArguments() {
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
index 41afdc6..eee0906 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
@@ -327,8 +327,8 @@ public class AvroCoder<T> extends StandardCoder<T> {
   }
 
   @Override
-  public CloudObject asCloudObject() {
-    CloudObject result = super.asCloudObject();
+  protected CloudObject initializeCloudObject() {
+    CloudObject result = CloudObject.forClass(getClass());
     addString(result, "type", type.getName());
     addString(result, "schema", schemaSupplier.get().toString());
     return result;

http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
index 2614cc1..59d29de 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
@@ -17,17 +17,12 @@
  */
 package org.apache.beam.sdk.coders;
 
-import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.beam.sdk.util.Structs.addString;
-import static org.apache.beam.sdk.util.Structs.addStringList;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.Lists;
 import java.io.Serializable;
-import java.util.Collection;
 import org.apache.beam.sdk.util.CloudObject;
-import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.StringUtils;
 
@@ -72,7 +67,7 @@ public abstract class CustomCoder<T> extends AtomicCoder<T>
    * @return A thin {@link CloudObject} wrapping of the Java serialization of {@code this}.
    */
   @Override
-  public CloudObject asCloudObject() {
+  public CloudObject initializeCloudObject() {
     // N.B. We use the CustomCoder class, not the derived class, since during
     // deserialization we will be using the CustomCoder's static factory method
     // to construct an instance of the derived class.
@@ -82,17 +77,6 @@ public abstract class CustomCoder<T> extends AtomicCoder<T>
         StringUtils.byteArrayToJsonString(
             SerializableUtils.serializeToByteArray(this)));
 
-    String encodingId = getEncodingId();
-    checkNotNull(encodingId, "Coder.getEncodingId() must not return null.");
-    if (!encodingId.isEmpty()) {
-      addString(result, PropertyNames.ENCODING_ID, encodingId);
-    }
-
-    Collection<String> allowedEncodings = getAllowedEncodings();
-    if (!allowedEncodings.isEmpty()) {
-      addStringList(result, PropertyNames.ALLOWED_ENCODINGS, Lists.newArrayList(allowedEncodings));
-    }
-
     return result;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
index 11fb172..cc6b970 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
@@ -68,8 +68,8 @@ public class IterableCoder<T> extends IterableLikeCoder<T, Iterable<T>> {
   }
 
   @Override
-  public CloudObject asCloudObject() {
-    CloudObject result = super.asCloudObject();
+  protected CloudObject initializeCloudObject() {
+    CloudObject result = CloudObject.forClass(getClass());
     addBoolean(result, PropertyNames.IS_STREAM_LIKE, true);
     return result;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java
index 748b07d..7afd225 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java
@@ -167,8 +167,8 @@ public class JAXBCoder<T> extends AtomicCoder<T> {
   }
 
   @Override
-  public CloudObject asCloudObject() {
-    CloudObject result = super.asCloudObject();
+  protected CloudObject initializeCloudObject() {
+    CloudObject result = CloudObject.forClass(getClass());
     Structs.addString(result, JAXB_CLASS, jaxbClass.getName());
     return result;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
index c0d3aa4..1e70a30 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
@@ -122,8 +122,8 @@ public class KvCoder<K, V> extends StandardCoder<KV<K, V>> {
   }
 
   @Override
-  public CloudObject asCloudObject() {
-    CloudObject result = super.asCloudObject();
+  protected CloudObject initializeCloudObject() {
+    CloudObject result = CloudObject.forClass(getClass());
     addBoolean(result, PropertyNames.IS_PAIR_LIKE, true);
     return result;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
index dd9af32..7319200 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
@@ -65,6 +65,8 @@ public class LengthPrefixCoder<T> extends StandardCoder<T> {
     this.valueCoder = valueCoder;
   }
 
+
+
   @Override
   public void encode(T value, OutputStream outStream, Context context)
       throws CoderException, IOException {

http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
index 46777b9..de7cea8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
@@ -144,8 +144,8 @@ public class SerializableCoder<T extends Serializable> extends AtomicCoder<T> {
   }
 
   @Override
-  public CloudObject asCloudObject() {
-    CloudObject result = super.asCloudObject();
+  public CloudObject initializeCloudObject() {
+    CloudObject result = CloudObject.forClass(getClass());
     result.put("type", type.getName());
     return result;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java
index 0e57ed2..c17c376 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java
@@ -120,9 +120,13 @@ public abstract class StandardCoder<T> implements Coder<T> {
     return builder.toString();
   }
 
+  /**
+   * {@link StandardCoder} implementations should override {@link #initializeCloudObject}
+   * if the default {@link CloudObject} representation needs to change.
+   */
   @Override
-  public CloudObject asCloudObject() {
-    CloudObject result = CloudObject.forClass(getClass());
+  public final CloudObject asCloudObject() {
+    CloudObject result = initializeCloudObject();
 
     List<? extends Coder<?>> components = getComponents();
     if (!components.isEmpty()) {
@@ -147,6 +151,10 @@ public abstract class StandardCoder<T> implements Coder<T> {
     return result;
   }
 
+  protected CloudObject initializeCloudObject() {
+    return CloudObject.forClass(getClass());
+  }
+
   /**
    * {@inheritDoc}
    *

http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java
index 9bba42b..a5f53ff 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/protobuf/ProtoCoder.java
@@ -124,7 +124,7 @@ public class ProtoCoder<T extends Message> extends AtomicCoder<T> {
    * Returns a {@link ProtoCoder} for the given Protocol Buffers {@link Message}.
    */
   public static <T extends Message> ProtoCoder<T> of(Class<T> protoMessageClass) {
-    return new ProtoCoder<T>(protoMessageClass, ImmutableSet.<Class<?>>of());
+    return new ProtoCoder<>(protoMessageClass, ImmutableSet.<Class<?>>of());
   }
 
   /**
@@ -162,7 +162,7 @@ public class ProtoCoder<T extends Message> extends AtomicCoder<T> {
       }
     }
 
-    return new ProtoCoder<T>(
+    return new ProtoCoder<>(
         protoMessageClass,
         new ImmutableSet.Builder<Class<?>>()
             .addAll(extensionHostClasses)
@@ -337,8 +337,8 @@ public class ProtoCoder<T extends Message> extends AtomicCoder<T> {
   }
 
   @Override
-  public CloudObject asCloudObject() {
-    CloudObject result = super.asCloudObject();
+  public CloudObject initializeCloudObject() {
+    CloudObject result = CloudObject.forClass(getClass());
     Structs.addString(result, PROTO_MESSAGE_CLASS, protoMessageClass.getName());
     List<CloudObject> extensionHostClassNames = Lists.newArrayList();
     for (String className : getSortedExtensionClasses()) {

http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 92c04ca..98a7bec 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -27,7 +27,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
@@ -43,6 +42,7 @@ import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn;
 import org.apache.beam.sdk.transforms.CombineFnBase.AbstractGlobalCombineFn;
 import org.apache.beam.sdk.transforms.CombineFnBase.AbstractPerKeyCombineFn;
 import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
@@ -654,11 +654,6 @@ public class Combine {
     }
 
     @Override
-    public List<Coder<?>> getCoderArguments() {
-      return Arrays.<Coder<?>>asList(valueCoder);
-    }
-
-    @Override
     public void encode(Holder<V> accumulator, OutputStream outStream, Context context)
         throws CoderException, IOException {
       if (accumulator.present) {
@@ -2225,11 +2220,11 @@ public class Combine {
       }
 
       public static <InputT, AccumT> InputOrAccum<InputT, AccumT> input(InputT input) {
-        return new InputOrAccum<InputT, AccumT>(input, null);
+        return new InputOrAccum<>(input, null);
       }
 
       public static <InputT, AccumT> InputOrAccum<InputT, AccumT> accum(AccumT aggr) {
-        return new InputOrAccum<InputT, AccumT>(null, aggr);
+        return new InputOrAccum<>(null, aggr);
       }
 
       private static class InputOrAccumCoder<InputT, AccumT>

http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
index 7b849e7..9e0a011 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
@@ -30,7 +30,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
@@ -242,20 +241,14 @@ public class CoGbkResult {
       this.unionCoder = unionCoder;
     }
 
-
     @Override
     public List<? extends Coder<?>> getCoderArguments() {
-      return null;
-    }
-
-    @Override
-    public List<? extends Coder<?>> getComponents() {
-      return Arrays.<Coder<?>>asList(unionCoder);
+      return ImmutableList.of(unionCoder);
     }
 
     @Override
-    public CloudObject asCloudObject() {
-      CloudObject result = super.asCloudObject();
+    public CloudObject initializeCloudObject() {
+      CloudObject result = CloudObject.forClass(getClass());
       addObject(result, PropertyNames.CO_GBK_RESULT_SCHEMA, schema.asCloudObject());
       return result;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
index 1b3e648..ce13317 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
@@ -131,7 +131,7 @@ public abstract class WindowedValue<T> {
    */
   @Deprecated
   public static <T> WindowedValue<T> valueInEmptyWindows(T value) {
-    return new ValueInEmptyWindows<T>(value, PaneInfo.NO_FIRING);
+    return new ValueInEmptyWindows<>(value, PaneInfo.NO_FIRING);
   }
 
   /**
@@ -143,7 +143,7 @@ public abstract class WindowedValue<T> {
    */
   @Deprecated
   public static <T> WindowedValue<T> valueInEmptyWindows(T value, PaneInfo pane) {
-    return new ValueInEmptyWindows<T>(value, pane);
+    return new ValueInEmptyWindows<>(value, pane);
   }
 
   /**
@@ -696,8 +696,8 @@ public abstract class WindowedValue<T> {
     }
 
     @Override
-    public CloudObject asCloudObject() {
-      CloudObject result = super.asCloudObject();
+    public CloudObject initializeCloudObject() {
+      CloudObject result = CloudObject.forClass(getClass());
       addBoolean(result, PropertyNames.IS_WRAPPER, true);
       return result;
     }
@@ -770,8 +770,8 @@ public abstract class WindowedValue<T> {
     }
 
     @Override
-    public CloudObject asCloudObject() {
-      CloudObject result = super.asCloudObject();
+    public CloudObject initializeCloudObject() {
+      CloudObject result = CloudObject.forClass(getClass());
       addBoolean(result, PropertyNames.IS_WRAPPER, true);
       return result;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
index 436e227..4c07c83 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
@@ -95,6 +95,11 @@ public class KvCoderTest {
   private static final Coder<KV<String, Integer>> TEST_CODER =
       KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of());
 
+  @Test
+  public void testEnc() {
+    System.out.println(TEST_CODER.asCloudObject());
+  }
+
   private static final List<KV<String, Integer>> TEST_VALUES =
       Arrays.asList(KV.of("", -1), KV.of("hello", 0), KV.of("goodbye", Integer.MAX_VALUE));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
index 5435a45..9f86ed2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
@@ -129,8 +129,8 @@ public class SerializableUtilsTest {
     }
 
     @Override
-    public CloudObject asCloudObject() {
-      CloudObject result = super.asCloudObject();
+    public CloudObject initializeCloudObject() {
+      CloudObject result = CloudObject.forClass(getClass());
       result.put("unserializableField", unserializableField);
       return result;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java
index 45a8037..7e01846 100644
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java
@@ -100,8 +100,8 @@ public class AvroWrapperCoder<WrapperT extends AvroWrapper<DatumT>, DatumT>
   }
 
   @Override
-  public CloudObject asCloudObject() {
-    CloudObject result = super.asCloudObject();
+  public CloudObject initializeCloudObject() {
+    CloudObject result = CloudObject.forClass(getClass());
     result.put("wrapperType", wrapperType.getName());
     return result;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/1b76d3dc/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java
index 96ba87a..637e686 100644
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java
@@ -101,8 +101,8 @@ public class WritableCoder<T extends Writable> extends StandardCoder<T> {
   }
 
   @Override
-  public CloudObject asCloudObject() {
-    CloudObject result = super.asCloudObject();
+  public CloudObject initializeCloudObject() {
+    CloudObject result = CloudObject.forClass(getClass());
     result.put("type", type.getName());
     return result;
   }


[3/6] beam git commit: [BEAM-1231] Use well known coder types in Java

Posted by lc...@apache.org.
[BEAM-1231] Use well known coder types in Java


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3de4108e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3de4108e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3de4108e

Branch: refs/heads/master
Commit: 3de4108e3ab96ec0f860d3d40160c7b7e4f5d0f7
Parents: d86db15
Author: Luke Cwik <lc...@google.com>
Authored: Thu Dec 29 12:03:31 2016 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Jan 3 16:35:37 2017 -0800

----------------------------------------------------------------------
 .../beam/sdk/coders/LengthPrefixCoder.java      | 139 +++++++++++++++++++
 .../org/apache/beam/sdk/util/CoderUtils.java    |  28 ++--
 .../beam/sdk/coders/LengthPrefixCoderTest.java  | 116 ++++++++++++++++
 3 files changed, 270 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3de4108e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
new file mode 100644
index 0000000..dd9af32
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.coders;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.ByteStreams;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.util.VarInt;
+
+/**
+ * A {@link Coder} which is able to take any existing coder and wrap it such that it is only
+ * invoked in the {@link org.apache.beam.sdk.coders.Coder.Context#OUTER outer context}. The data
+ * representing the element is prefixed with a length using a variable integer encoding.
+ *
+ * @param <T> the type of the values being transcoded
+ */
+public class LengthPrefixCoder<T> extends StandardCoder<T> {
+
+  public static <T> LengthPrefixCoder<T> of(
+      Coder<T> valueCoder) {
+    checkNotNull(valueCoder, "Coder not expected to be null");
+    return new LengthPrefixCoder<>(valueCoder);
+  }
+
+  @JsonCreator
+  public static LengthPrefixCoder<?> of(
+      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
+      List<Coder<?>> components) {
+    checkArgument(components.size() == 1,
+                  "Expecting 1 components, got " + components.size());
+    return of(components.get(0));
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  private final Coder<T> valueCoder;
+
+  private LengthPrefixCoder(Coder<T> valueCoder) {
+    this.valueCoder = valueCoder;
+  }
+
+  @Override
+  public void encode(T value, OutputStream outStream, Context context)
+      throws CoderException, IOException {
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    valueCoder.encode(value, bos, Context.OUTER);
+    VarInt.encode(bos.size(), outStream);
+    bos.writeTo(outStream);
+  }
+
+  @Override
+  public T decode(InputStream inStream, Context context) throws CoderException, IOException {
+    long size = VarInt.decodeLong(inStream);
+    return valueCoder.decode(ByteStreams.limit(inStream, size), Context.OUTER);
+  }
+
+  @Override
+  public List<? extends Coder<?>> getCoderArguments() {
+    return ImmutableList.of(valueCoder);
+  }
+
+  /**
+   * {@code LengthPrefixCoder} is deterministic if the nested {@code Coder} is.
+   *
+   * {@inheritDoc}
+   */
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    valueCoder.verifyDeterministic();
+  }
+
+  /**
+   * {@code LengthPrefixCoder} is consistent with equals if the nested {@code Coder} is.
+   *
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean consistentWithEquals() {
+    return valueCoder.consistentWithEquals();
+  }
+
+  /**
+   * Overridden to short-circuit the default {@code StandardCoder} behavior of encoding and
+   * counting the bytes. The size is known to be the size of the value plus the number of bytes
+   * required to prefix the length.
+   *
+   * {@inheritDoc}
+   */
+  @Override
+  protected long getEncodedElementByteSize(T value, Context context) throws Exception {
+    if (valueCoder instanceof StandardCoder) {
+      // If valueCoder is a StandardCoder then we can ask it directly for the encoded size of
+      // the value, adding the number of bytes to represent the length.
+      long valueSize = ((StandardCoder<T>) valueCoder).getEncodedElementByteSize(
+          value, Context.OUTER);
+      return VarInt.getLength(valueSize) + valueSize;
+    }
+
+    // If value is not a StandardCoder then fall back to the default StandardCoder behavior
+    // of encoding and counting the bytes. The encoding will include the null indicator byte.
+    return super.getEncodedElementByteSize(value, context);
+  }
+
+  /**
+   * {@code LengthPrefixCoder} is cheap if {@code valueCoder} is cheap.
+   *
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean isRegisterByteSizeObserverCheap(@Nullable T value, Context context) {
+    return valueCoder.isRegisterByteSizeObserverCheap(value, Context.OUTER);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/3de4108e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java
index 36bf789..7b93b59 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java
@@ -31,6 +31,7 @@ import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.fasterxml.jackson.databind.type.TypeFactory;
 import com.google.api.client.util.Base64;
 import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -39,10 +40,13 @@ import java.io.OutputStream;
 import java.lang.ref.SoftReference;
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.TypeVariable;
+import java.util.Map;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
@@ -51,15 +55,15 @@ import org.apache.beam.sdk.values.TypeDescriptor;
 public final class CoderUtils {
   private CoderUtils() {}  // Non-instantiable
 
-  /**
-   * Coder class-name alias for a key-value type.
-   */
-  public static final String KIND_PAIR = "kind:pair";
-
-  /**
-   * Coder class-name alias for a stream type.
-   */
-  public static final String KIND_STREAM = "kind:stream";
+  /** A mapping from well known coder types to their implementing classes. */
+  private static final Map<String, Class<?>> WELL_KNOWN_CODER_TYPES =
+      ImmutableMap.<String, Class<?>>builder()
+      .put("kind:pair", KvCoder.class)
+      .put("kind:stream", IterableCoder.class)
+      .put("kind:global_window", GlobalWindow.Coder.class)
+      .put("kind:length_prefix", LengthPrefixCoder.class)
+      .put("kind:windowed_value", WindowedValue.FullWindowedValueCoder.class)
+      .build();
 
   private static ThreadLocal<SoftReference<ExposedByteArrayOutputStream>>
       threadLocalOutputStream = new ThreadLocal<>();
@@ -266,10 +270,8 @@ public final class CoderUtils {
             return Class.forName(id);
           }
 
-          if (id.equals(KIND_STREAM)) {
-            return IterableCoder.class;
-          } else if (id.equals(KIND_PAIR)) {
-            return KvCoder.class;
+          if (WELL_KNOWN_CODER_TYPES.containsKey(id)) {
+            return WELL_KNOWN_CODER_TYPES.get(id);
           }
 
           // Otherwise, see if the ID is the name of a class in

http://git-wip-us.apache.org/repos/asf/beam/blob/3de4108e/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
new file mode 100644
index 0000000..e31c561
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.coders;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link LengthPrefixCoder}. */
+@RunWith(JUnit4.class)
+public class LengthPrefixCoderTest {
+  private static final StandardCoder<byte[]> TEST_CODER = LengthPrefixCoder.of(ByteArrayCoder.of());
+
+  private static final List<byte[]> TEST_VALUES = Arrays.asList(
+    new byte[]{ 0xa, 0xb, 0xc },
+    new byte[]{ 0xd, 0x3 },
+    new byte[]{ 0xd, 0xe },
+    new byte[]{ });
+
+  @Test
+  public void testCoderSerializable() throws Exception {
+    CoderProperties.coderSerializable(TEST_CODER);
+  }
+
+  @Test
+  public void testEncodedSize() throws Exception {
+    assertEquals(4L,
+        TEST_CODER.getEncodedElementByteSize(TEST_VALUES.get(0), Coder.Context.NESTED));
+    assertEquals(4L,
+        TEST_CODER.getEncodedElementByteSize(TEST_VALUES.get(0), Coder.Context.OUTER));
+  }
+
+  @Test
+  public void testObserverIsCheap() throws Exception {
+    NullableCoder<Double> coder = NullableCoder.of(DoubleCoder.of());
+    assertTrue(coder.isRegisterByteSizeObserverCheap(5.0, Coder.Context.OUTER));
+  }
+
+  @Test
+  public void testObserverIsNotCheap() throws Exception {
+    NullableCoder<List<String>> coder = NullableCoder.of(ListCoder.of(StringUtf8Coder.of()));
+    assertFalse(coder.isRegisterByteSizeObserverCheap(
+        ImmutableList.of("hi", "test"), Coder.Context.OUTER));
+  }
+
+  @Test
+  public void testDecodeEncodeEquals() throws Exception {
+    for (byte[] value : TEST_VALUES) {
+      CoderProperties.coderDecodeEncodeEqual(TEST_CODER, value);
+    }
+  }
+
+  @Test
+  public void testRegisterByteSizeObserver() throws Exception {
+    CoderProperties.testByteCount(TEST_CODER, Coder.Context.OUTER,
+                                   new byte[][]{{ 0xa, 0xb, 0xc }});
+
+    CoderProperties.testByteCount(TEST_CODER, Coder.Context.NESTED,
+                                   new byte[][]{{ 0xa, 0xb, 0xc }, {}, {}, { 0xd, 0xe }, {}});
+  }
+
+  @Test
+  public void testStructuralValueConsistentWithEquals() throws Exception {
+    for (byte[] value1 : TEST_VALUES) {
+      for (byte[] value2 : TEST_VALUES) {
+        CoderProperties.structuralValueConsistentWithEquals(TEST_CODER, value1, value2);
+      }
+    }
+  }
+
+  // If this changes, it implies the binary format has changed.
+  private static final String EXPECTED_ENCODING_ID = "";
+
+  @Test
+  public void testEncodingId() throws Exception {
+    CoderProperties.coderHasEncodingId(TEST_CODER, EXPECTED_ENCODING_ID);
+  }
+
+  /**
+   * Generated data to check that the wire format has not changed. To regenerate, see
+   * {@link org.apache.beam.sdk.coders.PrintBase64Encodings}.
+   */
+  private static final List<String> TEST_ENCODINGS = Arrays.asList(
+      "AwoLDA",
+      "Ag0D",
+      "Ag0O",
+      "AA");
+
+  @Test
+  public void testWireFormatEncode() throws Exception {
+    CoderProperties.coderEncodesBase64(TEST_CODER, TEST_VALUES, TEST_ENCODINGS);
+  }
+}