You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/12/28 23:31:19 UTC

[1/2] beam git commit: [BEAM-1211] Make KVCoder more efficient by removing unnecessary nesting

Repository: beam
Updated Branches:
  refs/heads/master b6944cd62 -> 84134bcab


[BEAM-1211] Make KVCoder more efficient by removing unnecessary nesting

See [BEAM-469] for more information about why this is
correct.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/496ebbea
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/496ebbea
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/496ebbea

Branch: refs/heads/master
Commit: 496ebbea1a6a4231077f01dcc392c06239d20841
Parents: b6944cd
Author: Dan Halperin <dh...@google.com>
Authored: Wed Dec 21 15:37:49 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Dec 28 15:30:53 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/core/KeyedWorkItemCoder.java   |  4 +-
 .../core/UnboundedReadFromBoundedSource.java    | 10 ++--
 .../streaming/SingletonKeyedWorkItemCoder.java  | 10 ++--
 .../beam/runners/dataflow/DataflowRunner.java   |  4 +-
 .../DataflowUnboundedReadFromBoundedSource.java | 10 ++--
 .../runners/dataflow/internal/IsmFormat.java    |  6 +--
 .../apache/beam/sdk/coders/BigDecimalCoder.java |  6 +--
 .../org/apache/beam/sdk/coders/KvCoder.java     | 22 +++-----
 .../org/apache/beam/sdk/coders/MapCoder.java    | 53 +++++++++++++++-----
 .../apache/beam/sdk/io/PubsubUnboundedSink.java | 12 ++---
 .../beam/sdk/testing/ValueInSingleWindow.java   |  6 +--
 .../apache/beam/sdk/transforms/CombineFns.java  | 14 +++++-
 .../org/apache/beam/sdk/transforms/Mean.java    | 10 ++--
 .../beam/sdk/transforms/join/CoGbkResult.java   | 22 ++++++--
 .../transforms/windowing/IntervalWindow.java    |  4 +-
 .../apache/beam/sdk/util/TimerInternals.java    |  4 +-
 .../org/apache/beam/sdk/util/WindowedValue.java | 11 ++--
 .../beam/sdk/coders/BigDecimalCoderTest.java    | 46 ++++++++---------
 .../apache/beam/sdk/coders/JAXBCoderTest.java   | 16 +++---
 .../apache/beam/sdk/coders/MapCoderTest.java    |  2 +-
 .../apache/beam/sdk/transforms/CombineTest.java |  6 +--
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    |  4 +-
 .../beam/sdk/io/kafka/KafkaRecordCoder.java     |  4 +-
 .../beam/sdk/io/kinesis/KinesisRecordCoder.java |  4 +-
 24 files changed, 164 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
index 95be047..dfd6a8d 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
@@ -92,7 +92,7 @@ public class KeyedWorkItemCoder<K, ElemT> extends StandardCoder<KeyedWorkItem<K,
     Coder.Context nestedContext = context.nested();
     keyCoder.encode(value.key(), outStream, nestedContext);
     timersCoder.encode(value.timersIterable(), outStream, nestedContext);
-    elemsCoder.encode(value.elementsIterable(), outStream, nestedContext);
+    elemsCoder.encode(value.elementsIterable(), outStream, context);
   }
 
   @Override
@@ -101,7 +101,7 @@ public class KeyedWorkItemCoder<K, ElemT> extends StandardCoder<KeyedWorkItem<K,
     Coder.Context nestedContext = context.nested();
     K key = keyCoder.decode(inStream, nestedContext);
     Iterable<TimerData> timers = timersCoder.decode(inStream, nestedContext);
-    Iterable<WindowedValue<ElemT>> elems = elemsCoder.decode(inStream, nestedContext);
+    Iterable<WindowedValue<ElemT>> elems = elemsCoder.decode(inStream, context);
     return KeyedWorkItems.workItem(key, timers, elems);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
index f3f93e1..be1793c 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
@@ -235,19 +235,17 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
       @Override
       public void encode(Checkpoint<T> value, OutputStream outStream, Context context)
           throws CoderException, IOException {
-        Context nested = context.nested();
-        elemsCoder.encode(value.residualElements, outStream, nested);
-        sourceCoder.encode(value.residualSource, outStream, nested);
+        elemsCoder.encode(value.residualElements, outStream, context.nested());
+        sourceCoder.encode(value.residualSource, outStream, context);
       }
 
       @SuppressWarnings("unchecked")
       @Override
       public Checkpoint<T> decode(InputStream inStream, Context context)
           throws CoderException, IOException {
-        Context nested = context.nested();
         return new Checkpoint<>(
-            elemsCoder.decode(inStream, nested),
-            sourceCoder.decode(inStream, nested));
+            elemsCoder.decode(inStream, context.nested()),
+            sourceCoder.decode(inStream, context));
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
index ad30688..d95ed7c 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
@@ -90,17 +90,15 @@ public class SingletonKeyedWorkItemCoder<K, ElemT>
                      OutputStream outStream,
                      Context context)
       throws CoderException, IOException {
-    Context nestedContext = context.nested();
-    keyCoder.encode(value.key(), outStream, nestedContext);
-    valueCoder.encode(value.value, outStream, nestedContext);
+    keyCoder.encode(value.key(), outStream, context.nested());
+    valueCoder.encode(value.value, outStream, context);
   }
 
   @Override
   public SingletonKeyedWorkItem<K, ElemT> decode(InputStream inStream, Context context)
       throws CoderException, IOException {
-    Context nestedContext = context.nested();
-    K key = keyCoder.decode(inStream, nestedContext);
-    WindowedValue<ElemT> value = valueCoder.decode(inStream, nestedContext);
+    K key = keyCoder.decode(inStream, context.nested());
+    WindowedValue<ElemT> value = valueCoder.decode(inStream, context);
     return new SingletonKeyedWorkItem<>(key, value);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 1a15eaf..29c0058 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -1984,7 +1984,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     public void encode(TransformedMap<K, V1, V2> value, OutputStream outStream,
         Coder.Context context) throws CoderException, IOException {
       transformCoder.encode(value.transform, outStream, context.nested());
-      originalMapCoder.encode(value.originalMap, outStream, context.nested());
+      originalMapCoder.encode(value.originalMap, outStream, context);
     }
 
     @Override
@@ -1992,7 +1992,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
         InputStream inStream, Coder.Context context) throws CoderException, IOException {
       return new TransformedMap<>(
           transformCoder.decode(inStream, context.nested()),
-          originalMapCoder.decode(inStream, context.nested()));
+          originalMapCoder.decode(inStream, context));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
index e1eedd8..65db817 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
@@ -254,19 +254,17 @@ public class DataflowUnboundedReadFromBoundedSource<T> extends PTransform<PBegin
       @Override
       public void encode(Checkpoint<T> value, OutputStream outStream, Context context)
           throws CoderException, IOException {
-        Context nested = context.nested();
-        elemsCoder.encode(value.residualElements, outStream, nested);
-        sourceCoder.encode(value.residualSource, outStream, nested);
+        elemsCoder.encode(value.residualElements, outStream, context.nested());
+        sourceCoder.encode(value.residualSource, outStream, context);
       }
 
       @SuppressWarnings("unchecked")
       @Override
       public Checkpoint<T> decode(InputStream inStream, Context context)
           throws CoderException, IOException {
-        Context nested = context.nested();
         return new Checkpoint<>(
-            elemsCoder.decode(inStream, nested),
-            sourceCoder.decode(inStream, nested));
+            elemsCoder.decode(inStream, context.nested()),
+            sourceCoder.decode(inStream, context));
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
index 2f83ffd..6a244b0 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
@@ -647,15 +647,15 @@ public class IsmFormat {
           value);
       VarIntCoder.of().encode(value.getId(), outStream, context.nested());
       VarLongCoder.of().encode(value.getBlockOffset(), outStream, context.nested());
-      VarLongCoder.of().encode(value.getIndexOffset(), outStream, context.nested());
+      VarLongCoder.of().encode(value.getIndexOffset(), outStream, context);
     }
 
     @Override
     public IsmShard decode(
         InputStream inStream, Coder.Context context) throws CoderException, IOException {
       return IsmShard.of(
-          VarIntCoder.of().decode(inStream, context),
-          VarLongCoder.of().decode(inStream, context),
+          VarIntCoder.of().decode(inStream, context.nested()),
+          VarLongCoder.of().decode(inStream, context.nested()),
           VarLongCoder.of().decode(inStream, context));
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
index e262882..36c8265 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
@@ -54,14 +54,14 @@ public class BigDecimalCoder extends AtomicCoder<BigDecimal> {
       throws IOException, CoderException {
     checkNotNull(value, String.format("cannot encode a null %s", BigDecimal.class.getSimpleName()));
     integerCoder.encode(value.scale(), outStream, context.nested());
-    bigIntegerCoder.encode(value.unscaledValue(), outStream, context.nested());
+    bigIntegerCoder.encode(value.unscaledValue(), outStream, context);
   }
 
   @Override
   public BigDecimal decode(InputStream inStream, Context context)
       throws IOException, CoderException {
     int scale = integerCoder.decode(inStream, context.nested());
-    BigInteger bigInteger = bigIntegerCoder.decode(inStream, context.nested());
+    BigInteger bigInteger = bigIntegerCoder.decode(inStream, context);
     return new BigDecimal(bigInteger, scale);
   }
 
@@ -96,6 +96,6 @@ public class BigDecimalCoder extends AtomicCoder<BigDecimal> {
   protected long getEncodedElementByteSize(BigDecimal value, Context context) throws Exception {
     checkNotNull(value, String.format("cannot encode a null %s", BigDecimal.class.getSimpleName()));
     return integerCoder.getEncodedElementByteSize(value.scale(), context.nested())
-        + bigIntegerCoder.getEncodedElementByteSize(value.unscaledValue(), context.nested());
+        + bigIntegerCoder.getEncodedElementByteSize(value.unscaledValue(), context);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
index ad13226..c0d3aa4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
@@ -83,17 +83,15 @@ public class KvCoder<K, V> extends StandardCoder<KV<K, V>> {
     if (kv == null) {
       throw new CoderException("cannot encode a null KV");
     }
-    Context nestedContext = context.nested();
-    keyCoder.encode(kv.getKey(), outStream, nestedContext);
-    valueCoder.encode(kv.getValue(), outStream, nestedContext);
+    keyCoder.encode(kv.getKey(), outStream, context.nested());
+    valueCoder.encode(kv.getValue(), outStream, context);
   }
 
   @Override
   public KV<K, V> decode(InputStream inStream, Context context)
       throws IOException, CoderException {
-    Context nestedContext = context.nested();
-    K key = keyCoder.decode(inStream, nestedContext);
-    V value = valueCoder.decode(inStream, nestedContext);
+    K key = keyCoder.decode(inStream, context.nested());
+    V value = valueCoder.decode(inStream, context);
     return KV.of(key, value);
   }
 
@@ -135,10 +133,8 @@ public class KvCoder<K, V> extends StandardCoder<KV<K, V>> {
    */
   @Override
   public boolean isRegisterByteSizeObserverCheap(KV<K, V> kv, Context context) {
-    return keyCoder.isRegisterByteSizeObserverCheap(kv.getKey(),
-                                                    context.nested())
-        && valueCoder.isRegisterByteSizeObserverCheap(kv.getValue(),
-                                                      context.nested());
+    return keyCoder.isRegisterByteSizeObserverCheap(kv.getKey(), context.nested())
+        && valueCoder.isRegisterByteSizeObserverCheap(kv.getValue(), context);
   }
 
   /**
@@ -152,9 +148,7 @@ public class KvCoder<K, V> extends StandardCoder<KV<K, V>> {
     if (kv == null) {
       throw new CoderException("cannot encode a null KV");
     }
-    keyCoder.registerByteSizeObserver(
-        kv.getKey(), observer, context.nested());
-    valueCoder.registerByteSizeObserver(
-        kv.getValue(), observer, context.nested());
+    keyCoder.registerByteSizeObserver(kv.getKey(), observer, context.nested());
+    valueCoder.registerByteSizeObserver(kv.getValue(), observer, context);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
index ebe7051..94099be 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
@@ -28,6 +28,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -81,10 +83,10 @@ public class MapCoder<K, V> extends StandardCoder<Map<K, V>> {
 
   /////////////////////////////////////////////////////////////////////////////
 
-  Coder<K> keyCoder;
-  Coder<V> valueCoder;
+  private Coder<K> keyCoder;
+  private Coder<V> valueCoder;
 
-  MapCoder(Coder<K> keyCoder, Coder<V> valueCoder) {
+  private MapCoder(Coder<K> keyCoder, Coder<V> valueCoder) {
     this.keyCoder = keyCoder;
     this.valueCoder = valueCoder;
   }
@@ -99,12 +101,25 @@ public class MapCoder<K, V> extends StandardCoder<Map<K, V>> {
       throw new CoderException("cannot encode a null Map");
     }
     DataOutputStream dataOutStream = new DataOutputStream(outStream);
-    dataOutStream.writeInt(map.size());
-    for (Entry<K, V> entry : map.entrySet()) {
+
+    int size = map.size();
+    dataOutStream.writeInt(size);
+    if (size == 0) {
+      return;
+    }
+
+    // Since we handled size == 0 above, entry is guaranteed to exist before and after loop
+    Iterator<Entry<K, V>> iterator = map.entrySet().iterator();
+    Entry<K, V> entry = iterator.next();
+    while (iterator.hasNext()) {
       keyCoder.encode(entry.getKey(), outStream, context.nested());
       valueCoder.encode(entry.getValue(), outStream, context.nested());
+      entry = iterator.next();
     }
-    dataOutStream.flush();
+
+    keyCoder.encode(entry.getKey(), outStream, context.nested());
+    valueCoder.encode(entry.getValue(), outStream, context);
+    // no flush needed as DataOutputStream does not buffer
   }
 
   @Override
@@ -112,12 +127,20 @@ public class MapCoder<K, V> extends StandardCoder<Map<K, V>> {
       throws IOException, CoderException {
     DataInputStream dataInStream = new DataInputStream(inStream);
     int size = dataInStream.readInt();
+    if (size == 0) {
+      return Collections.emptyMap();
+    }
+
     Map<K, V> retval = Maps.newHashMapWithExpectedSize(size);
-    for (int i = 0; i < size; ++i) {
+    for (int i = 0; i < size - 1; ++i) {
       K key = keyCoder.decode(inStream, context.nested());
       V value = valueCoder.decode(inStream, context.nested());
       retval.put(key, value);
     }
+
+    K key = keyCoder.decode(inStream, context.nested());
+    V value = valueCoder.decode(inStream, context);
+    retval.put(key, value);
     return retval;
   }
 
@@ -149,11 +172,17 @@ public class MapCoder<K, V> extends StandardCoder<Map<K, V>> {
       Map<K, V> map, ElementByteSizeObserver observer, Context context)
       throws Exception {
     observer.update(4L);
-    for (Entry<K, V> entry : map.entrySet()) {
-      keyCoder.registerByteSizeObserver(
-          entry.getKey(), observer, context.nested());
-      valueCoder.registerByteSizeObserver(
-          entry.getValue(), observer, context.nested());
+    if (map.isEmpty()) {
+      return;
+    }
+    Iterator<Entry<K, V>> entries = map.entrySet().iterator();
+    Entry<K, V> entry = entries.next();
+    while (entries.hasNext()) {
+      keyCoder.registerByteSizeObserver(entry.getKey(), observer, context.nested());
+      valueCoder.registerByteSizeObserver(entry.getValue(), observer, context.nested());
+      entry = entries.next();
     }
+    keyCoder.registerByteSizeObserver(entry.getKey(), observer, context.nested());
+    valueCoder.registerByteSizeObserver(entry.getValue(), observer, context);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
index 1992cb8..58414c6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
@@ -110,17 +110,17 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
     public void encode(
         OutgoingMessage value, OutputStream outStream, Context context)
         throws CoderException, IOException {
-      ByteArrayCoder.of().encode(value.elementBytes, outStream, Context.NESTED);
-      BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch, outStream, Context.NESTED);
-      RECORD_ID_CODER.encode(value.recordId, outStream, Context.NESTED);
+      ByteArrayCoder.of().encode(value.elementBytes, outStream, context.nested());
+      BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch, outStream, context.nested());
+      RECORD_ID_CODER.encode(value.recordId, outStream, context);
     }
 
     @Override
     public OutgoingMessage decode(
         InputStream inStream, Context context) throws CoderException, IOException {
-      byte[] elementBytes = ByteArrayCoder.of().decode(inStream, Context.NESTED);
-      long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream, Context.NESTED);
-      @Nullable String recordId = RECORD_ID_CODER.decode(inStream, Context.NESTED);
+      byte[] elementBytes = ByteArrayCoder.of().decode(inStream, context.nested());
+      long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream, context.nested());
+      @Nullable String recordId = RECORD_ID_CODER.decode(inStream, context);
       return new OutgoingMessage(elementBytes, timestampMsSinceEpoch, recordId);
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValueInSingleWindow.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValueInSingleWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValueInSingleWindow.java
index 9ec030f..b746f6d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValueInSingleWindow.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValueInSingleWindow.java
@@ -97,19 +97,19 @@ public abstract class ValueInSingleWindow<T> {
     public void encode(ValueInSingleWindow<T> windowedElem, OutputStream outStream, Context context)
         throws IOException {
       Context nestedContext = context.nested();
-      valueCoder.encode(windowedElem.getValue(), outStream, nestedContext);
       InstantCoder.of().encode(windowedElem.getTimestamp(), outStream, nestedContext);
       windowCoder.encode(windowedElem.getWindow(), outStream, nestedContext);
-      PaneInfo.PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), outStream, context);
+      PaneInfo.PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), outStream, nestedContext);
+      valueCoder.encode(windowedElem.getValue(), outStream, context);
     }
 
     @Override
     public ValueInSingleWindow<T> decode(InputStream inStream, Context context) throws IOException {
       Context nestedContext = context.nested();
-      T value = valueCoder.decode(inStream, nestedContext);
       Instant timestamp = InstantCoder.of().decode(inStream, nestedContext);
       BoundedWindow window = windowCoder.decode(inStream, nestedContext);
       PaneInfo pane = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream, nestedContext);
+      T value = valueCoder.decode(inStream, context);
       return new AutoValue_ValueInSingleWindow<>(value, timestamp, window, pane);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
index e4e1c50..79b2ab8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
@@ -994,20 +994,30 @@ public class CombineFns {
     public void encode(Object[] value, OutputStream outStream, Context context)
         throws CoderException, IOException {
       checkArgument(value.length == codersCount);
+      if (value.length == 0) {
+        return;
+      }
+      int lastIndex = codersCount - 1;
       Context nestedContext = context.nested();
-      for (int i = 0; i < codersCount; ++i) {
+      for (int i = 0; i < lastIndex; ++i) {
         coders.get(i).encode(value[i], outStream, nestedContext);
       }
+      coders.get(lastIndex).encode(value[lastIndex], outStream, context);
     }
 
     @Override
     public Object[] decode(InputStream inStream, Context context)
         throws CoderException, IOException {
       Object[] ret = new Object[codersCount];
+      if (codersCount == 0) {
+        return ret;
+      }
+      int lastIndex = codersCount - 1;
       Context nestedContext = context.nested();
-      for (int i = 0; i < codersCount; ++i) {
+      for (int i = 0; i < lastIndex; ++i) {
         ret[i] = coders.get(i).decode(inStream, nestedContext);
       }
+      ret[lastIndex] = coders.get(lastIndex).decode(inStream, context);
       return ret;
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
index 1a0791f..9eea3a0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
@@ -185,18 +185,16 @@ public class Mean {
      @Override
      public void encode(CountSum<NumT> value, OutputStream outStream, Coder.Context context)
          throws CoderException, IOException {
-       Coder.Context nestedContext = context.nested();
-       LONG_CODER.encode(value.count, outStream, nestedContext);
-       DOUBLE_CODER.encode(value.sum, outStream, nestedContext);
+       LONG_CODER.encode(value.count, outStream, context.nested());
+       DOUBLE_CODER.encode(value.sum, outStream, context);
      }
 
      @Override
      public CountSum<NumT> decode(InputStream inStream, Coder.Context context)
          throws CoderException, IOException {
-       Coder.Context nestedContext = context.nested();
        return new CountSum<>(
-           LONG_CODER.decode(inStream, nestedContext),
-           DOUBLE_CODER.decode(inStream, nestedContext));
+           LONG_CODER.decode(inStream, context.nested()),
+           DOUBLE_CODER.decode(inStream, context));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
index 10ba3c9..7b849e7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
@@ -22,7 +22,9 @@ import static org.apache.beam.sdk.util.Structs.addObject;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
 import com.google.common.collect.PeekingIterator;
 import java.io.IOException;
 import java.io.InputStream;
@@ -268,9 +270,14 @@ public class CoGbkResult {
       if (!schema.equals(value.getSchema())) {
         throw new CoderException("input schema does not match coder schema");
       }
-      for (int unionTag = 0; unionTag < schema.size(); unionTag++) {
-        tagListCoder(unionTag).encode(value.valueMap.get(unionTag), outStream, Context.NESTED);
+      if (schema.size() == 0) {
+        return;
+      }
+      int lastIndex = schema.size() - 1;
+      for (int unionTag = 0; unionTag < lastIndex; unionTag++) {
+        tagListCoder(unionTag).encode(value.valueMap.get(unionTag), outStream, context.nested());
       }
+      tagListCoder(lastIndex).encode(value.valueMap.get(lastIndex), outStream, context);
     }
 
     @Override
@@ -278,10 +285,15 @@ public class CoGbkResult {
         InputStream inStream,
         Context context)
         throws CoderException, IOException {
-      List<Iterable<?>> valueMap = new ArrayList<>();
-      for (int unionTag = 0; unionTag < schema.size(); unionTag++) {
-        valueMap.add(tagListCoder(unionTag).decode(inStream, Context.NESTED));
+      if (schema.size() == 0) {
+        return new CoGbkResult(schema, ImmutableList.<Iterable<?>>of());
+      }
+      int lastIndex = schema.size() - 1;
+      List<Iterable<?>> valueMap = Lists.newArrayListWithExpectedSize(schema.size());
+      for (int unionTag = 0; unionTag < lastIndex; unionTag++) {
+        valueMap.add(tagListCoder(unionTag).decode(inStream, context.nested()));
       }
+      valueMap.add(tagListCoder(lastIndex).decode(inStream, context));
       return new CoGbkResult(schema, valueMap);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
index af98796..fb0fc11 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
@@ -185,14 +185,14 @@ public class IntervalWindow extends BoundedWindow
                        Context context)
         throws IOException, CoderException {
       instantCoder.encode(window.end, outStream, context.nested());
-      durationCoder.encode(new Duration(window.start, window.end), outStream, context.nested());
+      durationCoder.encode(new Duration(window.start, window.end), outStream, context);
     }
 
     @Override
     public IntervalWindow decode(InputStream inStream, Context context)
         throws IOException, CoderException {
       Instant end = instantCoder.decode(inStream, context.nested());
-      ReadableDuration duration = durationCoder.decode(inStream, context.nested());
+      ReadableDuration duration = durationCoder.decode(inStream, context);
       return new IntervalWindow(end.minus(duration), end);
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
index 0bfcddc..7195486 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
@@ -258,7 +258,7 @@ public interface TimerInternals {
       STRING_CODER.encode(timer.getTimerId(), outStream, nestedContext);
       STRING_CODER.encode(timer.getNamespace().stringKey(), outStream, nestedContext);
       INSTANT_CODER.encode(timer.getTimestamp(), outStream, nestedContext);
-      STRING_CODER.encode(timer.getDomain().name(), outStream, nestedContext);
+      STRING_CODER.encode(timer.getDomain().name(), outStream, context);
     }
 
     @Override
@@ -269,7 +269,7 @@ public interface TimerInternals {
       StateNamespace namespace =
           StateNamespaces.fromString(STRING_CODER.decode(inStream, nestedContext), windowCoder);
       Instant timestamp = INSTANT_CODER.decode(inStream, nestedContext);
-      TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream, nestedContext));
+      TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream, context));
       return TimerData.of(timerId, namespace, timestamp, domain);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
index a0b4cf5..1b3e648 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
@@ -656,22 +656,22 @@ public abstract class WindowedValue<T> {
                        Context context)
         throws CoderException, IOException {
       Context nestedContext = context.nested();
-      valueCoder.encode(windowedElem.getValue(), outStream, nestedContext);
       InstantCoder.of().encode(
           windowedElem.getTimestamp(), outStream, nestedContext);
       windowsCoder.encode(windowedElem.getWindows(), outStream, nestedContext);
-      PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), outStream, context);
+      PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), outStream, nestedContext);
+      valueCoder.encode(windowedElem.getValue(), outStream, context);
     }
 
     @Override
     public WindowedValue<T> decode(InputStream inStream, Context context)
         throws CoderException, IOException {
       Context nestedContext = context.nested();
-      T value = valueCoder.decode(inStream, nestedContext);
       Instant timestamp = InstantCoder.of().decode(inStream, nestedContext);
       Collection<? extends BoundedWindow> windows =
           windowsCoder.decode(inStream, nestedContext);
       PaneInfo pane = PaneInfoCoder.INSTANCE.decode(inStream, nestedContext);
+      T value = valueCoder.decode(inStream, context);
       return WindowedValue.of(value, timestamp, windows, pane);
     }
 
@@ -689,9 +689,10 @@ public abstract class WindowedValue<T> {
     public void registerByteSizeObserver(WindowedValue<T> value,
                                          ElementByteSizeObserver observer,
                                          Context context) throws Exception {
+      InstantCoder.of().registerByteSizeObserver(value.getTimestamp(), observer, context.nested());
+      windowsCoder.registerByteSizeObserver(value.getWindows(), observer, context.nested());
+      PaneInfoCoder.INSTANCE.registerByteSizeObserver(value.getPane(), observer, context.nested());
       valueCoder.registerByteSizeObserver(value.getValue(), observer, context);
-      InstantCoder.of().registerByteSizeObserver(value.getTimestamp(), observer, context);
-      windowsCoder.registerByteSizeObserver(value.getWindows(), observer, context);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigDecimalCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigDecimalCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigDecimalCoderTest.java
index f5d56cb..9db50c8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigDecimalCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigDecimalCoderTest.java
@@ -75,29 +75,29 @@ public class BigDecimalCoderTest {
    */
   private static final List<String> TEST_ENCODINGS =
       ImmutableList.of(
-          "swi4AjXYo7DnVscE2eM2Q-f7A-JpZD8CyiFN76ZfI4ZZzJtQa31MqLYUepjY4f6BXT8ZapFuM"
-              + "0AaxKyQjsrfF_0mZON_XJJle68X1L0-yBXeRP6yDKlpHD-LUwr-ivNwlWD-zdwQZONNjX9"
-              + "eivckz1QFUqJDLdWm6V_to30sySJAR87byZdCJvJS1cvjKgsjC6OijwCsL1JrJ9sfkeZTF"
-              + "f0bR7nTr9x5-hggYHJ-rczwYunX8wROP3tWJTfyN7GxeSCCrFeswbw87qkJg0Hw_-Nx_2L"
-              + "k5qCk9p96bXyCJnz6_d9Yk83re-Ru461odv273IZB-s3blouk1b2ihT15hsv7V7BIHAQ2m"
-              + "1whPlIyGS6VxVjp70vICpqYZZ2-dt6-nBbzitNCmR4l486Qi3obX-yV-RCfpUHDsnPdWQ",
-          "sgi4AjXYo7DnVscE2eM2Q-f7A-JpZD8CyiFN76ZfI4ZZzJtQa31MqLYUepjY4f6BXT8ZapFu"
-              + "M0AaxKyQjsrfF_0mZON_XJJle68X1L0-yBXeRP6yDKlpHD-LUwr-ivNwlWD-zdwQZONNj"
-              + "X9eivckz1QFUqJDLdWm6V_to30sySJAR87byZdCJvJS1cvjKgsjC6OijwCsL1JrJ9sfke"
-              + "ZTFf0bR7nTr9x5-hggYHJ-rczwYunX8wROP3tWJTfyN7GxeSCCrFeswbw87qkJg0Hw_-Nx"
-              + "_2Lk5qCk9p96bXyCJnz6_d9Yk83re-Ru461odv273IZB-s3blouk1b2ihT15hsv7V7BIHA"
-              + "Q2m1whPlIyGS6VxVjp70vICpqYZZ2-dt6-nBbzitNCmR4l486Qi3obX-yV-RCfpUHDsnPdWQ",
-          "AQGX",
-          "AAH_",
-          "AAEA",
-          "AAEB",
-          "MBUJEk1IAgE1H9Gsru39PDZgUqT1NnU",
-          "AIEBAP________gAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
-              + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
-              + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA",
-          "AIEBCf_______7AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
-              + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
-              + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA");
+          "swg12KOw51bHBNnjNkPn-wPiaWQ_AsohTe-mXyOGWcybUGt9TKi2FHqY2OH-gV0_GWqRbjNAGsSskI7K3xf9JmT"
+              + "jf1ySZXuvF9S9PsgV3kT-sgypaRw_i1MK_orzcJVg_s3cEGTjTY1_Xor3JM9UBVKiQy3Vpulf7aN9LMki"
+              + "QEfO28mXQibyUtXL4yoLIwujoo8ArC9SayfbH5HmUxX9G0e506_cefoYIGByfq3M8GLp1_METj97ViU38"
+              + "jexsXkggqxXrMG8PO6pCYNB8P_jcf9i5OagpPafem18giZ8-v3fWJPN63vkbuOtaHb9u9yGQfrN25aLpN"
+              + "W9ooU9eYbL-1ewSBwENptcIT5SMhkulcVY6e9LyAqamGWdvnbevpwW84rTQpkeJePOkIt6G1_slfkQn6V"
+              + "Bw7Jz3Vk",
+          "sgg12KOw51bHBNnjNkPn-wPiaWQ_AsohTe-mXyOGWcybUGt9TKi2FHqY2OH-gV0_GWqRbjNAGsSskI7K3xf9JmT"
+              + "jf1ySZXuvF9S9PsgV3kT-sgypaRw_i1MK_orzcJVg_s3cEGTjTY1_Xor3JM9UBVKiQy3Vpulf7aN9LMki"
+              + "QEfO28mXQibyUtXL4yoLIwujoo8ArC9SayfbH5HmUxX9G0e506_cefoYIGByfq3M8GLp1_METj97ViU38"
+              + "jexsXkggqxXrMG8PO6pCYNB8P_jcf9i5OagpPafem18giZ8-v3fWJPN63vkbuOtaHb9u9yGQfrN25aLpN"
+              + "W9ooU9eYbL-1ewSBwENptcIT5SMhkulcVY6e9LyAqamGWdvnbevpwW84rTQpkeJePOkIt6G1_slfkQn6V"
+              + "Bw7Jz3Vk",
+          "AZc",
+          "AP8",
+          "AAA",
+          "AAE",
+          "MAkSTUgCATUf0ayu7f08NmBSpPU2dQ",
+          "AAD________4AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
+              + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
+              + "AAAAAA",
+          "AAn_______-wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
+              + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
+              + "AAAAAA");
 
   @Test
   public void testWireFormatEncode() throws Exception {

http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/JAXBCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/JAXBCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/JAXBCoderTest.java
index 36190f9..c023278 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/JAXBCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/JAXBCoderTest.java
@@ -172,19 +172,19 @@ public class JAXBCoderTest {
     @Override
     public void encode(TestType value, OutputStream outStream, Context context)
         throws CoderException, IOException {
-      Context subContext = context.nested();
-      VarIntCoder.of().encode(3, outStream, subContext);
-      jaxbCoder.encode(value, outStream, subContext);
-      VarLongCoder.of().encode(22L, outStream, subContext);
+      Context nestedContext = context.nested();
+      VarIntCoder.of().encode(3, outStream, nestedContext);
+      jaxbCoder.encode(value, outStream, nestedContext);
+      VarLongCoder.of().encode(22L, outStream, context);
     }
 
     @Override
     public TestType decode(InputStream inStream, Context context)
         throws CoderException, IOException {
-      Context subContext = context.nested();
-      VarIntCoder.of().decode(inStream, subContext);
-      TestType result = jaxbCoder.decode(inStream, subContext);
-      VarLongCoder.of().decode(inStream, subContext);
+      Context nestedContext = context.nested();
+      VarIntCoder.of().decode(inStream, nestedContext);
+      TestType result = jaxbCoder.decode(inStream, nestedContext);
+      VarLongCoder.of().decode(inStream, context);
       return result;
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java
index dc4a8b5..1053c79 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java
@@ -85,7 +85,7 @@ public class MapCoderTest {
    */
   private static final List<String> TEST_ENCODINGS = Arrays.asList(
       "AAAAAA",
-      "AAAAAv____8PA2ZvbwEFaGVsbG8");
+      "AAAAAv____8PA2ZvbwFoZWxsbw");
 
   @Test
   public void testWireFormatEncode() throws Exception {

http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index 0ac9502..f783928 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -905,14 +905,14 @@ public class CombineTest implements Serializable {
       @Override
       public void encode(CountSum value, OutputStream outStream,
           Context context) throws CoderException, IOException {
-        LONG_CODER.encode(value.count, outStream, context);
+        LONG_CODER.encode(value.count, outStream, context.nested());
         DOUBLE_CODER.encode(value.sum, outStream, context);
       }
 
       @Override
       public CountSum decode(InputStream inStream, Coder.Context context)
           throws CoderException, IOException {
-        long count = LONG_CODER.decode(inStream, context);
+        long count = LONG_CODER.decode(inStream, context.nested());
         double sum = DOUBLE_CODER.decode(inStream, context);
         return new CountSum(count, sum);
       }
@@ -930,7 +930,7 @@ public class CombineTest implements Serializable {
       public void registerByteSizeObserver(
           CountSum value, ElementByteSizeObserver observer, Context context)
           throws Exception {
-        LONG_CODER.registerByteSizeObserver(value.count, observer, context);
+        LONG_CODER.registerByteSizeObserver(value.count, observer, context.nested());
         DOUBLE_CODER.registerByteSizeObserver(value.sum, observer, context);
       }
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index de9393a..3a36add 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -2768,7 +2768,7 @@ public class BigQueryIO {
         throw new CoderException("cannot encode a null value");
       }
       tableRowCoder.encode(value.tableRow, outStream, context.nested());
-      idCoder.encode(value.uniqueId, outStream, context.nested());
+      idCoder.encode(value.uniqueId, outStream, context);
     }
 
     @Override
@@ -2776,7 +2776,7 @@ public class BigQueryIO {
       throws IOException {
       return new TableRowInfo(
           tableRowCoder.decode(inStream, context.nested()),
-          idCoder.decode(inStream, context.nested()));
+          idCoder.decode(inStream, context));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
index 736a752..ea78f09 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
@@ -66,7 +66,7 @@ public class KafkaRecordCoder<K, V> extends StandardCoder<KafkaRecord<K, V>> {
     stringCoder.encode(value.getTopic(), outStream, nested);
     intCoder.encode(value.getPartition(), outStream, nested);
     longCoder.encode(value.getOffset(), outStream, nested);
-    kvCoder.encode(value.getKV(), outStream, nested);
+    kvCoder.encode(value.getKV(), outStream, context);
   }
 
   @Override
@@ -77,7 +77,7 @@ public class KafkaRecordCoder<K, V> extends StandardCoder<KafkaRecord<K, V>> {
         stringCoder.decode(inStream, nested),
         intCoder.decode(inStream, nested),
         longCoder.decode(inStream, nested),
-        kvCoder.decode(inStream, nested));
+        kvCoder.decode(inStream, context));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
index fc087b5..77fe127 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
@@ -53,7 +53,7 @@ class KinesisRecordCoder extends AtomicCoder<KinesisRecord> {
         VAR_LONG_CODER.encode(value.getSubSequenceNumber(), outStream, nested);
         INSTANT_CODER.encode(value.getReadTime(), outStream, nested);
         STRING_CODER.encode(value.getStreamName(), outStream, nested);
-        STRING_CODER.encode(value.getShardId(), outStream, nested);
+        STRING_CODER.encode(value.getShardId(), outStream, context);
     }
 
     @Override
@@ -66,7 +66,7 @@ class KinesisRecordCoder extends AtomicCoder<KinesisRecord> {
         long subSequenceNumber = VAR_LONG_CODER.decode(inStream, nested);
         Instant readTimestamp = INSTANT_CODER.decode(inStream, nested);
         String streamName = STRING_CODER.decode(inStream, nested);
-        String shardId = STRING_CODER.decode(inStream, nested);
+        String shardId = STRING_CODER.decode(inStream, context);
         return new KinesisRecord(data, sequenceNumber, subSequenceNumber, partitionKey,
                 approximateArrivalTimestamp, readTimestamp, streamName, shardId
         );


[2/2] beam git commit: This closes #1680

Posted by dh...@apache.org.
This closes #1680


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/84134bca
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/84134bca
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/84134bca

Branch: refs/heads/master
Commit: 84134bcabaada697a22728f56f030a0351394e45
Parents: b6944cd 496ebbe
Author: Dan Halperin <dh...@google.com>
Authored: Wed Dec 28 15:31:06 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Dec 28 15:31:06 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/core/KeyedWorkItemCoder.java   |  4 +-
 .../core/UnboundedReadFromBoundedSource.java    | 10 ++--
 .../streaming/SingletonKeyedWorkItemCoder.java  | 10 ++--
 .../beam/runners/dataflow/DataflowRunner.java   |  4 +-
 .../DataflowUnboundedReadFromBoundedSource.java | 10 ++--
 .../runners/dataflow/internal/IsmFormat.java    |  6 +--
 .../apache/beam/sdk/coders/BigDecimalCoder.java |  6 +--
 .../org/apache/beam/sdk/coders/KvCoder.java     | 22 +++-----
 .../org/apache/beam/sdk/coders/MapCoder.java    | 53 +++++++++++++++-----
 .../apache/beam/sdk/io/PubsubUnboundedSink.java | 12 ++---
 .../beam/sdk/testing/ValueInSingleWindow.java   |  6 +--
 .../apache/beam/sdk/transforms/CombineFns.java  | 14 +++++-
 .../org/apache/beam/sdk/transforms/Mean.java    | 10 ++--
 .../beam/sdk/transforms/join/CoGbkResult.java   | 22 ++++++--
 .../transforms/windowing/IntervalWindow.java    |  4 +-
 .../apache/beam/sdk/util/TimerInternals.java    |  4 +-
 .../org/apache/beam/sdk/util/WindowedValue.java | 11 ++--
 .../beam/sdk/coders/BigDecimalCoderTest.java    | 46 ++++++++---------
 .../apache/beam/sdk/coders/JAXBCoderTest.java   | 16 +++---
 .../apache/beam/sdk/coders/MapCoderTest.java    |  2 +-
 .../apache/beam/sdk/transforms/CombineTest.java |  6 +--
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    |  4 +-
 .../beam/sdk/io/kafka/KafkaRecordCoder.java     |  4 +-
 .../beam/sdk/io/kinesis/KinesisRecordCoder.java |  4 +-
 24 files changed, 164 insertions(+), 126 deletions(-)
----------------------------------------------------------------------