You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/05/07 03:34:11 UTC
[2/3] beam git commit: Remove uses of context in coder size
estimation calls.
Remove uses of context in coder size estimation calls.
find . -type f -name '*.java' | xargs sed -i '' 's/\([a-zA-Z]*[bB]yteSize[a-zA-Z]*[(].*\), [^,]*[Cc]ontext[^,()]*\([(][)]\)*[)]/\1)/'
plus a one-off Python script and some manual fixups.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/96de8d73
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/96de8d73
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/96de8d73
Branch: refs/heads/master
Commit: 96de8d735c40ee5f823ff17966992c433d68f296
Parents: 78a99be
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Fri May 5 15:23:03 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Sat May 6 20:33:20 2017 -0700
----------------------------------------------------------------------
runners/google-cloud-dataflow-java/pom.xml | 2 +-
.../runners/dataflow/internal/IsmFormat.java | 8 ++--
.../runners/dataflow/util/RandomAccessData.java | 11 ++----
.../dataflow/util/RandomAccessDataTest.java | 6 +--
.../apache/beam/sdk/coders/BigDecimalCoder.java | 8 ++--
.../beam/sdk/coders/BigEndianIntegerCoder.java | 4 +-
.../beam/sdk/coders/BigEndianLongCoder.java | 4 +-
.../apache/beam/sdk/coders/BigIntegerCoder.java | 6 +--
.../apache/beam/sdk/coders/ByteArrayCoder.java | 10 ++---
.../org/apache/beam/sdk/coders/ByteCoder.java | 4 +-
.../org/apache/beam/sdk/coders/DoubleCoder.java | 4 +-
.../apache/beam/sdk/coders/DurationCoder.java | 8 ++--
.../apache/beam/sdk/coders/InstantCoder.java | 8 ++--
.../beam/sdk/coders/IterableLikeCoder.java | 9 ++---
.../org/apache/beam/sdk/coders/KvCoder.java | 12 +++---
.../beam/sdk/coders/LengthPrefixCoder.java | 11 +++---
.../org/apache/beam/sdk/coders/MapCoder.java | 10 ++---
.../apache/beam/sdk/coders/NullableCoder.java | 14 +++----
.../beam/sdk/coders/StringDelegateCoder.java | 1 -
.../apache/beam/sdk/coders/StringUtf8Coder.java | 16 ++------
.../beam/sdk/coders/TextualIntegerCoder.java | 4 +-
.../org/apache/beam/sdk/coders/VarIntCoder.java | 4 +-
.../apache/beam/sdk/coders/VarLongCoder.java | 4 +-
.../org/apache/beam/sdk/coders/VoidCoder.java | 4 +-
.../beam/sdk/testing/CoderProperties.java | 6 ++-
.../sdk/transforms/ApproximateQuantiles.java | 22 ++++-------
.../org/apache/beam/sdk/transforms/Count.java | 4 +-
.../org/apache/beam/sdk/transforms/Top.java | 9 ++---
.../beam/sdk/transforms/join/UnionCoder.java | 8 ++--
.../org/apache/beam/sdk/util/BitSetCoder.java | 1 -
.../org/apache/beam/sdk/util/WindowedValue.java | 15 ++++----
.../beam/sdk/coders/BigDecimalCoderTest.java | 5 ++-
.../beam/sdk/coders/BigIntegerCoderTest.java | 5 ++-
.../beam/sdk/coders/CoderRegistryTest.java | 4 +-
.../beam/sdk/coders/LengthPrefixCoderTest.java | 24 ++++++------
.../beam/sdk/coders/NullableCoderTest.java | 18 ++++-----
.../apache/beam/sdk/testing/PAssertTest.java | 4 +-
.../apache/beam/sdk/transforms/CombineTest.java | 8 ++--
.../apache/beam/sdk/transforms/ParDoTest.java | 4 +-
.../extensions/protobuf/ByteStringCoder.java | 8 +---
.../protobuf/ByteStringCoderTest.java | 10 ++---
.../sdk/io/gcp/bigquery/TableRowJsonCoder.java | 4 +-
.../beam/sdk/io/kafka/KafkaRecordCoder.java | 4 +-
.../org/apache/beam/sdk/io/xml/JAXBCoder.java | 40 +++++++++++---------
44 files changed, 168 insertions(+), 207 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index 09901d5..b579041 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -33,7 +33,7 @@
<packaging>jar</packaging>
<properties>
- <dataflow.container_version>beam-master-20170505-wd-2914</dataflow.container_version>
+ <dataflow.container_version>beam-master-20170506</dataflow.container_version>
<dataflow.fnapi_environment_major_version>1</dataflow.fnapi_environment_major_version>
<dataflow.legacy_environment_major_version>6</dataflow.legacy_environment_major_version>
</properties>
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
index aed514a..00e0c54 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
@@ -704,12 +704,12 @@ public class IsmFormat {
}
@Override
- public boolean isRegisterByteSizeObserverCheap(KeyPrefix value, Coder.Context context) {
+ public boolean isRegisterByteSizeObserverCheap(KeyPrefix value) {
return true;
}
@Override
- public long getEncodedElementByteSize(KeyPrefix value, Coder.Context context)
+ public long getEncodedElementByteSize(KeyPrefix value)
throws Exception {
checkNotNull(value);
return VarInt.getLength(value.getSharedKeySize())
@@ -786,12 +786,12 @@ public class IsmFormat {
}
@Override
- public boolean isRegisterByteSizeObserverCheap(Footer value, Coder.Context context) {
+ public boolean isRegisterByteSizeObserverCheap(Footer value) {
return true;
}
@Override
- public long getEncodedElementByteSize(Footer value, Coder.Context context)
+ public long getEncodedElementByteSize(Footer value)
throws Exception {
return Footer.FIXED_LENGTH;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
index 4e94515..f36bd78 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
@@ -96,22 +96,17 @@ public class RandomAccessData {
}
@Override
- public boolean isRegisterByteSizeObserverCheap(
- RandomAccessData value, Coder.Context context) {
+ public boolean isRegisterByteSizeObserverCheap(RandomAccessData value) {
return true;
}
@Override
- protected long getEncodedElementByteSize(RandomAccessData value, Coder.Context context)
+ protected long getEncodedElementByteSize(RandomAccessData value)
throws Exception {
if (value == null) {
throw new CoderException("cannot encode a null in memory stream");
}
- long size = 0;
- if (!context.isWholeStream) {
- size += VarInt.getLength(value.size);
- }
- return size + value.size;
+ return VarInt.getLength(value.size) + value.size;
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/RandomAccessDataTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/RandomAccessDataTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/RandomAccessDataTest.java
index 042e145..5a7908c 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/RandomAccessDataTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/RandomAccessDataTest.java
@@ -61,10 +61,8 @@ public class RandomAccessDataTest {
CoderProperties.coderSerializable(RandomAccessDataCoder.of());
CoderProperties.structuralValueConsistentWithEquals(
RandomAccessDataCoder.of(), streamA, streamB);
- assertTrue(RandomAccessDataCoder.of().isRegisterByteSizeObserverCheap(streamA, Context.NESTED));
- assertTrue(RandomAccessDataCoder.of().isRegisterByteSizeObserverCheap(streamA, Context.OUTER));
- assertEquals(4, RandomAccessDataCoder.of().getEncodedElementByteSize(streamA, Context.NESTED));
- assertEquals(3, RandomAccessDataCoder.of().getEncodedElementByteSize(streamA, Context.OUTER));
+ assertTrue(RandomAccessDataCoder.of().isRegisterByteSizeObserverCheap(streamA));
+ assertEquals(4, RandomAccessDataCoder.of().getEncodedElementByteSize(streamA));
}
@Test
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
index aadf085..97559a9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
@@ -85,7 +85,7 @@ public class BigDecimalCoder extends AtomicCoder<BigDecimal> {
* @return {@code true}, because {@link #getEncodedElementByteSize} runs in constant time.
*/
@Override
- public boolean isRegisterByteSizeObserverCheap(BigDecimal value, Context context) {
+ public boolean isRegisterByteSizeObserverCheap(BigDecimal value) {
return true;
}
@@ -97,9 +97,9 @@ public class BigDecimalCoder extends AtomicCoder<BigDecimal> {
* representation of the {@link BigInteger} that, when scaled, equals the given value.
*/
@Override
- protected long getEncodedElementByteSize(BigDecimal value, Context context) throws Exception {
+ protected long getEncodedElementByteSize(BigDecimal value) throws Exception {
checkNotNull(value, String.format("cannot encode a null %s", BigDecimal.class.getSimpleName()));
- return VAR_INT_CODER.getEncodedElementByteSize(value.scale(), context.nested())
- + BIG_INT_CODER.getEncodedElementByteSize(value.unscaledValue(), context);
+ return VAR_INT_CODER.getEncodedElementByteSize(value.scale())
+ + BIG_INT_CODER.getEncodedElementByteSize(value.unscaledValue());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
index c3c7a96..a61f099 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
@@ -82,7 +82,7 @@ public class BigEndianIntegerCoder extends AtomicCoder<Integer> {
* @return {@code true}, because {@link #getEncodedElementByteSize} runs in constant time.
*/
@Override
- public boolean isRegisterByteSizeObserverCheap(Integer value, Context context) {
+ public boolean isRegisterByteSizeObserverCheap(Integer value) {
return true;
}
@@ -97,7 +97,7 @@ public class BigEndianIntegerCoder extends AtomicCoder<Integer> {
* @return {@code 4}, the size in bytes of an integer's big endian encoding.
*/
@Override
- protected long getEncodedElementByteSize(Integer value, Context context)
+ protected long getEncodedElementByteSize(Integer value)
throws Exception {
if (value == null) {
throw new CoderException("cannot encode a null Integer");
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
index 5ef4878..868160a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
@@ -82,7 +82,7 @@ public class BigEndianLongCoder extends AtomicCoder<Long> {
* @return {@code true}, since {@link #getEncodedElementByteSize} returns a constant.
*/
@Override
- public boolean isRegisterByteSizeObserverCheap(Long value, Context context) {
+ public boolean isRegisterByteSizeObserverCheap(Long value) {
return true;
}
@@ -97,7 +97,7 @@ public class BigEndianLongCoder extends AtomicCoder<Long> {
* @return {@code 8}, the byte size of a big-endian encoded {@code Long}.
*/
@Override
- protected long getEncodedElementByteSize(Long value, Context context)
+ protected long getEncodedElementByteSize(Long value)
throws Exception {
if (value == null) {
throw new CoderException("cannot encode a null Long");
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java
index 6d14d17..3b038af 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java
@@ -75,7 +75,7 @@ public class BigIntegerCoder extends AtomicCoder<BigInteger> {
* @return {@code true}, because {@link #getEncodedElementByteSize} runs in constant time.
*/
@Override
- public boolean isRegisterByteSizeObserverCheap(BigInteger value, Context context) {
+ public boolean isRegisterByteSizeObserverCheap(BigInteger value) {
return true;
}
@@ -85,8 +85,8 @@ public class BigIntegerCoder extends AtomicCoder<BigInteger> {
* @return the size of the encoding as a byte array according to {@link ByteArrayCoder}
*/
@Override
- protected long getEncodedElementByteSize(BigInteger value, Context context) throws Exception {
+ protected long getEncodedElementByteSize(BigInteger value) throws Exception {
checkNotNull(value, String.format("cannot encode a null %s", BigInteger.class.getSimpleName()));
- return BYTE_ARRAY_CODER.getEncodedElementByteSize(value.toByteArray(), context);
+ return BYTE_ARRAY_CODER.getEncodedElementByteSize(value.toByteArray());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
index d83d832..c9393a1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
@@ -126,7 +126,7 @@ public class ByteArrayCoder extends AtomicCoder<byte[]> {
* constant time using the {@code length} of the provided array.
*/
@Override
- public boolean isRegisterByteSizeObserverCheap(byte[] value, Context context) {
+ public boolean isRegisterByteSizeObserverCheap(byte[] value) {
return true;
}
@@ -136,15 +136,11 @@ public class ByteArrayCoder extends AtomicCoder<byte[]> {
}
@Override
- protected long getEncodedElementByteSize(byte[] value, Context context)
+ protected long getEncodedElementByteSize(byte[] value)
throws Exception {
if (value == null) {
throw new CoderException("cannot encode a null byte[]");
}
- long size = 0;
- if (!context.isWholeStream) {
- size += VarInt.getLength(value.length);
- }
- return size + value.length;
+ return VarInt.getLength(value.length) + value.length;
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
index 6e4318e..7f449d6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
@@ -91,7 +91,7 @@ public class ByteCoder extends AtomicCoder<Byte> {
* @return {@code true}. {@link ByteCoder#getEncodedElementByteSize} returns a constant.
*/
@Override
- public boolean isRegisterByteSizeObserverCheap(Byte value, Context context) {
+ public boolean isRegisterByteSizeObserverCheap(Byte value) {
return true;
}
@@ -106,7 +106,7 @@ public class ByteCoder extends AtomicCoder<Byte> {
* @return {@code 1}, the byte size of a {@link Byte} encoded using Java serialization.
*/
@Override
- protected long getEncodedElementByteSize(Byte value, Context context)
+ protected long getEncodedElementByteSize(Byte value)
throws Exception {
if (value == null) {
throw new CoderException("cannot estimate size for unsupported null value");
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
index 12bc5e8..8eff6ba 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
@@ -93,7 +93,7 @@ public class DoubleCoder extends AtomicCoder<Double> {
* @return {@code true}. {@link DoubleCoder#getEncodedElementByteSize} returns a constant.
*/
@Override
- public boolean isRegisterByteSizeObserverCheap(Double value, Context context) {
+ public boolean isRegisterByteSizeObserverCheap(Double value) {
return true;
}
@@ -108,7 +108,7 @@ public class DoubleCoder extends AtomicCoder<Double> {
* @return {@code 8}, the byte size of a {@link Double} encoded using Java serialization.
*/
@Override
- protected long getEncodedElementByteSize(Double value, Context context)
+ protected long getEncodedElementByteSize(Double value)
throws Exception {
if (value == null) {
throw new CoderException("cannot encode a null Double");
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
index 7b49d1f..8b4ae1d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
@@ -89,14 +89,14 @@ public class DurationCoder extends AtomicCoder<ReadableDuration> {
* @return {@code true}, because it is cheap to ascertain the byte size of a long.
*/
@Override
- public boolean isRegisterByteSizeObserverCheap(ReadableDuration value, Context context) {
- return LONG_CODER.isRegisterByteSizeObserverCheap(toLong(value), context);
+ public boolean isRegisterByteSizeObserverCheap(ReadableDuration value) {
+ return LONG_CODER.isRegisterByteSizeObserverCheap(toLong(value));
}
@Override
public void registerByteSizeObserver(
- ReadableDuration value, ElementByteSizeObserver observer, Context context) throws Exception {
- LONG_CODER.registerByteSizeObserver(toLong(value), observer, context);
+ ReadableDuration value, ElementByteSizeObserver observer) throws Exception {
+ LONG_CODER.registerByteSizeObserver(toLong(value), observer);
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
index 56ed12b..000f406 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
@@ -103,16 +103,16 @@ public class InstantCoder extends AtomicCoder<Instant> {
* @return {@code true}. The byte size for a big endian long is a constant.
*/
@Override
- public boolean isRegisterByteSizeObserverCheap(Instant value, Context context) {
+ public boolean isRegisterByteSizeObserverCheap(Instant value) {
return LONG_CODER.isRegisterByteSizeObserverCheap(
- ORDER_PRESERVING_CONVERTER.convert(value), context);
+ ORDER_PRESERVING_CONVERTER.convert(value));
}
@Override
public void registerByteSizeObserver(
- Instant value, ElementByteSizeObserver observer, Context context) throws Exception {
+ Instant value, ElementByteSizeObserver observer) throws Exception {
LONG_CODER.registerByteSizeObserver(
- ORDER_PRESERVING_CONVERTER.convert(value), observer, context);
+ ORDER_PRESERVING_CONVERTER.convert(value), observer);
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
index 52b9c34..9994b3f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java
@@ -170,18 +170,17 @@ public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>>
*/
@Override
public boolean isRegisterByteSizeObserverCheap(
- IterableT iterable, Context context) {
+ IterableT iterable) {
return iterable instanceof ElementByteSizeObservableIterable;
}
@Override
public void registerByteSizeObserver(
- IterableT iterable, ElementByteSizeObserver observer, Context context)
+ IterableT iterable, ElementByteSizeObserver observer)
throws Exception {
if (iterable == null) {
throw new CoderException("cannot encode a null Iterable");
}
- Context nestedContext = context.nested();
if (iterable instanceof ElementByteSizeObservableIterable) {
observer.setLazy();
@@ -196,7 +195,7 @@ public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>>
Collection<T> collection = (Collection<T>) iterable;
observer.update(4L);
for (T elem : collection) {
- elementCoder.registerByteSizeObserver(elem, observer, nestedContext);
+ elementCoder.registerByteSizeObserver(elem, observer);
}
} else {
// TODO: (BEAM-1537) Update to use an accurate count depending on size and count,
@@ -208,7 +207,7 @@ public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>>
long count = 0;
for (T elem : iterable) {
count += 1;
- elementCoder.registerByteSizeObserver(elem, observer, nestedContext);
+ elementCoder.registerByteSizeObserver(elem, observer);
}
if (count > 0) {
// Update the length based upon the number of counted elements, this helps
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
index da7f03c..1df4460 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
@@ -105,9 +105,9 @@ public class KvCoder<K, V> extends StructuredCoder<KV<K, V>> {
* Returns whether both keyCoder and valueCoder are considered not expensive.
*/
@Override
- public boolean isRegisterByteSizeObserverCheap(KV<K, V> kv, Context context) {
- return keyCoder.isRegisterByteSizeObserverCheap(kv.getKey(), context.nested())
- && valueCoder.isRegisterByteSizeObserverCheap(kv.getValue(), context);
+ public boolean isRegisterByteSizeObserverCheap(KV<K, V> kv) {
+ return keyCoder.isRegisterByteSizeObserverCheap(kv.getKey())
+ && valueCoder.isRegisterByteSizeObserverCheap(kv.getValue());
}
/**
@@ -116,13 +116,13 @@ public class KvCoder<K, V> extends StructuredCoder<KV<K, V>> {
*/
@Override
public void registerByteSizeObserver(
- KV<K, V> kv, ElementByteSizeObserver observer, Context context)
+ KV<K, V> kv, ElementByteSizeObserver observer)
throws Exception {
if (kv == null) {
throw new CoderException("cannot encode a null KV");
}
- keyCoder.registerByteSizeObserver(kv.getKey(), observer, context.nested());
- valueCoder.registerByteSizeObserver(kv.getValue(), observer, context);
+ keyCoder.registerByteSizeObserver(kv.getKey(), observer);
+ valueCoder.registerByteSizeObserver(kv.getValue(), observer);
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
index 685e766..7dd2a32 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
@@ -107,18 +107,17 @@ public class LengthPrefixCoder<T> extends StructuredCoder<T> {
* {@inheritDoc}
*/
@Override
- protected long getEncodedElementByteSize(T value, Context context) throws Exception {
+ protected long getEncodedElementByteSize(T value) throws Exception {
if (valueCoder instanceof StructuredCoder) {
// If valueCoder is a StructuredCoder then we can ask it directly for the encoded size of
// the value, adding the number of bytes to represent the length.
- long valueSize = ((StructuredCoder<T>) valueCoder).getEncodedElementByteSize(
- value, Context.OUTER);
+ long valueSize = ((StructuredCoder<T>) valueCoder).getEncodedElementByteSize(value);
return VarInt.getLength(valueSize) + valueSize;
}
// If value is not a StructuredCoder then fall back to the default StructuredCoder behavior
// of encoding and counting the bytes. The encoding will include the length prefix.
- return super.getEncodedElementByteSize(value, context);
+ return super.getEncodedElementByteSize(value);
}
/**
@@ -127,7 +126,7 @@ public class LengthPrefixCoder<T> extends StructuredCoder<T> {
* {@inheritDoc}
*/
@Override
- public boolean isRegisterByteSizeObserverCheap(@Nullable T value, Context context) {
- return valueCoder.isRegisterByteSizeObserverCheap(value, Context.OUTER);
+ public boolean isRegisterByteSizeObserverCheap(@Nullable T value) {
+ return valueCoder.isRegisterByteSizeObserverCheap(value);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
index 9e3c768..7df9ca9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
@@ -146,7 +146,7 @@ public class MapCoder<K, V> extends StructuredCoder<Map<K, V>> {
@Override
public void registerByteSizeObserver(
- Map<K, V> map, ElementByteSizeObserver observer, Context context)
+ Map<K, V> map, ElementByteSizeObserver observer)
throws Exception {
observer.update(4L);
if (map.isEmpty()) {
@@ -155,12 +155,12 @@ public class MapCoder<K, V> extends StructuredCoder<Map<K, V>> {
Iterator<Entry<K, V>> entries = map.entrySet().iterator();
Entry<K, V> entry = entries.next();
while (entries.hasNext()) {
- keyCoder.registerByteSizeObserver(entry.getKey(), observer, context.nested());
- valueCoder.registerByteSizeObserver(entry.getValue(), observer, context.nested());
+ keyCoder.registerByteSizeObserver(entry.getKey(), observer);
+ valueCoder.registerByteSizeObserver(entry.getValue(), observer);
entry = entries.next();
}
- keyCoder.registerByteSizeObserver(entry.getKey(), observer, context.nested());
- valueCoder.registerByteSizeObserver(entry.getValue(), observer, context);
+ keyCoder.registerByteSizeObserver(entry.getKey(), observer);
+ valueCoder.registerByteSizeObserver(entry.getValue(), observer);
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
index d1eea9a..e46591e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
@@ -127,10 +127,10 @@ public class NullableCoder<T> extends StructuredCoder<T> {
*/
@Override
public void registerByteSizeObserver(
- @Nullable T value, ElementByteSizeObserver observer, Context context) throws Exception {
+ @Nullable T value, ElementByteSizeObserver observer) throws Exception {
observer.update(1);
if (value != null) {
- valueCoder.registerByteSizeObserver(value, observer, context);
+ valueCoder.registerByteSizeObserver(value, observer);
}
}
@@ -142,7 +142,7 @@ public class NullableCoder<T> extends StructuredCoder<T> {
* {@inheritDoc}
*/
@Override
- protected long getEncodedElementByteSize(@Nullable T value, Context context) throws Exception {
+ protected long getEncodedElementByteSize(@Nullable T value) throws Exception {
if (value == null) {
return 1;
}
@@ -151,12 +151,12 @@ public class NullableCoder<T> extends StructuredCoder<T> {
// If valueCoder is a StructuredCoder then we can ask it directly for the encoded size of
// the value, adding 1 byte to count the null indicator.
return 1 + ((StructuredCoder<T>) valueCoder)
- .getEncodedElementByteSize(value, context);
+ .getEncodedElementByteSize(value);
}
// If value is not a StructuredCoder then fall back to the default StructuredCoder behavior
// of encoding and counting the bytes. The encoding will include the null indicator byte.
- return super.getEncodedElementByteSize(value, context);
+ return super.getEncodedElementByteSize(value);
}
/**
@@ -165,11 +165,11 @@ public class NullableCoder<T> extends StructuredCoder<T> {
* {@inheritDoc}
*/
@Override
- public boolean isRegisterByteSizeObserverCheap(@Nullable T value, Context context) {
+ public boolean isRegisterByteSizeObserverCheap(@Nullable T value) {
if (value == null) {
return true;
}
- return valueCoder.isRegisterByteSizeObserverCheap(value, context);
+ return valueCoder.isRegisterByteSizeObserverCheap(value);
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
index 39a1658..1f4538f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
@@ -125,4 +125,3 @@ public final class StringDelegateCoder<T> extends CustomCoder<T> {
return delegateCoder.getEncodedTypeDescriptor();
}
}
-
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
index 42931ca..44856e8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
@@ -18,8 +18,6 @@
package org.apache.beam.sdk.coders;
import com.google.common.base.Utf8;
-import com.google.common.io.ByteStreams;
-import com.google.common.io.CountingOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
@@ -128,20 +126,12 @@ public class StringUtf8Coder extends AtomicCoder<String> {
* the byte size of the encoding plus the encoded length prefix.
*/
@Override
- public long getEncodedElementByteSize(String value, Context context)
+ public long getEncodedElementByteSize(String value)
throws Exception {
if (value == null) {
throw new CoderException("cannot encode a null String");
}
- if (context.isWholeStream) {
- return Utf8.encodedLength(value);
- } else {
- try (CountingOutputStream countingStream =
- new CountingOutputStream(ByteStreams.nullOutputStream())) {
- DataOutputStream stream = new DataOutputStream(countingStream);
- writeString(value, stream);
- return countingStream.getCount();
- }
- }
+ int size = Utf8.encodedLength(value);
+ return VarInt.getLength(size) + size;
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
index 9743c4c..718811c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
@@ -70,11 +70,11 @@ public class TextualIntegerCoder extends AtomicCoder<Integer> {
}
@Override
- protected long getEncodedElementByteSize(Integer value, Context context) throws Exception {
+ protected long getEncodedElementByteSize(Integer value) throws Exception {
if (value == null) {
throw new CoderException("cannot encode a null Integer");
}
String textualValue = value.toString();
- return StringUtf8Coder.of().getEncodedElementByteSize(textualValue, context);
+ return StringUtf8Coder.of().getEncodedElementByteSize(textualValue);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
index 30f9c09..bda66bb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
@@ -83,7 +83,7 @@ public class VarIntCoder extends AtomicCoder<Integer> {
* @return {@code true}. {@link #getEncodedElementByteSize} is cheap.
*/
@Override
- public boolean isRegisterByteSizeObserverCheap(Integer value, Context context) {
+ public boolean isRegisterByteSizeObserverCheap(Integer value) {
return true;
}
@@ -93,7 +93,7 @@ public class VarIntCoder extends AtomicCoder<Integer> {
}
@Override
- protected long getEncodedElementByteSize(Integer value, Context context)
+ protected long getEncodedElementByteSize(Integer value)
throws Exception {
if (value == null) {
throw new CoderException("cannot encode a null Integer");
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
index 9a7b125..bf651c3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
@@ -89,7 +89,7 @@ public class VarLongCoder extends StructuredCoder<Long> {
* @return {@code true}. {@link #getEncodedElementByteSize} is cheap.
*/
@Override
- public boolean isRegisterByteSizeObserverCheap(Long value, Context context) {
+ public boolean isRegisterByteSizeObserverCheap(Long value) {
return true;
}
@@ -99,7 +99,7 @@ public class VarLongCoder extends StructuredCoder<Long> {
}
@Override
- protected long getEncodedElementByteSize(Long value, Context context)
+ protected long getEncodedElementByteSize(Long value)
throws Exception {
if (value == null) {
throw new CoderException("cannot encode a null Long");
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
index 829bd20..4467faa 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
@@ -67,7 +67,7 @@ public class VoidCoder extends AtomicCoder<Void> {
* @return {@code true}. {@link VoidCoder#getEncodedElementByteSize} runs in constant time.
*/
@Override
- public boolean isRegisterByteSizeObserverCheap(Void value, Context context) {
+ public boolean isRegisterByteSizeObserverCheap(Void value) {
return true;
}
@@ -77,7 +77,7 @@ public class VoidCoder extends AtomicCoder<Void> {
}
@Override
- protected long getEncodedElementByteSize(Void value, Context context)
+ protected long getEncodedElementByteSize(Void value)
throws Exception {
return 0;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CoderProperties.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CoderProperties.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CoderProperties.java
index 6e0e264..f660f7d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CoderProperties.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CoderProperties.java
@@ -397,13 +397,15 @@ public class CoderProperties {
try (CountingOutputStream os = new CountingOutputStream(ByteStreams.nullOutputStream())) {
for (T elem : elements) {
- coder.registerByteSizeObserver(elem, observer, context);
+ coder.registerByteSizeObserver(elem, observer);
coder.encode(elem, os, context);
observer.advance();
}
long expectedLength = os.getCount();
- assertEquals(expectedLength, observer.getSum());
+ if (!context.isWholeStream) {
+ assertEquals(expectedLength, observer.getSum());
+ }
assertEquals(elements.length, observer.getCount());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
index b05f223..37d5a55 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
@@ -741,24 +741,16 @@ public class ApproximateQuantiles {
@Override
public void registerByteSizeObserver(
QuantileState<T, ComparatorT> state,
- ElementByteSizeObserver observer,
- Coder.Context context)
+ ElementByteSizeObserver observer)
throws Exception {
- Coder.Context nestedContext = context.nested();
- elementCoder.registerByteSizeObserver(
- state.min, observer, nestedContext);
- elementCoder.registerByteSizeObserver(
- state.max, observer, nestedContext);
- elementListCoder.registerByteSizeObserver(
- state.unbufferedElements, observer, nestedContext);
-
- BigEndianIntegerCoder.of().registerByteSizeObserver(
- state.buffers.size(), observer, nestedContext);
+ elementCoder.registerByteSizeObserver(state.min, observer);
+ elementCoder.registerByteSizeObserver(state.max, observer);
+ elementListCoder.registerByteSizeObserver(state.unbufferedElements, observer);
+
+ BigEndianIntegerCoder.of().registerByteSizeObserver(state.buffers.size(), observer);
for (QuantileBuffer<T> buffer : state.buffers) {
observer.update(4L + 8);
-
- elementListCoder.registerByteSizeObserver(
- buffer.elements, observer, nestedContext);
+ elementListCoder.registerByteSizeObserver(buffer.elements, observer);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
index 753e14c..497d62b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
@@ -185,12 +185,12 @@ public class Count {
}
@Override
- public boolean isRegisterByteSizeObserverCheap(long[] value, Context context) {
+ public boolean isRegisterByteSizeObserverCheap(long[] value) {
return true;
}
@Override
- protected long getEncodedElementByteSize(long[] value, Context context) {
+ protected long getEncodedElementByteSize(long[] value) {
return VarInt.getLength(value[0]);
}
};
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
index 9d5db74..c0381a7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
@@ -557,16 +557,15 @@ public class Top {
@Override
public boolean isRegisterByteSizeObserverCheap(
- BoundedHeap<T, ComparatorT> value, Context context) {
- return listCoder.isRegisterByteSizeObserverCheap(
- value.asList(), context);
+ BoundedHeap<T, ComparatorT> value) {
+ return listCoder.isRegisterByteSizeObserverCheap(value.asList());
}
@Override
public void registerByteSizeObserver(
- BoundedHeap<T, ComparatorT> value, ElementByteSizeObserver observer, Context context)
+ BoundedHeap<T, ComparatorT> value, ElementByteSizeObserver observer)
throws Exception {
- listCoder.registerByteSizeObserver(value.asList(), observer, context);
+ listCoder.registerByteSizeObserver(value.asList(), observer);
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java
index 4a2a286..3194a37 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java
@@ -100,11 +100,11 @@ public class UnionCoder extends StructuredCoder<RawUnionValue> {
* time, we defer the return value to that coder.
*/
@Override
- public boolean isRegisterByteSizeObserverCheap(RawUnionValue union, Context context) {
+ public boolean isRegisterByteSizeObserverCheap(RawUnionValue union) {
int index = getIndexForEncoding(union);
@SuppressWarnings("unchecked")
Coder<Object> coder = (Coder<Object>) elementCoders.get(index);
- return coder.isRegisterByteSizeObserverCheap(union.getValue(), context);
+ return coder.isRegisterByteSizeObserverCheap(union.getValue());
}
/**
@@ -112,7 +112,7 @@ public class UnionCoder extends StructuredCoder<RawUnionValue> {
*/
@Override
public void registerByteSizeObserver(
- RawUnionValue union, ElementByteSizeObserver observer, Context context)
+ RawUnionValue union, ElementByteSizeObserver observer)
throws Exception {
int index = getIndexForEncoding(union);
// Write out the union tag.
@@ -120,7 +120,7 @@ public class UnionCoder extends StructuredCoder<RawUnionValue> {
// Write out the actual value.
@SuppressWarnings("unchecked")
Coder<Object> coder = (Coder<Object>) elementCoders.get(index);
- coder.registerByteSizeObserver(union.getValue(), observer, context);
+ coder.registerByteSizeObserver(union.getValue(), observer);
}
/////////////////////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
index b646bf6..a0896f5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
@@ -59,4 +59,3 @@ public class BitSetCoder extends AtomicCoder<BitSet> {
BYTE_ARRAY_CODER.verifyDeterministic();
}
}
-
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
index 1e72550..1b7e335 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
@@ -670,12 +670,11 @@ public abstract class WindowedValue<T> {
@Override
public void registerByteSizeObserver(WindowedValue<T> value,
- ElementByteSizeObserver observer,
- Context context) throws Exception {
- InstantCoder.of().registerByteSizeObserver(value.getTimestamp(), observer, context.nested());
- windowsCoder.registerByteSizeObserver(value.getWindows(), observer, context.nested());
- PaneInfoCoder.INSTANCE.registerByteSizeObserver(value.getPane(), observer, context.nested());
- valueCoder.registerByteSizeObserver(value.getValue(), observer, context);
+ ElementByteSizeObserver observer) throws Exception {
+ InstantCoder.of().registerByteSizeObserver(value.getTimestamp(), observer);
+ windowsCoder.registerByteSizeObserver(value.getWindows(), observer);
+ PaneInfoCoder.INSTANCE.registerByteSizeObserver(value.getPane(), observer);
+ valueCoder.registerByteSizeObserver(value.getValue(), observer);
}
@Override
@@ -733,9 +732,9 @@ public abstract class WindowedValue<T> {
@Override
public void registerByteSizeObserver(
- WindowedValue<T> value, ElementByteSizeObserver observer, Context context)
+ WindowedValue<T> value, ElementByteSizeObserver observer)
throws Exception {
- valueCoder.registerByteSizeObserver(value.getValue(), observer, context);
+ valueCoder.registerByteSizeObserver(value.getValue(), observer);
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigDecimalCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigDecimalCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigDecimalCoderTest.java
index 8aa2604..a8a1bc8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigDecimalCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigDecimalCoderTest.java
@@ -100,10 +100,11 @@ public class BigDecimalCoderTest {
public void testGetEncodedElementByteSize() throws Exception {
TestElementByteSizeObserver observer = new TestElementByteSizeObserver();
for (BigDecimal value : TEST_VALUES) {
- TEST_CODER.registerByteSizeObserver(value, observer, Coder.Context.OUTER);
+ TEST_CODER.registerByteSizeObserver(value, observer);
observer.advance();
assertThat(observer.getSumAndReset(),
- equalTo((long) CoderUtils.encodeToByteArray(TEST_CODER, value).length));
+ equalTo((long) CoderUtils.encodeToByteArray(
+ TEST_CODER, value, Coder.Context.NESTED).length));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigIntegerCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigIntegerCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigIntegerCoderTest.java
index e2fd012..bfd6b4a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigIntegerCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigIntegerCoderTest.java
@@ -75,11 +75,12 @@ public class BigIntegerCoderTest {
public void testGetEncodedElementByteSize() throws Exception {
TestElementByteSizeObserver observer = new TestElementByteSizeObserver();
for (BigInteger value : TEST_VALUES) {
- TEST_CODER.registerByteSizeObserver(value, observer, Coder.Context.OUTER);
+ TEST_CODER.registerByteSizeObserver(value, observer);
observer.advance();
assertThat(
observer.getSumAndReset(),
- equalTo((long) CoderUtils.encodeToByteArray(TEST_CODER, value).length));
+ equalTo((long) CoderUtils.encodeToByteArray(
+ TEST_CODER, value, Coder.Context.NESTED).length));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
index 9568324..7ca7fb9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
@@ -399,13 +399,13 @@ public class CoderRegistryTest {
}
@Override
- public boolean isRegisterByteSizeObserverCheap(MyValue value, Context context) {
+ public boolean isRegisterByteSizeObserverCheap(MyValue value) {
return true;
}
@Override
public void registerByteSizeObserver(
- MyValue value, ElementByteSizeObserver observer, Context context)
+ MyValue value, ElementByteSizeObserver observer)
throws Exception {
observer.update(0L);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
index fa81a7c..9a09b86 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
@@ -64,23 +64,22 @@ public class LengthPrefixCoderTest {
@Test
public void testEncodedSize() throws Exception {
- assertEquals(4L,
- TEST_CODER.getEncodedElementByteSize(TEST_VALUES.get(0), Coder.Context.NESTED));
- assertEquals(4L,
- TEST_CODER.getEncodedElementByteSize(TEST_VALUES.get(0), Coder.Context.OUTER));
+ assertEquals(5L,
+ TEST_CODER.getEncodedElementByteSize(TEST_VALUES.get(0)));
}
@Test
public void testObserverIsCheap() throws Exception {
- NullableCoder<Double> coder = NullableCoder.of(DoubleCoder.of());
- assertTrue(coder.isRegisterByteSizeObserverCheap(5.0, Coder.Context.OUTER));
+ LengthPrefixCoder<Double> coder = LengthPrefixCoder.of(DoubleCoder.of());
+ assertTrue(coder.isRegisterByteSizeObserverCheap(5.0));
}
@Test
public void testObserverIsNotCheap() throws Exception {
- NullableCoder<List<String>> coder = NullableCoder.of(ListCoder.of(StringUtf8Coder.of()));
+ LengthPrefixCoder<List<String>> coder =
+ LengthPrefixCoder.of(ListCoder.of(StringUtf8Coder.of()));
assertFalse(coder.isRegisterByteSizeObserverCheap(
- ImmutableList.of("hi", "test"), Coder.Context.OUTER));
+ ImmutableList.of("hi", "test")));
}
@Test
@@ -92,11 +91,10 @@ public class LengthPrefixCoderTest {
@Test
public void testRegisterByteSizeObserver() throws Exception {
- CoderProperties.testByteCount(TEST_CODER, Coder.Context.OUTER,
- new byte[][]{{ 0xa, 0xb, 0xc }});
-
- CoderProperties.testByteCount(TEST_CODER, Coder.Context.NESTED,
- new byte[][]{{ 0xa, 0xb, 0xc }, {}, {}, { 0xd, 0xe }, {}});
+ CoderProperties.testByteCount(
+ LengthPrefixCoder.of(VarIntCoder.of()),
+ Coder.Context.NESTED,
+ new Integer[]{0, 10, 1000});
}
@Test
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
index c0a4bed..d6d7de8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
@@ -98,38 +98,34 @@ public class NullableCoderTest {
@Test
public void testEncodedSize() throws Exception {
NullableCoder<Double> coder = NullableCoder.of(DoubleCoder.of());
- assertEquals(1, coder.getEncodedElementByteSize(null, Coder.Context.OUTER));
- assertEquals(9, coder.getEncodedElementByteSize(5.0, Coder.Context.OUTER));
+ assertEquals(1, coder.getEncodedElementByteSize(null));
+ assertEquals(9, coder.getEncodedElementByteSize(5.0));
}
@Test
public void testEncodedSizeNested() throws Exception {
NullableCoder<String> varLenCoder = NullableCoder.of(StringUtf8Coder.of());
-
- assertEquals(1, varLenCoder.getEncodedElementByteSize(null, Context.OUTER));
- assertEquals(1, varLenCoder.getEncodedElementByteSize(null, Context.NESTED));
-
- assertEquals(5, varLenCoder.getEncodedElementByteSize("spam", Context.OUTER));
- assertEquals(6, varLenCoder.getEncodedElementByteSize("spam", Context.NESTED));
+ assertEquals(1, varLenCoder.getEncodedElementByteSize(null));
+ assertEquals(6, varLenCoder.getEncodedElementByteSize("spam"));
}
@Test
public void testObserverIsCheap() throws Exception {
NullableCoder<Double> coder = NullableCoder.of(DoubleCoder.of());
- assertTrue(coder.isRegisterByteSizeObserverCheap(5.0, Coder.Context.OUTER));
+ assertTrue(coder.isRegisterByteSizeObserverCheap(5.0));
}
@Test
public void testObserverIsNotCheap() throws Exception {
NullableCoder<List<String>> coder = NullableCoder.of(ListCoder.of(StringUtf8Coder.of()));
assertFalse(coder.isRegisterByteSizeObserverCheap(
- ImmutableList.of("hi", "test"), Coder.Context.OUTER));
+ ImmutableList.of("hi", "test")));
}
@Test
public void testObserverIsAlwaysCheapForNullValues() throws Exception {
NullableCoder<List<String>> coder = NullableCoder.of(ListCoder.of(StringUtf8Coder.of()));
- assertTrue(coder.isRegisterByteSizeObserverCheap(null, Coder.Context.OUTER));
+ assertTrue(coder.isRegisterByteSizeObserverCheap(null));
}
@Test
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
index 2ef892c..83f348c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
@@ -109,13 +109,13 @@ public class PAssertTest implements Serializable {
}
@Override
- public boolean isRegisterByteSizeObserverCheap(NotSerializableObject value, Context context) {
+ public boolean isRegisterByteSizeObserverCheap(NotSerializableObject value) {
return true;
}
@Override
public void registerByteSizeObserver(
- NotSerializableObject value, ElementByteSizeObserver observer, Context context)
+ NotSerializableObject value, ElementByteSizeObserver observer)
throws Exception {
observer.update(0L);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index 12619e0..a70af94 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -895,16 +895,16 @@ public class CombineTest implements Serializable {
@Override
public boolean isRegisterByteSizeObserverCheap(
- CountSum value, Context context) {
+ CountSum value) {
return true;
}
@Override
public void registerByteSizeObserver(
- CountSum value, ElementByteSizeObserver observer, Context context)
+ CountSum value, ElementByteSizeObserver observer)
throws Exception {
- LONG_CODER.registerByteSizeObserver(value.count, observer, context.nested());
- DOUBLE_CODER.registerByteSizeObserver(value.sum, observer, context);
+ LONG_CODER.registerByteSizeObserver(value.count, observer);
+ DOUBLE_CODER.registerByteSizeObserver(value.sum, observer);
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index cbbe7f1..d2cb980 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -997,13 +997,13 @@ public class ParDoTest implements Serializable {
}
@Override
- public boolean isRegisterByteSizeObserverCheap(TestDummy value, Context context) {
+ public boolean isRegisterByteSizeObserverCheap(TestDummy value) {
return true;
}
@Override
public void registerByteSizeObserver(
- TestDummy value, ElementByteSizeObserver observer, Context context)
+ TestDummy value, ElementByteSizeObserver observer)
throws Exception {
observer.update(0L);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java
index 0781cf1..325c69d 100644
--- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java
+++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java
@@ -76,12 +76,8 @@ public class ByteStringCoder extends AtomicCoder<ByteString> {
}
@Override
- protected long getEncodedElementByteSize(ByteString value, Context context) throws Exception {
+ protected long getEncodedElementByteSize(ByteString value) throws Exception {
int size = value.size();
-
- if (context.isWholeStream) {
- return size;
- }
return VarInt.getLength(size) + size;
}
@@ -106,7 +102,7 @@ public class ByteStringCoder extends AtomicCoder<ByteString> {
* <p>Returns true. {@link ByteString#size} returns the size of an array and a {@link VarInt}.
*/
@Override
- public boolean isRegisterByteSizeObserverCheap(ByteString value, Context context) {
+ public boolean isRegisterByteSizeObserverCheap(ByteString value) {
return true;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoderTest.java b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoderTest.java
index 8fdb851..d1800a6 100644
--- a/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoderTest.java
+++ b/sdks/java/extensions/protobuf/src/test/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoderTest.java
@@ -115,12 +115,10 @@ public class ByteStringCoderTest {
}
@Test
- public void testEncodedElementByteSizeInAllContexts() throws Throwable {
- for (Context context : CoderProperties.ALL_CONTEXTS) {
- for (ByteString value : TEST_VALUES) {
- byte[] encoded = CoderUtils.encodeToByteArray(TEST_CODER, value, context);
- assertEquals(encoded.length, TEST_CODER.getEncodedElementByteSize(value, context));
- }
+ public void testEncodedElementByteSize() throws Throwable {
+ for (ByteString value : TEST_VALUES) {
+ byte[] encoded = CoderUtils.encodeToByteArray(TEST_CODER, value, Context.NESTED);
+ assertEquals(encoded.length, TEST_CODER.getEncodedElementByteSize(value));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
index 7ca8958..cfec991 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
@@ -52,10 +52,10 @@ public class TableRowJsonCoder extends AtomicCoder<TableRow> {
}
@Override
- protected long getEncodedElementByteSize(TableRow value, Context context)
+ protected long getEncodedElementByteSize(TableRow value)
throws Exception {
String strValue = MAPPER.writeValueAsString(value);
- return StringUtf8Coder.of().getEncodedElementByteSize(strValue, context);
+ return StringUtf8Coder.of().getEncodedElementByteSize(strValue);
}
/////////////////////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
index 0f5dc4c..d838a0d 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
@@ -83,8 +83,8 @@ public class KafkaRecordCoder<K, V> extends StructuredCoder<KafkaRecord<K, V>> {
}
@Override
- public boolean isRegisterByteSizeObserverCheap(KafkaRecord<K, V> value, Context context) {
- return kvCoder.isRegisterByteSizeObserverCheap(value.getKV(), context);
+ public boolean isRegisterByteSizeObserverCheap(KafkaRecord<K, V> value) {
+ return kvCoder.isRegisterByteSizeObserverCheap(value.getKV());
//TODO : do we have to implement getEncodedSize()?
}
http://git-wip-us.apache.org/repos/asf/beam/blob/96de8d73/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
index 75a4619..5b2ec02 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.xml;
import com.google.common.io.ByteStreams;
+import java.io.ByteArrayOutputStream;
import java.io.FilterInputStream;
import java.io.FilterOutputStream;
import java.io.IOException;
@@ -87,37 +88,40 @@ public class JAXBCoder<T> extends CustomCoder<T> {
}
@Override
- public void encode(T value, OutputStream outStream, Context context)
- throws CoderException, IOException {
+ public void encode(T value, OutputStream outStream) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
- if (!context.isWholeStream) {
- try {
- long size = getEncodedElementByteSize(value, Context.OUTER);
- // record the number of bytes the XML consists of so when reading we only read the encoded
- // value
- VarInt.encode(size, outStream);
- } catch (Exception e) {
- throw new CoderException(
- "An Exception occured while trying to get the size of an encoded representation", e);
- }
- }
-
- jaxbMarshaller.get().marshal(value, new CloseIgnoringOutputStream(outStream));
+ jaxbMarshaller.get().marshal(value, baos);
} catch (JAXBException e) {
throw new CoderException(e);
}
+ VarInt.encode(baos.size(), outStream);
+ baos.writeTo(outStream);
+ }
+
+ @Override
+ public void encode(T value, OutputStream outStream, Context context)
+ throws CoderException, IOException {
+ if (context.isWholeStream) {
+ try {
+ jaxbMarshaller.get().marshal(value, new CloseIgnoringOutputStream(outStream));
+ } catch (JAXBException e) {
+ throw new CoderException(e);
+ }
+ } else {
+ encode(value, outStream);
+ }
}
@Override
public T decode(InputStream inStream, Context context) throws CoderException, IOException {
try {
- InputStream stream = inStream;
if (!context.isWholeStream) {
long limit = VarInt.decodeLong(inStream);
- stream = ByteStreams.limit(inStream, limit);
+ inStream = ByteStreams.limit(inStream, limit);
}
@SuppressWarnings("unchecked")
- T obj = (T) jaxbUnmarshaller.get().unmarshal(new CloseIgnoringInputStream(stream));
+ T obj = (T) jaxbUnmarshaller.get().unmarshal(new CloseIgnoringInputStream(inStream));
return obj;
} catch (JAXBException e) {
throw new CoderException(e);