You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/14 13:24:13 UTC
[beam] 04/11: Add serialization test
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 47c20c27b261afcd7da9b807b040c78bd7db2495
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Jan 11 10:39:10 2019 +0100
Add serialization test
---
.../spark/structuredstreaming/SourceTest.java | 51 +++++++++++++++++++++-
1 file changed, 50 insertions(+), 1 deletion(-)
diff --git a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java
index 8263718..c348ed5 100644
--- a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java
+++ b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java
@@ -1,17 +1,37 @@
package org.apache.beam.runners.spark.structuredstreaming;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.core.construction.PipelineOptionsSerializationUtils;
+import org.apache.beam.runners.core.serialization.Base64Serializer;
+import org.apache.beam.runners.spark.structuredstreaming.translation.batch.DatasetSourceBatch;
+import org.apache.beam.runners.spark.structuredstreaming.utils.SerializationDebugger;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
/**
* Test class for beam to spark source translation.
*/
-public class SourceTest {
+@RunWith(JUnit4.class)
+public class SourceTest implements Serializable {
private static Pipeline pipeline;
+ @Rule
+ public TemporaryFolder temporaryFolder;
@BeforeClass
public static void beforeClass(){
@@ -21,6 +41,35 @@ public class SourceTest {
}
@Test
+ public void testSerialization() throws IOException{
+ Map<String, String> datasetSourceOptions = new HashMap<>();
+ BoundedSource<Integer> source = new BoundedSource<Integer>() {
+
+ @Override public List<? extends BoundedSource<Integer>> split(long desiredBundleSizeBytes,
+ PipelineOptions options) throws Exception {
+ return new ArrayList<>();
+ }
+
+ @Override public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+ return 0;
+ }
+
+ @Override public BoundedReader<Integer> createReader(PipelineOptions options)
+ throws IOException {
+ return null;
+ }
+ };
+ String serializedSource = Base64Serializer.serializeUnchecked(source);
+ datasetSourceOptions.put("source", serializedSource);
+ datasetSourceOptions.put("defaultParallelism", "4");
+ datasetSourceOptions.put("pipelineOptions",
+ PipelineOptionsSerializationUtils.serializeToJson(pipeline.getOptions()));
+ DataSourceReader objectToTest = new DatasetSourceBatch()
+ .createReader(new DataSourceOptions(datasetSourceOptions));
+ SerializationDebugger.testSerialization(objectToTest, temporaryFolder.getRoot());
+ }
+
+ @Test
public void testBoundedSource(){
pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
pipeline.run();