You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/22 15:02:44 UTC
[beam] branch spark-runner_structured-streaming updated: Add
ComplexSourceTest
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push:
new b6d1077 Add ComplexSourceTest
b6d1077 is described below
commit b6d1077fa5028a2f172a2cdb734b11bd91f935cd
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue Jan 22 15:58:48 2019 +0100
Add ComplexSourceTest
---
.../translation/batch/ComplexSourceTest.java | 54 ++++++++++++++++++++++
.../{SourceTest.java => SimpleSourceTest.java} | 2 +-
2 files changed, 55 insertions(+), 1 deletion(-)
diff --git a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ComplexSourceTest.java b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ComplexSourceTest.java
new file mode 100644
index 0000000..6e1ac39
--- /dev/null
+++ b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ComplexSourceTest.java
@@ -0,0 +1,54 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
+import org.apache.beam.runners.spark.structuredstreaming.SparkRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Test class for beam to spark source translation.
+ */
+@RunWith(JUnit4.class) public class ComplexSourceTest implements Serializable {
+
+ @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+ private static File file;
+ private static Pipeline pipeline;
+
+ @BeforeClass public static void beforeClass() throws IOException {
+ PipelineOptions options = PipelineOptionsFactory.create().as(SparkPipelineOptions.class);
+ options.setRunner(SparkRunner.class);
+ pipeline = Pipeline.create(options);
+ file = TEMPORARY_FOLDER.newFile();
+ OutputStream outputStream = new FileOutputStream(file);
+ List<String> lines = new ArrayList<>();
+ for (int i = 0; i < 30; ++i) {
+ lines.add("word" + i);
+ }
+ try (PrintStream writer = new PrintStream(outputStream)) {
+ for (String line : lines) {
+ writer.println(line);
+ }
+ }
+ }
+
+ @Test public void testBoundedSource() {
+ pipeline.apply(TextIO.read().from(file.getPath()));
+ pipeline.run();
+ }
+}
diff --git a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SourceTest.java b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SimpleSourceTest.java
similarity index 98%
rename from runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SourceTest.java
rename to runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SimpleSourceTest.java
index c3ec2ec..0c15876 100644
--- a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SourceTest.java
+++ b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SimpleSourceTest.java
@@ -29,7 +29,7 @@ import org.junit.runners.JUnit4;
* Test class for beam to spark source translation.
*/
@RunWith(JUnit4.class)
-public class SourceTest implements Serializable {
+public class SimpleSourceTest implements Serializable {
private static Pipeline pipeline;
@ClassRule
public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();