You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2016/12/13 02:17:54 UTC
[2/3] incubator-beam git commit: Add Tests for Kryo Serialization of
URFBS
Add Tests for Kryo Serialization of URFBS
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/47cc2dca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/47cc2dca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/47cc2dca
Branch: refs/heads/master
Commit: 47cc2dca05daa4075093c414e13bf0cacaa77744
Parents: 3c2e550
Author: Thomas Groh <tg...@google.com>
Authored: Mon Dec 12 16:33:53 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Dec 12 18:17:08 2016 -0800
----------------------------------------------------------------------
runners/core-java/pom.xml | 7 ++
.../UnboundedReadFromBoundedSourceTest.java | 97 ++++++++++++++++++--
2 files changed, 94 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47cc2dca/runners/core-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml
index 0e2b4b0..bab9d57 100644
--- a/runners/core-java/pom.xml
+++ b/runners/core-java/pom.xml
@@ -152,6 +152,13 @@
<!-- test dependencies -->
+ <dependency>
+ <groupId>com.esotericsoftware.kryo</groupId>
+ <artifactId>kryo</artifactId>
+ <version>2.21</version>
+ <scope>test</scope>
+ </dependency>
+
<!-- Utilities such as WindowMatchers -->
<dependency>
<groupId>org.apache.beam</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47cc2dca/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
index 7fd8807..8a1b70b 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
@@ -17,19 +17,28 @@
*/
package org.apache.beam.runners.core;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasSize;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
+import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Random;
@@ -44,11 +53,13 @@ import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.io.FileBasedSource;
import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Distinct;
@@ -65,6 +76,7 @@ import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.objenesis.strategy.StdInstantiatorStrategy;
/**
* Unit tests for {@link UnboundedReadFromBoundedSource}.
@@ -101,28 +113,93 @@ public class UnboundedReadFromBoundedSourceTest {
PCollection<Long> output =
p.apply(Read.from(unboundedSource).withMaxNumRecords(numElements));
-
// Count == numElements
PAssert
- .thatSingleton(output.apply("Count", Count.<Long>globally()))
- .isEqualTo(numElements);
+ .thatSingleton(output.apply("Count", Count.<Long>globally()))
+ .isEqualTo(numElements);
// Unique count == numElements
PAssert
- .thatSingleton(output.apply(Distinct.<Long>create())
- .apply("UniqueCount", Count.<Long>globally()))
- .isEqualTo(numElements);
+ .thatSingleton(output.apply(Distinct.<Long>create())
+ .apply("UniqueCount", Count.<Long>globally()))
+ .isEqualTo(numElements);
// Min == 0
PAssert
- .thatSingleton(output.apply("Min", Min.<Long>globally()))
- .isEqualTo(0L);
+ .thatSingleton(output.apply("Min", Min.<Long>globally()))
+ .isEqualTo(0L);
// Max == numElements-1
PAssert
- .thatSingleton(output.apply("Max", Max.<Long>globally()))
- .isEqualTo(numElements - 1);
+ .thatSingleton(output.apply("Max", Max.<Long>globally()))
+ .isEqualTo(numElements - 1);
+
p.run();
}
@Test
+ public void testAdapterKryoSerializationNoMemoization() throws IOException {
+ long numElements = 100;
+ BoundedSource<Long> boundedSource = CountingSource.upTo(numElements);
+ UnboundedSource<Long, Checkpoint<Long>> unboundedSource =
+ new BoundedToUnboundedSourceAdapter<>(boundedSource);
+
+ //Kryo instantiation
+ Kryo kryo = new Kryo();
+ kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
+
+ //Serialization of object without any memoization
+ ByteArrayOutputStream adapterWithoutMemoizationBos = new ByteArrayOutputStream();
+ try (Output output = new Output(adapterWithoutMemoizationBos)) {
+ kryo.writeObject(output, unboundedSource);
+ }
+
+ // Copy empty and memoized variants of the Adapater
+ ByteArrayInputStream bisWithoutMemoization =
+ new ByteArrayInputStream(adapterWithoutMemoizationBos.toByteArray());
+ BoundedToUnboundedSourceAdapter<Long> copiedWithoutMemoization =
+ kryo.readObject(new Input(bisWithoutMemoization), BoundedToUnboundedSourceAdapter.class);
+
+ Source.Reader<Long> reader =
+ copiedWithoutMemoization.createReader(TestPipeline.testingPipelineOptions(), null);
+ List<Long> readLongs = SourceTestUtils.readFromUnstartedReader(reader);
+ assertThat(readLongs, hasSize((int) numElements));
+ List<Long> expectedLongs = new ArrayList<>();
+ for (int i = 0; i < numElements; i++) {
+ expectedLongs.add((long) i);
+ }
+ assertThat(readLongs, containsInAnyOrder(expectedLongs.toArray()));
+ }
+
+ @Test
+ public void testAdapterKryoSerializationWithMemoization() throws IOException {
+ long numElements = 100;
+ BoundedSource<Long> boundedSource = CountingSource.upTo(numElements);
+ UnboundedSource<Long, Checkpoint<Long>> unboundedSource =
+ new BoundedToUnboundedSourceAdapter<>(boundedSource);
+
+ //Kryo instantiation
+ Kryo kryo = new Kryo();
+ kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
+ // Serialization of object with memoized fields
+ ByteArrayOutputStream sourceWithMemoizationsOutStream = new ByteArrayOutputStream();
+ try (Output output = new Output(sourceWithMemoizationsOutStream)) {
+ kryo.writeObject(output, unboundedSource);
+ }
+
+ ByteArrayInputStream bisWithMemoization =
+ new ByteArrayInputStream(sourceWithMemoizationsOutStream.toByteArray());
+ BoundedToUnboundedSourceAdapter<Long> copiedWithMemoization =
+ kryo.readObject(new Input(bisWithMemoization), BoundedToUnboundedSourceAdapter.class);
+ Source.Reader<Long> reader =
+ copiedWithMemoization.createReader(TestPipeline.testingPipelineOptions(), null);
+ List<Long> readLongs = SourceTestUtils.readFromUnstartedReader(reader);
+ assertThat(readLongs, hasSize((int) numElements));
+ List<Long> expectedLongs = new ArrayList<>();
+ for (int i = 0; i < numElements; i++) {
+ expectedLongs.add((long) i);
+ }
+ assertThat(readLongs, containsInAnyOrder(expectedLongs.toArray()));
+ }
+
+ @Test
public void testCountingSourceToUnboundedCheckpoint() throws Exception {
long numElements = 100;
BoundedSource<Long> countingSource = CountingSource.upTo(numElements);