You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2022/12/29 10:08:45 UTC
[beam] branch master updated: [Spark Dataset runner] Reuse SparkSession when testing using class rule. (#24812)
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 208a14e05b5 [Spark Dataset runner] Reuse SparkSession when testing using class rule. (#24812)
208a14e05b5 is described below
commit 208a14e05b5cf4021ef978ec45d75a96e20bc3a6
Author: Moritz Mack <mm...@talend.com>
AuthorDate: Thu Dec 29 11:08:36 2022 +0100
[Spark Dataset runner] Reuse SparkSession when testing using class rule. (#24812)
---
.../translation/batch/CombineGloballyTest.java | 18 ++++++----------
.../batch/CombineGroupedValuesTest.java | 18 ++++++----------
.../translation/batch/CombinePerKeyTest.java | 18 ++++++----------
.../translation/batch/ComplexSourceTest.java | 17 +++++----------
.../translation/batch/FlattenTest.java | 24 +++++++++-------------
.../translation/batch/GroupByKeyTest.java | 18 ++++++----------
.../translation/batch/SimpleSourceTest.java | 18 ++++++----------
.../translation/batch/WindowAssignTest.java | 18 ++++++----------
8 files changed, 51 insertions(+), 98 deletions(-)
diff --git a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTest.java b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTest.java
index dca8b664bd3..cca192df9de 100644
--- a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTest.java
+++ b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTest.java
@@ -18,10 +18,7 @@
package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
import java.io.Serializable;
-import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
-import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.runners.spark.structuredstreaming.SparkSessionRule;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
@@ -38,6 +35,7 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;
+import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -48,15 +46,11 @@ import org.junit.runners.JUnit4;
*/
@RunWith(JUnit4.class)
public class CombineGloballyTest implements Serializable {
- @Rule public transient TestPipeline pipeline = TestPipeline.fromOptions(testOptions());
+ @ClassRule public static final SparkSessionRule SESSION = new SparkSessionRule();
- private static PipelineOptions testOptions() {
- SparkStructuredStreamingPipelineOptions options =
- PipelineOptionsFactory.create().as(SparkStructuredStreamingPipelineOptions.class);
- options.setRunner(SparkStructuredStreamingRunner.class);
- options.setTestMode(true);
- return options;
- }
+ @Rule
+ public transient TestPipeline pipeline =
+ TestPipeline.fromOptions(SESSION.createPipelineOptions());
@Test
public void testCombineGlobally() {
diff --git a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGroupedValuesTest.java b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGroupedValuesTest.java
index cce3199d2c3..774186c1821 100644
--- a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGroupedValuesTest.java
+++ b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGroupedValuesTest.java
@@ -18,14 +18,11 @@
package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
import java.io.Serializable;
-import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
-import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner;
+import org.apache.beam.runners.spark.structuredstreaming.SparkSessionRule;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
@@ -34,6 +31,7 @@ import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -42,15 +40,11 @@ import org.junit.runners.JUnit4;
/** Test class for beam to spark {@link Combine#groupedValues} translation. */
@RunWith(JUnit4.class)
public class CombineGroupedValuesTest implements Serializable {
- @Rule public transient TestPipeline pipeline = TestPipeline.fromOptions(testOptions());
+ @ClassRule public static final SparkSessionRule SESSION = new SparkSessionRule();
- private static PipelineOptions testOptions() {
- SparkStructuredStreamingPipelineOptions options =
- PipelineOptionsFactory.create().as(SparkStructuredStreamingPipelineOptions.class);
- options.setRunner(SparkStructuredStreamingRunner.class);
- options.setTestMode(true);
- return options;
- }
+ @Rule
+ public transient TestPipeline pipeline =
+ TestPipeline.fromOptions(SESSION.createPipelineOptions());
@Test
public void testCombineGroupedValues() {
diff --git a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTest.java b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTest.java
index c8b25b3355d..5a2335a154e 100644
--- a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTest.java
+++ b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTest.java
@@ -20,10 +20,7 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
-import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
-import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.runners.spark.structuredstreaming.SparkSessionRule;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.CombineFnBase;
@@ -41,6 +38,7 @@ import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.joda.time.Duration;
import org.joda.time.Instant;
+import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -52,15 +50,11 @@ import org.junit.runners.JUnit4;
*/
@RunWith(JUnit4.class)
public class CombinePerKeyTest implements Serializable {
- @Rule public transient TestPipeline pipeline = TestPipeline.fromOptions(testOptions());
+ @ClassRule public static final SparkSessionRule SESSION = new SparkSessionRule();
- private static PipelineOptions testOptions() {
- SparkStructuredStreamingPipelineOptions options =
- PipelineOptionsFactory.create().as(SparkStructuredStreamingPipelineOptions.class);
- options.setRunner(SparkStructuredStreamingRunner.class);
- options.setTestMode(true);
- return options;
- }
+ @Rule
+ public transient TestPipeline pipeline =
+ TestPipeline.fromOptions(SESSION.createPipelineOptions());
@Test
public void testCombinePerKey() {
diff --git a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ComplexSourceTest.java b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ComplexSourceTest.java
index 582a31a05a6..4ba356f6ce7 100644
--- a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ComplexSourceTest.java
+++ b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ComplexSourceTest.java
@@ -25,11 +25,8 @@ import java.io.PrintStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
-import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
-import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner;
+import org.apache.beam.runners.spark.structuredstreaming.SparkSessionRule;
import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
@@ -48,15 +45,11 @@ public class ComplexSourceTest implements Serializable {
private static File file;
private static List<String> lines = createLines(30);
- @Rule public transient TestPipeline pipeline = TestPipeline.fromOptions(testOptions());
+ @ClassRule public static final SparkSessionRule SESSION = new SparkSessionRule();
- private static PipelineOptions testOptions() {
- SparkStructuredStreamingPipelineOptions options =
- PipelineOptionsFactory.create().as(SparkStructuredStreamingPipelineOptions.class);
- options.setRunner(SparkStructuredStreamingRunner.class);
- options.setTestMode(true);
- return options;
- }
+ @Rule
+ public transient TestPipeline pipeline =
+ TestPipeline.fromOptions(SESSION.createPipelineOptions());
@BeforeClass
public static void beforeClass() throws IOException {
diff --git a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTest.java b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTest.java
index 50b443da9ae..bf3774ba29e 100644
--- a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTest.java
+++ b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTest.java
@@ -18,16 +18,14 @@
package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
import java.io.Serializable;
-import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
-import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.runners.spark.structuredstreaming.SparkSessionRule;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
+import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -36,20 +34,18 @@ import org.junit.runners.JUnit4;
/** Test class for beam to spark flatten translation. */
@RunWith(JUnit4.class)
public class FlattenTest implements Serializable {
- @Rule public transient TestPipeline pipeline = TestPipeline.fromOptions(testOptions());
+ @ClassRule public static final SparkSessionRule SESSION = new SparkSessionRule();
- private static PipelineOptions testOptions() {
- SparkStructuredStreamingPipelineOptions options =
- PipelineOptionsFactory.create().as(SparkStructuredStreamingPipelineOptions.class);
- options.setRunner(SparkStructuredStreamingRunner.class);
- options.setTestMode(true);
- return options;
- }
+ @Rule
+ public transient TestPipeline pipeline =
+ TestPipeline.fromOptions(SESSION.createPipelineOptions());
@Test
public void testFlatten() {
- PCollection<Integer> input1 = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
- PCollection<Integer> input2 = pipeline.apply(Create.of(11, 12, 13, 14, 15, 16, 17, 18, 19, 20));
+ PCollection<Integer> input1 =
+ pipeline.apply("input1", Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
+ PCollection<Integer> input2 =
+ pipeline.apply("input2", Create.of(11, 12, 13, 14, 15, 16, 17, 18, 19, 20));
PCollectionList<Integer> pcs = PCollectionList.of(input1).and(input2);
PCollection<Integer> input = pcs.apply(Flatten.pCollections());
PAssert.that(input)
diff --git a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest.java b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest.java
index 1a84466b319..b1aa300fc27 100644
--- a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest.java
+++ b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest.java
@@ -29,10 +29,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
-import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.runners.spark.structuredstreaming.SparkSessionRule;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.SerializableMatcher;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -51,6 +48,7 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Immutabl
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.joda.time.Duration;
import org.joda.time.Instant;
+import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -59,15 +57,11 @@ import org.junit.runners.JUnit4;
/** Test class for beam to spark {@link ParDo} translation. */
@RunWith(JUnit4.class)
public class GroupByKeyTest implements Serializable {
- @Rule public transient TestPipeline pipeline = TestPipeline.fromOptions(testOptions());
+ @ClassRule public static final SparkSessionRule SESSION = new SparkSessionRule();
- private static PipelineOptions testOptions() {
- SparkStructuredStreamingPipelineOptions options =
- PipelineOptionsFactory.create().as(SparkStructuredStreamingPipelineOptions.class);
- options.setRunner(SparkStructuredStreamingRunner.class);
- options.setTestMode(true);
- return options;
- }
+ @Rule
+ public transient TestPipeline pipeline =
+ TestPipeline.fromOptions(SESSION.createPipelineOptions());
@Test
public void testGroupByKeyPreservesWindowing() {
diff --git a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SimpleSourceTest.java b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SimpleSourceTest.java
index 0f16b644222..d70293d5056 100644
--- a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SimpleSourceTest.java
+++ b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SimpleSourceTest.java
@@ -18,14 +18,12 @@
package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
import java.io.Serializable;
-import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
-import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.runners.spark.structuredstreaming.SparkSessionRule;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;
+import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -34,15 +32,11 @@ import org.junit.runners.JUnit4;
/** Test class for beam to spark source translation. */
@RunWith(JUnit4.class)
public class SimpleSourceTest implements Serializable {
- @Rule public transient TestPipeline pipeline = TestPipeline.fromOptions(testOptions());
+ @ClassRule public static final SparkSessionRule SESSION = new SparkSessionRule();
- private static PipelineOptions testOptions() {
- SparkStructuredStreamingPipelineOptions options =
- PipelineOptionsFactory.create().as(SparkStructuredStreamingPipelineOptions.class);
- options.setRunner(SparkStructuredStreamingRunner.class);
- options.setTestMode(true);
- return options;
- }
+ @Rule
+ public transient TestPipeline pipeline =
+ TestPipeline.fromOptions(SESSION.createPipelineOptions());
@Test
public void testBoundedSource() {
diff --git a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTest.java b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTest.java
index 28efe754ddf..ecb3e7ebdb5 100644
--- a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTest.java
+++ b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTest.java
@@ -18,10 +18,7 @@
package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
import java.io.Serializable;
-import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
-import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.runners.spark.structuredstreaming.SparkSessionRule;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
@@ -32,6 +29,7 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;
+import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -40,15 +38,11 @@ import org.junit.runners.JUnit4;
/** Test class for beam to spark window assign translation. */
@RunWith(JUnit4.class)
public class WindowAssignTest implements Serializable {
- @Rule public transient TestPipeline pipeline = TestPipeline.fromOptions(testOptions());
+ @ClassRule public static final SparkSessionRule SESSION = new SparkSessionRule();
- private static PipelineOptions testOptions() {
- SparkStructuredStreamingPipelineOptions options =
- PipelineOptionsFactory.create().as(SparkStructuredStreamingPipelineOptions.class);
- options.setRunner(SparkStructuredStreamingRunner.class);
- options.setTestMode(true);
- return options;
- }
+ @Rule
+ public transient TestPipeline pipeline =
+ TestPipeline.fromOptions(SESSION.createPipelineOptions());
@Test
public void testWindowAssign() {