You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2022/12/29 10:08:45 UTC

[beam] branch master updated: [Spark Dataset runner] Reuse SparkSession when testing using class rule. (#24812)

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 208a14e05b5 [Spark Dataset runner] Reuse SparkSession when testing using class rule. (#24812)
208a14e05b5 is described below

commit 208a14e05b5cf4021ef978ec45d75a96e20bc3a6
Author: Moritz Mack <mm...@talend.com>
AuthorDate: Thu Dec 29 11:08:36 2022 +0100

    [Spark Dataset runner] Reuse SparkSession when testing using class rule. (#24812)
---
 .../translation/batch/CombineGloballyTest.java     | 18 ++++++----------
 .../batch/CombineGroupedValuesTest.java            | 18 ++++++----------
 .../translation/batch/CombinePerKeyTest.java       | 18 ++++++----------
 .../translation/batch/ComplexSourceTest.java       | 17 +++++----------
 .../translation/batch/FlattenTest.java             | 24 +++++++++-------------
 .../translation/batch/GroupByKeyTest.java          | 18 ++++++----------
 .../translation/batch/SimpleSourceTest.java        | 18 ++++++----------
 .../translation/batch/WindowAssignTest.java        | 18 ++++++----------
 8 files changed, 51 insertions(+), 98 deletions(-)

diff --git a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTest.java b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTest.java
index dca8b664bd3..cca192df9de 100644
--- a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTest.java
+++ b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTest.java
@@ -18,10 +18,7 @@
 package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 
 import java.io.Serializable;
-import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
-import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.runners.spark.structuredstreaming.SparkSessionRule;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Combine;
@@ -38,6 +35,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -48,15 +46,11 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class CombineGloballyTest implements Serializable {
-  @Rule public transient TestPipeline pipeline = TestPipeline.fromOptions(testOptions());
+  @ClassRule public static final SparkSessionRule SESSION = new SparkSessionRule();
 
-  private static PipelineOptions testOptions() {
-    SparkStructuredStreamingPipelineOptions options =
-        PipelineOptionsFactory.create().as(SparkStructuredStreamingPipelineOptions.class);
-    options.setRunner(SparkStructuredStreamingRunner.class);
-    options.setTestMode(true);
-    return options;
-  }
+  @Rule
+  public transient TestPipeline pipeline =
+      TestPipeline.fromOptions(SESSION.createPipelineOptions());
 
   @Test
   public void testCombineGlobally() {
diff --git a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGroupedValuesTest.java b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGroupedValuesTest.java
index cce3199d2c3..774186c1821 100644
--- a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGroupedValuesTest.java
+++ b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGroupedValuesTest.java
@@ -18,14 +18,11 @@
 package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 
 import java.io.Serializable;
-import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
-import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner;
+import org.apache.beam.runners.spark.structuredstreaming.SparkSessionRule;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Combine;
@@ -34,6 +31,7 @@ import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -42,15 +40,11 @@ import org.junit.runners.JUnit4;
 /** Test class for beam to spark {@link Combine#groupedValues} translation. */
 @RunWith(JUnit4.class)
 public class CombineGroupedValuesTest implements Serializable {
-  @Rule public transient TestPipeline pipeline = TestPipeline.fromOptions(testOptions());
+  @ClassRule public static final SparkSessionRule SESSION = new SparkSessionRule();
 
-  private static PipelineOptions testOptions() {
-    SparkStructuredStreamingPipelineOptions options =
-        PipelineOptionsFactory.create().as(SparkStructuredStreamingPipelineOptions.class);
-    options.setRunner(SparkStructuredStreamingRunner.class);
-    options.setTestMode(true);
-    return options;
-  }
+  @Rule
+  public transient TestPipeline pipeline =
+      TestPipeline.fromOptions(SESSION.createPipelineOptions());
 
   @Test
   public void testCombineGroupedValues() {
diff --git a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTest.java b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTest.java
index c8b25b3355d..5a2335a154e 100644
--- a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTest.java
+++ b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTest.java
@@ -20,10 +20,7 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
-import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.runners.spark.structuredstreaming.SparkSessionRule;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.CombineFnBase;
@@ -41,6 +38,7 @@ import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -52,15 +50,11 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class CombinePerKeyTest implements Serializable {
-  @Rule public transient TestPipeline pipeline = TestPipeline.fromOptions(testOptions());
+  @ClassRule public static final SparkSessionRule SESSION = new SparkSessionRule();
 
-  private static PipelineOptions testOptions() {
-    SparkStructuredStreamingPipelineOptions options =
-        PipelineOptionsFactory.create().as(SparkStructuredStreamingPipelineOptions.class);
-    options.setRunner(SparkStructuredStreamingRunner.class);
-    options.setTestMode(true);
-    return options;
-  }
+  @Rule
+  public transient TestPipeline pipeline =
+      TestPipeline.fromOptions(SESSION.createPipelineOptions());
 
   @Test
   public void testCombinePerKey() {
diff --git a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ComplexSourceTest.java b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ComplexSourceTest.java
index 582a31a05a6..4ba356f6ce7 100644
--- a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ComplexSourceTest.java
+++ b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ComplexSourceTest.java
@@ -25,11 +25,8 @@ import java.io.PrintStream;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
-import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner;
+import org.apache.beam.runners.spark.structuredstreaming.SparkSessionRule;
 import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
@@ -48,15 +45,11 @@ public class ComplexSourceTest implements Serializable {
   private static File file;
   private static List<String> lines = createLines(30);
 
-  @Rule public transient TestPipeline pipeline = TestPipeline.fromOptions(testOptions());
+  @ClassRule public static final SparkSessionRule SESSION = new SparkSessionRule();
 
-  private static PipelineOptions testOptions() {
-    SparkStructuredStreamingPipelineOptions options =
-        PipelineOptionsFactory.create().as(SparkStructuredStreamingPipelineOptions.class);
-    options.setRunner(SparkStructuredStreamingRunner.class);
-    options.setTestMode(true);
-    return options;
-  }
+  @Rule
+  public transient TestPipeline pipeline =
+      TestPipeline.fromOptions(SESSION.createPipelineOptions());
 
   @BeforeClass
   public static void beforeClass() throws IOException {
diff --git a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTest.java b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTest.java
index 50b443da9ae..bf3774ba29e 100644
--- a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTest.java
+++ b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTest.java
@@ -18,16 +18,14 @@
 package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 
 import java.io.Serializable;
-import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
-import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.runners.spark.structuredstreaming.SparkSessionRule;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
+import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -36,20 +34,18 @@ import org.junit.runners.JUnit4;
 /** Test class for beam to spark flatten translation. */
 @RunWith(JUnit4.class)
 public class FlattenTest implements Serializable {
-  @Rule public transient TestPipeline pipeline = TestPipeline.fromOptions(testOptions());
+  @ClassRule public static final SparkSessionRule SESSION = new SparkSessionRule();
 
-  private static PipelineOptions testOptions() {
-    SparkStructuredStreamingPipelineOptions options =
-        PipelineOptionsFactory.create().as(SparkStructuredStreamingPipelineOptions.class);
-    options.setRunner(SparkStructuredStreamingRunner.class);
-    options.setTestMode(true);
-    return options;
-  }
+  @Rule
+  public transient TestPipeline pipeline =
+      TestPipeline.fromOptions(SESSION.createPipelineOptions());
 
   @Test
   public void testFlatten() {
-    PCollection<Integer> input1 = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
-    PCollection<Integer> input2 = pipeline.apply(Create.of(11, 12, 13, 14, 15, 16, 17, 18, 19, 20));
+    PCollection<Integer> input1 =
+        pipeline.apply("input1", Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
+    PCollection<Integer> input2 =
+        pipeline.apply("input2", Create.of(11, 12, 13, 14, 15, 16, 17, 18, 19, 20));
     PCollectionList<Integer> pcs = PCollectionList.of(input1).and(input2);
     PCollection<Integer> input = pcs.apply(Flatten.pCollections());
     PAssert.that(input)
diff --git a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest.java b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest.java
index 1a84466b319..b1aa300fc27 100644
--- a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest.java
+++ b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest.java
@@ -29,10 +29,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
-import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.runners.spark.structuredstreaming.SparkSessionRule;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.SerializableMatcher;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -51,6 +48,7 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Immutabl
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -59,15 +57,11 @@ import org.junit.runners.JUnit4;
 /** Test class for beam to spark {@link ParDo} translation. */
 @RunWith(JUnit4.class)
 public class GroupByKeyTest implements Serializable {
-  @Rule public transient TestPipeline pipeline = TestPipeline.fromOptions(testOptions());
+  @ClassRule public static final SparkSessionRule SESSION = new SparkSessionRule();
 
-  private static PipelineOptions testOptions() {
-    SparkStructuredStreamingPipelineOptions options =
-        PipelineOptionsFactory.create().as(SparkStructuredStreamingPipelineOptions.class);
-    options.setRunner(SparkStructuredStreamingRunner.class);
-    options.setTestMode(true);
-    return options;
-  }
+  @Rule
+  public transient TestPipeline pipeline =
+      TestPipeline.fromOptions(SESSION.createPipelineOptions());
 
   @Test
   public void testGroupByKeyPreservesWindowing() {
diff --git a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SimpleSourceTest.java b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SimpleSourceTest.java
index 0f16b644222..d70293d5056 100644
--- a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SimpleSourceTest.java
+++ b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SimpleSourceTest.java
@@ -18,14 +18,12 @@
 package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 
 import java.io.Serializable;
-import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
-import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.runners.spark.structuredstreaming.SparkSessionRule;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.PCollection;
+import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -34,15 +32,11 @@ import org.junit.runners.JUnit4;
 /** Test class for beam to spark source translation. */
 @RunWith(JUnit4.class)
 public class SimpleSourceTest implements Serializable {
-  @Rule public transient TestPipeline pipeline = TestPipeline.fromOptions(testOptions());
+  @ClassRule public static final SparkSessionRule SESSION = new SparkSessionRule();
 
-  private static PipelineOptions testOptions() {
-    SparkStructuredStreamingPipelineOptions options =
-        PipelineOptionsFactory.create().as(SparkStructuredStreamingPipelineOptions.class);
-    options.setRunner(SparkStructuredStreamingRunner.class);
-    options.setTestMode(true);
-    return options;
-  }
+  @Rule
+  public transient TestPipeline pipeline =
+      TestPipeline.fromOptions(SESSION.createPipelineOptions());
 
   @Test
   public void testBoundedSource() {
diff --git a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTest.java b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTest.java
index 28efe754ddf..ecb3e7ebdb5 100644
--- a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTest.java
+++ b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTest.java
@@ -18,10 +18,7 @@
 package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 
 import java.io.Serializable;
-import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
-import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.runners.spark.structuredstreaming.SparkSessionRule;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
@@ -32,6 +29,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -40,15 +38,11 @@ import org.junit.runners.JUnit4;
 /** Test class for beam to spark window assign translation. */
 @RunWith(JUnit4.class)
 public class WindowAssignTest implements Serializable {
-  @Rule public transient TestPipeline pipeline = TestPipeline.fromOptions(testOptions());
+  @ClassRule public static final SparkSessionRule SESSION = new SparkSessionRule();
 
-  private static PipelineOptions testOptions() {
-    SparkStructuredStreamingPipelineOptions options =
-        PipelineOptionsFactory.create().as(SparkStructuredStreamingPipelineOptions.class);
-    options.setRunner(SparkStructuredStreamingRunner.class);
-    options.setTestMode(true);
-    return options;
-  }
+  @Rule
+  public transient TestPipeline pipeline =
+      TestPipeline.fromOptions(SESSION.createPipelineOptions());
 
   @Test
   public void testWindowAssign() {