You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/11 10:25:18 UTC

[beam] 04/08: Add serialization test

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 47c20c27b261afcd7da9b807b040c78bd7db2495
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Jan 11 10:39:10 2019 +0100

    Add serialization test
---
 .../spark/structuredstreaming/SourceTest.java      | 51 +++++++++++++++++++++-
 1 file changed, 50 insertions(+), 1 deletion(-)

diff --git a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java
index 8263718..c348ed5 100644
--- a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java
+++ b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java
@@ -1,17 +1,37 @@
 package org.apache.beam.runners.spark.structuredstreaming;
 
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.core.construction.PipelineOptionsSerializationUtils;
+import org.apache.beam.runners.core.serialization.Base64Serializer;
+import org.apache.beam.runners.spark.structuredstreaming.translation.batch.DatasetSourceBatch;
+import org.apache.beam.runners.spark.structuredstreaming.utils.SerializationDebugger;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
 
 /**
  * Test class for beam to spark source translation.
  */
-public class SourceTest {
+@RunWith(JUnit4.class)
+public class SourceTest implements Serializable {
   private static Pipeline pipeline;
+  @Rule
+  public TemporaryFolder temporaryFolder;
 
   @BeforeClass
   public static void beforeClass(){
@@ -21,6 +41,35 @@ public class SourceTest {
   }
 
   @Test
+  public void testSerialization() throws IOException{
+    Map<String, String> datasetSourceOptions = new HashMap<>();
+    BoundedSource<Integer> source = new BoundedSource<Integer>() {
+
+      @Override public List<? extends BoundedSource<Integer>> split(long desiredBundleSizeBytes,
+          PipelineOptions options) throws Exception {
+        return new ArrayList<>();
+      }
+
+      @Override public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+        return 0;
+      }
+
+      @Override public BoundedReader<Integer> createReader(PipelineOptions options)
+          throws IOException {
+        return null;
+      }
+    };
+    String serializedSource = Base64Serializer.serializeUnchecked(source);
+    datasetSourceOptions.put("source", serializedSource);
+    datasetSourceOptions.put("defaultParallelism", "4");
+    datasetSourceOptions.put("pipelineOptions",
+        PipelineOptionsSerializationUtils.serializeToJson(pipeline.getOptions()));
+    DataSourceReader objectToTest = new DatasetSourceBatch()
+        .createReader(new DataSourceOptions(datasetSourceOptions));
+    SerializationDebugger.testSerialization(objectToTest, temporaryFolder.getRoot());
+  }
+
+  @Test
   public void testBoundedSource(){
     pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
     pipeline.run();