You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/14 13:24:20 UTC

[beam] 11/11: Add flatten test

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit af80e19c16fd9a32dda0b1006da16cc3dd806c2e
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Mon Jan 14 09:27:41 2019 +0100

    Add flatten test
---
 .../translation/batch/FlattenTest.java             | 42 ++++++++++++++++++++++
 1 file changed, 42 insertions(+)

diff --git a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTest.java b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTest.java
new file mode 100644
index 0000000..ec22d14
--- /dev/null
+++ b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTest.java
@@ -0,0 +1,42 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+
+import java.io.Serializable;
+import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
+import org.apache.beam.runners.spark.structuredstreaming.SparkRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Test class for beam to spark source translation.
+ */
+@RunWith(JUnit4.class)
+public class FlattenTest implements Serializable {
+  private static Pipeline pipeline;
+
+  @BeforeClass
+  public static void beforeClass(){
+    PipelineOptions options = PipelineOptionsFactory.create().as(SparkPipelineOptions.class);
+    options.setRunner(SparkRunner.class);
+    pipeline = Pipeline.create(options);
+  }
+
+
+  @Test
+  public void testFlatted(){
+    PCollection<Integer> input1 = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
+    PCollection<Integer> input2 = pipeline.apply(Create.of(11, 12, 13, 14, 15, 16, 17, 18, 19, 20));
+    PCollectionList<Integer> pcs = PCollectionList.of(input1).and(input2);
+    PCollection<Integer> merged = pcs.apply(Flatten.<Integer>pCollections());
+    pipeline.run();
+  }
+
+}