You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/14 13:24:15 UTC
[beam] 06/11: Fix SourceTest
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 3be7f2db80a88d723302848d35b183c5a3032062
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Jan 11 11:01:51 2019 +0100
Fix SourceTest
---
.../translation/batch/SourceTest.java | 19 +++++++++----------
1 file changed, 9 insertions(+), 10 deletions(-)
diff --git a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SourceTest.java b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SourceTest.java
index 6ef41b8..c3ec2ec 100644
--- a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SourceTest.java
+++ b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SourceTest.java
@@ -10,7 +10,6 @@ import org.apache.beam.runners.core.construction.PipelineOptionsSerializationUti
import org.apache.beam.runners.core.serialization.Base64Serializer;
import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
import org.apache.beam.runners.spark.structuredstreaming.SparkRunner;
-import org.apache.beam.runners.spark.structuredstreaming.translation.batch.DatasetSourceBatch;
import org.apache.beam.runners.spark.structuredstreaming.utils.SerializationDebugger;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.BoundedSource;
@@ -20,7 +19,7 @@ import org.apache.beam.sdk.transforms.Create;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.junit.BeforeClass;
-import org.junit.Rule;
+import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
@@ -32,8 +31,8 @@ import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class SourceTest implements Serializable {
private static Pipeline pipeline;
- @Rule
- public TemporaryFolder temporaryFolder;
+ @ClassRule
+ public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
@BeforeClass
public static void beforeClass(){
@@ -44,7 +43,6 @@ public class SourceTest implements Serializable {
@Test
public void testSerialization() throws IOException{
- Map<String, String> datasetSourceOptions = new HashMap<>();
BoundedSource<Integer> source = new BoundedSource<Integer>() {
@Override public List<? extends BoundedSource<Integer>> split(long desiredBundleSizeBytes,
@@ -62,13 +60,14 @@ public class SourceTest implements Serializable {
}
};
String serializedSource = Base64Serializer.serializeUnchecked(source);
- datasetSourceOptions.put("source", serializedSource);
- datasetSourceOptions.put("defaultParallelism", "4");
- datasetSourceOptions.put("pipelineOptions",
+ Map<String, String> dataSourceOptions = new HashMap<>();
+ dataSourceOptions.put(DatasetSourceBatch.BEAM_SOURCE_OPTION, serializedSource);
+ dataSourceOptions.put(DatasetSourceBatch.DEFAULT_PARALLELISM, "4");
+ dataSourceOptions.put(DatasetSourceBatch.PIPELINE_OPTIONS,
PipelineOptionsSerializationUtils.serializeToJson(pipeline.getOptions()));
DataSourceReader objectToTest = new DatasetSourceBatch()
- .createReader(new DataSourceOptions(datasetSourceOptions));
- SerializationDebugger.testSerialization(objectToTest, temporaryFolder.getRoot());
+ .createReader(new DataSourceOptions(dataSourceOptions));
+ SerializationDebugger.testSerialization(objectToTest, TEMPORARY_FOLDER.newFile());
}
@Test