You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/11 10:25:20 UTC

[beam] 06/08: Fix SourceTest

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 3be7f2db80a88d723302848d35b183c5a3032062
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Jan 11 11:01:51 2019 +0100

    Fix SourceTest
---
 .../translation/batch/SourceTest.java                 | 19 +++++++++----------
 1 file changed, 9 insertions(+), 10 deletions(-)

diff --git a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SourceTest.java b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SourceTest.java
index 6ef41b8..c3ec2ec 100644
--- a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SourceTest.java
+++ b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SourceTest.java
@@ -10,7 +10,6 @@ import org.apache.beam.runners.core.construction.PipelineOptionsSerializationUti
 import org.apache.beam.runners.core.serialization.Base64Serializer;
 import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
 import org.apache.beam.runners.spark.structuredstreaming.SparkRunner;
-import org.apache.beam.runners.spark.structuredstreaming.translation.batch.DatasetSourceBatch;
 import org.apache.beam.runners.spark.structuredstreaming.utils.SerializationDebugger;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.BoundedSource;
@@ -20,7 +19,7 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
 import org.junit.BeforeClass;
-import org.junit.Rule;
+import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
@@ -32,8 +31,8 @@ import org.junit.runners.JUnit4;
 @RunWith(JUnit4.class)
 public class SourceTest implements Serializable {
   private static Pipeline pipeline;
-  @Rule
-  public TemporaryFolder temporaryFolder;
+  @ClassRule
+  public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
 
   @BeforeClass
   public static void beforeClass(){
@@ -44,7 +43,6 @@ public class SourceTest implements Serializable {
 
   @Test
   public void testSerialization() throws IOException{
-    Map<String, String> datasetSourceOptions = new HashMap<>();
     BoundedSource<Integer> source = new BoundedSource<Integer>() {
 
       @Override public List<? extends BoundedSource<Integer>> split(long desiredBundleSizeBytes,
@@ -62,13 +60,14 @@ public class SourceTest implements Serializable {
       }
     };
     String serializedSource = Base64Serializer.serializeUnchecked(source);
-    datasetSourceOptions.put("source", serializedSource);
-    datasetSourceOptions.put("defaultParallelism", "4");
-    datasetSourceOptions.put("pipelineOptions",
+    Map<String, String> dataSourceOptions = new HashMap<>();
+    dataSourceOptions.put(DatasetSourceBatch.BEAM_SOURCE_OPTION, serializedSource);
+    dataSourceOptions.put(DatasetSourceBatch.DEFAULT_PARALLELISM, "4");
+    dataSourceOptions.put(DatasetSourceBatch.PIPELINE_OPTIONS,
         PipelineOptionsSerializationUtils.serializeToJson(pipeline.getOptions()));
     DataSourceReader objectToTest = new DatasetSourceBatch()
-        .createReader(new DataSourceOptions(datasetSourceOptions));
-    SerializationDebugger.testSerialization(objectToTest, temporaryFolder.getRoot());
+        .createReader(new DataSourceOptions(dataSourceOptions));
+    SerializationDebugger.testSerialization(objectToTest, TEMPORARY_FOLDER.newFile());
   }
 
   @Test