You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/05/28 07:33:16 UTC
[beam] 01/06: Put back batch/simpleSourceTest.testBoundedSource
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 6ae20566886df887761bb01bb49392e9d99b79f5
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue Apr 30 15:10:09 2019 +0200
Put back batch/simpleSourceTest.testBoundedSource
---
.../translation/batch/SimpleSourceTest.java | 11 ++++++++++-
1 file changed, 10 insertions(+), 1 deletion(-)
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SimpleSourceTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SimpleSourceTest.java
index 8bd5b24..51be8e3 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SimpleSourceTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SimpleSourceTest.java
@@ -31,6 +31,9 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.junit.BeforeClass;
@@ -85,4 +88,10 @@ public class SimpleSourceTest implements Serializable {
new DatasetSourceBatch().createReader(new DataSourceOptions(dataSourceOptions));
SerializationDebugger.testSerialization(objectToTest, TEMPORARY_FOLDER.newFile());
}
-}
+
+ @Test
+ public void testBoundedSource() {
+ PCollection<Integer> input = p.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
+ PAssert.that(input).containsInAnyOrder(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+ p.run();
+ }}