You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/01/04 00:36:18 UTC

[2/6] beam git commit: Fix coders to have type information and test coders with components to ensure that they serialize and deserialize correctly with well known coder types.

Fix coders to have type information and test coders with components to ensure that they serialize and deserialize correctly with well known coder types.

Jackson requires type information to invoke the coder Jackson module to be able to resolve @type information.
The coder Jackson module is configured to only apply to Coder types and subtypes.
In the case of List<Object> or List<?>, Jackson ignores the coder Jackson module and attempts to deserialize the type
using the default context which expects a fully qualified class name which fails on well known types.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bfde34dd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bfde34dd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bfde34dd

Branch: refs/heads/master
Commit: bfde34ddf92567b70cb1887237fbc4d6baaa1846
Parents: 6947d21
Author: Luke Cwik <lc...@google.com>
Authored: Tue Jan 3 14:11:18 2017 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Jan 3 16:35:37 2017 -0800

----------------------------------------------------------------------
 .../runners/core/KeyedWorkItemCoderTest.java    |  6 ++
 .../UnboundedReadFromBoundedSourceTest.java     |  7 ++
 .../apache/beam/sdk/coders/CollectionCoder.java |  4 +-
 .../org/apache/beam/sdk/coders/SetCoder.java    |  4 +-
 .../beam/sdk/values/TimestampedValue.java       |  4 +-
 .../beam/sdk/coders/CollectionCoderTest.java    |  6 ++
 .../beam/sdk/coders/IterableCoderTest.java      |  6 ++
 .../org/apache/beam/sdk/coders/KvCoderTest.java |  7 ++
 .../beam/sdk/coders/LengthPrefixCoderTest.java  |  6 ++
 .../apache/beam/sdk/coders/ListCoderTest.java   |  8 ++-
 .../apache/beam/sdk/coders/MapCoderTest.java    |  9 ++-
 .../beam/sdk/coders/NullableCoderTest.java      |  6 ++
 .../apache/beam/sdk/coders/SetCoderTest.java    |  6 ++
 .../apache/beam/sdk/testing/TestStreamTest.java |  5 ++
 .../testing/ValueInSingleWindowCoderTest.java   |  7 ++
 .../apache/beam/sdk/transforms/CombineTest.java | 70 --------------------
 .../transforms/join/CoGbkResultCoderTest.java   | 10 ++-
 .../sdk/transforms/join/UnionCoderTest.java     | 24 +++----
 .../beam/sdk/util/TimerInternalsTest.java       |  5 ++
 .../beam/sdk/util/ValueWithRecordIdTest.java    | 34 ++++++++++
 .../apache/beam/sdk/util/WindowedValueTest.java | 12 ++++
 .../beam/sdk/values/TimestampedValueTest.java   | 19 +++++-
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    |  7 +-
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |  6 ++
 .../beam/sdk/io/hdfs/AvroWrapperCoderTest.java  |  1 -
 .../beam/sdk/io/kafka/KafkaRecordCoderTest.java | 34 ++++++++++
 26 files changed, 216 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java
index 37fabdd..56a6f6b 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java
@@ -61,4 +61,10 @@ public class KeyedWorkItemCoderTest {
     CoderProperties.coderDecodeEncodeEqual(
         coder, KeyedWorkItems.<String, Integer>timersWorkItem("foo", timers));
   }
+
+  @Test
+  public void testCoderIsSerializableWithWellKnownCoderType() throws Exception {
+    CoderProperties.coderSerializable(KeyedWorkItemCoder.of(
+        GlobalWindow.Coder.INSTANCE, GlobalWindow.Coder.INSTANCE, GlobalWindow.Coder.INSTANCE));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
index e03793b..e1968cb 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
@@ -46,6 +46,7 @@ import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -53,6 +54,7 @@ import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Distinct;
 import org.apache.beam.sdk.transforms.Max;
 import org.apache.beam.sdk.transforms.Min;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
@@ -92,6 +94,11 @@ public class UnboundedReadFromBoundedSourceTest {
   }
 
   @Test
+  public void testCheckpointCoderIsSerializableWithWellKnownCoderType() throws Exception {
+    CoderProperties.coderSerializable(new CheckpointCoder<>(GlobalWindow.Coder.INSTANCE));
+  }
+
+  @Test
   @Category(NeedsRunner.class)
   public void testBoundedToUnboundedSourceAdapter() throws Exception {
     long numElements = 100;

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java
index 7c61e88..bf05253 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java
@@ -52,9 +52,9 @@ public class CollectionCoder<T> extends IterableLikeCoder<T, Collection<T>> {
   @JsonCreator
   public static CollectionCoder<?> of(
       @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-      List<Object> components) {
+      List<Coder<?>> components) {
     checkArgument(components.size() == 1, "Expecting 1 component, got %s", components.size());
-    return of((Coder<?>) components.get(0));
+    return of(components.get(0));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java
index 0d1b017..a8fd1cc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java
@@ -47,9 +47,9 @@ public class SetCoder<T> extends IterableLikeCoder<T, Set<T>> {
   @JsonCreator
   public static SetCoder<?> of(
       @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-      List<Object> components) {
+      List<Coder<?>> components) {
     checkArgument(components.size() == 1, "Expecting 1 component, got %s", components.size());
-    return of((Coder<?>) components.get(0));
+    return of(components.get(0));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
index dd80fb2..7f3bbd3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
@@ -103,10 +103,10 @@ public class TimestampedValue<V> {
     @JsonCreator
     public static TimestampedValueCoder<?> of(
         @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-        List<Object> components) {
+        List<Coder<?>> components) {
       checkArgument(components.size() == 1,
                     "Expecting 1 component, got " + components.size());
-      return of((Coder<?>) components.get(0));
+      return of(components.get(0));
     }
 
     @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CollectionCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CollectionCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CollectionCoderTest.java
index 6a5d94b..faa7e1a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CollectionCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CollectionCoderTest.java
@@ -24,6 +24,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.TreeSet;
 import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.junit.Rule;
 import org.junit.Test;
@@ -89,4 +90,9 @@ public class CollectionCoderTest {
 
     CoderUtils.encodeToBase64(TEST_CODER, null);
   }
+
+  @Test
+  public void testCoderIsSerializableWithWellKnownCoderType() throws Exception {
+    CoderProperties.coderSerializable(CollectionCoder.of(GlobalWindow.Coder.INSTANCE));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java
index 2df768e..e200efe 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java
@@ -26,6 +26,7 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.CloudObject;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.Structs;
@@ -55,6 +56,11 @@ public class IterableCoderTest {
   }
 
   @Test
+  public void testCoderIsSerializableWithWellKnownCoderType() throws Exception {
+    CoderProperties.coderSerializable(ListCoder.of(GlobalWindow.Coder.INSTANCE));
+  }
+
+  @Test
   public void testDecodeEncodeContentsInSameOrder() throws Exception {
     for (Iterable<Integer> value : TEST_VALUES) {
       CoderProperties.<Integer, Iterable<Integer>>coderDecodeEncodeContentsInSameOrder(

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
index 1d1f905..1422897 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.CloudObject;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.Structs;
@@ -87,6 +88,12 @@ public class KvCoderTest {
     }
   }
 
+  @Test
+  public void testCoderIsSerializableWithWellKnownCoderType() throws Exception {
+    CoderProperties.coderSerializable(
+        KvCoder.of(GlobalWindow.Coder.INSTANCE, GlobalWindow.Coder.INSTANCE));
+  }
+
   // If this changes, it implies the binary format has changed!
   private static final String EXPECTED_ENCODING_ID = "";
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
index 95d405d..f7942d3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.CloudObject;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -63,6 +64,11 @@ public class LengthPrefixCoderTest {
   }
 
   @Test
+  public void testCoderIsSerializableWithWellKnownCoderType() throws Exception {
+    CoderProperties.coderSerializable(LengthPrefixCoder.of(GlobalWindow.Coder.INSTANCE));
+  }
+
+  @Test
   public void testEncodedSize() throws Exception {
     assertEquals(4L,
         TEST_CODER.getEncodedElementByteSize(TEST_VALUES.get(0), Coder.Context.NESTED));

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ListCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ListCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ListCoderTest.java
index ba9cc9d..8d91343 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ListCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ListCoderTest.java
@@ -25,6 +25,7 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.junit.Rule;
 import org.junit.Test;
@@ -42,7 +43,12 @@ public class ListCoderTest {
       Collections.<Integer>emptyList(),
       Collections.singletonList(43),
       Arrays.asList(1, 2, 3, 4),
-      new LinkedList<Integer>(Arrays.asList(7, 6, 5)));
+      new LinkedList<>(Arrays.asList(7, 6, 5)));
+
+  @Test
+  public void testCoderIsSerializableWithWellKnownCoderType() throws Exception {
+    CoderProperties.coderSerializable(ListCoder.of(GlobalWindow.Coder.INSTANCE));
+  }
 
   @Test
   public void testDecodeEncodeContentsInSameOrder() throws Exception {

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java
index 1053c79..67366c8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.junit.Rule;
 import org.junit.Test;
@@ -44,7 +45,7 @@ public class MapCoderTest {
 
   private static final List<Map<Integer, String>> TEST_VALUES = Arrays.<Map<Integer, String>>asList(
       Collections.<Integer, String>emptyMap(),
-      new TreeMap<Integer, String>(new ImmutableMap.Builder<Integer, String>()
+      new TreeMap<>(new ImmutableMap.Builder<Integer, String>()
           .put(1, "hello").put(-1, "foo").build()));
 
   @Test
@@ -55,6 +56,12 @@ public class MapCoderTest {
   }
 
   @Test
+  public void testCoderIsSerializableWithWellKnownCoderType() throws Exception {
+    CoderProperties.coderSerializable(
+        MapCoder.of(GlobalWindow.Coder.INSTANCE, GlobalWindow.Coder.INSTANCE));
+  }
+
+  @Test
   public void testGetInstanceComponentsNonempty() {
     Map<Integer, String> map = new HashMap<>();
     map.put(17, "foozle");

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
index 21ecb45..63066a4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
@@ -35,6 +35,7 @@ import java.util.Collections;
 import java.util.List;
 import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -66,6 +67,11 @@ public class NullableCoderTest {
     CoderProperties.coderSerializable(TEST_CODER);
   }
 
+  @Test
+  public void testCoderIsSerializableWithWellKnownCoderType() throws Exception {
+    CoderProperties.coderSerializable(NullableCoder.of(GlobalWindow.Coder.INSTANCE));
+  }
+
   // If this changes, it implies the binary format has changed.
   private static final String EXPECTED_ENCODING_ID = "";
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SetCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SetCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SetCoderTest.java
index 58b0b8e..e7f8d1d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SetCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SetCoderTest.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
 import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.junit.Rule;
 import org.junit.Test;
@@ -44,6 +45,11 @@ public class SetCoderTest {
       new TreeSet<>(Arrays.asList(31, -5, 83)));
 
   @Test
+  public void testCoderIsSerializableWithWellKnownCoderType() throws Exception {
+    CoderProperties.coderSerializable(SetCoder.of(GlobalWindow.Coder.INSTANCE));
+  }
+
+  @Test
   public void testDecodeEncodeContentsEqual() throws Exception {
     for (Set<Integer> value : TEST_VALUES) {
       CoderProperties.coderDecodeEncodeContentsEqual(TEST_CODER, value);

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
index c12e9f3..a6a5f0e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
@@ -347,4 +347,9 @@ public class TestStreamTest implements Serializable {
     CoderProperties.coderDecodeEncodeEqual(coder, wm);
     CoderProperties.coderDecodeEncodeEqual(coder, procTime);
   }
+
+  @Test
+  public void testCoderIsSerializableWithWellKnownCoderType() {
+    CoderProperties.coderSerializable(TestStream.EventCoder.of(GlobalWindow.Coder.INSTANCE));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ValueInSingleWindowCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ValueInSingleWindowCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ValueInSingleWindowCoderTest.java
index daf73b6..3cc016e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ValueInSingleWindowCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ValueInSingleWindowCoderTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.testing;
 
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.joda.time.Duration;
@@ -48,4 +49,10 @@ public class ValueInSingleWindowCoderTest {
     CoderProperties.coderSerializable(
         ValueInSingleWindow.Coder.of(StringUtf8Coder.of(), IntervalWindow.getCoder()));
   }
+
+  @Test
+  public void testCoderIsSerializableWithWellKnownCoderType() {
+    CoderProperties.coderSerializable(ValueInSingleWindow.Coder.of(
+        GlobalWindow.Coder.INSTANCE, GlobalWindow.Coder.INSTANCE));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index fef47fb..890a36e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -29,11 +29,8 @@ import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -52,10 +49,8 @@ import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.DoubleCoder;
-import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.testing.NeedsRunner;
@@ -75,7 +70,6 @@ import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
-import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -200,7 +194,6 @@ public class CombineTest implements Serializable {
   private void runTestBasicCombine(List<KV<String, Integer>> table,
                                    Set<Integer> globalUnique,
                                    List<KV<String, Set<Integer>>> perKeyUnique) {
-    pipeline.getCoderRegistry().registerCoder(Set.class, SetCoder.class);
     PCollection<KV<String, Integer>> input = createInput(pipeline, table);
 
     PCollection<Set<Integer>> unique = input
@@ -759,69 +752,6 @@ public class CombineTest implements Serializable {
     }
   }
 
-  // Note: not a deterministic encoding
-  private static class SetCoder<T> extends StandardCoder<Set<T>> {
-
-    public static <T> SetCoder<T> of(Coder<T> elementCoder) {
-      return new SetCoder<>(elementCoder);
-    }
-
-    @JsonCreator
-    public static SetCoder<?> of(
-        @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-        List<Coder<?>> components) {
-      checkArgument(components.size() == 1, "Expecting 1 component, got %s", components.size());
-      return of((Coder<?>) components.get(0));
-    }
-
-    @SuppressWarnings("unused") // required for coder instantiation
-    public static <T> List<Object> getInstanceComponents(Set<T> exampleValue) {
-      return IterableCoder.getInstanceComponents(exampleValue);
-    }
-
-    private final Coder<Iterable<T>> iterableCoder;
-
-    private SetCoder(Coder<T> elementCoder) {
-      iterableCoder = IterableCoder.of(elementCoder);
-    }
-
-    @Override
-    public void encode(Set<T> value, OutputStream outStream, Context context)
-        throws CoderException, IOException {
-      iterableCoder.encode(value, outStream, context);
-    }
-
-    @Override
-    public Set<T> decode(InputStream inStream, Context context)
-        throws CoderException, IOException {
-      // TODO: Eliminate extra copy if used in production.
-      return Sets.newHashSet(iterableCoder.decode(inStream, context));
-    }
-
-    @Override
-    public List<? extends Coder<?>> getCoderArguments() {
-      return iterableCoder.getCoderArguments();
-    }
-
-    @Override
-    public void verifyDeterministic() throws NonDeterministicException {
-      throw new NonDeterministicException(this,
-          "CombineTest.SetCoder does not encode in a deterministic order.");
-    }
-
-    @Override
-    public boolean isRegisterByteSizeObserverCheap(Set<T> value, Context context) {
-      return iterableCoder.isRegisterByteSizeObserverCheap(value, context);
-    }
-
-    @Override
-    public void registerByteSizeObserver(
-        Set<T> value, ElementByteSizeObserver observer, Context context)
-        throws Exception {
-      iterableCoder.registerByteSizeObserver(value, observer, context);
-    }
-  }
-
   /** Example AccumulatingCombineFn. */
   private static class MeanInts extends
       Combine.AccumulatingCombineFn<Integer, MeanInts.CountSum, Double> {

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGbkResultCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGbkResultCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGbkResultCoderTest.java
index 18ecd9b..97eb47a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGbkResultCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGbkResultCoderTest.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.transforms.join.CoGbkResult.CoGbkResultCoder;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.junit.Test;
@@ -79,7 +80,14 @@ public class CoGbkResultCoderTest {
   }
 
   @Test
-  public void testSerializationDeserialization() {
+  public void testCoderIsSerialiable() {
     CoderProperties.coderSerializable(TEST_CODER);
   }
+
+  @Test
+  public void testCoderIsSerializableWithWellKnownCoderType() {
+    CoderProperties.coderSerializable(CoGbkResultCoder.of(
+        CoGbkResultSchema.of(ImmutableList.<TupleTag<?>>of(new TupleTag<GlobalWindow>())),
+        UnionCoder.of(ImmutableList.<Coder<?>>of(GlobalWindow.Coder.INSTANCE))));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/UnionCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/UnionCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/UnionCoderTest.java
index 41ba952..4ba78c8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/UnionCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/UnionCoderTest.java
@@ -17,14 +17,13 @@
  */
 package org.apache.beam.sdk.transforms.join;
 
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
+import com.google.common.collect.ImmutableList;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.DoubleCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.util.CloudObject;
-import org.apache.beam.sdk.util.Serializer;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.TimerInternals.TimerDataCoder;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -36,12 +35,13 @@ import org.junit.runners.JUnit4;
 public class UnionCoderTest {
 
   @Test
-  public void testSerializationDeserialization() {
-    UnionCoder newCoder =
-        UnionCoder.of(Arrays.<Coder<?>>asList(StringUtf8Coder.of(),
-            DoubleCoder.of()));
-    CloudObject encoding = newCoder.asCloudObject();
-    Coder<?> decodedCoder = Serializer.deserialize(encoding, Coder.class);
-    assertEquals(newCoder, decodedCoder);
+  public void testCoderIsSerializable() {
+    CoderProperties.coderSerializable(UnionCoder.of(ImmutableList.<Coder<?>>of(
+        StringUtf8Coder.of(), DoubleCoder.of())));
+  }
+
+  @Test
+  public void testCoderIsSerializableWithWellKnownCoderType() {
+    CoderProperties.coderSerializable(TimerDataCoder.of(GlobalWindow.Coder.INSTANCE));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TimerInternalsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TimerInternalsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TimerInternalsTest.java
index 7b56f1c..b024f89 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TimerInternalsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TimerInternalsTest.java
@@ -59,6 +59,11 @@ public class TimerInternalsTest {
   }
 
   @Test
+  public void testCoderIsSerializableWithWellKnownCoderType() {
+    CoderProperties.coderSerializable(TimerDataCoder.of(GlobalWindow.Coder.INSTANCE));
+  }
+
+  @Test
   public void testCompareTo() {
     Instant firstTimestamp = new Instant(100);
     Instant secondTimestamp = new Instant(200);

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ValueWithRecordIdTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ValueWithRecordIdTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ValueWithRecordIdTest.java
new file mode 100644
index 0000000..e3a2dc6
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ValueWithRecordIdTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.ValueWithRecordId.ValueWithRecordIdCoder;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link ValueWithRecordId}. */
+@RunWith(JUnit4.class)
+public class ValueWithRecordIdTest {
+  @Test
+  public void testCoderIsSerializableWithWellKnownCoderType() {
+    CoderProperties.coderSerializable(ValueWithRecordIdCoder.of(GlobalWindow.Coder.INSTANCE));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
index 1c67ef0..8bfdcef 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
@@ -30,6 +30,7 @@ import java.util.Arrays;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -75,6 +76,17 @@ public class WindowedValueTest {
   }
 
   @Test
+  public void testFullWindowedValueCoderIsSerializableWithWellKnownCoderType() {
+    CoderProperties.coderSerializable(WindowedValue.getFullCoder(
+        GlobalWindow.Coder.INSTANCE, GlobalWindow.Coder.INSTANCE));
+  }
+
+  @Test
+  public void testValueOnlyWindowedValueCoderIsSerializableWithWellKnownCoderType() {
+    CoderProperties.coderSerializable(WindowedValue.getValueOnlyCoder(GlobalWindow.Coder.INSTANCE));
+  }
+
+  @Test
   public void testExplodeWindowsInNoWindowsEmptyIterable() {
     WindowedValue<String> value =
         WindowedValue.of(

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TimestampedValueTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TimestampedValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TimestampedValueTest.java
index a982f31..e0f01f8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TimestampedValueTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TimestampedValueTest.java
@@ -21,9 +21,10 @@ package org.apache.beam.sdk.values;
 import static org.junit.Assert.assertEquals;
 
 import com.google.common.testing.EqualsTester;
-
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;
@@ -80,4 +81,18 @@ public class TimestampedValueTest {
             TimestampedValue.atMinimumTimestamp("foo"))
         .testEquals();
   }
+
+  private static final Coder<TimestampedValue<GlobalWindow>> CODER =
+      TimestampedValue.TimestampedValueCoder.of(GlobalWindow.Coder.INSTANCE);
+
+  @Test
+  public void testCoderEncodeDecodeEquals() throws Exception {
+    CoderProperties.coderDecodeEncodeEqual(CODER,
+        TimestampedValue.of(GlobalWindow.INSTANCE, Instant.now()));
+  }
+
+  @Test
+  public void testCoderIsSerializableWithWellKnownCoderType() {
+    CoderProperties.coderSerializable(CODER);
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 7efa115..4b19973 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -2685,7 +2685,7 @@ public class BigQueryIO {
     private final int shardNumber;
 
     public static <K> ShardedKey<K> of(K key, int shardNumber) {
-      return new ShardedKey<K>(key, shardNumber);
+      return new ShardedKey<>(key, shardNumber);
     }
 
     private ShardedKey(K key, int shardNumber) {
@@ -2705,7 +2705,8 @@ public class BigQueryIO {
   /**
    * A {@link Coder} for {@link ShardedKey}, using a wrapped key {@link Coder}.
    */
-  private static class ShardedKeyCoder<KeyT>
+  @VisibleForTesting
+  static class ShardedKeyCoder<KeyT>
       extends StandardCoder<ShardedKey<KeyT>> {
     public static <KeyT> ShardedKeyCoder<KeyT> of(Coder<KeyT> keyCoder) {
       return new ShardedKeyCoder<>(keyCoder);
@@ -2739,7 +2740,7 @@ public class BigQueryIO {
     @Override
     public ShardedKey<KeyT> decode(InputStream inStream, Context context)
         throws IOException {
-      return new ShardedKey<KeyT>(
+      return new ShardedKey<>(
           keyCoder.decode(inStream, context.nested()),
           shardNumberCoder.decode(inStream, context));
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 4ddfdea..471b5e4 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -122,6 +122,7 @@ import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
@@ -2352,4 +2353,9 @@ public class BigQueryIOTest implements Serializable {
       return null;
     }
   }
+
+  @Test
+  public void testShardedKeyCoderIsSerializableWithWellKnownCoderType() {
+    CoderProperties.coderSerializable(BigQueryIO.ShardedKeyCoder.of(GlobalWindow.Coder.INSTANCE));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoderTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoderTest.java
index 6ebea3a..4d9c819 100644
--- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoderTest.java
+++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoderTest.java
@@ -47,5 +47,4 @@ public class AvroWrapperCoderTest {
 
     CoderProperties.coderDecodeEncodeEqual(coder, value);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/bfde34dd/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoderTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoderTest.java
new file mode 100644
index 0000000..426103d
--- /dev/null
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoderTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link KafkaRecordCoder}. */
+@RunWith(JUnit4.class)
+public class KafkaRecordCoderTest {
+  @Test
+  public void testCoderIsSerializableWithWellKnownCoderType() {
+    CoderProperties.coderSerializable(
+        KafkaRecordCoder.of(GlobalWindow.Coder.INSTANCE, GlobalWindow.Coder.INSTANCE));
+  }
+}