You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/05/07 03:34:11 UTC

[2/3] beam git commit: Remove uses of context in coder size estimation calls.

Remove uses of context in coder size estimation calls.

    find . -type f -name '*.java' | xargs sed -i '' 's/\([a-zA-Z]*[bB]yteSize[a-zA-Z]*[(].*\), [^,]*[Cc]ontext[^,()]*\([(][)]\)*[)]/\1)/'

plus a one-off Python script and some manual fixups.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/96de8d73
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/96de8d73
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/96de8d73

Branch: refs/heads/master
Commit: 96de8d735c40ee5f823ff17966992c433d68f296
Parents: 78a99be
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Fri May 5 15:23:03 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Sat May 6 20:33:20 2017 -0700

----------------------------------------------------------------------
 runners/google-cloud-dataflow-java/pom.xml      |  2 +-
 .../runners/dataflow/internal/IsmFormat.java    |  8 ++--
 .../runners/dataflow/util/RandomAccessData.java | 11 ++----
 .../dataflow/util/RandomAccessDataTest.java     |  6 +--
 .../apache/beam/sdk/coders/BigDecimalCoder.java |  8 ++--
 .../beam/sdk/coders/BigEndianIntegerCoder.java  |  4 +-
 .../beam/sdk/coders/BigEndianLongCoder.java     |  4 +-
 .../apache/beam/sdk/coders/BigIntegerCoder.java |  6 +--
 .../apache/beam/sdk/coders/ByteArrayCoder.java  | 10 ++---
 .../org/apache/beam/sdk/coders/ByteCoder.java   |  4 +-
 .../org/apache/beam/sdk/coders/DoubleCoder.java |  4 +-
 .../apache/beam/sdk/coders/DurationCoder.java   |  8 ++--
 .../apache/beam/sdk/coders/InstantCoder.java    |  8 ++--
 .../beam/sdk/coders/IterableLikeCoder.java      |  9 ++---
 .../org/apache/beam/sdk/coders/KvCoder.java     | 12 +++---
 .../beam/sdk/coders/LengthPrefixCoder.java      | 11 +++---
 .../org/apache/beam/sdk/coders/MapCoder.java    | 10 ++---
 .../apache/beam/sdk/coders/NullableCoder.java   | 14 +++----
 .../beam/sdk/coders/StringDelegateCoder.java    |  1 -
 .../apache/beam/sdk/coders/StringUtf8Coder.java | 16 ++------
 .../beam/sdk/coders/TextualIntegerCoder.java    |  4 +-
 .../org/apache/beam/sdk/coders/VarIntCoder.java |  4 +-
 .../apache/beam/sdk/coders/VarLongCoder.java    |  4 +-
 .../org/apache/beam/sdk/coders/VoidCoder.java   |  4 +-
 .../beam/sdk/testing/CoderProperties.java       |  6 ++-
 .../sdk/transforms/ApproximateQuantiles.java    | 22 ++++-------
 .../org/apache/beam/sdk/transforms/Count.java   |  4 +-
 .../org/apache/beam/sdk/transforms/Top.java     |  9 ++---
 .../beam/sdk/transforms/join/UnionCoder.java    |  8 ++--
 .../org/apache/beam/sdk/util/BitSetCoder.java   |  1 -
 .../org/apache/beam/sdk/util/WindowedValue.java | 15 ++++----
 .../beam/sdk/coders/BigDecimalCoderTest.java    |  5 ++-
 .../beam/sdk/coders/BigIntegerCoderTest.java    |  5 ++-
 .../beam/sdk/coders/CoderRegistryTest.java      |  4 +-
 .../beam/sdk/coders/LengthPrefixCoderTest.java  | 24 ++++++------
 .../beam/sdk/coders/NullableCoderTest.java      | 18 ++++-----
 .../apache/beam/sdk/testing/PAssertTest.java    |  4 +-
 .../apache/beam/sdk/transforms/CombineTest.java |  8 ++--
 .../apache/beam/sdk/transforms/ParDoTest.java   |  4 +-
 .../extensions/protobuf/ByteStringCoder.java    |  8 +---
 .../protobuf/ByteStringCoderTest.java           | 10 ++---
 .../sdk/io/gcp/bigquery/TableRowJsonCoder.java  |  4 +-
 .../beam/sdk/io/kafka/KafkaRecordCoder.java     |  4 +-
 .../org/apache/beam/sdk/io/xml/JAXBCoder.java   | 40 +++++++++++---------
 44 files changed, 168 insertions(+), 207 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index 09901d5..b579041 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -33,7 +33,7 @@
   <packaging>jar</packaging>
 
   <properties>
-    <dataflow.container_version>beam-master-20170505-wd-2914</dataflow.container_version>
+    <dataflow.container_version>beam-master-20170506</dataflow.container_version>
     <dataflow.fnapi_environment_major_version>1</dataflow.fnapi_environment_major_version>
     <dataflow.legacy_environment_major_version>6</dataflow.legacy_environment_major_version>
   </properties>

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
index aed514a..00e0c54 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
@@ -704,12 +704,12 @@ public class IsmFormat {
     }
 
     @Override
-    public boolean isRegisterByteSizeObserverCheap(KeyPrefix value, Coder.Context context) {
+    public boolean isRegisterByteSizeObserverCheap(KeyPrefix value) {
       return true;
     }
 
     @Override
-    public long getEncodedElementByteSize(KeyPrefix value, Coder.Context context)
+    public long getEncodedElementByteSize(KeyPrefix value)
         throws Exception {
       checkNotNull(value);
       return VarInt.getLength(value.getSharedKeySize())
@@ -786,12 +786,12 @@ public class IsmFormat {
     }
 
     @Override
-    public boolean isRegisterByteSizeObserverCheap(Footer value, Coder.Context context) {
+    public boolean isRegisterByteSizeObserverCheap(Footer value) {
       return true;
     }
 
     @Override
-    public long getEncodedElementByteSize(Footer value, Coder.Context context)
+    public long getEncodedElementByteSize(Footer value)
         throws Exception {
       return Footer.FIXED_LENGTH;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
index 4e94515..f36bd78 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
@@ -96,22 +96,17 @@ public class RandomAccessData {
     }
 
     @Override
-    public boolean isRegisterByteSizeObserverCheap(
-        RandomAccessData value, Coder.Context context) {
+    public boolean isRegisterByteSizeObserverCheap(RandomAccessData value) {
       return true;
     }
 
     @Override
-    protected long getEncodedElementByteSize(RandomAccessData value, Coder.Context context)
+    protected long getEncodedElementByteSize(RandomAccessData value)
         throws Exception {
       if (value == null) {
         throw new CoderException("cannot encode a null in memory stream");
       }
-      long size = 0;
-      if (!context.isWholeStream) {
-        size += VarInt.getLength(value.size);
-      }
-      return size + value.size;
+      return VarInt.getLength(value.size) + value.size;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/RandomAccessDataTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/RandomAccessDataTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/RandomAccessDataTest.java
index 042e145..5a7908c 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/RandomAccessDataTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/RandomAccessDataTest.java
@@ -61,10 +61,8 @@ public class RandomAccessDataTest {
     CoderProperties.coderSerializable(RandomAccessDataCoder.of());
     CoderProperties.structuralValueConsistentWithEquals(
         RandomAccessDataCoder.of(), streamA, streamB);
-    assertTrue(RandomAccessDataCoder.of().isRegisterByteSizeObserverCheap(streamA, Context.NESTED));
-    assertTrue(RandomAccessDataCoder.of().isRegisterByteSizeObserverCheap(streamA, Context.OUTER));
-    assertEquals(4, RandomAccessDataCoder.of().getEncodedElementByteSize(streamA, Context.NESTED));
-    assertEquals(3, RandomAccessDataCoder.of().getEncodedElementByteSize(streamA, Context.OUTER));
+    assertTrue(RandomAccessDataCoder.of().isRegisterByteSizeObserverCheap(streamA));
+    assertEquals(4, RandomAccessDataCoder.of().getEncodedElementByteSize(streamA));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
index aadf085..97559a9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
@@ -85,7 +85,7 @@ public class BigDecimalCoder extends AtomicCoder<BigDecimal> {
    * @return {@code true}, because {@link #getEncodedElementByteSize} runs in constant time.
    */
   @Override
-  public boolean isRegisterByteSizeObserverCheap(BigDecimal value, Context context) {
+  public boolean isRegisterByteSizeObserverCheap(BigDecimal value) {
     return true;
   }
 
@@ -97,9 +97,9 @@ public class BigDecimalCoder extends AtomicCoder<BigDecimal> {
    * representation of the {@link BigInteger} that, when scaled, equals the given value.
    */
   @Override
-  protected long getEncodedElementByteSize(BigDecimal value, Context context) throws Exception {
+  protected long getEncodedElementByteSize(BigDecimal value) throws Exception {
     checkNotNull(value, String.format("cannot encode a null %s", BigDecimal.class.getSimpleName()));
-    return VAR_INT_CODER.getEncodedElementByteSize(value.scale(), context.nested())
-        + BIG_INT_CODER.getEncodedElementByteSize(value.unscaledValue(), context);
+    return VAR_INT_CODER.getEncodedElementByteSize(value.scale())
+        + BIG_INT_CODER.getEncodedElementByteSize(value.unscaledValue());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
index c3c7a96..a61f099 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
@@ -82,7 +82,7 @@ public class BigEndianIntegerCoder extends AtomicCoder<Integer> {
    * @return {@code true}, because {@link #getEncodedElementByteSize} runs in constant time.
    */
   @Override
-  public boolean isRegisterByteSizeObserverCheap(Integer value, Context context) {
+  public boolean isRegisterByteSizeObserverCheap(Integer value) {
     return true;
   }
 
@@ -97,7 +97,7 @@ public class BigEndianIntegerCoder extends AtomicCoder<Integer> {
    * @return {@code 4}, the size in bytes of an integer's big endian encoding.
    */
   @Override
-  protected long getEncodedElementByteSize(Integer value, Context context)
+  protected long getEncodedElementByteSize(Integer value)
       throws Exception {
     if (value == null) {
       throw new CoderException("cannot encode a null Integer");

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
index 5ef4878..868160a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
@@ -82,7 +82,7 @@ public class BigEndianLongCoder extends AtomicCoder<Long> {
    * @return {@code true}, since {@link #getEncodedElementByteSize} returns a constant.
    */
   @Override
-  public boolean isRegisterByteSizeObserverCheap(Long value, Context context) {
+  public boolean isRegisterByteSizeObserverCheap(Long value) {
     return true;
   }
 
@@ -97,7 +97,7 @@ public class BigEndianLongCoder extends AtomicCoder<Long> {
    * @return {@code 8}, the byte size of a big-endian encoded {@code Long}.
    */
   @Override
-  protected long getEncodedElementByteSize(Long value, Context context)
+  protected long getEncodedElementByteSize(Long value)
       throws Exception {
     if (value == null) {
       throw new CoderException("cannot encode a null Long");

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java
index 6d14d17..3b038af 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java
@@ -75,7 +75,7 @@ public class BigIntegerCoder extends AtomicCoder<BigInteger> {
    * @return {@code true}, because {@link #getEncodedElementByteSize} runs in constant time.
    */
   @Override
-  public boolean isRegisterByteSizeObserverCheap(BigInteger value, Context context) {
+  public boolean isRegisterByteSizeObserverCheap(BigInteger value) {
     return true;
   }
 
@@ -85,8 +85,8 @@ public class BigIntegerCoder extends AtomicCoder<BigInteger> {
    * @return the size of the encoding as a byte array according to {@link ByteArrayCoder}
    */
   @Override
-  protected long getEncodedElementByteSize(BigInteger value, Context context) throws Exception {
+  protected long getEncodedElementByteSize(BigInteger value) throws Exception {
     checkNotNull(value, String.format("cannot encode a null %s", BigInteger.class.getSimpleName()));
-    return BYTE_ARRAY_CODER.getEncodedElementByteSize(value.toByteArray(), context);
+    return BYTE_ARRAY_CODER.getEncodedElementByteSize(value.toByteArray());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
index d83d832..c9393a1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
@@ -126,7 +126,7 @@ public class ByteArrayCoder extends AtomicCoder<byte[]> {
    * constant time using the {@code length} of the provided array.
    */
   @Override
-  public boolean isRegisterByteSizeObserverCheap(byte[] value, Context context) {
+  public boolean isRegisterByteSizeObserverCheap(byte[] value) {
     return true;
   }
 
@@ -136,15 +136,11 @@ public class ByteArrayCoder extends AtomicCoder<byte[]> {
   }
 
   @Override
-  protected long getEncodedElementByteSize(byte[] value, Context context)
+  protected long getEncodedElementByteSize(byte[] value)
       throws Exception {
     if (value == null) {
       throw new CoderException("cannot encode a null byte[]");
     }
-    long size = 0;
-    if (!context.isWholeStream) {
-      size += VarInt.getLength(value.length);
-    }
-    return size + value.length;
+    return VarInt.getLength(value.length) + value.length;
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
index 6e4318e..7f449d6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
@@ -91,7 +91,7 @@ public class ByteCoder extends AtomicCoder<Byte> {
    * @return {@code true}. {@link ByteCoder#getEncodedElementByteSize} returns a constant.
    */
   @Override
-  public boolean isRegisterByteSizeObserverCheap(Byte value, Context context) {
+  public boolean isRegisterByteSizeObserverCheap(Byte value) {
     return true;
   }
 
@@ -106,7 +106,7 @@ public class ByteCoder extends AtomicCoder<Byte> {
    * @return {@code 1}, the byte size of a {@link Byte} encoded using Java serialization.
    */
   @Override
-  protected long getEncodedElementByteSize(Byte value, Context context)
+  protected long getEncodedElementByteSize(Byte value)
       throws Exception {
     if (value == null) {
       throw new CoderException("cannot estimate size for unsupported null value");

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
index 12bc5e8..8eff6ba 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
@@ -93,7 +93,7 @@ public class DoubleCoder extends AtomicCoder<Double> {
    * @return {@code true}. {@link DoubleCoder#getEncodedElementByteSize} returns a constant.
    */
   @Override
-  public boolean isRegisterByteSizeObserverCheap(Double value, Context context) {
+  public boolean isRegisterByteSizeObserverCheap(Double value) {
     return true;
   }
 
@@ -108,7 +108,7 @@ public class DoubleCoder extends AtomicCoder<Double> {
    * @return {@code 8}, the byte size of a {@link Double} encoded using Java serialization.
    */
   @Override
-  protected long getEncodedElementByteSize(Double value, Context context)
+  protected long getEncodedElementByteSize(Double value)
       throws Exception {
     if (value == null) {
       throw new CoderException("cannot encode a null Double");

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
index 7b49d1f..8b4ae1d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
@@ -89,14 +89,14 @@ public class DurationCoder extends AtomicCoder<ReadableDuration> {
    * @return {@code true}, because it is cheap to ascertain the byte size of a long.
    */
   @Override
-  public boolean isRegisterByteSizeObserverCheap(ReadableDuration value, Context context) {
-    return LONG_CODER.isRegisterByteSizeObserverCheap(toLong(value), context);
+  public boolean isRegisterByteSizeObserverCheap(ReadableDuration value) {
+    return LONG_CODER.isRegisterByteSizeObserverCheap(toLong(value));
   }
 
   @Override
   public void registerByteSizeObserver(
-      ReadableDuration value, ElementByteSizeObserver observer, Context context) throws Exception {
-    LONG_CODER.registerByteSizeObserver(toLong(value), observer, context);
+      ReadableDuration value, ElementByteSizeObserver observer) throws Exception {
+    LONG_CODER.registerByteSizeObserver(toLong(value), observer);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
index 56ed12b..000f406 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
@@ -103,16 +103,16 @@ public class InstantCoder extends AtomicCoder<Instant> {
    * @return {@code true}. The byte size for a big endian long is a constant.
    */
   @Override
-  public boolean isRegisterByteSizeObserverCheap(Instant value, Context context) {
+  public boolean isRegisterByteSizeObserverCheap(Instant value) {
     return LONG_CODER.isRegisterByteSizeObserverCheap(
-        ORDER_PRESERVING_CONVERTER.convert(value), context);
+        ORDER_PRESERVING_CONVERTER.convert(value));
   }
 
   @Override
   public void registerByteSizeObserver(
-      Instant value, ElementByteSizeObserver observer, Context context) throws Exception {
+      Instant value, ElementByteSizeObserver observer) throws Exception {
     LONG_CODER.registerByteSizeObserver(
-        ORDER_PRESERVING_CONVERTER.convert(value), observer, context);
+        ORDER_PRESERVING_CONVERTER.convert(value), observer);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
index 52b9c34..9994b3f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
@@ -170,18 +170,17 @@ public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>>
    */
   @Override
   public boolean isRegisterByteSizeObserverCheap(
-      IterableT iterable, Context context) {
+      IterableT iterable) {
     return iterable instanceof ElementByteSizeObservableIterable;
   }
 
   @Override
   public void registerByteSizeObserver(
-      IterableT iterable, ElementByteSizeObserver observer, Context context)
+      IterableT iterable, ElementByteSizeObserver observer)
       throws Exception {
     if (iterable == null) {
       throw new CoderException("cannot encode a null Iterable");
     }
-    Context nestedContext = context.nested();
 
     if (iterable instanceof ElementByteSizeObservableIterable) {
       observer.setLazy();
@@ -196,7 +195,7 @@ public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>>
         Collection<T> collection = (Collection<T>) iterable;
         observer.update(4L);
         for (T elem : collection) {
-          elementCoder.registerByteSizeObserver(elem, observer, nestedContext);
+          elementCoder.registerByteSizeObserver(elem, observer);
         }
       } else {
         // TODO: (BEAM-1537) Update to use an accurate count depending on size and count,
@@ -208,7 +207,7 @@ public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>>
         long count = 0;
         for (T elem : iterable) {
           count += 1;
-          elementCoder.registerByteSizeObserver(elem, observer, nestedContext);
+          elementCoder.registerByteSizeObserver(elem, observer);
         }
         if (count > 0) {
           // Update the length based upon the number of counted elements, this helps

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
index da7f03c..1df4460 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
@@ -105,9 +105,9 @@ public class KvCoder<K, V> extends StructuredCoder<KV<K, V>> {
    * Returns whether both keyCoder and valueCoder are considered not expensive.
    */
   @Override
-  public boolean isRegisterByteSizeObserverCheap(KV<K, V> kv, Context context) {
-    return keyCoder.isRegisterByteSizeObserverCheap(kv.getKey(), context.nested())
-        && valueCoder.isRegisterByteSizeObserverCheap(kv.getValue(), context);
+  public boolean isRegisterByteSizeObserverCheap(KV<K, V> kv) {
+    return keyCoder.isRegisterByteSizeObserverCheap(kv.getKey())
+        && valueCoder.isRegisterByteSizeObserverCheap(kv.getValue());
   }
 
   /**
@@ -116,13 +116,13 @@ public class KvCoder<K, V> extends StructuredCoder<KV<K, V>> {
    */
   @Override
   public void registerByteSizeObserver(
-      KV<K, V> kv, ElementByteSizeObserver observer, Context context)
+      KV<K, V> kv, ElementByteSizeObserver observer)
       throws Exception {
     if (kv == null) {
       throw new CoderException("cannot encode a null KV");
     }
-    keyCoder.registerByteSizeObserver(kv.getKey(), observer, context.nested());
-    valueCoder.registerByteSizeObserver(kv.getValue(), observer, context);
+    keyCoder.registerByteSizeObserver(kv.getKey(), observer);
+    valueCoder.registerByteSizeObserver(kv.getValue(), observer);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
index 685e766..7dd2a32 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
@@ -107,18 +107,17 @@ public class LengthPrefixCoder<T> extends StructuredCoder<T> {
    * {@inheritDoc}
    */
   @Override
-  protected long getEncodedElementByteSize(T value, Context context) throws Exception {
+  protected long getEncodedElementByteSize(T value) throws Exception {
     if (valueCoder instanceof StructuredCoder) {
       // If valueCoder is a StructuredCoder then we can ask it directly for the encoded size of
       // the value, adding the number of bytes to represent the length.
-      long valueSize = ((StructuredCoder<T>) valueCoder).getEncodedElementByteSize(
-          value, Context.OUTER);
+      long valueSize = ((StructuredCoder<T>) valueCoder).getEncodedElementByteSize(value);
       return VarInt.getLength(valueSize) + valueSize;
     }
 
     // If value is not a StructuredCoder then fall back to the default StructuredCoder behavior
     // of encoding and counting the bytes. The encoding will include the length prefix.
-    return super.getEncodedElementByteSize(value, context);
+    return super.getEncodedElementByteSize(value);
   }
 
   /**
@@ -127,7 +126,7 @@ public class LengthPrefixCoder<T> extends StructuredCoder<T> {
    * {@inheritDoc}
    */
   @Override
-  public boolean isRegisterByteSizeObserverCheap(@Nullable T value, Context context) {
-    return valueCoder.isRegisterByteSizeObserverCheap(value, Context.OUTER);
+  public boolean isRegisterByteSizeObserverCheap(@Nullable T value) {
+    return valueCoder.isRegisterByteSizeObserverCheap(value);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
index 9e3c768..7df9ca9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
@@ -146,7 +146,7 @@ public class MapCoder<K, V> extends StructuredCoder<Map<K, V>> {
 
   @Override
   public void registerByteSizeObserver(
-      Map<K, V> map, ElementByteSizeObserver observer, Context context)
+      Map<K, V> map, ElementByteSizeObserver observer)
       throws Exception {
     observer.update(4L);
     if (map.isEmpty()) {
@@ -155,12 +155,12 @@ public class MapCoder<K, V> extends StructuredCoder<Map<K, V>> {
     Iterator<Entry<K, V>> entries = map.entrySet().iterator();
     Entry<K, V> entry = entries.next();
     while (entries.hasNext()) {
-      keyCoder.registerByteSizeObserver(entry.getKey(), observer, context.nested());
-      valueCoder.registerByteSizeObserver(entry.getValue(), observer, context.nested());
+      keyCoder.registerByteSizeObserver(entry.getKey(), observer);
+      valueCoder.registerByteSizeObserver(entry.getValue(), observer);
       entry = entries.next();
     }
-    keyCoder.registerByteSizeObserver(entry.getKey(), observer, context.nested());
-    valueCoder.registerByteSizeObserver(entry.getValue(), observer, context);
+    keyCoder.registerByteSizeObserver(entry.getKey(), observer);
+    valueCoder.registerByteSizeObserver(entry.getValue(), observer);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
index d1eea9a..e46591e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
@@ -127,10 +127,10 @@ public class NullableCoder<T> extends StructuredCoder<T> {
    */
   @Override
   public void registerByteSizeObserver(
-      @Nullable T value, ElementByteSizeObserver observer, Context context) throws Exception {
+      @Nullable T value, ElementByteSizeObserver observer) throws Exception {
     observer.update(1);
     if (value != null) {
-      valueCoder.registerByteSizeObserver(value, observer, context);
+      valueCoder.registerByteSizeObserver(value, observer);
     }
   }
 
@@ -142,7 +142,7 @@ public class NullableCoder<T> extends StructuredCoder<T> {
    * {@inheritDoc}
    */
   @Override
-  protected long getEncodedElementByteSize(@Nullable T value, Context context) throws Exception {
+  protected long getEncodedElementByteSize(@Nullable T value) throws Exception {
     if (value == null) {
       return 1;
     }
@@ -151,12 +151,12 @@ public class NullableCoder<T> extends StructuredCoder<T> {
       // If valueCoder is a StructuredCoder then we can ask it directly for the encoded size of
       // the value, adding 1 byte to count the null indicator.
       return 1  + ((StructuredCoder<T>) valueCoder)
-          .getEncodedElementByteSize(value, context);
+          .getEncodedElementByteSize(value);
     }
 
     // If value is not a StructuredCoder then fall back to the default StructuredCoder behavior
     // of encoding and counting the bytes. The encoding will include the null indicator byte.
-    return super.getEncodedElementByteSize(value, context);
+    return super.getEncodedElementByteSize(value);
   }
 
   /**
@@ -165,11 +165,11 @@ public class NullableCoder<T> extends StructuredCoder<T> {
    * {@inheritDoc}
    */
   @Override
-  public boolean isRegisterByteSizeObserverCheap(@Nullable T value, Context context) {
+  public boolean isRegisterByteSizeObserverCheap(@Nullable T value) {
     if (value == null) {
       return true;
     }
-    return valueCoder.isRegisterByteSizeObserverCheap(value, context);
+    return valueCoder.isRegisterByteSizeObserverCheap(value);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
index 39a1658..1f4538f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
@@ -125,4 +125,3 @@ public final class StringDelegateCoder<T> extends CustomCoder<T> {
     return delegateCoder.getEncodedTypeDescriptor();
   }
 }
-

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
index 42931ca..44856e8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
@@ -18,8 +18,6 @@
 package org.apache.beam.sdk.coders;
 
 import com.google.common.base.Utf8;
-import com.google.common.io.ByteStreams;
-import com.google.common.io.CountingOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
@@ -128,20 +126,12 @@ public class StringUtf8Coder extends AtomicCoder<String> {
    * the byte size of the encoding plus the encoded length prefix.
    */
   @Override
-  public long getEncodedElementByteSize(String value, Context context)
+  public long getEncodedElementByteSize(String value)
       throws Exception {
     if (value == null) {
       throw new CoderException("cannot encode a null String");
     }
-    if (context.isWholeStream) {
-      return Utf8.encodedLength(value);
-    } else {
-      try (CountingOutputStream countingStream =
-          new CountingOutputStream(ByteStreams.nullOutputStream())) {
-        DataOutputStream stream = new DataOutputStream(countingStream);
-        writeString(value, stream);
-        return countingStream.getCount();
-      }
-    }
+    int size = Utf8.encodedLength(value);
+    return VarInt.getLength(size) + size;
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
index 9743c4c..718811c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
@@ -70,11 +70,11 @@ public class TextualIntegerCoder extends AtomicCoder<Integer> {
   }
 
   @Override
-  protected long getEncodedElementByteSize(Integer value, Context context) throws Exception {
+  protected long getEncodedElementByteSize(Integer value) throws Exception {
     if (value == null) {
       throw new CoderException("cannot encode a null Integer");
     }
     String textualValue = value.toString();
-    return StringUtf8Coder.of().getEncodedElementByteSize(textualValue, context);
+    return StringUtf8Coder.of().getEncodedElementByteSize(textualValue);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
index 30f9c09..bda66bb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
@@ -83,7 +83,7 @@ public class VarIntCoder extends AtomicCoder<Integer> {
    * @return {@code true}. {@link #getEncodedElementByteSize} is cheap.
    */
   @Override
-  public boolean isRegisterByteSizeObserverCheap(Integer value, Context context) {
+  public boolean isRegisterByteSizeObserverCheap(Integer value) {
     return true;
   }
 
@@ -93,7 +93,7 @@ public class VarIntCoder extends AtomicCoder<Integer> {
   }
 
   @Override
-  protected long getEncodedElementByteSize(Integer value, Context context)
+  protected long getEncodedElementByteSize(Integer value)
       throws Exception {
     if (value == null) {
       throw new CoderException("cannot encode a null Integer");

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
index 9a7b125..bf651c3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
@@ -89,7 +89,7 @@ public class VarLongCoder extends StructuredCoder<Long> {
    * @return {@code true}. {@link #getEncodedElementByteSize} is cheap.
    */
   @Override
-  public boolean isRegisterByteSizeObserverCheap(Long value, Context context) {
+  public boolean isRegisterByteSizeObserverCheap(Long value) {
     return true;
   }
 
@@ -99,7 +99,7 @@ public class VarLongCoder extends StructuredCoder<Long> {
   }
 
   @Override
-  protected long getEncodedElementByteSize(Long value, Context context)
+  protected long getEncodedElementByteSize(Long value)
       throws Exception {
     if (value == null) {
       throw new CoderException("cannot encode a null Long");

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
index 829bd20..4467faa 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
@@ -67,7 +67,7 @@ public class VoidCoder extends AtomicCoder<Void> {
    * @return {@code true}. {@link VoidCoder#getEncodedElementByteSize} runs in constant time.
    */
   @Override
-  public boolean isRegisterByteSizeObserverCheap(Void value, Context context) {
+  public boolean isRegisterByteSizeObserverCheap(Void value) {
     return true;
   }
 
@@ -77,7 +77,7 @@ public class VoidCoder extends AtomicCoder<Void> {
   }
 
   @Override
-  protected long getEncodedElementByteSize(Void value, Context context)
+  protected long getEncodedElementByteSize(Void value)
       throws Exception {
     return 0;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CoderProperties.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CoderProperties.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CoderProperties.java
index 6e0e264..f660f7d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CoderProperties.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CoderProperties.java
@@ -397,13 +397,15 @@ public class CoderProperties {
 
     try (CountingOutputStream os = new CountingOutputStream(ByteStreams.nullOutputStream())) {
       for (T elem : elements) {
-        coder.registerByteSizeObserver(elem, observer, context);
+        coder.registerByteSizeObserver(elem, observer);
         coder.encode(elem, os, context);
         observer.advance();
       }
       long expectedLength = os.getCount();
 
-      assertEquals(expectedLength, observer.getSum());
+      if (!context.isWholeStream) {
+        assertEquals(expectedLength, observer.getSum());
+      }
       assertEquals(elements.length, observer.getCount());
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
index b05f223..37d5a55 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
@@ -741,24 +741,16 @@ public class ApproximateQuantiles {
     @Override
     public void registerByteSizeObserver(
         QuantileState<T, ComparatorT> state,
-        ElementByteSizeObserver observer,
-        Coder.Context context)
+        ElementByteSizeObserver observer)
         throws Exception {
-      Coder.Context nestedContext = context.nested();
-      elementCoder.registerByteSizeObserver(
-          state.min, observer, nestedContext);
-      elementCoder.registerByteSizeObserver(
-          state.max, observer, nestedContext);
-      elementListCoder.registerByteSizeObserver(
-          state.unbufferedElements, observer, nestedContext);
-
-      BigEndianIntegerCoder.of().registerByteSizeObserver(
-          state.buffers.size(), observer, nestedContext);
+      elementCoder.registerByteSizeObserver(state.min, observer);
+      elementCoder.registerByteSizeObserver(state.max, observer);
+      elementListCoder.registerByteSizeObserver(state.unbufferedElements, observer);
+
+      BigEndianIntegerCoder.of().registerByteSizeObserver(state.buffers.size(), observer);
       for (QuantileBuffer<T> buffer : state.buffers) {
         observer.update(4L + 8);
-
-        elementListCoder.registerByteSizeObserver(
-            buffer.elements, observer, nestedContext);
+        elementListCoder.registerByteSizeObserver(buffer.elements, observer);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
index 753e14c..497d62b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
@@ -185,12 +185,12 @@ public class Count {
         }
 
         @Override
-        public boolean isRegisterByteSizeObserverCheap(long[] value, Context context) {
+        public boolean isRegisterByteSizeObserverCheap(long[] value) {
           return true;
         }
 
         @Override
-        protected long getEncodedElementByteSize(long[] value, Context context) {
+        protected long getEncodedElementByteSize(long[] value) {
           return VarInt.getLength(value[0]);
         }
       };

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
index 9d5db74..c0381a7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
@@ -557,16 +557,15 @@ public class Top {
 
     @Override
     public boolean isRegisterByteSizeObserverCheap(
-        BoundedHeap<T, ComparatorT> value, Context context) {
-      return listCoder.isRegisterByteSizeObserverCheap(
-          value.asList(), context);
+        BoundedHeap<T, ComparatorT> value) {
+      return listCoder.isRegisterByteSizeObserverCheap(value.asList());
     }
 
     @Override
     public void registerByteSizeObserver(
-        BoundedHeap<T, ComparatorT> value, ElementByteSizeObserver observer, Context context)
+        BoundedHeap<T, ComparatorT> value, ElementByteSizeObserver observer)
             throws Exception {
-      listCoder.registerByteSizeObserver(value.asList(), observer, context);
+      listCoder.registerByteSizeObserver(value.asList(), observer);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java
index 4a2a286..3194a37 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java
@@ -100,11 +100,11 @@ public class UnionCoder extends StructuredCoder<RawUnionValue> {
    * time, we defer the return value to that coder.
    */
   @Override
-  public boolean isRegisterByteSizeObserverCheap(RawUnionValue union, Context context) {
+  public boolean isRegisterByteSizeObserverCheap(RawUnionValue union) {
     int index = getIndexForEncoding(union);
     @SuppressWarnings("unchecked")
     Coder<Object> coder = (Coder<Object>) elementCoders.get(index);
-    return coder.isRegisterByteSizeObserverCheap(union.getValue(), context);
+    return coder.isRegisterByteSizeObserverCheap(union.getValue());
   }
 
   /**
@@ -112,7 +112,7 @@ public class UnionCoder extends StructuredCoder<RawUnionValue> {
    */
   @Override
   public void registerByteSizeObserver(
-      RawUnionValue union, ElementByteSizeObserver observer, Context context)
+      RawUnionValue union, ElementByteSizeObserver observer)
       throws Exception {
     int index = getIndexForEncoding(union);
     // Write out the union tag.
@@ -120,7 +120,7 @@ public class UnionCoder extends StructuredCoder<RawUnionValue> {
     // Write out the actual value.
     @SuppressWarnings("unchecked")
     Coder<Object> coder = (Coder<Object>) elementCoders.get(index);
-    coder.registerByteSizeObserver(union.getValue(), observer, context);
+    coder.registerByteSizeObserver(union.getValue(), observer);
   }
 
   /////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
index b646bf6..a0896f5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
@@ -59,4 +59,3 @@ public class BitSetCoder extends AtomicCoder<BitSet> {
     BYTE_ARRAY_CODER.verifyDeterministic();
   }
 }
-

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
index 1e72550..1b7e335 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
@@ -670,12 +670,11 @@ public abstract class WindowedValue<T> {
 
     @Override
     public void registerByteSizeObserver(WindowedValue<T> value,
-                                         ElementByteSizeObserver observer,
-                                         Context context) throws Exception {
-      InstantCoder.of().registerByteSizeObserver(value.getTimestamp(), observer, context.nested());
-      windowsCoder.registerByteSizeObserver(value.getWindows(), observer, context.nested());
-      PaneInfoCoder.INSTANCE.registerByteSizeObserver(value.getPane(), observer, context.nested());
-      valueCoder.registerByteSizeObserver(value.getValue(), observer, context);
+                                         ElementByteSizeObserver observer) throws Exception {
+      InstantCoder.of().registerByteSizeObserver(value.getTimestamp(), observer);
+      windowsCoder.registerByteSizeObserver(value.getWindows(), observer);
+      PaneInfoCoder.INSTANCE.registerByteSizeObserver(value.getPane(), observer);
+      valueCoder.registerByteSizeObserver(value.getValue(), observer);
     }
 
     @Override
@@ -733,9 +732,9 @@ public abstract class WindowedValue<T> {
 
     @Override
     public void registerByteSizeObserver(
-        WindowedValue<T> value, ElementByteSizeObserver observer, Context context)
+        WindowedValue<T> value, ElementByteSizeObserver observer)
         throws Exception {
-      valueCoder.registerByteSizeObserver(value.getValue(), observer, context);
+      valueCoder.registerByteSizeObserver(value.getValue(), observer);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigDecimalCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigDecimalCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigDecimalCoderTest.java
index 8aa2604..a8a1bc8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigDecimalCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigDecimalCoderTest.java
@@ -100,10 +100,11 @@ public class BigDecimalCoderTest {
   public void testGetEncodedElementByteSize() throws Exception {
     TestElementByteSizeObserver observer = new TestElementByteSizeObserver();
     for (BigDecimal value : TEST_VALUES) {
-      TEST_CODER.registerByteSizeObserver(value, observer, Coder.Context.OUTER);
+      TEST_CODER.registerByteSizeObserver(value, observer);
       observer.advance();
       assertThat(observer.getSumAndReset(),
-          equalTo((long) CoderUtils.encodeToByteArray(TEST_CODER, value).length));
+          equalTo((long) CoderUtils.encodeToByteArray(
+              TEST_CODER, value, Coder.Context.NESTED).length));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigIntegerCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigIntegerCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigIntegerCoderTest.java
index e2fd012..bfd6b4a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigIntegerCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigIntegerCoderTest.java
@@ -75,11 +75,12 @@ public class BigIntegerCoderTest {
   public void testGetEncodedElementByteSize() throws Exception {
     TestElementByteSizeObserver observer = new TestElementByteSizeObserver();
     for (BigInteger value : TEST_VALUES) {
-      TEST_CODER.registerByteSizeObserver(value, observer, Coder.Context.OUTER);
+      TEST_CODER.registerByteSizeObserver(value, observer);
       observer.advance();
       assertThat(
           observer.getSumAndReset(),
-          equalTo((long) CoderUtils.encodeToByteArray(TEST_CODER, value).length));
+          equalTo((long) CoderUtils.encodeToByteArray(
+              TEST_CODER, value, Coder.Context.NESTED).length));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
index 9568324..7ca7fb9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
@@ -399,13 +399,13 @@ public class CoderRegistryTest {
     }
 
     @Override
-    public boolean isRegisterByteSizeObserverCheap(MyValue value, Context context) {
+    public boolean isRegisterByteSizeObserverCheap(MyValue value) {
       return true;
     }
 
     @Override
     public void registerByteSizeObserver(
-        MyValue value, ElementByteSizeObserver observer, Context context)
+        MyValue value, ElementByteSizeObserver observer)
         throws Exception {
       observer.update(0L);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
index fa81a7c..9a09b86 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
@@ -64,23 +64,22 @@ public class LengthPrefixCoderTest {
 
   @Test
   public void testEncodedSize() throws Exception {
-    assertEquals(4L,
-        TEST_CODER.getEncodedElementByteSize(TEST_VALUES.get(0), Coder.Context.NESTED));
-    assertEquals(4L,
-        TEST_CODER.getEncodedElementByteSize(TEST_VALUES.get(0), Coder.Context.OUTER));
+    assertEquals(5L,
+        TEST_CODER.getEncodedElementByteSize(TEST_VALUES.get(0)));
   }
 
   @Test
   public void testObserverIsCheap() throws Exception {
-    NullableCoder<Double> coder = NullableCoder.of(DoubleCoder.of());
-    assertTrue(coder.isRegisterByteSizeObserverCheap(5.0, Coder.Context.OUTER));
+    LengthPrefixCoder<Double> coder = LengthPrefixCoder.of(DoubleCoder.of());
+    assertTrue(coder.isRegisterByteSizeObserverCheap(5.0));
   }
 
   @Test
   public void testObserverIsNotCheap() throws Exception {
-    NullableCoder<List<String>> coder = NullableCoder.of(ListCoder.of(StringUtf8Coder.of()));
+    LengthPrefixCoder<List<String>> coder =
+        LengthPrefixCoder.of(ListCoder.of(StringUtf8Coder.of()));
     assertFalse(coder.isRegisterByteSizeObserverCheap(
-        ImmutableList.of("hi", "test"), Coder.Context.OUTER));
+        ImmutableList.of("hi", "test")));
   }
 
   @Test
@@ -92,11 +91,10 @@ public class LengthPrefixCoderTest {
 
   @Test
   public void testRegisterByteSizeObserver() throws Exception {
-    CoderProperties.testByteCount(TEST_CODER, Coder.Context.OUTER,
-        new byte[][]{{ 0xa, 0xb, 0xc }});
-
-    CoderProperties.testByteCount(TEST_CODER, Coder.Context.NESTED,
-        new byte[][]{{ 0xa, 0xb, 0xc }, {}, {}, { 0xd, 0xe }, {}});
+    CoderProperties.testByteCount(
+        LengthPrefixCoder.of(VarIntCoder.of()),
+        Coder.Context.NESTED,
+        new Integer[]{0, 10, 1000});
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
index c0a4bed..d6d7de8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
@@ -98,38 +98,34 @@ public class NullableCoderTest {
   @Test
   public void testEncodedSize() throws Exception {
     NullableCoder<Double> coder = NullableCoder.of(DoubleCoder.of());
-    assertEquals(1, coder.getEncodedElementByteSize(null, Coder.Context.OUTER));
-    assertEquals(9, coder.getEncodedElementByteSize(5.0, Coder.Context.OUTER));
+    assertEquals(1, coder.getEncodedElementByteSize(null));
+    assertEquals(9, coder.getEncodedElementByteSize(5.0));
   }
 
   @Test
   public void testEncodedSizeNested() throws Exception {
     NullableCoder<String> varLenCoder = NullableCoder.of(StringUtf8Coder.of());
-
-    assertEquals(1, varLenCoder.getEncodedElementByteSize(null, Context.OUTER));
-    assertEquals(1, varLenCoder.getEncodedElementByteSize(null, Context.NESTED));
-
-    assertEquals(5, varLenCoder.getEncodedElementByteSize("spam", Context.OUTER));
-    assertEquals(6, varLenCoder.getEncodedElementByteSize("spam", Context.NESTED));
+    assertEquals(1, varLenCoder.getEncodedElementByteSize(null));
+    assertEquals(6, varLenCoder.getEncodedElementByteSize("spam"));
   }
 
   @Test
   public void testObserverIsCheap() throws Exception {
     NullableCoder<Double> coder = NullableCoder.of(DoubleCoder.of());
-    assertTrue(coder.isRegisterByteSizeObserverCheap(5.0, Coder.Context.OUTER));
+    assertTrue(coder.isRegisterByteSizeObserverCheap(5.0));
   }
 
   @Test
   public void testObserverIsNotCheap() throws Exception {
     NullableCoder<List<String>> coder = NullableCoder.of(ListCoder.of(StringUtf8Coder.of()));
     assertFalse(coder.isRegisterByteSizeObserverCheap(
-        ImmutableList.of("hi", "test"), Coder.Context.OUTER));
+        ImmutableList.of("hi", "test")));
   }
 
   @Test
   public void testObserverIsAlwaysCheapForNullValues() throws Exception {
     NullableCoder<List<String>> coder = NullableCoder.of(ListCoder.of(StringUtf8Coder.of()));
-    assertTrue(coder.isRegisterByteSizeObserverCheap(null, Coder.Context.OUTER));
+    assertTrue(coder.isRegisterByteSizeObserverCheap(null));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
index 2ef892c..83f348c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
@@ -109,13 +109,13 @@ public class PAssertTest implements Serializable {
     }
 
     @Override
-    public boolean isRegisterByteSizeObserverCheap(NotSerializableObject value, Context context) {
+    public boolean isRegisterByteSizeObserverCheap(NotSerializableObject value) {
       return true;
     }
 
     @Override
     public void registerByteSizeObserver(
-        NotSerializableObject value, ElementByteSizeObserver observer, Context context)
+        NotSerializableObject value, ElementByteSizeObserver observer)
         throws Exception {
       observer.update(0L);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index 12619e0..a70af94 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -895,16 +895,16 @@ public class CombineTest implements Serializable {
 
       @Override
       public boolean isRegisterByteSizeObserverCheap(
-          CountSum value, Context context) {
+          CountSum value) {
         return true;
       }
 
       @Override
       public void registerByteSizeObserver(
-          CountSum value, ElementByteSizeObserver observer, Context context)
+          CountSum value, ElementByteSizeObserver observer)
           throws Exception {
-        LONG_CODER.registerByteSizeObserver(value.count, observer, context.nested());
-        DOUBLE_CODER.registerByteSizeObserver(value.sum, observer, context);
+        LONG_CODER.registerByteSizeObserver(value.count, observer);
+        DOUBLE_CODER.registerByteSizeObserver(value.sum, observer);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index cbbe7f1..d2cb980 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -997,13 +997,13 @@ public class ParDoTest implements Serializable {
     }
 
     @Override
-    public boolean isRegisterByteSizeObserverCheap(TestDummy value, Context context) {
+    public boolean isRegisterByteSizeObserverCheap(TestDummy value) {
       return true;
     }
 
     @Override
     public void registerByteSizeObserver(
-        TestDummy value, ElementByteSizeObserver observer, Context context)
+        TestDummy value, ElementByteSizeObserver observer)
         throws Exception {
       observer.update(0L);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java
index 0781cf1..325c69d 100644
--- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java
+++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java
@@ -76,12 +76,8 @@ public class ByteStringCoder extends AtomicCoder<ByteString> {
   }
 
   @Override
-  protected long getEncodedElementByteSize(ByteString value, Context context) throws Exception {
+  protected long getEncodedElementByteSize(ByteString value) throws Exception {
     int size = value.size();
-
-    if (context.isWholeStream) {
-      return size;
-    }
     return VarInt.getLength(size) + size;
   }
 
@@ -106,7 +102,7 @@ public class ByteStringCoder extends AtomicCoder<ByteString> {
    * <p>Returns true. {@link ByteString#size} returns the size of an array and a {@link VarInt}.
    */
   @Override
-  public boolean isRegisterByteSizeObserverCheap(ByteString value, Context context) {
+  public boolean isRegisterByteSizeObserverCheap(ByteString value) {
     return true;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoderTest.java b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoderTest.java
index 8fdb851..d1800a6 100644
--- a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoderTest.java
+++ b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoderTest.java
@@ -115,12 +115,10 @@ public class ByteStringCoderTest {
   }
 
   @Test
-  public void testEncodedElementByteSizeInAllContexts() throws Throwable {
-    for (Context context : CoderProperties.ALL_CONTEXTS) {
-      for (ByteString value : TEST_VALUES) {
-        byte[] encoded = CoderUtils.encodeToByteArray(TEST_CODER, value, context);
-        assertEquals(encoded.length, TEST_CODER.getEncodedElementByteSize(value, context));
-      }
+  public void testEncodedElementByteSize() throws Throwable {
+    for (ByteString value : TEST_VALUES) {
+      byte[] encoded = CoderUtils.encodeToByteArray(TEST_CODER, value, Context.NESTED);
+      assertEquals(encoded.length, TEST_CODER.getEncodedElementByteSize(value));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
index 7ca8958..cfec991 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
@@ -52,10 +52,10 @@ public class TableRowJsonCoder extends AtomicCoder<TableRow> {
   }
 
   @Override
-  protected long getEncodedElementByteSize(TableRow value, Context context)
+  protected long getEncodedElementByteSize(TableRow value)
       throws Exception {
     String strValue = MAPPER.writeValueAsString(value);
-    return StringUtf8Coder.of().getEncodedElementByteSize(strValue, context);
+    return StringUtf8Coder.of().getEncodedElementByteSize(strValue);
   }
 
   /////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
index 0f5dc4c..d838a0d 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
@@ -83,8 +83,8 @@ public class KafkaRecordCoder<K, V> extends StructuredCoder<KafkaRecord<K, V>> {
   }
 
   @Override
-  public boolean isRegisterByteSizeObserverCheap(KafkaRecord<K, V> value, Context context) {
-    return kvCoder.isRegisterByteSizeObserverCheap(value.getKV(), context);
+  public boolean isRegisterByteSizeObserverCheap(KafkaRecord<K, V> value) {
+    return kvCoder.isRegisterByteSizeObserverCheap(value.getKV());
     //TODO : do we have to implement getEncodedSize()?
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
index 75a4619..5b2ec02 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.io.xml;
 
 import com.google.common.io.ByteStreams;
+import java.io.ByteArrayOutputStream;
 import java.io.FilterInputStream;
 import java.io.FilterOutputStream;
 import java.io.IOException;
@@ -87,37 +88,40 @@ public class JAXBCoder<T> extends CustomCoder<T> {
   }
 
   @Override
-  public void encode(T value, OutputStream outStream, Context context)
-      throws CoderException, IOException {
+  public void encode(T value, OutputStream outStream) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
     try {
-      if (!context.isWholeStream) {
-        try {
-          long size = getEncodedElementByteSize(value, Context.OUTER);
-          // record the number of bytes the XML consists of so when reading we only read the encoded
-          // value
-          VarInt.encode(size, outStream);
-        } catch (Exception e) {
-          throw new CoderException(
-              "An Exception occured while trying to get the size of an encoded representation", e);
-        }
-      }
-
-      jaxbMarshaller.get().marshal(value, new CloseIgnoringOutputStream(outStream));
+      jaxbMarshaller.get().marshal(value, baos);
     } catch (JAXBException e) {
       throw new CoderException(e);
     }
+    VarInt.encode(baos.size(), outStream);
+    baos.writeTo(outStream);
+  }
+
+  @Override
+  public void encode(T value, OutputStream outStream, Context context)
+      throws CoderException, IOException {
+    if (context.isWholeStream) {
+      try {
+        jaxbMarshaller.get().marshal(value, new CloseIgnoringOutputStream(outStream));
+      } catch (JAXBException e) {
+        throw new CoderException(e);
+      }
+    } else {
+      encode(value, outStream);
+    }
   }
 
   @Override
   public T decode(InputStream inStream, Context context) throws CoderException, IOException {
     try {
-      InputStream stream = inStream;
       if (!context.isWholeStream) {
         long limit = VarInt.decodeLong(inStream);
-        stream = ByteStreams.limit(inStream, limit);
+        inStream = ByteStreams.limit(inStream, limit);
       }
       @SuppressWarnings("unchecked")
-      T obj = (T) jaxbUnmarshaller.get().unmarshal(new CloseIgnoringInputStream(stream));
+      T obj = (T) jaxbUnmarshaller.get().unmarshal(new CloseIgnoringInputStream(inStream));
       return obj;
     } catch (JAXBException e) {
       throw new CoderException(e);