You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/12/13 02:17:54 UTC

[2/3] incubator-beam git commit: Add Tests for Kryo Serialization of URFBS

Add Tests for Kryo Serialization of URFBS


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/47cc2dca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/47cc2dca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/47cc2dca

Branch: refs/heads/master
Commit: 47cc2dca05daa4075093c414e13bf0cacaa77744
Parents: 3c2e550
Author: Thomas Groh <tg...@google.com>
Authored: Mon Dec 12 16:33:53 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Dec 12 18:17:08 2016 -0800

----------------------------------------------------------------------
 runners/core-java/pom.xml                       |  7 ++
 .../UnboundedReadFromBoundedSourceTest.java     | 97 ++++++++++++++++++--
 2 files changed, 94 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47cc2dca/runners/core-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml
index 0e2b4b0..bab9d57 100644
--- a/runners/core-java/pom.xml
+++ b/runners/core-java/pom.xml
@@ -152,6 +152,13 @@
 
     <!-- test dependencies -->
 
+    <dependency>
+      <groupId>com.esotericsoftware.kryo</groupId>
+      <artifactId>kryo</artifactId>
+      <version>2.21</version>
+      <scope>test</scope>
+    </dependency>
+
     <!-- Utilities such as WindowMatchers -->
     <dependency>
       <groupId>org.apache.beam</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47cc2dca/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
index 7fd8807..8a1b70b 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
@@ -17,19 +17,28 @@
  */
 package org.apache.beam.runners.core;
 
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasSize;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Random;
@@ -44,11 +53,13 @@ import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.FileBasedSource;
 import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Distinct;
@@ -65,6 +76,7 @@ import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.objenesis.strategy.StdInstantiatorStrategy;
 
 /**
  * Unit tests for {@link UnboundedReadFromBoundedSource}.
@@ -101,28 +113,93 @@ public class UnboundedReadFromBoundedSourceTest {
 
     PCollection<Long> output =
         p.apply(Read.from(unboundedSource).withMaxNumRecords(numElements));
-
     // Count == numElements
     PAssert
-      .thatSingleton(output.apply("Count", Count.<Long>globally()))
-      .isEqualTo(numElements);
+        .thatSingleton(output.apply("Count", Count.<Long>globally()))
+        .isEqualTo(numElements);
     // Unique count == numElements
     PAssert
-      .thatSingleton(output.apply(Distinct.<Long>create())
-                          .apply("UniqueCount", Count.<Long>globally()))
-      .isEqualTo(numElements);
+        .thatSingleton(output.apply(Distinct.<Long>create())
+            .apply("UniqueCount", Count.<Long>globally()))
+        .isEqualTo(numElements);
     // Min == 0
     PAssert
-      .thatSingleton(output.apply("Min", Min.<Long>globally()))
-      .isEqualTo(0L);
+        .thatSingleton(output.apply("Min", Min.<Long>globally()))
+        .isEqualTo(0L);
     // Max == numElements-1
     PAssert
-      .thatSingleton(output.apply("Max", Max.<Long>globally()))
-      .isEqualTo(numElements - 1);
+        .thatSingleton(output.apply("Max", Max.<Long>globally()))
+        .isEqualTo(numElements - 1);
+
     p.run();
   }
 
   @Test
+  public void testAdapterKryoSerializationNoMemoization() throws IOException {
+    long numElements = 100;
+    BoundedSource<Long> boundedSource = CountingSource.upTo(numElements);
+    UnboundedSource<Long, Checkpoint<Long>> unboundedSource =
+        new BoundedToUnboundedSourceAdapter<>(boundedSource);
+
+    //Kryo instantiation
+    Kryo kryo = new Kryo();
+    kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
+
+    //Serialization of object without any memoization
+    ByteArrayOutputStream adapterWithoutMemoizationBos = new ByteArrayOutputStream();
+    try (Output output = new Output(adapterWithoutMemoizationBos)) {
+      kryo.writeObject(output, unboundedSource);
+    }
+
+    // Copy empty and memoized variants of the Adapater
+    ByteArrayInputStream bisWithoutMemoization =
+        new ByteArrayInputStream(adapterWithoutMemoizationBos.toByteArray());
+    BoundedToUnboundedSourceAdapter<Long> copiedWithoutMemoization =
+        kryo.readObject(new Input(bisWithoutMemoization), BoundedToUnboundedSourceAdapter.class);
+
+    Source.Reader<Long> reader =
+        copiedWithoutMemoization.createReader(TestPipeline.testingPipelineOptions(), null);
+    List<Long> readLongs = SourceTestUtils.readFromUnstartedReader(reader);
+    assertThat(readLongs, hasSize((int) numElements));
+    List<Long> expectedLongs = new ArrayList<>();
+    for (int i = 0; i < numElements; i++) {
+      expectedLongs.add((long) i);
+    }
+    assertThat(readLongs, containsInAnyOrder(expectedLongs.toArray()));
+  }
+
+  @Test
+  public void testAdapterKryoSerializationWithMemoization() throws IOException {
+    long numElements = 100;
+    BoundedSource<Long> boundedSource = CountingSource.upTo(numElements);
+    UnboundedSource<Long, Checkpoint<Long>> unboundedSource =
+        new BoundedToUnboundedSourceAdapter<>(boundedSource);
+
+    //Kryo instantiation
+    Kryo kryo = new Kryo();
+    kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
+    // Serialization of object with memoized fields
+    ByteArrayOutputStream sourceWithMemoizationsOutStream = new ByteArrayOutputStream();
+    try (Output output = new Output(sourceWithMemoizationsOutStream)) {
+      kryo.writeObject(output, unboundedSource);
+    }
+
+    ByteArrayInputStream bisWithMemoization =
+        new ByteArrayInputStream(sourceWithMemoizationsOutStream.toByteArray());
+    BoundedToUnboundedSourceAdapter<Long> copiedWithMemoization =
+        kryo.readObject(new Input(bisWithMemoization), BoundedToUnboundedSourceAdapter.class);
+    Source.Reader<Long> reader =
+        copiedWithMemoization.createReader(TestPipeline.testingPipelineOptions(), null);
+    List<Long> readLongs = SourceTestUtils.readFromUnstartedReader(reader);
+    assertThat(readLongs, hasSize((int) numElements));
+    List<Long> expectedLongs = new ArrayList<>();
+    for (int i = 0; i < numElements; i++) {
+      expectedLongs.add((long) i);
+    }
+    assertThat(readLongs, containsInAnyOrder(expectedLongs.toArray()));
+  }
+
+  @Test
   public void testCountingSourceToUnboundedCheckpoint() throws Exception {
     long numElements = 100;
     BoundedSource<Long> countingSource = CountingSource.upTo(numElements);