You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/22 15:02:44 UTC

[beam] branch spark-runner_structured-streaming updated: Add ComplexSourceTest

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push:
     new b6d1077  Add ComplexSourceTest
b6d1077 is described below

commit b6d1077fa5028a2f172a2cdb734b11bd91f935cd
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue Jan 22 15:58:48 2019 +0100

    Add ComplexSourceTest
---
 .../translation/batch/ComplexSourceTest.java       | 54 ++++++++++++++++++++++
 .../{SourceTest.java => SimpleSourceTest.java}     |  2 +-
 2 files changed, 55 insertions(+), 1 deletion(-)

diff --git a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ComplexSourceTest.java b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ComplexSourceTest.java
new file mode 100644
index 0000000..6e1ac39
--- /dev/null
+++ b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ComplexSourceTest.java
@@ -0,0 +1,54 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
+import org.apache.beam.runners.spark.structuredstreaming.SparkRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Test class for beam to spark source translation.
+ */
+@RunWith(JUnit4.class) public class ComplexSourceTest implements Serializable {
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+  private static File file;
+  private static Pipeline pipeline;
+
+  @BeforeClass public static void beforeClass() throws IOException {
+    PipelineOptions options = PipelineOptionsFactory.create().as(SparkPipelineOptions.class);
+    options.setRunner(SparkRunner.class);
+    pipeline = Pipeline.create(options);
+    file = TEMPORARY_FOLDER.newFile();
+    OutputStream outputStream = new FileOutputStream(file);
+    List<String> lines = new ArrayList<>();
+    for (int i = 0; i < 30; ++i) {
+      lines.add("word" + i);
+    }
+    try (PrintStream writer = new PrintStream(outputStream)) {
+      for (String line : lines) {
+        writer.println(line);
+      }
+    }
+  }
+
+  @Test public void testBoundedSource() {
+    pipeline.apply(TextIO.read().from(file.getPath()));
+    pipeline.run();
+  }
+}
diff --git a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SourceTest.java b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SimpleSourceTest.java
similarity index 98%
rename from runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SourceTest.java
rename to runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SimpleSourceTest.java
index c3ec2ec..0c15876 100644
--- a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SourceTest.java
+++ b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SimpleSourceTest.java
@@ -29,7 +29,7 @@ import org.junit.runners.JUnit4;
  * Test class for beam to spark source translation.
  */
 @RunWith(JUnit4.class)
-public class SourceTest implements Serializable {
+public class SimpleSourceTest implements Serializable {
   private static Pipeline pipeline;
   @ClassRule
   public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();