You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2019/01/11 16:01:46 UTC
[beam] branch master updated: [BEAM-6332] Lazy serialization of
aggregation results in Spark runner.
This is an automated email from the ASF dual-hosted git repository.
iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ec0eb04 [BEAM-6332] Lazy serialization of aggregation results in Spark runner.
new 3a53132 Merge pull request #7398: [BEAM-6332] Lazy serialization of aggregation results in Spark runner
ec0eb04 is described below
commit ec0eb0402291bf42d700b21e19fccf72bc96e731
Author: vaclav.plajt@gmail.com <va...@firma.seznam.cz>
AuthorDate: Thu Dec 13 15:59:21 2018 +0100
[BEAM-6332] Lazy serialization of aggregation results in Spark runner.
---
.../spark/coders/BeamSparkRunnerRegistrator.java | 5 +
.../spark/translation/GroupCombineFunctions.java | 247 +++++++++++++--------
.../spark/translation/TransformTranslator.java | 10 +-
.../translation/GroupCombineFunctionsTest.java | 131 +++++++++++
4 files changed, 291 insertions(+), 102 deletions(-)
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
index 787fb4e..f9eaffd 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.LinkedHashMap;
import org.apache.beam.runners.spark.io.MicrobatchSource;
import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet.StateAndTimers;
+import org.apache.beam.runners.spark.translation.GroupCombineFunctions;
import org.apache.beam.runners.spark.util.ByteArray;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.KV;
@@ -46,6 +47,10 @@ public class BeamSparkRunnerRegistrator implements KryoRegistrator {
// MicrobatchSource is serialized as data and may not be Kryo-serializable.
kryo.register(MicrobatchSource.class, new StatelessJavaSerializer());
+ kryo.register(
+ GroupCombineFunctions.SerializableAccumulator.class,
+ new GroupCombineFunctions.KryoAccumulatorSerializer());
+
kryo.register(WrappedArray.ofRef.class);
kryo.register(Object[].class);
kryo.register(ByteArray.class);
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
index 77fc77f..8006f25 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
@@ -17,8 +17,18 @@
*/
package org.apache.beam.runners.spark.translation;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
import javax.annotation.Nullable;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.util.ByteArray;
@@ -35,6 +45,7 @@ import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
+import scala.Tuple2;
/** A set of group/combine functions to apply to Spark {@link org.apache.spark.rdd.RDD}s. */
public class GroupCombineFunctions {
@@ -76,58 +87,30 @@ public class GroupCombineFunctions {
public static <InputT, AccumT> Optional<Iterable<WindowedValue<AccumT>>> combineGlobally(
JavaRDD<WindowedValue<InputT>> rdd,
final SparkGlobalCombineFn<InputT, AccumT, ?> sparkCombineFn,
- final Coder<InputT> iCoder,
final Coder<AccumT> aCoder,
final WindowingStrategy<?, ?> windowingStrategy) {
- // coders.
- final WindowedValue.FullWindowedValueCoder<InputT> wviCoder =
- WindowedValue.FullWindowedValueCoder.of(
- iCoder, windowingStrategy.getWindowFn().windowCoder());
+
final WindowedValue.FullWindowedValueCoder<AccumT> wvaCoder =
WindowedValue.FullWindowedValueCoder.of(
aCoder, windowingStrategy.getWindowFn().windowCoder());
final IterableCoder<WindowedValue<AccumT>> iterAccumCoder = IterableCoder.of(wvaCoder);
- // Use coders to convert objects in the PCollection to byte arrays, so they
- // can be transferred over the network for the shuffle.
- // for readability, we add comments with actual type next to byte[].
- // to shorten line length, we use:
- // ---- WV: WindowedValue
- // ---- Iterable: Itr
- // ---- AccumT: A
- // ---- InputT: I
- JavaRDD<byte[]> inputRDDBytes = rdd.map(CoderHelpers.toByteFunction(wviCoder));
-
- /*Itr<WV<A>>*/
- /*Itr<WV<A>>>*/
- /*Itr<WV<A>>>*/
- /*Itr<WV<A>>>*/
- /*Itr<WV<A>>>*/
- /*Itr<WV<A>>>*/
- /*Itr<WV<A>>>*/
- /*A*/
- /*I*/
- /*A*/
- /*Itr<WV<A>>*/
- /*Itr<WV<A>>*/
- /*WV<I>*/
- byte[] accumulatedBytes =
- inputRDDBytes.aggregate(
- CoderHelpers.toByteArray(sparkCombineFn.zeroValue(), iterAccumCoder),
+ SerializableAccumulator<AccumT> accumulatedResult =
+ rdd.aggregate(
+ SerializableAccumulator.empty(iterAccumCoder),
(ab, ib) -> {
- Iterable<WindowedValue<AccumT>> a = CoderHelpers.fromByteArray(ab, iterAccumCoder);
- WindowedValue<InputT> i = CoderHelpers.fromByteArray(ib, wviCoder);
- return CoderHelpers.toByteArray(sparkCombineFn.seqOp(a, i), iterAccumCoder);
+ Iterable<WindowedValue<AccumT>> merged =
+ sparkCombineFn.seqOp(ab.getOrDecode(iterAccumCoder), ib);
+ return SerializableAccumulator.of(merged, iterAccumCoder);
},
(a1b, a2b) -> {
- Iterable<WindowedValue<AccumT>> a1 = CoderHelpers.fromByteArray(a1b, iterAccumCoder);
- Iterable<WindowedValue<AccumT>> a2 = CoderHelpers.fromByteArray(a2b, iterAccumCoder);
- Iterable<WindowedValue<AccumT>> merged = sparkCombineFn.combOp(a1, a2);
- return CoderHelpers.toByteArray(merged, iterAccumCoder);
+ Iterable<WindowedValue<AccumT>> merged =
+ sparkCombineFn.combOp(
+ a1b.getOrDecode(iterAccumCoder), a2b.getOrDecode(iterAccumCoder));
+ return SerializableAccumulator.of(merged, iterAccumCoder);
});
- final Iterable<WindowedValue<AccumT>> result =
- CoderHelpers.fromByteArray(accumulatedBytes, iterAccumCoder);
+ final Iterable<WindowedValue<AccumT>> result = accumulatedResult.getOrDecode(iterAccumCoder);
return Iterables.isEmpty(result) ? Optional.absent() : Optional.of(result);
}
@@ -145,13 +128,9 @@ public class GroupCombineFunctions {
JavaRDD<WindowedValue<KV<K, InputT>>> rdd,
final SparkKeyedCombineFn<K, InputT, AccumT, ?> sparkCombineFn,
final Coder<K> keyCoder,
- final Coder<InputT> iCoder,
final Coder<AccumT> aCoder,
final WindowingStrategy<?, ?> windowingStrategy) {
- // coders.
- final WindowedValue.FullWindowedValueCoder<KV<K, InputT>> wkviCoder =
- WindowedValue.FullWindowedValueCoder.of(
- KvCoder.of(keyCoder, iCoder), windowingStrategy.getWindowFn().windowCoder());
+
final WindowedValue.FullWindowedValueCoder<KV<K, AccumT>> wkvaCoder =
WindowedValue.FullWindowedValueCoder.of(
KvCoder.of(keyCoder, aCoder), windowingStrategy.getWindowFn().windowCoder());
@@ -167,56 +146,21 @@ public class GroupCombineFunctions {
JavaPairRDD<K, WindowedValue<KV<K, InputT>>> inRddDuplicatedKeyPair =
rdd.mapToPair(TranslationUtils.toPairByKeyInWindowedValue());
- // Use coders to convert objects in the PCollection to byte arrays, so they
- // can be transferred over the network for the shuffle.
- // for readability, we add comments with actual type next to byte[].
- // to shorten line length, we use:
- // ---- WV: WindowedValue
- // ---- Iterable: Itr
- // ---- AccumT: A
- // ---- InputT: I
- JavaPairRDD<ByteArray, byte[]> inRddDuplicatedKeyPairBytes =
- inRddDuplicatedKeyPair.mapToPair(CoderHelpers.toByteFunction(keyCoder, wkviCoder));
-
- /*Itr<WV<KV<K, A>>>*/
- /*Itr<WV<KV<K, A>>>*/
- /*Itr<WV<KV<K, A>>>*/
- /*Itr<WV<KV<K, A>>>*/
- /*Itr<WV<KV<K, A>>>*/
- /*Itr<WV<KV<K, A>>>*/
- /*Itr<WV<KV<K, A>>>*/
- /*WV<KV<K, I>>*/
- /*Itr<WV<KV<K, A>>>*/
- /*Itr<WV<KV<K, A>>>*/
- /*Itr<WV<KV<K, A>>>*/
- /*WV<KV<K, I>>*/
- /*WV<KV<K, I>>*/
- /*Itr<WV<KV<K, A>>>*/
- /*Itr<WV<KV<K, A>>>*/
- /*WV<KV<K, I>>*/
- JavaPairRDD</*K*/ ByteArray, /*Itr<WV<KV<K, A>>>*/ byte[]> accumulatedBytes =
- inRddDuplicatedKeyPairBytes.combineByKey(
- input -> {
- WindowedValue<KV<K, InputT>> wkvi = CoderHelpers.fromByteArray(input, wkviCoder);
- return CoderHelpers.toByteArray(sparkCombineFn.createCombiner(wkvi), iterAccumCoder);
- },
- (acc, input) -> {
- Iterable<WindowedValue<KV<K, AccumT>>> wkvas =
- CoderHelpers.fromByteArray(acc, iterAccumCoder);
- WindowedValue<KV<K, InputT>> wkvi = CoderHelpers.fromByteArray(input, wkviCoder);
- return CoderHelpers.toByteArray(
- sparkCombineFn.mergeValue(wkvi, wkvas), iterAccumCoder);
- },
- (acc1, acc2) -> {
- Iterable<WindowedValue<KV<K, AccumT>>> wkvas1 =
- CoderHelpers.fromByteArray(acc1, iterAccumCoder);
- Iterable<WindowedValue<KV<K, AccumT>>> wkvas2 =
- CoderHelpers.fromByteArray(acc2, iterAccumCoder);
- return CoderHelpers.toByteArray(
- sparkCombineFn.mergeCombiners(wkvas1, wkvas2), iterAccumCoder);
- });
+ JavaPairRDD<K, SerializableAccumulator<KV<K, AccumT>>> accumulatedResult =
+ inRddDuplicatedKeyPair.combineByKey(
+ input ->
+ SerializableAccumulator.of(sparkCombineFn.createCombiner(input), iterAccumCoder),
+ (acc, input) ->
+ SerializableAccumulator.of(
+ sparkCombineFn.mergeValue(input, acc.getOrDecode(iterAccumCoder)),
+ iterAccumCoder),
+ (acc1, acc2) ->
+ SerializableAccumulator.of(
+ sparkCombineFn.mergeCombiners(
+ acc1.getOrDecode(iterAccumCoder), acc2.getOrDecode(iterAccumCoder)),
+ iterAccumCoder));
- return accumulatedBytes.mapToPair(CoderHelpers.fromByteFunction(keyCoder, iterAccumCoder));
+ return accumulatedResult.mapToPair(i -> new Tuple2<>(i._1, i._2.getOrDecode(iterAccumCoder)));
}
/** An implementation of {@link Reshuffle} for the Spark runner. */
@@ -234,4 +178,119 @@ public class GroupCombineFunctions {
.map(TranslationUtils.fromPairFunction())
.map(TranslationUtils.toKVByWindowInValue());
}
+
+ /**
+ * Wrapper around accumulated (combined) value with custom lazy serialization. Serialization is
+ * done through given coder and it is performed within on-serialization callbacks {@link
+ * #writeObject(ObjectOutputStream)} and {@link KryoAccumulatorSerializer#write(Kryo, Output,
+ * SerializableAccumulator)}. Both Spark's serialization mechanisms (Java Serialization, Kryo) are
+ * supported. Materialization of accumulated value is done when value is requested to avoid
+ * serialization of the coder itself.
+ *
+ * @param <AccumT>
+ */
+ public static class SerializableAccumulator<AccumT> implements Serializable {
+ private transient Iterable<WindowedValue<AccumT>> accumulated;
+ private transient Coder<Iterable<WindowedValue<AccumT>>> coder;
+
+ private byte[] serializedAcc;
+
+ private SerializableAccumulator() {}
+
+ private SerializableAccumulator(
+ Iterable<WindowedValue<AccumT>> accumulated,
+ Coder<Iterable<WindowedValue<AccumT>>> coder,
+ byte[] serializedAcc) {
+ this.accumulated = accumulated;
+ this.coder = coder;
+ this.serializedAcc = serializedAcc;
+ }
+
+ static <AccumT> SerializableAccumulator<AccumT> of(
+ Iterable<WindowedValue<AccumT>> accumulated, Coder<Iterable<WindowedValue<AccumT>>> coder) {
+ return new SerializableAccumulator<>(accumulated, coder, null);
+ }
+
+ static <AccumT> SerializableAccumulator<AccumT> ofBytes(byte[] serializedAcc) {
+ Preconditions.checkNotNull(serializedAcc);
+ return new SerializableAccumulator<>(null, null, serializedAcc);
+ }
+
+ static <AccumT> SerializableAccumulator<AccumT> empty(
+ Coder<Iterable<WindowedValue<AccumT>>> coder) {
+ return new SerializableAccumulator<>(Lists.newArrayList(), coder, null);
+ }
+
+ /**
+ * Returns wrapped accumulated value when available as java object or deserialize them using
+ * given {@code coder}.
+ *
+ * @param coder
+ * @return
+ */
+ Iterable<WindowedValue<AccumT>> getOrDecode(Coder<Iterable<WindowedValue<AccumT>>> coder) {
+ if (accumulated == null) {
+ accumulated = CoderHelpers.fromByteArray(serializedAcc, coder);
+ serializedAcc = null;
+ }
+
+ if (this.coder == null) {
+ this.coder = coder;
+ }
+
+ return accumulated;
+ }
+
+ byte[] toBytes() {
+ byte[] coded;
+ if (coder != null) {
+ coded = CoderHelpers.toByteArray(this.accumulated, coder);
+ } else if (serializedAcc != null) {
+ coded = serializedAcc;
+ } else {
+ throw new IllegalStateException(
+ String.format(
+ "Given '%s' cannot be serialized since it do not contain coder or already serialized data.",
+ SerializableAccumulator.class.getSimpleName()));
+ }
+ return coded;
+ }
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ byte[] coded = toBytes();
+ out.writeInt(coded.length);
+ out.write(coded);
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ int length = in.readInt();
+ byte[] coded = new byte[length];
+ in.readFully(coded);
+ this.serializedAcc = coded;
+ }
+ }
+
+ /**
+ * Kryo serializer for {@link SerializableAccumulator}.
+ *
+ * @param <AccumT>
+ */
+ public static class KryoAccumulatorSerializer<AccumT>
+ extends Serializer<SerializableAccumulator<AccumT>> {
+
+ @Override
+ public void write(Kryo kryo, Output output, SerializableAccumulator<AccumT> accumulator) {
+ byte[] coded = accumulator.toBytes();
+ output.writeInt(coded.length, true);
+ output.write(coded);
+ }
+
+ @Override
+ public SerializableAccumulator<AccumT> read(
+ Kryo kryo, Input input, Class<SerializableAccumulator<AccumT>> type) {
+ int length = input.readInt(true);
+ byte[] coded = input.readBytes(length);
+ return SerializableAccumulator.ofBytes(coded);
+ }
+ }
}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 90d829c..f1f32a8 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -241,8 +241,7 @@ public final class TransformTranslator {
JavaRDD<WindowedValue<OutputT>> outRdd;
Optional<Iterable<WindowedValue<AccumT>>> maybeAccumulated =
- GroupCombineFunctions.combineGlobally(
- inRdd, sparkCombineFn, iCoder, aCoder, windowingStrategy);
+ GroupCombineFunctions.combineGlobally(inRdd, sparkCombineFn, aCoder, windowingStrategy);
if (maybeAccumulated.isPresent()) {
Iterable<WindowedValue<OutputT>> output =
@@ -313,12 +312,7 @@ public final class TransformTranslator {
JavaPairRDD<K, Iterable<WindowedValue<KV<K, AccumT>>>> accumulatePerKey =
GroupCombineFunctions.combinePerKey(
- inRdd,
- sparkCombineFn,
- inputCoder.getKeyCoder(),
- inputCoder.getValueCoder(),
- vaCoder,
- windowingStrategy);
+ inRdd, sparkCombineFn, inputCoder.getKeyCoder(), vaCoder, windowingStrategy);
JavaRDD<WindowedValue<KV<K, OutputT>>> outRdd =
accumulatePerKey
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/GroupCombineFunctionsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/GroupCombineFunctionsTest.java
new file mode 100644
index 0000000..d1dee0e
--- /dev/null
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/GroupCombineFunctionsTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import static org.junit.Assert.assertEquals;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Arrays;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.translation.GroupCombineFunctions.KryoAccumulatorSerializer;
+import org.apache.beam.runners.spark.translation.GroupCombineFunctions.SerializableAccumulator;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.joda.time.Instant;
+import org.junit.Test;
+
+/** Unit tests of {@link GroupCombineFunctions}. */
+public class GroupCombineFunctionsTest {
+
+ @Test
+ public void serializableAccumulatorTest() {
+ Iterable<WindowedValue<Integer>> accumulatedValue =
+ Arrays.asList(winVal(0), winVal(1), winVal(3), winVal(4));
+
+ final WindowedValue.FullWindowedValueCoder<Integer> wvaCoder =
+ WindowedValue.FullWindowedValueCoder.of(
+ BigEndianIntegerCoder.of(), GlobalWindow.Coder.INSTANCE);
+
+ final IterableCoder<WindowedValue<Integer>> iterAccumCoder = IterableCoder.of(wvaCoder);
+
+ SerializableAccumulator<Integer> accUnderTest =
+ SerializableAccumulator.of(accumulatedValue, iterAccumCoder);
+ assertEquals(accumulatedValue, accUnderTest.getOrDecode(iterAccumCoder));
+
+ byte[] bytes = accUnderTest.toBytes();
+ assertEquals(accumulatedValue, CoderHelpers.fromByteArray(bytes, iterAccumCoder));
+
+ SerializableAccumulator<Integer> accFromBytes = SerializableAccumulator.ofBytes(bytes);
+ assertEquals(accumulatedValue, accFromBytes.getOrDecode(iterAccumCoder));
+ }
+
+ @Test
+ public void serializableAccumulatorSerializationTest()
+ throws IOException, ClassNotFoundException {
+ @SuppressWarnings("unchecked")
+ Iterable<WindowedValue<Integer>> accumulatedValue =
+ Arrays.asList(winVal(0), winVal(1), winVal(3), winVal(4));
+
+ final WindowedValue.FullWindowedValueCoder<Integer> wvaCoder =
+ WindowedValue.FullWindowedValueCoder.of(
+ BigEndianIntegerCoder.of(), GlobalWindow.Coder.INSTANCE);
+
+ final IterableCoder<WindowedValue<Integer>> iterAccumCoder = IterableCoder.of(wvaCoder);
+
+ SerializableAccumulator<Integer> accUnderTest =
+ SerializableAccumulator.of(accumulatedValue, iterAccumCoder);
+
+ ByteArrayOutputStream inMemOut = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(inMemOut);
+ oos.writeObject(accUnderTest);
+
+ ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(inMemOut.toByteArray()));
+
+ @SuppressWarnings("unchecked")
+ SerializableAccumulator<Integer> materialized =
+ (SerializableAccumulator<Integer>) ois.readObject();
+ assertEquals(accumulatedValue, materialized.getOrDecode(iterAccumCoder));
+ }
+
+ @Test
+ public void serializableAccumulatorKryoTest() {
+ Iterable<WindowedValue<Integer>> accumulatedValue =
+ Arrays.asList(winVal(0), winVal(1), winVal(3), winVal(4));
+
+ final WindowedValue.FullWindowedValueCoder<Integer> wvaCoder =
+ WindowedValue.FullWindowedValueCoder.of(
+ BigEndianIntegerCoder.of(), GlobalWindow.Coder.INSTANCE);
+
+ final IterableCoder<WindowedValue<Integer>> iterAccumCoder = IterableCoder.of(wvaCoder);
+
+ SerializableAccumulator<Integer> accUnderTest =
+ SerializableAccumulator.of(accumulatedValue, iterAccumCoder);
+
+ KryoAccumulatorSerializer kryoSerializer = new KryoAccumulatorSerializer();
+ Kryo kryo = new Kryo();
+ kryo.register(SerializableAccumulator.class, kryoSerializer);
+
+ ByteArrayOutputStream inMemOut = new ByteArrayOutputStream();
+ Output out = new Output(inMemOut);
+ kryo.writeObject(out, accUnderTest);
+ out.close();
+
+ Input input = new Input(new ByteArrayInputStream(inMemOut.toByteArray()));
+
+ @SuppressWarnings("unchecked")
+ SerializableAccumulator<Integer> materialized =
+ (SerializableAccumulator<Integer>) kryo.readObject(input, SerializableAccumulator.class);
+ input.close();
+
+ assertEquals(accumulatedValue, materialized.getOrDecode(iterAccumCoder));
+ }
+
+ private <T> WindowedValue<T> winVal(T val) {
+ return WindowedValue.of(val, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
+ }
+}