You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/12/28 23:31:19 UTC
[1/2] beam git commit: [BEAM-1211] Make KVCoder more efficient by
removing unnecessary nesting
Repository: beam
Updated Branches:
refs/heads/master b6944cd62 -> 84134bcab
[BEAM-1211] Make KVCoder more efficient by removing unnecessary nesting
See [BEAM-469] for more information about why this is
correct.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/496ebbea
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/496ebbea
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/496ebbea
Branch: refs/heads/master
Commit: 496ebbea1a6a4231077f01dcc392c06239d20841
Parents: b6944cd
Author: Dan Halperin <dh...@google.com>
Authored: Wed Dec 21 15:37:49 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Dec 28 15:30:53 2016 -0800
----------------------------------------------------------------------
.../beam/runners/core/KeyedWorkItemCoder.java | 4 +-
.../core/UnboundedReadFromBoundedSource.java | 10 ++--
.../streaming/SingletonKeyedWorkItemCoder.java | 10 ++--
.../beam/runners/dataflow/DataflowRunner.java | 4 +-
.../DataflowUnboundedReadFromBoundedSource.java | 10 ++--
.../runners/dataflow/internal/IsmFormat.java | 6 +--
.../apache/beam/sdk/coders/BigDecimalCoder.java | 6 +--
.../org/apache/beam/sdk/coders/KvCoder.java | 22 +++-----
.../org/apache/beam/sdk/coders/MapCoder.java | 53 +++++++++++++++-----
.../apache/beam/sdk/io/PubsubUnboundedSink.java | 12 ++---
.../beam/sdk/testing/ValueInSingleWindow.java | 6 +--
.../apache/beam/sdk/transforms/CombineFns.java | 14 +++++-
.../org/apache/beam/sdk/transforms/Mean.java | 10 ++--
.../beam/sdk/transforms/join/CoGbkResult.java | 22 ++++++--
.../transforms/windowing/IntervalWindow.java | 4 +-
.../apache/beam/sdk/util/TimerInternals.java | 4 +-
.../org/apache/beam/sdk/util/WindowedValue.java | 11 ++--
.../beam/sdk/coders/BigDecimalCoderTest.java | 46 ++++++++---------
.../apache/beam/sdk/coders/JAXBCoderTest.java | 16 +++---
.../apache/beam/sdk/coders/MapCoderTest.java | 2 +-
.../apache/beam/sdk/transforms/CombineTest.java | 6 +--
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 4 +-
.../beam/sdk/io/kafka/KafkaRecordCoder.java | 4 +-
.../beam/sdk/io/kinesis/KinesisRecordCoder.java | 4 +-
24 files changed, 164 insertions(+), 126 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
index 95be047..dfd6a8d 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
@@ -92,7 +92,7 @@ public class KeyedWorkItemCoder<K, ElemT> extends StandardCoder<KeyedWorkItem<K,
Coder.Context nestedContext = context.nested();
keyCoder.encode(value.key(), outStream, nestedContext);
timersCoder.encode(value.timersIterable(), outStream, nestedContext);
- elemsCoder.encode(value.elementsIterable(), outStream, nestedContext);
+ elemsCoder.encode(value.elementsIterable(), outStream, context);
}
@Override
@@ -101,7 +101,7 @@ public class KeyedWorkItemCoder<K, ElemT> extends StandardCoder<KeyedWorkItem<K,
Coder.Context nestedContext = context.nested();
K key = keyCoder.decode(inStream, nestedContext);
Iterable<TimerData> timers = timersCoder.decode(inStream, nestedContext);
- Iterable<WindowedValue<ElemT>> elems = elemsCoder.decode(inStream, nestedContext);
+ Iterable<WindowedValue<ElemT>> elems = elemsCoder.decode(inStream, context);
return KeyedWorkItems.workItem(key, timers, elems);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
index f3f93e1..be1793c 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
@@ -235,19 +235,17 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
@Override
public void encode(Checkpoint<T> value, OutputStream outStream, Context context)
throws CoderException, IOException {
- Context nested = context.nested();
- elemsCoder.encode(value.residualElements, outStream, nested);
- sourceCoder.encode(value.residualSource, outStream, nested);
+ elemsCoder.encode(value.residualElements, outStream, context.nested());
+ sourceCoder.encode(value.residualSource, outStream, context);
}
@SuppressWarnings("unchecked")
@Override
public Checkpoint<T> decode(InputStream inStream, Context context)
throws CoderException, IOException {
- Context nested = context.nested();
return new Checkpoint<>(
- elemsCoder.decode(inStream, nested),
- sourceCoder.decode(inStream, nested));
+ elemsCoder.decode(inStream, context.nested()),
+ sourceCoder.decode(inStream, context));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
index ad30688..d95ed7c 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
@@ -90,17 +90,15 @@ public class SingletonKeyedWorkItemCoder<K, ElemT>
OutputStream outStream,
Context context)
throws CoderException, IOException {
- Context nestedContext = context.nested();
- keyCoder.encode(value.key(), outStream, nestedContext);
- valueCoder.encode(value.value, outStream, nestedContext);
+ keyCoder.encode(value.key(), outStream, context.nested());
+ valueCoder.encode(value.value, outStream, context);
}
@Override
public SingletonKeyedWorkItem<K, ElemT> decode(InputStream inStream, Context context)
throws CoderException, IOException {
- Context nestedContext = context.nested();
- K key = keyCoder.decode(inStream, nestedContext);
- WindowedValue<ElemT> value = valueCoder.decode(inStream, nestedContext);
+ K key = keyCoder.decode(inStream, context.nested());
+ WindowedValue<ElemT> value = valueCoder.decode(inStream, context);
return new SingletonKeyedWorkItem<>(key, value);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 1a15eaf..29c0058 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -1984,7 +1984,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
public void encode(TransformedMap<K, V1, V2> value, OutputStream outStream,
Coder.Context context) throws CoderException, IOException {
transformCoder.encode(value.transform, outStream, context.nested());
- originalMapCoder.encode(value.originalMap, outStream, context.nested());
+ originalMapCoder.encode(value.originalMap, outStream, context);
}
@Override
@@ -1992,7 +1992,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
InputStream inStream, Coder.Context context) throws CoderException, IOException {
return new TransformedMap<>(
transformCoder.decode(inStream, context.nested()),
- originalMapCoder.decode(inStream, context.nested()));
+ originalMapCoder.decode(inStream, context));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
index e1eedd8..65db817 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
@@ -254,19 +254,17 @@ public class DataflowUnboundedReadFromBoundedSource<T> extends PTransform<PBegin
@Override
public void encode(Checkpoint<T> value, OutputStream outStream, Context context)
throws CoderException, IOException {
- Context nested = context.nested();
- elemsCoder.encode(value.residualElements, outStream, nested);
- sourceCoder.encode(value.residualSource, outStream, nested);
+ elemsCoder.encode(value.residualElements, outStream, context.nested());
+ sourceCoder.encode(value.residualSource, outStream, context);
}
@SuppressWarnings("unchecked")
@Override
public Checkpoint<T> decode(InputStream inStream, Context context)
throws CoderException, IOException {
- Context nested = context.nested();
return new Checkpoint<>(
- elemsCoder.decode(inStream, nested),
- sourceCoder.decode(inStream, nested));
+ elemsCoder.decode(inStream, context.nested()),
+ sourceCoder.decode(inStream, context));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
index 2f83ffd..6a244b0 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
@@ -647,15 +647,15 @@ public class IsmFormat {
value);
VarIntCoder.of().encode(value.getId(), outStream, context.nested());
VarLongCoder.of().encode(value.getBlockOffset(), outStream, context.nested());
- VarLongCoder.of().encode(value.getIndexOffset(), outStream, context.nested());
+ VarLongCoder.of().encode(value.getIndexOffset(), outStream, context);
}
@Override
public IsmShard decode(
InputStream inStream, Coder.Context context) throws CoderException, IOException {
return IsmShard.of(
- VarIntCoder.of().decode(inStream, context),
- VarLongCoder.of().decode(inStream, context),
+ VarIntCoder.of().decode(inStream, context.nested()),
+ VarLongCoder.of().decode(inStream, context.nested()),
VarLongCoder.of().decode(inStream, context));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
index e262882..36c8265 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
@@ -54,14 +54,14 @@ public class BigDecimalCoder extends AtomicCoder<BigDecimal> {
throws IOException, CoderException {
checkNotNull(value, String.format("cannot encode a null %s", BigDecimal.class.getSimpleName()));
integerCoder.encode(value.scale(), outStream, context.nested());
- bigIntegerCoder.encode(value.unscaledValue(), outStream, context.nested());
+ bigIntegerCoder.encode(value.unscaledValue(), outStream, context);
}
@Override
public BigDecimal decode(InputStream inStream, Context context)
throws IOException, CoderException {
int scale = integerCoder.decode(inStream, context.nested());
- BigInteger bigInteger = bigIntegerCoder.decode(inStream, context.nested());
+ BigInteger bigInteger = bigIntegerCoder.decode(inStream, context);
return new BigDecimal(bigInteger, scale);
}
@@ -96,6 +96,6 @@ public class BigDecimalCoder extends AtomicCoder<BigDecimal> {
protected long getEncodedElementByteSize(BigDecimal value, Context context) throws Exception {
checkNotNull(value, String.format("cannot encode a null %s", BigDecimal.class.getSimpleName()));
return integerCoder.getEncodedElementByteSize(value.scale(), context.nested())
- + bigIntegerCoder.getEncodedElementByteSize(value.unscaledValue(), context.nested());
+ + bigIntegerCoder.getEncodedElementByteSize(value.unscaledValue(), context);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
index ad13226..c0d3aa4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
@@ -83,17 +83,15 @@ public class KvCoder<K, V> extends StandardCoder<KV<K, V>> {
if (kv == null) {
throw new CoderException("cannot encode a null KV");
}
- Context nestedContext = context.nested();
- keyCoder.encode(kv.getKey(), outStream, nestedContext);
- valueCoder.encode(kv.getValue(), outStream, nestedContext);
+ keyCoder.encode(kv.getKey(), outStream, context.nested());
+ valueCoder.encode(kv.getValue(), outStream, context);
}
@Override
public KV<K, V> decode(InputStream inStream, Context context)
throws IOException, CoderException {
- Context nestedContext = context.nested();
- K key = keyCoder.decode(inStream, nestedContext);
- V value = valueCoder.decode(inStream, nestedContext);
+ K key = keyCoder.decode(inStream, context.nested());
+ V value = valueCoder.decode(inStream, context);
return KV.of(key, value);
}
@@ -135,10 +133,8 @@ public class KvCoder<K, V> extends StandardCoder<KV<K, V>> {
*/
@Override
public boolean isRegisterByteSizeObserverCheap(KV<K, V> kv, Context context) {
- return keyCoder.isRegisterByteSizeObserverCheap(kv.getKey(),
- context.nested())
- && valueCoder.isRegisterByteSizeObserverCheap(kv.getValue(),
- context.nested());
+ return keyCoder.isRegisterByteSizeObserverCheap(kv.getKey(), context.nested())
+ && valueCoder.isRegisterByteSizeObserverCheap(kv.getValue(), context);
}
/**
@@ -152,9 +148,7 @@ public class KvCoder<K, V> extends StandardCoder<KV<K, V>> {
if (kv == null) {
throw new CoderException("cannot encode a null KV");
}
- keyCoder.registerByteSizeObserver(
- kv.getKey(), observer, context.nested());
- valueCoder.registerByteSizeObserver(
- kv.getValue(), observer, context.nested());
+ keyCoder.registerByteSizeObserver(kv.getKey(), observer, context.nested());
+ valueCoder.registerByteSizeObserver(kv.getValue(), observer, context);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
index ebe7051..94099be 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
@@ -28,6 +28,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -81,10 +83,10 @@ public class MapCoder<K, V> extends StandardCoder<Map<K, V>> {
/////////////////////////////////////////////////////////////////////////////
- Coder<K> keyCoder;
- Coder<V> valueCoder;
+ private Coder<K> keyCoder;
+ private Coder<V> valueCoder;
- MapCoder(Coder<K> keyCoder, Coder<V> valueCoder) {
+ private MapCoder(Coder<K> keyCoder, Coder<V> valueCoder) {
this.keyCoder = keyCoder;
this.valueCoder = valueCoder;
}
@@ -99,12 +101,25 @@ public class MapCoder<K, V> extends StandardCoder<Map<K, V>> {
throw new CoderException("cannot encode a null Map");
}
DataOutputStream dataOutStream = new DataOutputStream(outStream);
- dataOutStream.writeInt(map.size());
- for (Entry<K, V> entry : map.entrySet()) {
+
+ int size = map.size();
+ dataOutStream.writeInt(size);
+ if (size == 0) {
+ return;
+ }
+
+ // Since we handled size == 0 above, entry is guaranteed to exist before and after loop
+ Iterator<Entry<K, V>> iterator = map.entrySet().iterator();
+ Entry<K, V> entry = iterator.next();
+ while (iterator.hasNext()) {
keyCoder.encode(entry.getKey(), outStream, context.nested());
valueCoder.encode(entry.getValue(), outStream, context.nested());
+ entry = iterator.next();
}
- dataOutStream.flush();
+
+ keyCoder.encode(entry.getKey(), outStream, context.nested());
+ valueCoder.encode(entry.getValue(), outStream, context);
+ // no flush needed as DataOutputStream does not buffer
}
@Override
@@ -112,12 +127,20 @@ public class MapCoder<K, V> extends StandardCoder<Map<K, V>> {
throws IOException, CoderException {
DataInputStream dataInStream = new DataInputStream(inStream);
int size = dataInStream.readInt();
+ if (size == 0) {
+ return Collections.emptyMap();
+ }
+
Map<K, V> retval = Maps.newHashMapWithExpectedSize(size);
- for (int i = 0; i < size; ++i) {
+ for (int i = 0; i < size - 1; ++i) {
K key = keyCoder.decode(inStream, context.nested());
V value = valueCoder.decode(inStream, context.nested());
retval.put(key, value);
}
+
+ K key = keyCoder.decode(inStream, context.nested());
+ V value = valueCoder.decode(inStream, context);
+ retval.put(key, value);
return retval;
}
@@ -149,11 +172,17 @@ public class MapCoder<K, V> extends StandardCoder<Map<K, V>> {
Map<K, V> map, ElementByteSizeObserver observer, Context context)
throws Exception {
observer.update(4L);
- for (Entry<K, V> entry : map.entrySet()) {
- keyCoder.registerByteSizeObserver(
- entry.getKey(), observer, context.nested());
- valueCoder.registerByteSizeObserver(
- entry.getValue(), observer, context.nested());
+ if (map.isEmpty()) {
+ return;
+ }
+ Iterator<Entry<K, V>> entries = map.entrySet().iterator();
+ Entry<K, V> entry = entries.next();
+ while (entries.hasNext()) {
+ keyCoder.registerByteSizeObserver(entry.getKey(), observer, context.nested());
+ valueCoder.registerByteSizeObserver(entry.getValue(), observer, context.nested());
+ entry = entries.next();
}
+ keyCoder.registerByteSizeObserver(entry.getKey(), observer, context.nested());
+ valueCoder.registerByteSizeObserver(entry.getValue(), observer, context);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
index 1992cb8..58414c6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
@@ -110,17 +110,17 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
public void encode(
OutgoingMessage value, OutputStream outStream, Context context)
throws CoderException, IOException {
- ByteArrayCoder.of().encode(value.elementBytes, outStream, Context.NESTED);
- BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch, outStream, Context.NESTED);
- RECORD_ID_CODER.encode(value.recordId, outStream, Context.NESTED);
+ ByteArrayCoder.of().encode(value.elementBytes, outStream, context.nested());
+ BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch, outStream, context.nested());
+ RECORD_ID_CODER.encode(value.recordId, outStream, context);
}
@Override
public OutgoingMessage decode(
InputStream inStream, Context context) throws CoderException, IOException {
- byte[] elementBytes = ByteArrayCoder.of().decode(inStream, Context.NESTED);
- long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream, Context.NESTED);
- @Nullable String recordId = RECORD_ID_CODER.decode(inStream, Context.NESTED);
+ byte[] elementBytes = ByteArrayCoder.of().decode(inStream, context.nested());
+ long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream, context.nested());
+ @Nullable String recordId = RECORD_ID_CODER.decode(inStream, context);
return new OutgoingMessage(elementBytes, timestampMsSinceEpoch, recordId);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValueInSingleWindow.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValueInSingleWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValueInSingleWindow.java
index 9ec030f..b746f6d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValueInSingleWindow.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValueInSingleWindow.java
@@ -97,19 +97,19 @@ public abstract class ValueInSingleWindow<T> {
public void encode(ValueInSingleWindow<T> windowedElem, OutputStream outStream, Context context)
throws IOException {
Context nestedContext = context.nested();
- valueCoder.encode(windowedElem.getValue(), outStream, nestedContext);
InstantCoder.of().encode(windowedElem.getTimestamp(), outStream, nestedContext);
windowCoder.encode(windowedElem.getWindow(), outStream, nestedContext);
- PaneInfo.PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), outStream, context);
+ PaneInfo.PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), outStream, nestedContext);
+ valueCoder.encode(windowedElem.getValue(), outStream, context);
}
@Override
public ValueInSingleWindow<T> decode(InputStream inStream, Context context) throws IOException {
Context nestedContext = context.nested();
- T value = valueCoder.decode(inStream, nestedContext);
Instant timestamp = InstantCoder.of().decode(inStream, nestedContext);
BoundedWindow window = windowCoder.decode(inStream, nestedContext);
PaneInfo pane = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream, nestedContext);
+ T value = valueCoder.decode(inStream, context);
return new AutoValue_ValueInSingleWindow<>(value, timestamp, window, pane);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
index e4e1c50..79b2ab8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
@@ -994,20 +994,30 @@ public class CombineFns {
public void encode(Object[] value, OutputStream outStream, Context context)
throws CoderException, IOException {
checkArgument(value.length == codersCount);
+ if (value.length == 0) {
+ return;
+ }
+ int lastIndex = codersCount - 1;
Context nestedContext = context.nested();
- for (int i = 0; i < codersCount; ++i) {
+ for (int i = 0; i < lastIndex; ++i) {
coders.get(i).encode(value[i], outStream, nestedContext);
}
+ coders.get(lastIndex).encode(value[lastIndex], outStream, context);
}
@Override
public Object[] decode(InputStream inStream, Context context)
throws CoderException, IOException {
Object[] ret = new Object[codersCount];
+ if (codersCount == 0) {
+ return ret;
+ }
+ int lastIndex = codersCount - 1;
Context nestedContext = context.nested();
- for (int i = 0; i < codersCount; ++i) {
+ for (int i = 0; i < lastIndex; ++i) {
ret[i] = coders.get(i).decode(inStream, nestedContext);
}
+ ret[lastIndex] = coders.get(lastIndex).decode(inStream, context);
return ret;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
index 1a0791f..9eea3a0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
@@ -185,18 +185,16 @@ public class Mean {
@Override
public void encode(CountSum<NumT> value, OutputStream outStream, Coder.Context context)
throws CoderException, IOException {
- Coder.Context nestedContext = context.nested();
- LONG_CODER.encode(value.count, outStream, nestedContext);
- DOUBLE_CODER.encode(value.sum, outStream, nestedContext);
+ LONG_CODER.encode(value.count, outStream, context.nested());
+ DOUBLE_CODER.encode(value.sum, outStream, context);
}
@Override
public CountSum<NumT> decode(InputStream inStream, Coder.Context context)
throws CoderException, IOException {
- Coder.Context nestedContext = context.nested();
return new CountSum<>(
- LONG_CODER.decode(inStream, nestedContext),
- DOUBLE_CODER.decode(inStream, nestedContext));
+ LONG_CODER.decode(inStream, context.nested()),
+ DOUBLE_CODER.decode(inStream, context));
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
index 10ba3c9..7b849e7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
@@ -22,7 +22,9 @@ import static org.apache.beam.sdk.util.Structs.addObject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
import com.google.common.collect.PeekingIterator;
import java.io.IOException;
import java.io.InputStream;
@@ -268,9 +270,14 @@ public class CoGbkResult {
if (!schema.equals(value.getSchema())) {
throw new CoderException("input schema does not match coder schema");
}
- for (int unionTag = 0; unionTag < schema.size(); unionTag++) {
- tagListCoder(unionTag).encode(value.valueMap.get(unionTag), outStream, Context.NESTED);
+ if (schema.size() == 0) {
+ return;
+ }
+ int lastIndex = schema.size() - 1;
+ for (int unionTag = 0; unionTag < lastIndex; unionTag++) {
+ tagListCoder(unionTag).encode(value.valueMap.get(unionTag), outStream, context.nested());
}
+ tagListCoder(lastIndex).encode(value.valueMap.get(lastIndex), outStream, context);
}
@Override
@@ -278,10 +285,15 @@ public class CoGbkResult {
InputStream inStream,
Context context)
throws CoderException, IOException {
- List<Iterable<?>> valueMap = new ArrayList<>();
- for (int unionTag = 0; unionTag < schema.size(); unionTag++) {
- valueMap.add(tagListCoder(unionTag).decode(inStream, Context.NESTED));
+ if (schema.size() == 0) {
+ return new CoGbkResult(schema, ImmutableList.<Iterable<?>>of());
+ }
+ int lastIndex = schema.size() - 1;
+ List<Iterable<?>> valueMap = Lists.newArrayListWithExpectedSize(schema.size());
+ for (int unionTag = 0; unionTag < lastIndex; unionTag++) {
+ valueMap.add(tagListCoder(unionTag).decode(inStream, context.nested()));
}
+ valueMap.add(tagListCoder(lastIndex).decode(inStream, context));
return new CoGbkResult(schema, valueMap);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
index af98796..fb0fc11 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
@@ -185,14 +185,14 @@ public class IntervalWindow extends BoundedWindow
Context context)
throws IOException, CoderException {
instantCoder.encode(window.end, outStream, context.nested());
- durationCoder.encode(new Duration(window.start, window.end), outStream, context.nested());
+ durationCoder.encode(new Duration(window.start, window.end), outStream, context);
}
@Override
public IntervalWindow decode(InputStream inStream, Context context)
throws IOException, CoderException {
Instant end = instantCoder.decode(inStream, context.nested());
- ReadableDuration duration = durationCoder.decode(inStream, context.nested());
+ ReadableDuration duration = durationCoder.decode(inStream, context);
return new IntervalWindow(end.minus(duration), end);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
index 0bfcddc..7195486 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
@@ -258,7 +258,7 @@ public interface TimerInternals {
STRING_CODER.encode(timer.getTimerId(), outStream, nestedContext);
STRING_CODER.encode(timer.getNamespace().stringKey(), outStream, nestedContext);
INSTANT_CODER.encode(timer.getTimestamp(), outStream, nestedContext);
- STRING_CODER.encode(timer.getDomain().name(), outStream, nestedContext);
+ STRING_CODER.encode(timer.getDomain().name(), outStream, context);
}
@Override
@@ -269,7 +269,7 @@ public interface TimerInternals {
StateNamespace namespace =
StateNamespaces.fromString(STRING_CODER.decode(inStream, nestedContext), windowCoder);
Instant timestamp = INSTANT_CODER.decode(inStream, nestedContext);
- TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream, nestedContext));
+ TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream, context));
return TimerData.of(timerId, namespace, timestamp, domain);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
index a0b4cf5..1b3e648 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
@@ -656,22 +656,22 @@ public abstract class WindowedValue<T> {
Context context)
throws CoderException, IOException {
Context nestedContext = context.nested();
- valueCoder.encode(windowedElem.getValue(), outStream, nestedContext);
InstantCoder.of().encode(
windowedElem.getTimestamp(), outStream, nestedContext);
windowsCoder.encode(windowedElem.getWindows(), outStream, nestedContext);
- PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), outStream, context);
+ PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), outStream, nestedContext);
+ valueCoder.encode(windowedElem.getValue(), outStream, context);
}
@Override
public WindowedValue<T> decode(InputStream inStream, Context context)
throws CoderException, IOException {
Context nestedContext = context.nested();
- T value = valueCoder.decode(inStream, nestedContext);
Instant timestamp = InstantCoder.of().decode(inStream, nestedContext);
Collection<? extends BoundedWindow> windows =
windowsCoder.decode(inStream, nestedContext);
PaneInfo pane = PaneInfoCoder.INSTANCE.decode(inStream, nestedContext);
+ T value = valueCoder.decode(inStream, context);
return WindowedValue.of(value, timestamp, windows, pane);
}
@@ -689,9 +689,10 @@ public abstract class WindowedValue<T> {
public void registerByteSizeObserver(WindowedValue<T> value,
ElementByteSizeObserver observer,
Context context) throws Exception {
+ InstantCoder.of().registerByteSizeObserver(value.getTimestamp(), observer, context.nested());
+ windowsCoder.registerByteSizeObserver(value.getWindows(), observer, context.nested());
+ PaneInfoCoder.INSTANCE.registerByteSizeObserver(value.getPane(), observer, context.nested());
valueCoder.registerByteSizeObserver(value.getValue(), observer, context);
- InstantCoder.of().registerByteSizeObserver(value.getTimestamp(), observer, context);
- windowsCoder.registerByteSizeObserver(value.getWindows(), observer, context);
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigDecimalCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigDecimalCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigDecimalCoderTest.java
index f5d56cb..9db50c8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigDecimalCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigDecimalCoderTest.java
@@ -75,29 +75,29 @@ public class BigDecimalCoderTest {
*/
private static final List<String> TEST_ENCODINGS =
ImmutableList.of(
- "swi4AjXYo7DnVscE2eM2Q-f7A-JpZD8CyiFN76ZfI4ZZzJtQa31MqLYUepjY4f6BXT8ZapFuM"
- + "0AaxKyQjsrfF_0mZON_XJJle68X1L0-yBXeRP6yDKlpHD-LUwr-ivNwlWD-zdwQZONNjX9"
- + "eivckz1QFUqJDLdWm6V_to30sySJAR87byZdCJvJS1cvjKgsjC6OijwCsL1JrJ9sfkeZTF"
- + "f0bR7nTr9x5-hggYHJ-rczwYunX8wROP3tWJTfyN7GxeSCCrFeswbw87qkJg0Hw_-Nx_2L"
- + "k5qCk9p96bXyCJnz6_d9Yk83re-Ru461odv273IZB-s3blouk1b2ihT15hsv7V7BIHAQ2m"
- + "1whPlIyGS6VxVjp70vICpqYZZ2-dt6-nBbzitNCmR4l486Qi3obX-yV-RCfpUHDsnPdWQ",
- "sgi4AjXYo7DnVscE2eM2Q-f7A-JpZD8CyiFN76ZfI4ZZzJtQa31MqLYUepjY4f6BXT8ZapFu"
- + "M0AaxKyQjsrfF_0mZON_XJJle68X1L0-yBXeRP6yDKlpHD-LUwr-ivNwlWD-zdwQZONNj"
- + "X9eivckz1QFUqJDLdWm6V_to30sySJAR87byZdCJvJS1cvjKgsjC6OijwCsL1JrJ9sfke"
- + "ZTFf0bR7nTr9x5-hggYHJ-rczwYunX8wROP3tWJTfyN7GxeSCCrFeswbw87qkJg0Hw_-Nx"
- + "_2Lk5qCk9p96bXyCJnz6_d9Yk83re-Ru461odv273IZB-s3blouk1b2ihT15hsv7V7BIHA"
- + "Q2m1whPlIyGS6VxVjp70vICpqYZZ2-dt6-nBbzitNCmR4l486Qi3obX-yV-RCfpUHDsnPdWQ",
- "AQGX",
- "AAH_",
- "AAEA",
- "AAEB",
- "MBUJEk1IAgE1H9Gsru39PDZgUqT1NnU",
- "AIEBAP________gAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
- + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
- + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA",
- "AIEBCf_______7AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
- + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
- + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA");
+ "swg12KOw51bHBNnjNkPn-wPiaWQ_AsohTe-mXyOGWcybUGt9TKi2FHqY2OH-gV0_GWqRbjNAGsSskI7K3xf9JmT"
+ + "jf1ySZXuvF9S9PsgV3kT-sgypaRw_i1MK_orzcJVg_s3cEGTjTY1_Xor3JM9UBVKiQy3Vpulf7aN9LMki"
+ + "QEfO28mXQibyUtXL4yoLIwujoo8ArC9SayfbH5HmUxX9G0e506_cefoYIGByfq3M8GLp1_METj97ViU38"
+ + "jexsXkggqxXrMG8PO6pCYNB8P_jcf9i5OagpPafem18giZ8-v3fWJPN63vkbuOtaHb9u9yGQfrN25aLpN"
+ + "W9ooU9eYbL-1ewSBwENptcIT5SMhkulcVY6e9LyAqamGWdvnbevpwW84rTQpkeJePOkIt6G1_slfkQn6V"
+ + "Bw7Jz3Vk",
+ "sgg12KOw51bHBNnjNkPn-wPiaWQ_AsohTe-mXyOGWcybUGt9TKi2FHqY2OH-gV0_GWqRbjNAGsSskI7K3xf9JmT"
+ + "jf1ySZXuvF9S9PsgV3kT-sgypaRw_i1MK_orzcJVg_s3cEGTjTY1_Xor3JM9UBVKiQy3Vpulf7aN9LMki"
+ + "QEfO28mXQibyUtXL4yoLIwujoo8ArC9SayfbH5HmUxX9G0e506_cefoYIGByfq3M8GLp1_METj97ViU38"
+ + "jexsXkggqxXrMG8PO6pCYNB8P_jcf9i5OagpPafem18giZ8-v3fWJPN63vkbuOtaHb9u9yGQfrN25aLpN"
+ + "W9ooU9eYbL-1ewSBwENptcIT5SMhkulcVY6e9LyAqamGWdvnbevpwW84rTQpkeJePOkIt6G1_slfkQn6V"
+ + "Bw7Jz3Vk",
+ "AZc",
+ "AP8",
+ "AAA",
+ "AAE",
+ "MAkSTUgCATUf0ayu7f08NmBSpPU2dQ",
+ "AAD________4AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
+ + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
+ + "AAAAAA",
+ "AAn_______-wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
+ + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
+ + "AAAAAA");
@Test
public void testWireFormatEncode() throws Exception {
http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/JAXBCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/JAXBCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/JAXBCoderTest.java
index 36190f9..c023278 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/JAXBCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/JAXBCoderTest.java
@@ -172,19 +172,19 @@ public class JAXBCoderTest {
@Override
public void encode(TestType value, OutputStream outStream, Context context)
throws CoderException, IOException {
- Context subContext = context.nested();
- VarIntCoder.of().encode(3, outStream, subContext);
- jaxbCoder.encode(value, outStream, subContext);
- VarLongCoder.of().encode(22L, outStream, subContext);
+ Context nestedContext = context.nested();
+ VarIntCoder.of().encode(3, outStream, nestedContext);
+ jaxbCoder.encode(value, outStream, nestedContext);
+ VarLongCoder.of().encode(22L, outStream, context);
}
@Override
public TestType decode(InputStream inStream, Context context)
throws CoderException, IOException {
- Context subContext = context.nested();
- VarIntCoder.of().decode(inStream, subContext);
- TestType result = jaxbCoder.decode(inStream, subContext);
- VarLongCoder.of().decode(inStream, subContext);
+ Context nestedContext = context.nested();
+ VarIntCoder.of().decode(inStream, nestedContext);
+ TestType result = jaxbCoder.decode(inStream, nestedContext);
+ VarLongCoder.of().decode(inStream, context);
return result;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java
index dc4a8b5..1053c79 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java
@@ -85,7 +85,7 @@ public class MapCoderTest {
*/
private static final List<String> TEST_ENCODINGS = Arrays.asList(
"AAAAAA",
- "AAAAAv____8PA2ZvbwEFaGVsbG8");
+ "AAAAAv____8PA2ZvbwFoZWxsbw");
@Test
public void testWireFormatEncode() throws Exception {
http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index 0ac9502..f783928 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -905,14 +905,14 @@ public class CombineTest implements Serializable {
@Override
public void encode(CountSum value, OutputStream outStream,
Context context) throws CoderException, IOException {
- LONG_CODER.encode(value.count, outStream, context);
+ LONG_CODER.encode(value.count, outStream, context.nested());
DOUBLE_CODER.encode(value.sum, outStream, context);
}
@Override
public CountSum decode(InputStream inStream, Coder.Context context)
throws CoderException, IOException {
- long count = LONG_CODER.decode(inStream, context);
+ long count = LONG_CODER.decode(inStream, context.nested());
double sum = DOUBLE_CODER.decode(inStream, context);
return new CountSum(count, sum);
}
@@ -930,7 +930,7 @@ public class CombineTest implements Serializable {
public void registerByteSizeObserver(
CountSum value, ElementByteSizeObserver observer, Context context)
throws Exception {
- LONG_CODER.registerByteSizeObserver(value.count, observer, context);
+ LONG_CODER.registerByteSizeObserver(value.count, observer, context.nested());
DOUBLE_CODER.registerByteSizeObserver(value.sum, observer, context);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index de9393a..3a36add 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -2768,7 +2768,7 @@ public class BigQueryIO {
throw new CoderException("cannot encode a null value");
}
tableRowCoder.encode(value.tableRow, outStream, context.nested());
- idCoder.encode(value.uniqueId, outStream, context.nested());
+ idCoder.encode(value.uniqueId, outStream, context);
}
@Override
@@ -2776,7 +2776,7 @@ public class BigQueryIO {
throws IOException {
return new TableRowInfo(
tableRowCoder.decode(inStream, context.nested()),
- idCoder.decode(inStream, context.nested()));
+ idCoder.decode(inStream, context));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
index 736a752..ea78f09 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
@@ -66,7 +66,7 @@ public class KafkaRecordCoder<K, V> extends StandardCoder<KafkaRecord<K, V>> {
stringCoder.encode(value.getTopic(), outStream, nested);
intCoder.encode(value.getPartition(), outStream, nested);
longCoder.encode(value.getOffset(), outStream, nested);
- kvCoder.encode(value.getKV(), outStream, nested);
+ kvCoder.encode(value.getKV(), outStream, context);
}
@Override
@@ -77,7 +77,7 @@ public class KafkaRecordCoder<K, V> extends StandardCoder<KafkaRecord<K, V>> {
stringCoder.decode(inStream, nested),
intCoder.decode(inStream, nested),
longCoder.decode(inStream, nested),
- kvCoder.decode(inStream, nested));
+ kvCoder.decode(inStream, context));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/496ebbea/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
index fc087b5..77fe127 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
@@ -53,7 +53,7 @@ class KinesisRecordCoder extends AtomicCoder<KinesisRecord> {
VAR_LONG_CODER.encode(value.getSubSequenceNumber(), outStream, nested);
INSTANT_CODER.encode(value.getReadTime(), outStream, nested);
STRING_CODER.encode(value.getStreamName(), outStream, nested);
- STRING_CODER.encode(value.getShardId(), outStream, nested);
+ STRING_CODER.encode(value.getShardId(), outStream, context);
}
@Override
@@ -66,7 +66,7 @@ class KinesisRecordCoder extends AtomicCoder<KinesisRecord> {
long subSequenceNumber = VAR_LONG_CODER.decode(inStream, nested);
Instant readTimestamp = INSTANT_CODER.decode(inStream, nested);
String streamName = STRING_CODER.decode(inStream, nested);
- String shardId = STRING_CODER.decode(inStream, nested);
+ String shardId = STRING_CODER.decode(inStream, context);
return new KinesisRecord(data, sequenceNumber, subSequenceNumber, partitionKey,
approximateArrivalTimestamp, readTimestamp, streamName, shardId
);
[2/2] beam git commit: This closes #1680
Posted by dh...@apache.org.
This closes #1680
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/84134bca
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/84134bca
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/84134bca
Branch: refs/heads/master
Commit: 84134bcabaada697a22728f56f030a0351394e45
Parents: b6944cd 496ebbe
Author: Dan Halperin <dh...@google.com>
Authored: Wed Dec 28 15:31:06 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Dec 28 15:31:06 2016 -0800
----------------------------------------------------------------------
.../beam/runners/core/KeyedWorkItemCoder.java | 4 +-
.../core/UnboundedReadFromBoundedSource.java | 10 ++--
.../streaming/SingletonKeyedWorkItemCoder.java | 10 ++--
.../beam/runners/dataflow/DataflowRunner.java | 4 +-
.../DataflowUnboundedReadFromBoundedSource.java | 10 ++--
.../runners/dataflow/internal/IsmFormat.java | 6 +--
.../apache/beam/sdk/coders/BigDecimalCoder.java | 6 +--
.../org/apache/beam/sdk/coders/KvCoder.java | 22 +++-----
.../org/apache/beam/sdk/coders/MapCoder.java | 53 +++++++++++++++-----
.../apache/beam/sdk/io/PubsubUnboundedSink.java | 12 ++---
.../beam/sdk/testing/ValueInSingleWindow.java | 6 +--
.../apache/beam/sdk/transforms/CombineFns.java | 14 +++++-
.../org/apache/beam/sdk/transforms/Mean.java | 10 ++--
.../beam/sdk/transforms/join/CoGbkResult.java | 22 ++++++--
.../transforms/windowing/IntervalWindow.java | 4 +-
.../apache/beam/sdk/util/TimerInternals.java | 4 +-
.../org/apache/beam/sdk/util/WindowedValue.java | 11 ++--
.../beam/sdk/coders/BigDecimalCoderTest.java | 46 ++++++++---------
.../apache/beam/sdk/coders/JAXBCoderTest.java | 16 +++---
.../apache/beam/sdk/coders/MapCoderTest.java | 2 +-
.../apache/beam/sdk/transforms/CombineTest.java | 6 +--
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 4 +-
.../beam/sdk/io/kafka/KafkaRecordCoder.java | 4 +-
.../beam/sdk/io/kinesis/KinesisRecordCoder.java | 4 +-
24 files changed, 164 insertions(+), 126 deletions(-)
----------------------------------------------------------------------