You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2019/01/11 16:01:46 UTC

[beam] branch master updated: [BEAM-6332] Lazy serialization of aggregation results in Spark runner.

This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ec0eb04  [BEAM-6332] Lazy serialization of aggregation results in Spark runner.
     new 3a53132  Merge pull request #7398: [BEAM-6332] Lazy serialization of aggregation results in Spark runner
ec0eb04 is described below

commit ec0eb0402291bf42d700b21e19fccf72bc96e731
Author: vaclav.plajt@gmail.com <va...@firma.seznam.cz>
AuthorDate: Thu Dec 13 15:59:21 2018 +0100

    [BEAM-6332] Lazy serialization of aggregation results in Spark runner.
---
 .../spark/coders/BeamSparkRunnerRegistrator.java   |   5 +
 .../spark/translation/GroupCombineFunctions.java   | 247 +++++++++++++--------
 .../spark/translation/TransformTranslator.java     |  10 +-
 .../translation/GroupCombineFunctionsTest.java     | 131 +++++++++++
 4 files changed, 291 insertions(+), 102 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
index 787fb4e..f9eaffd 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import org.apache.beam.runners.spark.io.MicrobatchSource;
 import org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet.StateAndTimers;
+import org.apache.beam.runners.spark.translation.GroupCombineFunctions;
 import org.apache.beam.runners.spark.util.ByteArray;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.values.KV;
@@ -46,6 +47,10 @@ public class BeamSparkRunnerRegistrator implements KryoRegistrator {
     // MicrobatchSource is serialized as data and may not be Kryo-serializable.
     kryo.register(MicrobatchSource.class, new StatelessJavaSerializer());
 
+    kryo.register(
+        GroupCombineFunctions.SerializableAccumulator.class,
+        new GroupCombineFunctions.KryoAccumulatorSerializer());
+
     kryo.register(WrappedArray.ofRef.class);
     kryo.register(Object[].class);
     kryo.register(ByteArray.class);
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
index 77fc77f..8006f25 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
@@ -17,8 +17,18 @@
  */
 package org.apache.beam.runners.spark.translation;
 
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.runners.spark.util.ByteArray;
@@ -35,6 +45,7 @@ import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.Function2;
+import scala.Tuple2;
 
 /** A set of group/combine functions to apply to Spark {@link org.apache.spark.rdd.RDD}s. */
 public class GroupCombineFunctions {
@@ -76,58 +87,30 @@ public class GroupCombineFunctions {
   public static <InputT, AccumT> Optional<Iterable<WindowedValue<AccumT>>> combineGlobally(
       JavaRDD<WindowedValue<InputT>> rdd,
       final SparkGlobalCombineFn<InputT, AccumT, ?> sparkCombineFn,
-      final Coder<InputT> iCoder,
       final Coder<AccumT> aCoder,
       final WindowingStrategy<?, ?> windowingStrategy) {
-    // coders.
-    final WindowedValue.FullWindowedValueCoder<InputT> wviCoder =
-        WindowedValue.FullWindowedValueCoder.of(
-            iCoder, windowingStrategy.getWindowFn().windowCoder());
+
     final WindowedValue.FullWindowedValueCoder<AccumT> wvaCoder =
         WindowedValue.FullWindowedValueCoder.of(
             aCoder, windowingStrategy.getWindowFn().windowCoder());
     final IterableCoder<WindowedValue<AccumT>> iterAccumCoder = IterableCoder.of(wvaCoder);
 
-    // Use coders to convert objects in the PCollection to byte arrays, so they
-    // can be transferred over the network for the shuffle.
-    // for readability, we add comments with actual type next to byte[].
-    // to shorten line length, we use:
-    // ---- WV: WindowedValue
-    // ---- Iterable: Itr
-    // ---- AccumT: A
-    // ---- InputT: I
-    JavaRDD<byte[]> inputRDDBytes = rdd.map(CoderHelpers.toByteFunction(wviCoder));
-
-    /*Itr<WV<A>>*/
-    /*Itr<WV<A>>>*/
-    /*Itr<WV<A>>>*/
-    /*Itr<WV<A>>>*/
-    /*Itr<WV<A>>>*/
-    /*Itr<WV<A>>>*/
-    /*Itr<WV<A>>>*/
-    /*A*/
-    /*I*/
-    /*A*/
-    /*Itr<WV<A>>*/
-    /*Itr<WV<A>>*/
-    /*WV<I>*/
-    byte[] accumulatedBytes =
-        inputRDDBytes.aggregate(
-            CoderHelpers.toByteArray(sparkCombineFn.zeroValue(), iterAccumCoder),
+    SerializableAccumulator<AccumT> accumulatedResult =
+        rdd.aggregate(
+            SerializableAccumulator.empty(iterAccumCoder),
             (ab, ib) -> {
-              Iterable<WindowedValue<AccumT>> a = CoderHelpers.fromByteArray(ab, iterAccumCoder);
-              WindowedValue<InputT> i = CoderHelpers.fromByteArray(ib, wviCoder);
-              return CoderHelpers.toByteArray(sparkCombineFn.seqOp(a, i), iterAccumCoder);
+              Iterable<WindowedValue<AccumT>> merged =
+                  sparkCombineFn.seqOp(ab.getOrDecode(iterAccumCoder), ib);
+              return SerializableAccumulator.of(merged, iterAccumCoder);
             },
             (a1b, a2b) -> {
-              Iterable<WindowedValue<AccumT>> a1 = CoderHelpers.fromByteArray(a1b, iterAccumCoder);
-              Iterable<WindowedValue<AccumT>> a2 = CoderHelpers.fromByteArray(a2b, iterAccumCoder);
-              Iterable<WindowedValue<AccumT>> merged = sparkCombineFn.combOp(a1, a2);
-              return CoderHelpers.toByteArray(merged, iterAccumCoder);
+              Iterable<WindowedValue<AccumT>> merged =
+                  sparkCombineFn.combOp(
+                      a1b.getOrDecode(iterAccumCoder), a2b.getOrDecode(iterAccumCoder));
+              return SerializableAccumulator.of(merged, iterAccumCoder);
             });
 
-    final Iterable<WindowedValue<AccumT>> result =
-        CoderHelpers.fromByteArray(accumulatedBytes, iterAccumCoder);
+    final Iterable<WindowedValue<AccumT>> result = accumulatedResult.getOrDecode(iterAccumCoder);
 
     return Iterables.isEmpty(result) ? Optional.absent() : Optional.of(result);
   }
@@ -145,13 +128,9 @@ public class GroupCombineFunctions {
           JavaRDD<WindowedValue<KV<K, InputT>>> rdd,
           final SparkKeyedCombineFn<K, InputT, AccumT, ?> sparkCombineFn,
           final Coder<K> keyCoder,
-          final Coder<InputT> iCoder,
           final Coder<AccumT> aCoder,
           final WindowingStrategy<?, ?> windowingStrategy) {
-    // coders.
-    final WindowedValue.FullWindowedValueCoder<KV<K, InputT>> wkviCoder =
-        WindowedValue.FullWindowedValueCoder.of(
-            KvCoder.of(keyCoder, iCoder), windowingStrategy.getWindowFn().windowCoder());
+
     final WindowedValue.FullWindowedValueCoder<KV<K, AccumT>> wkvaCoder =
         WindowedValue.FullWindowedValueCoder.of(
             KvCoder.of(keyCoder, aCoder), windowingStrategy.getWindowFn().windowCoder());
@@ -167,56 +146,21 @@ public class GroupCombineFunctions {
     JavaPairRDD<K, WindowedValue<KV<K, InputT>>> inRddDuplicatedKeyPair =
         rdd.mapToPair(TranslationUtils.toPairByKeyInWindowedValue());
 
-    // Use coders to convert objects in the PCollection to byte arrays, so they
-    // can be transferred over the network for the shuffle.
-    // for readability, we add comments with actual type next to byte[].
-    // to shorten line length, we use:
-    // ---- WV: WindowedValue
-    // ---- Iterable: Itr
-    // ---- AccumT: A
-    // ---- InputT: I
-    JavaPairRDD<ByteArray, byte[]> inRddDuplicatedKeyPairBytes =
-        inRddDuplicatedKeyPair.mapToPair(CoderHelpers.toByteFunction(keyCoder, wkviCoder));
-
-    /*Itr<WV<KV<K, A>>>*/
-    /*Itr<WV<KV<K, A>>>*/
-    /*Itr<WV<KV<K, A>>>*/
-    /*Itr<WV<KV<K, A>>>*/
-    /*Itr<WV<KV<K, A>>>*/
-    /*Itr<WV<KV<K, A>>>*/
-    /*Itr<WV<KV<K, A>>>*/
-    /*WV<KV<K, I>>*/
-    /*Itr<WV<KV<K, A>>>*/
-    /*Itr<WV<KV<K, A>>>*/
-    /*Itr<WV<KV<K, A>>>*/
-    /*WV<KV<K, I>>*/
-    /*WV<KV<K, I>>*/
-    /*Itr<WV<KV<K, A>>>*/
-    /*Itr<WV<KV<K, A>>>*/
-    /*WV<KV<K, I>>*/
-    JavaPairRDD</*K*/ ByteArray, /*Itr<WV<KV<K, A>>>*/ byte[]> accumulatedBytes =
-        inRddDuplicatedKeyPairBytes.combineByKey(
-            input -> {
-              WindowedValue<KV<K, InputT>> wkvi = CoderHelpers.fromByteArray(input, wkviCoder);
-              return CoderHelpers.toByteArray(sparkCombineFn.createCombiner(wkvi), iterAccumCoder);
-            },
-            (acc, input) -> {
-              Iterable<WindowedValue<KV<K, AccumT>>> wkvas =
-                  CoderHelpers.fromByteArray(acc, iterAccumCoder);
-              WindowedValue<KV<K, InputT>> wkvi = CoderHelpers.fromByteArray(input, wkviCoder);
-              return CoderHelpers.toByteArray(
-                  sparkCombineFn.mergeValue(wkvi, wkvas), iterAccumCoder);
-            },
-            (acc1, acc2) -> {
-              Iterable<WindowedValue<KV<K, AccumT>>> wkvas1 =
-                  CoderHelpers.fromByteArray(acc1, iterAccumCoder);
-              Iterable<WindowedValue<KV<K, AccumT>>> wkvas2 =
-                  CoderHelpers.fromByteArray(acc2, iterAccumCoder);
-              return CoderHelpers.toByteArray(
-                  sparkCombineFn.mergeCombiners(wkvas1, wkvas2), iterAccumCoder);
-            });
+    JavaPairRDD<K, SerializableAccumulator<KV<K, AccumT>>> accumulatedResult =
+        inRddDuplicatedKeyPair.combineByKey(
+            input ->
+                SerializableAccumulator.of(sparkCombineFn.createCombiner(input), iterAccumCoder),
+            (acc, input) ->
+                SerializableAccumulator.of(
+                    sparkCombineFn.mergeValue(input, acc.getOrDecode(iterAccumCoder)),
+                    iterAccumCoder),
+            (acc1, acc2) ->
+                SerializableAccumulator.of(
+                    sparkCombineFn.mergeCombiners(
+                        acc1.getOrDecode(iterAccumCoder), acc2.getOrDecode(iterAccumCoder)),
+                    iterAccumCoder));
 
-    return accumulatedBytes.mapToPair(CoderHelpers.fromByteFunction(keyCoder, iterAccumCoder));
+    return accumulatedResult.mapToPair(i -> new Tuple2<>(i._1, i._2.getOrDecode(iterAccumCoder)));
   }
 
   /** An implementation of {@link Reshuffle} for the Spark runner. */
@@ -234,4 +178,119 @@ public class GroupCombineFunctions {
         .map(TranslationUtils.fromPairFunction())
         .map(TranslationUtils.toKVByWindowInValue());
   }
+
+  /**
+   * Wrapper around accumulated (combined) value with custom lazy serialization. Serialization is
+   * done through given coder and it is performed within on-serialization callbacks {@link
+   * #writeObject(ObjectOutputStream)} and {@link KryoAccumulatorSerializer#write(Kryo, Output,
+   * SerializableAccumulator)}. Both Spark's serialization mechanisms (Java Serialization, Kryo) are
+   * supported. Materialization of accumulated value is done when value is requested to avoid
+   * serialization of the coder itself.
+   *
+   * @param <AccumT>
+   */
+  public static class SerializableAccumulator<AccumT> implements Serializable {
+    private transient Iterable<WindowedValue<AccumT>> accumulated;
+    private transient Coder<Iterable<WindowedValue<AccumT>>> coder;
+
+    private byte[] serializedAcc;
+
+    private SerializableAccumulator() {}
+
+    private SerializableAccumulator(
+        Iterable<WindowedValue<AccumT>> accumulated,
+        Coder<Iterable<WindowedValue<AccumT>>> coder,
+        byte[] serializedAcc) {
+      this.accumulated = accumulated;
+      this.coder = coder;
+      this.serializedAcc = serializedAcc;
+    }
+
+    static <AccumT> SerializableAccumulator<AccumT> of(
+        Iterable<WindowedValue<AccumT>> accumulated, Coder<Iterable<WindowedValue<AccumT>>> coder) {
+      return new SerializableAccumulator<>(accumulated, coder, null);
+    }
+
+    static <AccumT> SerializableAccumulator<AccumT> ofBytes(byte[] serializedAcc) {
+      Preconditions.checkNotNull(serializedAcc);
+      return new SerializableAccumulator<>(null, null, serializedAcc);
+    }
+
+    static <AccumT> SerializableAccumulator<AccumT> empty(
+        Coder<Iterable<WindowedValue<AccumT>>> coder) {
+      return new SerializableAccumulator<>(Lists.newArrayList(), coder, null);
+    }
+
+    /**
+     * Returns wrapped accumulated value when available as java object or deserialize them using
+     * given {@code coder}.
+     *
+     * @param coder
+     * @return
+     */
+    Iterable<WindowedValue<AccumT>> getOrDecode(Coder<Iterable<WindowedValue<AccumT>>> coder) {
+      if (accumulated == null) {
+        accumulated = CoderHelpers.fromByteArray(serializedAcc, coder);
+        serializedAcc = null;
+      }
+
+      if (this.coder == null) {
+        this.coder = coder;
+      }
+
+      return accumulated;
+    }
+
+    byte[] toBytes() {
+      byte[] coded;
+      if (coder != null) {
+        coded = CoderHelpers.toByteArray(this.accumulated, coder);
+      } else if (serializedAcc != null) {
+        coded = serializedAcc;
+      } else {
+        throw new IllegalStateException(
+            String.format(
+                "Given '%s' cannot be serialized since it do not contain coder or already serialized data.",
+                SerializableAccumulator.class.getSimpleName()));
+      }
+      return coded;
+    }
+
+    private void writeObject(ObjectOutputStream out) throws IOException {
+      byte[] coded = toBytes();
+      out.writeInt(coded.length);
+      out.write(coded);
+    }
+
+    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+      int length = in.readInt();
+      byte[] coded = new byte[length];
+      in.readFully(coded);
+      this.serializedAcc = coded;
+    }
+  }
+
+  /**
+   * Kryo serializer for {@link SerializableAccumulator}.
+   *
+   * @param <AccumT>
+   */
+  public static class KryoAccumulatorSerializer<AccumT>
+      extends Serializer<SerializableAccumulator<AccumT>> {
+
+    @Override
+    public void write(Kryo kryo, Output output, SerializableAccumulator<AccumT> accumulator) {
+      byte[] coded = accumulator.toBytes();
+      output.writeInt(coded.length, true);
+      output.write(coded);
+    }
+
+    @Override
+    public SerializableAccumulator<AccumT> read(
+        Kryo kryo, Input input, Class<SerializableAccumulator<AccumT>> type) {
+      int length = input.readInt(true);
+      byte[] coded = input.readBytes(length);
+      return SerializableAccumulator.ofBytes(coded);
+    }
+  }
 }
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 90d829c..f1f32a8 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -241,8 +241,7 @@ public final class TransformTranslator {
         JavaRDD<WindowedValue<OutputT>> outRdd;
 
         Optional<Iterable<WindowedValue<AccumT>>> maybeAccumulated =
-            GroupCombineFunctions.combineGlobally(
-                inRdd, sparkCombineFn, iCoder, aCoder, windowingStrategy);
+            GroupCombineFunctions.combineGlobally(inRdd, sparkCombineFn, aCoder, windowingStrategy);
 
         if (maybeAccumulated.isPresent()) {
           Iterable<WindowedValue<OutputT>> output =
@@ -313,12 +312,7 @@ public final class TransformTranslator {
 
         JavaPairRDD<K, Iterable<WindowedValue<KV<K, AccumT>>>> accumulatePerKey =
             GroupCombineFunctions.combinePerKey(
-                inRdd,
-                sparkCombineFn,
-                inputCoder.getKeyCoder(),
-                inputCoder.getValueCoder(),
-                vaCoder,
-                windowingStrategy);
+                inRdd, sparkCombineFn, inputCoder.getKeyCoder(), vaCoder, windowingStrategy);
 
         JavaRDD<WindowedValue<KV<K, OutputT>>> outRdd =
             accumulatePerKey
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/GroupCombineFunctionsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/GroupCombineFunctionsTest.java
new file mode 100644
index 0000000..d1dee0e
--- /dev/null
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/GroupCombineFunctionsTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import static org.junit.Assert.assertEquals;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Arrays;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.translation.GroupCombineFunctions.KryoAccumulatorSerializer;
+import org.apache.beam.runners.spark.translation.GroupCombineFunctions.SerializableAccumulator;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.joda.time.Instant;
+import org.junit.Test;
+
+/** Unit tests of {@link GroupCombineFunctions}. */
+public class GroupCombineFunctionsTest {
+
+  @Test
+  public void serializableAccumulatorTest() {
+    Iterable<WindowedValue<Integer>> accumulatedValue =
+        Arrays.asList(winVal(0), winVal(1), winVal(3), winVal(4));
+
+    final WindowedValue.FullWindowedValueCoder<Integer> wvaCoder =
+        WindowedValue.FullWindowedValueCoder.of(
+            BigEndianIntegerCoder.of(), GlobalWindow.Coder.INSTANCE);
+
+    final IterableCoder<WindowedValue<Integer>> iterAccumCoder = IterableCoder.of(wvaCoder);
+
+    SerializableAccumulator<Integer> accUnderTest =
+        SerializableAccumulator.of(accumulatedValue, iterAccumCoder);
+    assertEquals(accumulatedValue, accUnderTest.getOrDecode(iterAccumCoder));
+
+    byte[] bytes = accUnderTest.toBytes();
+    assertEquals(accumulatedValue, CoderHelpers.fromByteArray(bytes, iterAccumCoder));
+
+    SerializableAccumulator<Integer> accFromBytes = SerializableAccumulator.ofBytes(bytes);
+    assertEquals(accumulatedValue, accFromBytes.getOrDecode(iterAccumCoder));
+  }
+
+  @Test
+  public void serializableAccumulatorSerializationTest()
+      throws IOException, ClassNotFoundException {
+    @SuppressWarnings("unchecked")
+    Iterable<WindowedValue<Integer>> accumulatedValue =
+        Arrays.asList(winVal(0), winVal(1), winVal(3), winVal(4));
+
+    final WindowedValue.FullWindowedValueCoder<Integer> wvaCoder =
+        WindowedValue.FullWindowedValueCoder.of(
+            BigEndianIntegerCoder.of(), GlobalWindow.Coder.INSTANCE);
+
+    final IterableCoder<WindowedValue<Integer>> iterAccumCoder = IterableCoder.of(wvaCoder);
+
+    SerializableAccumulator<Integer> accUnderTest =
+        SerializableAccumulator.of(accumulatedValue, iterAccumCoder);
+
+    ByteArrayOutputStream inMemOut = new ByteArrayOutputStream();
+    ObjectOutputStream oos = new ObjectOutputStream(inMemOut);
+    oos.writeObject(accUnderTest);
+
+    ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(inMemOut.toByteArray()));
+
+    @SuppressWarnings("unchecked")
+    SerializableAccumulator<Integer> materialized =
+        (SerializableAccumulator<Integer>) ois.readObject();
+    assertEquals(accumulatedValue, materialized.getOrDecode(iterAccumCoder));
+  }
+
+  @Test
+  public void serializableAccumulatorKryoTest() {
+    Iterable<WindowedValue<Integer>> accumulatedValue =
+        Arrays.asList(winVal(0), winVal(1), winVal(3), winVal(4));
+
+    final WindowedValue.FullWindowedValueCoder<Integer> wvaCoder =
+        WindowedValue.FullWindowedValueCoder.of(
+            BigEndianIntegerCoder.of(), GlobalWindow.Coder.INSTANCE);
+
+    final IterableCoder<WindowedValue<Integer>> iterAccumCoder = IterableCoder.of(wvaCoder);
+
+    SerializableAccumulator<Integer> accUnderTest =
+        SerializableAccumulator.of(accumulatedValue, iterAccumCoder);
+
+    KryoAccumulatorSerializer kryoSerializer = new KryoAccumulatorSerializer();
+    Kryo kryo = new Kryo();
+    kryo.register(SerializableAccumulator.class, kryoSerializer);
+
+    ByteArrayOutputStream inMemOut = new ByteArrayOutputStream();
+    Output out = new Output(inMemOut);
+    kryo.writeObject(out, accUnderTest);
+    out.close();
+
+    Input input = new Input(new ByteArrayInputStream(inMemOut.toByteArray()));
+
+    @SuppressWarnings("unchecked")
+    SerializableAccumulator<Integer> materialized =
+        (SerializableAccumulator<Integer>) kryo.readObject(input, SerializableAccumulator.class);
+    input.close();
+
+    assertEquals(accumulatedValue, materialized.getOrDecode(iterAccumCoder));
+  }
+
+  private <T> WindowedValue<T> winVal(T val) {
+    return WindowedValue.of(val, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
+  }
+}