You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/04/20 10:01:30 UTC

[GitHub] [beam] mosche opened a new pull request, #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

mosche opened a new pull request, #17406:
URL: https://github.com/apache/beam/pull/17406

   Usage of `forkEvery 1` is typically a strong sign of poor quality / bad code and should be avoided: 
   
   - It often hides resource leaks, either in code or worse, potentially in the runner itself.
   - It significantly impacts performance when running tests.
   
   This PR introduces new JUnit rules to better manage the lifecyle of `SparkContext` or `SparkContextOptions` and allows to safely reuse those for all tests of a suite where applicable. The previous `ReuseSparkContextRule` leaks the SparkContext and was removed. Besides, with `forkEvery 1` trying to reuse the `SparkContext` is absolutely pointless ...
   
   There's might be more issues hidden I didn't discover locally ... I'll keep a look.
   
   This also reduce the runtime of tests to ~30 secs from 1:30+ mins.
   
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r870024622


##########
runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SparkCoGroupByKeyStreamingTest.java:
##########
@@ -53,8 +52,6 @@ public class SparkCoGroupByKeyStreamingTest {
   private static final TupleTag<Integer> INPUT1_TAG = new TupleTag<>("input1");
   private static final TupleTag<Integer> INPUT2_TAG = new TupleTag<>("input2");
 
-  @Rule public final transient ReuseSparkContextRule noContextResue = ReuseSparkContextRule.no();
-

Review Comment:
   as said



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java:
##########
@@ -86,7 +85,6 @@
 public class CreateStreamTest implements Serializable {
 
   @Rule public final transient TestPipeline p = TestPipeline.create();
-  @Rule public final transient ReuseSparkContextRule noContextResue = ReuseSparkContextRule.no();

Review Comment:
   as said



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r869987231


##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java:
##########
@@ -25,80 +28,111 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** The Spark context factory. */
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
 public final class SparkContextFactory {
   private static final Logger LOG = LoggerFactory.getLogger(SparkContextFactory.class);
 
   /**
    * If the property {@code beam.spark.test.reuseSparkContext} is set to {@code true} then the Spark
    * context will be reused for beam pipelines. This property should only be enabled for tests.
+   *
+   * @deprecated Please use {@link SparkContextOptions} instead to allow for proper lifecycle
+   *     control to not leak your SparkContext.
    */
+  @Deprecated
   public static final String TEST_REUSE_SPARK_CONTEXT = "beam.spark.test.reuseSparkContext";
 
   // Spark allows only one context for JVM so this can be static.
-  private static JavaSparkContext sparkContext;
-  private static String sparkMaster;
-  private static boolean usesProvidedSparkContext;
+  private static @Nullable JavaSparkContext sparkContext;
+  private static @Nullable String sparkMaster;
+
+  private static boolean hasProvidedSparkContext;
 
   private SparkContextFactory() {}
 
+  /**
+   * Set an externally managed {@link JavaSparkContext} that will be used if {@link
+   * SparkContextOptions#getUsesProvidedSparkContext()} is set to {@code true}.
+   *
+   * <p>A Spark context can also be provided using {@link
+   * SparkContextOptions#setProvidedSparkContext(JavaSparkContext)}. However, it will be dropped

Review Comment:
   👍 though, there's no deprecation regarding `setProvidedSparkContext`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] echauchot commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r870092244


##########
runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java:
##########
@@ -20,190 +20,136 @@
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.fail;
+import static org.joda.time.Duration.millis;
+import static org.junit.Assert.assertThrows;
 
 import java.io.Serializable;
+import javax.annotation.Nullable;
 import org.apache.beam.runners.spark.io.CreateStream;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-import org.junit.Rule;
+import org.junit.ClassRule;
 import org.junit.Test;
-import org.junit.rules.TestName;
 
 /** This suite tests that various scenarios result in proper states of the pipeline. */
 public class SparkPipelineStateTest implements Serializable {
 
-  private static class MyCustomException extends RuntimeException {
+  @ClassRule public static SparkContextRule contextRule = new SparkContextRule();
 
+  private static class MyCustomException extends RuntimeException {

Review Comment:
   I know ...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r869999575


##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java:
##########
@@ -25,80 +28,111 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** The Spark context factory. */
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
 public final class SparkContextFactory {
   private static final Logger LOG = LoggerFactory.getLogger(SparkContextFactory.class);
 
   /**
    * If the property {@code beam.spark.test.reuseSparkContext} is set to {@code true} then the Spark
    * context will be reused for beam pipelines. This property should only be enabled for tests.
+   *
+   * @deprecated Please use {@link SparkContextOptions} instead to allow for proper lifecycle
+   *     control to not leak your SparkContext.
    */
+  @Deprecated
   public static final String TEST_REUSE_SPARK_CONTEXT = "beam.spark.test.reuseSparkContext";
 
   // Spark allows only one context for JVM so this can be static.
-  private static JavaSparkContext sparkContext;
-  private static String sparkMaster;
-  private static boolean usesProvidedSparkContext;
+  private static @Nullable JavaSparkContext sparkContext;
+  private static @Nullable String sparkMaster;
+
+  private static boolean hasProvidedSparkContext;
 
   private SparkContextFactory() {}
 
+  /**
+   * Set an externally managed {@link JavaSparkContext} that will be used if {@link
+   * SparkContextOptions#getUsesProvidedSparkContext()} is set to {@code true}.
+   *
+   * <p>A Spark context can also be provided using {@link
+   * SparkContextOptions#setProvidedSparkContext(JavaSparkContext)}. However, it will be dropped
+   * during serialization potentially leading to confusing behavior. This is particularly the case
+   * when used in tests with {@link org.apache.beam.sdk.testing.TestPipeline}.
+   */
+  public static synchronized void setProvidedSparkContext(JavaSparkContext providedSparkContext) {
+    sparkContext = checkNotNull(providedSparkContext);
+    hasProvidedSparkContext = true;
+    sparkMaster = null;
+  }
+
+  public static synchronized void clearProvidedSparkContext() {
+    hasProvidedSparkContext = false;
+    sparkContext = null;
+  }
+
   public static synchronized JavaSparkContext getSparkContext(SparkPipelineOptions options) {
     SparkContextOptions contextOptions = options.as(SparkContextOptions.class);
-    usesProvidedSparkContext = contextOptions.getUsesProvidedSparkContext();
-    // reuse should be ignored if the context is provided.
-    if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT) && !usesProvidedSparkContext) {
-
-      // if the context is null or stopped for some reason, re-create it.
-      if (sparkContext == null || sparkContext.sc().isStopped()) {
-        sparkContext = createSparkContext(contextOptions);
+    if (contextOptions.getUsesProvidedSparkContext()) {
+      JavaSparkContext jsc = contextOptions.getProvidedSparkContext();
+      if (jsc != null) {
+        setProvidedSparkContext(jsc);
+      } else if (hasProvidedSparkContext) {
+        jsc = sparkContext;
+      }
+      if (jsc == null) {
+        throw new IllegalStateException(
+            "No Spark context was provided. Use SparkContextFactor.setProvidedSparkContext to do so.");
+      } else if (jsc.sc().isStopped()) {
+        LOG.error("The provided Spark context " + jsc + " was already stopped.");
+        throw new IllegalStateException("The provided Spark context was already stopped");
+      }
+      LOG.info("Using a provided Spark Context");
+      return jsc;
+    } else if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)) {
+      // This is highly discouraged as it leaks SparkContexts without any control.
+      // If the context is null or stopped for some reason, re-create it.
+      @Nullable JavaSparkContext jsc = sparkContext;
+      if (jsc == null || jsc.sc().isStopped()) {
+        sparkContext = jsc = createSparkContext(contextOptions);
         sparkMaster = options.getSparkMaster();
+        hasProvidedSparkContext = false;

Review Comment:
   No, `false` is correct ... there's two concepts here:
   - using a provided sparkContext, managing the lifecycle is up to the user
   -  the deprecated reuse of the first sparkContext created without any way for users to ever stop that one (and leaving a resource leak behind)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r871217253


##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java:
##########
@@ -36,7 +36,8 @@ public final class SparkContextFactory {
    * context will be reused for beam pipelines. This property should only be enabled for tests.
    *
    * @deprecated Please use {@link SparkContextOptions} instead to allow for proper lifecycle

Review Comment:
   I've added more guidelines, though `SparkContextRule` only exists in the test scope and is not published ... should somebody try and use the SparkRunner for their own tests instead of the direct runner (for whatever reason) that's might be confusing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] echauchot merged pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
echauchot merged PR #17406:
URL: https://github.com/apache/beam/pull/17406


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] echauchot commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r870090737


##########
runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SparkSessionRule.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming;
+
+import static java.util.stream.Collectors.toMap;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.values.KV;
+import org.apache.spark.sql.SparkSession;
+import org.junit.rules.ExternalResource;
+
+public class SparkSessionRule extends ExternalResource implements Serializable {
+  private transient SparkSession.Builder builder;
+  private transient @Nullable SparkSession session = null;
+
+  public SparkSessionRule(String sparkMaster, Map<String, String> sparkConfig) {
+    builder = SparkSession.builder();
+    sparkConfig.forEach(builder::config);
+    builder.master(sparkMaster).appName("test");

Review Comment:
   yes, same as above using junit Description looks good to me



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] echauchot commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r870222386


##########
runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java:
##########
@@ -22,91 +22,94 @@
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Registration;
-import org.apache.beam.runners.spark.SparkContextOptions;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.TestSparkPipelineOptions;
-import org.apache.beam.runners.spark.TestSparkRunner;
+import org.apache.beam.runners.spark.SparkContextRule;
 import org.apache.beam.runners.spark.io.MicrobatchSource;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.beam.sdk.values.KV;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
 
-/** Testing of beam registrar. */
+/**
+ * Testing of beam registrar. Note: There can only be one Spark context at a time. For that reason
+ * tests requiring a different context have to be forked using separate test classes.
+ */
 @SuppressWarnings({
   "rawtypes" // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
 })
+@RunWith(Enclosed.class)
 public class SparkRunnerKryoRegistratorTest {
 
-  @Test
-  public void testKryoRegistration() {
-    SparkConf conf = new SparkConf();
-    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
-    conf.set("spark.kryo.registrator", WrapperKryoRegistrator.class.getName());
-    runSimplePipelineWithSparkContext(conf);
-    assertTrue(
-        "WrapperKryoRegistrator wasn't initiated, probably KryoSerializer is not set",
-        WrapperKryoRegistrator.wasInitiated);
-  }
+  public static class WithKryoSerializer {
 
-  @Test
-  public void testDefaultSerializerNotCallingKryo() {
-    SparkConf conf = new SparkConf();
-    conf.set("spark.kryo.registrator", KryoRegistratorIsNotCalled.class.getName());
-    runSimplePipelineWithSparkContext(conf);
-  }
+    @ClassRule
+    public static SparkContextRule contextRule =
+        new SparkContextRule(
+            KV.of("spark.serializer", "org.apache.spark.serializer.KryoSerializer"),
+            KV.of("spark.kryo.registrator", WrapperKryoRegistrator.class.getName()));
 
-  private void runSimplePipelineWithSparkContext(SparkConf conf) {
-    SparkPipelineOptions options =
-        PipelineOptionsFactory.create().as(TestSparkPipelineOptions.class);
-    options.setRunner(TestSparkRunner.class);
+    @Test
+    public void testKryoRegistration() {
+      runSimplePipelineWithSparkContextOptions(contextRule);
+      assertTrue(
+          "WrapperKryoRegistrator wasn't initiated, probably KryoSerializer is not set",
+          WrapperKryoRegistrator.wasInitiated);
+    }
 
-    conf.set("spark.master", "local");
-    conf.setAppName("test");
+    /**
+     * A {@link SparkRunnerKryoRegistrator} that registers an internal class to validate
+     * KryoSerialization resolution. Use only for test purposes. Needs to be public for
+     * serialization.
+     */
+    public static class WrapperKryoRegistrator extends SparkRunnerKryoRegistrator {
 
-    JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
-    options.setUsesProvidedSparkContext(true);
-    options.as(SparkContextOptions.class).setProvidedSparkContext(javaSparkContext);
-    Pipeline p = Pipeline.create(options);
-    p.apply(Create.of("a")); // some operation to trigger pipeline construction
-    p.run().waitUntilFinish();
-    javaSparkContext.stop();
-  }
+      static boolean wasInitiated = false;
 
-  /**
-   * A {@link SparkRunnerKryoRegistrator} that fails if called. Use only for test purposes. Needs to
-   * be public for serialization.
-   */
-  public static class KryoRegistratorIsNotCalled extends SparkRunnerKryoRegistrator {
+      public WrapperKryoRegistrator() {
+        wasInitiated = true;
+      }
 
-    @Override
-    public void registerClasses(Kryo kryo) {
-      fail(
-          "Default spark.serializer is JavaSerializer"
-              + " so spark.kryo.registrator shouldn't be called");
+      @Override
+      public void registerClasses(Kryo kryo) {
+        super.registerClasses(kryo);
+        Registration registration = kryo.getRegistration(MicrobatchSource.class);
+        com.esotericsoftware.kryo.Serializer kryoSerializer = registration.getSerializer();
+        assertTrue(kryoSerializer instanceof StatelessJavaSerializer);
+      }
     }
   }
 
-  /**
-   * A {@link SparkRunnerKryoRegistrator} that registers an internal class to validate
-   * KryoSerialization resolution. Use only for test purposes. Needs to be public for serialization.
-   */
-  public static class WrapperKryoRegistrator extends SparkRunnerKryoRegistrator {
+  public static class WithoutKryoSerializer {
+    @ClassRule
+    public static SparkContextRule contextRule =
+        new SparkContextRule(
+            KV.of("spark.kryo.registrator", KryoRegistratorIsNotCalled.class.getName()));
 
-    static boolean wasInitiated = false;
-
-    public WrapperKryoRegistrator() {
-      wasInitiated = true;
+    @Test
+    public void testDefaultSerializerNotCallingKryo() {

Review Comment:
   Yes I know it is not your code but I took a look anyway as you changed the test structure. 
   I don't like the approach that the original developer  took in several tests that the test assertion is the absence of exception. It is not explicit enough IMHO. But changing the test would we out of the scope of this PR. Can you just add a comment in the code that "the fact that exception is not thrown means that the Kryo serializer is not called by default"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] echauchot commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r870238093


##########
runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java:
##########
@@ -47,72 +54,49 @@ public class ProvidedSparkContextTest {
   private static final String PROVIDED_CONTEXT_EXCEPTION =
       "The provided Spark context was not created or was stopped";
 
+  @ClassRule public static SparkContextOptionsRule contextRule = new SparkContextOptionsRule();
+
   /** Provide a context and call pipeline run. */
   @Test
-  public void testWithProvidedContext() throws Exception {
-    JavaSparkContext jsc = new JavaSparkContext("local[*]", "Existing_Context");
-    testWithValidProvidedContext(jsc);
+  public void testAWithProvidedContext() throws Exception {
+    Pipeline p = createPipeline();
+    PipelineResult result = p.run(); // Run test from pipeline
+    result.waitUntilFinish();
+    TestPipeline.verifyPAssertsSucceeded(p, result);

Review Comment:
   ok keep for later cosmetic PR then :smile: 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r870001724


##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java:
##########
@@ -25,80 +28,111 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** The Spark context factory. */
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
 public final class SparkContextFactory {
   private static final Logger LOG = LoggerFactory.getLogger(SparkContextFactory.class);
 
   /**
    * If the property {@code beam.spark.test.reuseSparkContext} is set to {@code true} then the Spark
    * context will be reused for beam pipelines. This property should only be enabled for tests.
+   *
+   * @deprecated Please use {@link SparkContextOptions} instead to allow for proper lifecycle
+   *     control to not leak your SparkContext.
    */
+  @Deprecated
   public static final String TEST_REUSE_SPARK_CONTEXT = "beam.spark.test.reuseSparkContext";
 
   // Spark allows only one context for JVM so this can be static.
-  private static JavaSparkContext sparkContext;
-  private static String sparkMaster;
-  private static boolean usesProvidedSparkContext;
+  private static @Nullable JavaSparkContext sparkContext;
+  private static @Nullable String sparkMaster;
+
+  private static boolean hasProvidedSparkContext;
 
   private SparkContextFactory() {}
 
+  /**
+   * Set an externally managed {@link JavaSparkContext} that will be used if {@link
+   * SparkContextOptions#getUsesProvidedSparkContext()} is set to {@code true}.
+   *
+   * <p>A Spark context can also be provided using {@link
+   * SparkContextOptions#setProvidedSparkContext(JavaSparkContext)}. However, it will be dropped
+   * during serialization potentially leading to confusing behavior. This is particularly the case
+   * when used in tests with {@link org.apache.beam.sdk.testing.TestPipeline}.
+   */
+  public static synchronized void setProvidedSparkContext(JavaSparkContext providedSparkContext) {
+    sparkContext = checkNotNull(providedSparkContext);
+    hasProvidedSparkContext = true;
+    sparkMaster = null;
+  }
+
+  public static synchronized void clearProvidedSparkContext() {
+    hasProvidedSparkContext = false;
+    sparkContext = null;
+  }
+
   public static synchronized JavaSparkContext getSparkContext(SparkPipelineOptions options) {
     SparkContextOptions contextOptions = options.as(SparkContextOptions.class);
-    usesProvidedSparkContext = contextOptions.getUsesProvidedSparkContext();
-    // reuse should be ignored if the context is provided.
-    if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT) && !usesProvidedSparkContext) {
-
-      // if the context is null or stopped for some reason, re-create it.
-      if (sparkContext == null || sparkContext.sc().isStopped()) {
-        sparkContext = createSparkContext(contextOptions);
+    if (contextOptions.getUsesProvidedSparkContext()) {
+      JavaSparkContext jsc = contextOptions.getProvidedSparkContext();
+      if (jsc != null) {
+        setProvidedSparkContext(jsc);
+      } else if (hasProvidedSparkContext) {
+        jsc = sparkContext;
+      }
+      if (jsc == null) {
+        throw new IllegalStateException(
+            "No Spark context was provided. Use SparkContextFactor.setProvidedSparkContext to do so.");
+      } else if (jsc.sc().isStopped()) {
+        LOG.error("The provided Spark context " + jsc + " was already stopped.");
+        throw new IllegalStateException("The provided Spark context was already stopped");
+      }
+      LOG.info("Using a provided Spark Context");
+      return jsc;
+    } else if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)) {
+      // This is highly discouraged as it leaks SparkContexts without any control.
+      // If the context is null or stopped for some reason, re-create it.
+      @Nullable JavaSparkContext jsc = sparkContext;
+      if (jsc == null || jsc.sc().isStopped()) {
+        sparkContext = jsc = createSparkContext(contextOptions);
         sparkMaster = options.getSparkMaster();
+        hasProvidedSparkContext = false;
+      } else if (hasProvidedSparkContext) {

Review Comment:
   No, this is correct... in this case there's a provided SparkContext, but usage of it is disabled according to `SparkPipelineOptions.getUsesProvidedSparkContext()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] echauchot commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r870077639


##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java:
##########
@@ -25,80 +28,111 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** The Spark context factory. */
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
 public final class SparkContextFactory {
   private static final Logger LOG = LoggerFactory.getLogger(SparkContextFactory.class);
 
   /**
    * If the property {@code beam.spark.test.reuseSparkContext} is set to {@code true} then the Spark
    * context will be reused for beam pipelines. This property should only be enabled for tests.
+   *
+   * @deprecated Please use {@link SparkContextOptions} instead to allow for proper lifecycle
+   *     control to not leak your SparkContext.
    */
+  @Deprecated
   public static final String TEST_REUSE_SPARK_CONTEXT = "beam.spark.test.reuseSparkContext";
 
   // Spark allows only one context for JVM so this can be static.
-  private static JavaSparkContext sparkContext;
-  private static String sparkMaster;
-  private static boolean usesProvidedSparkContext;
+  private static @Nullable JavaSparkContext sparkContext;
+  private static @Nullable String sparkMaster;
+
+  private static boolean hasProvidedSparkContext;
 
   private SparkContextFactory() {}
 
+  /**
+   * Set an externally managed {@link JavaSparkContext} that will be used if {@link
+   * SparkContextOptions#getUsesProvidedSparkContext()} is set to {@code true}.
+   *
+   * <p>A Spark context can also be provided using {@link
+   * SparkContextOptions#setProvidedSparkContext(JavaSparkContext)}. However, it will be dropped
+   * during serialization potentially leading to confusing behavior. This is particularly the case
+   * when used in tests with {@link org.apache.beam.sdk.testing.TestPipeline}.
+   */
+  public static synchronized void setProvidedSparkContext(JavaSparkContext providedSparkContext) {
+    sparkContext = checkNotNull(providedSparkContext);
+    hasProvidedSparkContext = true;
+    sparkMaster = null;
+  }
+
+  public static synchronized void clearProvidedSparkContext() {
+    hasProvidedSparkContext = false;
+    sparkContext = null;
+  }
+
   public static synchronized JavaSparkContext getSparkContext(SparkPipelineOptions options) {
     SparkContextOptions contextOptions = options.as(SparkContextOptions.class);
-    usesProvidedSparkContext = contextOptions.getUsesProvidedSparkContext();
-    // reuse should be ignored if the context is provided.
-    if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT) && !usesProvidedSparkContext) {
-
-      // if the context is null or stopped for some reason, re-create it.
-      if (sparkContext == null || sparkContext.sc().isStopped()) {
-        sparkContext = createSparkContext(contextOptions);
+    if (contextOptions.getUsesProvidedSparkContext()) {
+      JavaSparkContext jsc = contextOptions.getProvidedSparkContext();
+      if (jsc != null) {
+        setProvidedSparkContext(jsc);
+      } else if (hasProvidedSparkContext) {
+        jsc = sparkContext;
+      }
+      if (jsc == null) {
+        throw new IllegalStateException(
+            "No Spark context was provided. Use SparkContextFactor.setProvidedSparkContext to do so.");
+      } else if (jsc.sc().isStopped()) {
+        LOG.error("The provided Spark context " + jsc + " was already stopped.");
+        throw new IllegalStateException("The provided Spark context was already stopped");
+      }
+      LOG.info("Using a provided Spark Context");
+      return jsc;
+    } else if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)) {
+      // This is highly discouraged as it leaks SparkContexts without any control.
+      // If the context is null or stopped for some reason, re-create it.

Review Comment:
   Ok thanks for the explanation. In think these details are worth being written in the comments and/or javadocs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] echauchot commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r870075496


##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java:
##########
@@ -25,80 +28,111 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** The Spark context factory. */
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
 public final class SparkContextFactory {
   private static final Logger LOG = LoggerFactory.getLogger(SparkContextFactory.class);
 
   /**
    * If the property {@code beam.spark.test.reuseSparkContext} is set to {@code true} then the Spark
    * context will be reused for beam pipelines. This property should only be enabled for tests.
+   *
+   * @deprecated Please use {@link SparkContextOptions} instead to allow for proper lifecycle
+   *     control to not leak your SparkContext.
    */
+  @Deprecated
   public static final String TEST_REUSE_SPARK_CONTEXT = "beam.spark.test.reuseSparkContext";
 
   // Spark allows only one context for JVM so this can be static.
-  private static JavaSparkContext sparkContext;
-  private static String sparkMaster;
-  private static boolean usesProvidedSparkContext;
+  private static @Nullable JavaSparkContext sparkContext;
+  private static @Nullable String sparkMaster;
+
+  private static boolean hasProvidedSparkContext;
 
   private SparkContextFactory() {}
 
+  /**
+   * Set an externally managed {@link JavaSparkContext} that will be used if {@link
+   * SparkContextOptions#getUsesProvidedSparkContext()} is set to {@code true}.
+   *
+   * <p>A Spark context can also be provided using {@link
+   * SparkContextOptions#setProvidedSparkContext(JavaSparkContext)}. However, it will be dropped
+   * during serialization potentially leading to confusing behavior. This is particularly the case
+   * when used in tests with {@link org.apache.beam.sdk.testing.TestPipeline}.
+   */
+  public static synchronized void setProvidedSparkContext(JavaSparkContext providedSparkContext) {
+    sparkContext = checkNotNull(providedSparkContext);
+    hasProvidedSparkContext = true;
+    sparkMaster = null;
+  }
+
+  public static synchronized void clearProvidedSparkContext() {
+    hasProvidedSparkContext = false;
+    sparkContext = null;
+  }
+
   public static synchronized JavaSparkContext getSparkContext(SparkPipelineOptions options) {
     SparkContextOptions contextOptions = options.as(SparkContextOptions.class);
-    usesProvidedSparkContext = contextOptions.getUsesProvidedSparkContext();
-    // reuse should be ignored if the context is provided.
-    if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT) && !usesProvidedSparkContext) {
-
-      // if the context is null or stopped for some reason, re-create it.
-      if (sparkContext == null || sparkContext.sc().isStopped()) {
-        sparkContext = createSparkContext(contextOptions);
+    if (contextOptions.getUsesProvidedSparkContext()) {
+      JavaSparkContext jsc = contextOptions.getProvidedSparkContext();
+      if (jsc != null) {
+        setProvidedSparkContext(jsc);
+      } else if (hasProvidedSparkContext) {
+        jsc = sparkContext;
+      }
+      if (jsc == null) {
+        throw new IllegalStateException(
+            "No Spark context was provided. Use SparkContextFactor.setProvidedSparkContext to do so.");
+      } else if (jsc.sc().isStopped()) {
+        LOG.error("The provided Spark context " + jsc + " was already stopped.");
+        throw new IllegalStateException("The provided Spark context was already stopped");
+      }
+      LOG.info("Using a provided Spark Context");
+      return jsc;
+    } else if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)) {
+      // This is highly discouraged as it leaks SparkContexts without any control.
+      // If the context is null or stopped for some reason, re-create it.
+      @Nullable JavaSparkContext jsc = sparkContext;
+      if (jsc == null || jsc.sc().isStopped()) {
+        sparkContext = jsc = createSparkContext(contextOptions);
         sparkMaster = options.getSparkMaster();
+        hasProvidedSparkContext = false;
+      } else if (hasProvidedSparkContext) {

Review Comment:
   I think the booleans are not clear enough. we should add more javadocs to the fields. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] echauchot commented on pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
echauchot commented on PR #17406:
URL: https://github.com/apache/beam/pull/17406#issuecomment-1114582816

   > Usage of `forkEvery 1` is typically a strong sign of poor quality / bad code and should be avoided:
   > 
   > * It often hides resource leaks, either in code or worse, potentially in the runner itself.
   > * It significantly impacts performance when running tests.
   > 
   > This PR introduces new JUnit rules to better manage the lifecyle of `SparkContext` or `SparkContextOptions` and allows to safely reuse those for all tests of a suite where applicable. The previous `ReuseSparkContextRule` leaks the SparkContext and was removed. Besides, with `forkEvery 1` trying to reuse the `SparkContext` is absolutely pointless ...
   > 
   > There's might be more issues hidden I didn't discover locally ... I'll keep a look.
   > 
   > This also reduce the runtime of tests to ~30 secs from 1:30+ mins.
   > 
   > Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   > 
   > * [ ]  [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
   > * [ ]  Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
   > * [ ]  Update `CHANGES.md` with noteworthy changes.
   > * [ ]  If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   > 
   > See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   > 
   > To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
   > 
   > ## GitHub Actions Tests Status (on master branch)
   > [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule) [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule) [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   > 
   > See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   
   Bad smell indeed ! Thanks for catching this @mosche and reducing the test time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r870006952


##########
runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java:
##########
@@ -20,190 +20,136 @@
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.fail;
+import static org.joda.time.Duration.millis;
+import static org.junit.Assert.assertThrows;
 
 import java.io.Serializable;
+import javax.annotation.Nullable;
 import org.apache.beam.runners.spark.io.CreateStream;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-import org.junit.Rule;
+import org.junit.ClassRule;
 import org.junit.Test;
-import org.junit.rules.TestName;
 
 /** This suite tests that various scenarios result in proper states of the pipeline. */
 public class SparkPipelineStateTest implements Serializable {
 
-  private static class MyCustomException extends RuntimeException {
+  @ClassRule public static SparkContextRule contextRule = new SparkContextRule();
 
+  private static class MyCustomException extends RuntimeException {

Review Comment:
   Not my name ;) Changed ...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r869994700


##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java:
##########
@@ -25,80 +28,111 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** The Spark context factory. */
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
 public final class SparkContextFactory {
   private static final Logger LOG = LoggerFactory.getLogger(SparkContextFactory.class);
 
   /**
    * If the property {@code beam.spark.test.reuseSparkContext} is set to {@code true} then the Spark
    * context will be reused for beam pipelines. This property should only be enabled for tests.
+   *
+   * @deprecated Please use {@link SparkContextOptions} instead to allow for proper lifecycle
+   *     control to not leak your SparkContext.
    */
+  @Deprecated
   public static final String TEST_REUSE_SPARK_CONTEXT = "beam.spark.test.reuseSparkContext";
 
   // Spark allows only one context for JVM so this can be static.
-  private static JavaSparkContext sparkContext;
-  private static String sparkMaster;
-  private static boolean usesProvidedSparkContext;
+  private static @Nullable JavaSparkContext sparkContext;
+  private static @Nullable String sparkMaster;
+
+  private static boolean hasProvidedSparkContext;
 
   private SparkContextFactory() {}
 
+  /**
+   * Set an externally managed {@link JavaSparkContext} that will be used if {@link
+   * SparkContextOptions#getUsesProvidedSparkContext()} is set to {@code true}.
+   *
+   * <p>A Spark context can also be provided using {@link
+   * SparkContextOptions#setProvidedSparkContext(JavaSparkContext)}. However, it will be dropped
+   * during serialization potentially leading to confusing behavior. This is particularly the case
+   * when used in tests with {@link org.apache.beam.sdk.testing.TestPipeline}.
+   */
+  public static synchronized void setProvidedSparkContext(JavaSparkContext providedSparkContext) {
+    sparkContext = checkNotNull(providedSparkContext);
+    hasProvidedSparkContext = true;
+    sparkMaster = null;
+  }
+
+  public static synchronized void clearProvidedSparkContext() {
+    hasProvidedSparkContext = false;
+    sparkContext = null;

Review Comment:
   sparkMaster is already null, see above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] echauchot commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r871356054


##########
runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java:
##########
@@ -48,61 +49,45 @@ public static class WithKryoSerializer {
     public static SparkContextRule contextRule =
         new SparkContextRule(
             KV.of("spark.serializer", "org.apache.spark.serializer.KryoSerializer"),
-            KV.of("spark.kryo.registrator", WrapperKryoRegistrator.class.getName()));
+            KV.of("spark.kryo.registrator", TestKryoRegistrator.class.getName()));
 
     @Test
     public void testKryoRegistration() {
+      TestKryoRegistrator.wasInitiated = false;
       runSimplePipelineWithSparkContextOptions(contextRule);
-      assertTrue(
-          "WrapperKryoRegistrator wasn't initiated, probably KryoSerializer is not set",
-          WrapperKryoRegistrator.wasInitiated);
-    }
-
-    /**
-     * A {@link SparkRunnerKryoRegistrator} that registers an internal class to validate
-     * KryoSerialization resolution. Use only for test purposes. Needs to be public for
-     * serialization.
-     */
-    public static class WrapperKryoRegistrator extends SparkRunnerKryoRegistrator {
-
-      static boolean wasInitiated = false;
-
-      public WrapperKryoRegistrator() {
-        wasInitiated = true;
-      }
-
-      @Override
-      public void registerClasses(Kryo kryo) {
-        super.registerClasses(kryo);
-        Registration registration = kryo.getRegistration(MicrobatchSource.class);
-        com.esotericsoftware.kryo.Serializer kryoSerializer = registration.getSerializer();
-        assertTrue(kryoSerializer instanceof StatelessJavaSerializer);
-      }
+      assertTrue(TestKryoRegistrator.wasInitiated);
     }
   }
 
   public static class WithoutKryoSerializer {
     @ClassRule
     public static SparkContextRule contextRule =
-        new SparkContextRule(
-            KV.of("spark.kryo.registrator", KryoRegistratorIsNotCalled.class.getName()));
+        new SparkContextRule(KV.of("spark.kryo.registrator", TestKryoRegistrator.class.getName()));
 
     @Test
     public void testDefaultSerializerNotCallingKryo() {
+      TestKryoRegistrator.wasInitiated = false;
       runSimplePipelineWithSparkContextOptions(contextRule);
+      assertFalse(TestKryoRegistrator.wasInitiated);

Review Comment:
   It's way better,  thanks for this fix. I know it was not part of the scope but as we had to touch this file anyway it was better to fix the test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
mosche commented on PR #17406:
URL: https://github.com/apache/beam/pull/17406#issuecomment-1104770253

   Hmm, I noticed a related issue here. `SparkContextOptions` doesn't work with `TestPipeline` because `providedSparkContext` is ignored during the serde roundtrip to test that everything can be serialized before actually running the pipeline :/
   
   IMHO `providedSparkContext` really doesn't belong into PipelineOptions, it can't be serialized and the resulting behavior is very inconsistent... tough that would be a breaking change. I suggest adding methods to set the provided Spark context to `SparkContextFactory`.  If a context is provided using `SparkContextOptions`, it will be stored in the factory using `setProvidedSparkContext` as well.
   
   This also allows to clear the provided Spark context as well, allowing for much cleaner code.
   
   ```java
     /**
      * Set an externally managed {@link JavaSparkContext} that will be used if {@link
      * SparkContextOptions#getUsesProvidedSparkContext()} is set to {@code true}.
      *
      * <p>A Spark context can also be provided using {@link
      * SparkContextOptions#setProvidedSparkContext(JavaSparkContext)}. However, it will be dropped
      * during serialization potentially leading to confusing behavior. This is particularly the case
      * when used in tests with {@link org.apache.beam.sdk.testing.TestPipeline}.
      */
     public static synchronized void setProvidedSparkContext(JavaSparkContext providedSparkContext) 
   
     public static synchronized void clearProvidedSparkContext()
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
mosche commented on PR #17406:
URL: https://github.com/apache/beam/pull/17406#issuecomment-1104854123

   I'll follow up with another PR to remove `forkEvery 1` from the validates runner tasks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
mosche commented on PR #17406:
URL: https://github.com/apache/beam/pull/17406#issuecomment-1123402453

   Thanks for the thorough review @echauchot 🙏  I should have addressed all your questions / comments. Let me know if anything is unclear


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] echauchot commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r870087648


##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java:
##########
@@ -25,80 +28,111 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** The Spark context factory. */
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
 public final class SparkContextFactory {
   private static final Logger LOG = LoggerFactory.getLogger(SparkContextFactory.class);
 
   /**
    * If the property {@code beam.spark.test.reuseSparkContext} is set to {@code true} then the Spark
    * context will be reused for beam pipelines. This property should only be enabled for tests.
+   *
+   * @deprecated Please use {@link SparkContextOptions} instead to allow for proper lifecycle
+   *     control to not leak your SparkContext.
    */
+  @Deprecated
   public static final String TEST_REUSE_SPARK_CONTEXT = "beam.spark.test.reuseSparkContext";
 
   // Spark allows only one context for JVM so this can be static.
-  private static JavaSparkContext sparkContext;
-  private static String sparkMaster;
-  private static boolean usesProvidedSparkContext;
+  private static @Nullable JavaSparkContext sparkContext;
+  private static @Nullable String sparkMaster;
+
+  private static boolean hasProvidedSparkContext;
 
   private SparkContextFactory() {}
 
+  /**
+   * Set an externally managed {@link JavaSparkContext} that will be used if {@link
+   * SparkContextOptions#getUsesProvidedSparkContext()} is set to {@code true}.
+   *
+   * <p>A Spark context can also be provided using {@link
+   * SparkContextOptions#setProvidedSparkContext(JavaSparkContext)}. However, it will be dropped

Review Comment:
   ah yes, correct, this parameter is still in use I was confusing with the other parameter



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] echauchot commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r870071075


##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java:
##########
@@ -25,80 +28,111 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** The Spark context factory. */
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
 public final class SparkContextFactory {
   private static final Logger LOG = LoggerFactory.getLogger(SparkContextFactory.class);
 
   /**
    * If the property {@code beam.spark.test.reuseSparkContext} is set to {@code true} then the Spark
    * context will be reused for beam pipelines. This property should only be enabled for tests.
+   *
+   * @deprecated Please use {@link SparkContextOptions} instead to allow for proper lifecycle
+   *     control to not leak your SparkContext.
    */
+  @Deprecated
   public static final String TEST_REUSE_SPARK_CONTEXT = "beam.spark.test.reuseSparkContext";
 
   // Spark allows only one context for JVM so this can be static.
-  private static JavaSparkContext sparkContext;
-  private static String sparkMaster;
-  private static boolean usesProvidedSparkContext;
+  private static @Nullable JavaSparkContext sparkContext;
+  private static @Nullable String sparkMaster;
+
+  private static boolean hasProvidedSparkContext;
 
   private SparkContextFactory() {}
 
+  /**
+   * Set an externally managed {@link JavaSparkContext} that will be used if {@link
+   * SparkContextOptions#getUsesProvidedSparkContext()} is set to {@code true}.
+   *
+   * <p>A Spark context can also be provided using {@link
+   * SparkContextOptions#setProvidedSparkContext(JavaSparkContext)}. However, it will be dropped
+   * during serialization potentially leading to confusing behavior. This is particularly the case
+   * when used in tests with {@link org.apache.beam.sdk.testing.TestPipeline}.
+   */
+  public static synchronized void setProvidedSparkContext(JavaSparkContext providedSparkContext) {
+    sparkContext = checkNotNull(providedSparkContext);
+    hasProvidedSparkContext = true;
+    sparkMaster = null;
+  }
+
+  public static synchronized void clearProvidedSparkContext() {
+    hasProvidedSparkContext = false;
+    sparkContext = null;

Review Comment:
   yes I did not understand correctly the usage of sparkMaster. Better to rename it indeed for clarity



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] echauchot commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r870239910


##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java:
##########
@@ -25,80 +28,111 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** The Spark context factory. */
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
 public final class SparkContextFactory {
   private static final Logger LOG = LoggerFactory.getLogger(SparkContextFactory.class);
 
   /**
    * If the property {@code beam.spark.test.reuseSparkContext} is set to {@code true} then the Spark
    * context will be reused for beam pipelines. This property should only be enabled for tests.
+   *
+   * @deprecated Please use {@link SparkContextOptions} instead to allow for proper lifecycle
+   *     control to not leak your SparkContext.
    */
+  @Deprecated
   public static final String TEST_REUSE_SPARK_CONTEXT = "beam.spark.test.reuseSparkContext";
 
   // Spark allows only one context for JVM so this can be static.
-  private static JavaSparkContext sparkContext;
-  private static String sparkMaster;
-  private static boolean usesProvidedSparkContext;
+  private static @Nullable JavaSparkContext sparkContext;
+  private static @Nullable String sparkMaster;
+
+  private static boolean hasProvidedSparkContext;
 
   private SparkContextFactory() {}
 
+  /**
+   * Set an externally managed {@link JavaSparkContext} that will be used if {@link
+   * SparkContextOptions#getUsesProvidedSparkContext()} is set to {@code true}.
+   *
+   * <p>A Spark context can also be provided using {@link
+   * SparkContextOptions#setProvidedSparkContext(JavaSparkContext)}. However, it will be dropped
+   * during serialization potentially leading to confusing behavior. This is particularly the case
+   * when used in tests with {@link org.apache.beam.sdk.testing.TestPipeline}.
+   */
+  public static synchronized void setProvidedSparkContext(JavaSparkContext providedSparkContext) {
+    sparkContext = checkNotNull(providedSparkContext);
+    hasProvidedSparkContext = true;
+    sparkMaster = null;
+  }
+
+  public static synchronized void clearProvidedSparkContext() {
+    hasProvidedSparkContext = false;
+    sparkContext = null;
+  }
+
   public static synchronized JavaSparkContext getSparkContext(SparkPipelineOptions options) {
     SparkContextOptions contextOptions = options.as(SparkContextOptions.class);
-    usesProvidedSparkContext = contextOptions.getUsesProvidedSparkContext();
-    // reuse should be ignored if the context is provided.
-    if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT) && !usesProvidedSparkContext) {
-
-      // if the context is null or stopped for some reason, re-create it.
-      if (sparkContext == null || sparkContext.sc().isStopped()) {
-        sparkContext = createSparkContext(contextOptions);
+    if (contextOptions.getUsesProvidedSparkContext()) {
+      JavaSparkContext jsc = contextOptions.getProvidedSparkContext();
+      if (jsc != null) {
+        setProvidedSparkContext(jsc);
+      } else if (hasProvidedSparkContext) {
+        jsc = sparkContext;
+      }
+      if (jsc == null) {
+        throw new IllegalStateException(
+            "No Spark context was provided. Use SparkContextFactor.setProvidedSparkContext to do so.");
+      } else if (jsc.sc().isStopped()) {
+        LOG.error("The provided Spark context " + jsc + " was already stopped.");
+        throw new IllegalStateException("The provided Spark context was already stopped");
+      }
+      LOG.info("Using a provided Spark Context");
+      return jsc;
+    } else if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)) {
+      // This is highly discouraged as it leaks SparkContexts without any control.
+      // If the context is null or stopped for some reason, re-create it.
+      @Nullable JavaSparkContext jsc = sparkContext;
+      if (jsc == null || jsc.sc().isStopped()) {
+        sparkContext = jsc = createSparkContext(contextOptions);
         sparkMaster = options.getSparkMaster();
+        hasProvidedSparkContext = false;

Review Comment:
   thanks for your last javadoc update, it is clearer that TEST_REUSE_SPARK_CONTEXT and hasProvidedSparkContext are mutually exclusive



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] echauchot commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r866852800


##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java:
##########
@@ -25,80 +28,111 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** The Spark context factory. */
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
 public final class SparkContextFactory {
   private static final Logger LOG = LoggerFactory.getLogger(SparkContextFactory.class);
 
   /**
    * If the property {@code beam.spark.test.reuseSparkContext} is set to {@code true} then the Spark
    * context will be reused for beam pipelines. This property should only be enabled for tests.
+   *
+   * @deprecated Please use {@link SparkContextOptions} instead to allow for proper lifecycle
+   *     control to not leak your SparkContext.
    */
+  @Deprecated
   public static final String TEST_REUSE_SPARK_CONTEXT = "beam.spark.test.reuseSparkContext";
 
   // Spark allows only one context for JVM so this can be static.
-  private static JavaSparkContext sparkContext;
-  private static String sparkMaster;
-  private static boolean usesProvidedSparkContext;
+  private static @Nullable JavaSparkContext sparkContext;
+  private static @Nullable String sparkMaster;
+
+  private static boolean hasProvidedSparkContext;
 
   private SparkContextFactory() {}
 
+  /**
+   * Set an externally managed {@link JavaSparkContext} that will be used if {@link
+   * SparkContextOptions#getUsesProvidedSparkContext()} is set to {@code true}.
+   *
+   * <p>A Spark context can also be provided using {@link
+   * SparkContextOptions#setProvidedSparkContext(JavaSparkContext)}. However, it will be dropped
+   * during serialization potentially leading to confusing behavior. This is particularly the case
+   * when used in tests with {@link org.apache.beam.sdk.testing.TestPipeline}.
+   */
+  public static synchronized void setProvidedSparkContext(JavaSparkContext providedSparkContext) {
+    sparkContext = checkNotNull(providedSparkContext);
+    hasProvidedSparkContext = true;
+    sparkMaster = null;
+  }
+
+  public static synchronized void clearProvidedSparkContext() {
+    hasProvidedSparkContext = false;
+    sparkContext = null;
+  }
+
   public static synchronized JavaSparkContext getSparkContext(SparkPipelineOptions options) {
     SparkContextOptions contextOptions = options.as(SparkContextOptions.class);
-    usesProvidedSparkContext = contextOptions.getUsesProvidedSparkContext();
-    // reuse should be ignored if the context is provided.
-    if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT) && !usesProvidedSparkContext) {
-
-      // if the context is null or stopped for some reason, re-create it.
-      if (sparkContext == null || sparkContext.sc().isStopped()) {
-        sparkContext = createSparkContext(contextOptions);
+    if (contextOptions.getUsesProvidedSparkContext()) {
+      JavaSparkContext jsc = contextOptions.getProvidedSparkContext();
+      if (jsc != null) {
+        setProvidedSparkContext(jsc);
+      } else if (hasProvidedSparkContext) {
+        jsc = sparkContext;
+      }
+      if (jsc == null) {
+        throw new IllegalStateException(
+            "No Spark context was provided. Use SparkContextFactor.setProvidedSparkContext to do so.");
+      } else if (jsc.sc().isStopped()) {
+        LOG.error("The provided Spark context " + jsc + " was already stopped.");
+        throw new IllegalStateException("The provided Spark context was already stopped");
+      }
+      LOG.info("Using a provided Spark Context");
+      return jsc;
+    } else if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)) {
+      // This is highly discouraged as it leaks SparkContexts without any control.
+      // If the context is null or stopped for some reason, re-create it.
+      @Nullable JavaSparkContext jsc = sparkContext;
+      if (jsc == null || jsc.sc().isStopped()) {
+        sparkContext = jsc = createSparkContext(contextOptions);
         sparkMaster = options.getSparkMaster();
+        hasProvidedSparkContext = false;
+      } else if (hasProvidedSparkContext) {

Review Comment:
   is it missing a ! in the condition ?



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java:
##########
@@ -25,80 +28,111 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** The Spark context factory. */
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
 public final class SparkContextFactory {
   private static final Logger LOG = LoggerFactory.getLogger(SparkContextFactory.class);
 
   /**
    * If the property {@code beam.spark.test.reuseSparkContext} is set to {@code true} then the Spark
    * context will be reused for beam pipelines. This property should only be enabled for tests.
+   *
+   * @deprecated Please use {@link SparkContextOptions} instead to allow for proper lifecycle
+   *     control to not leak your SparkContext.
    */
+  @Deprecated
   public static final String TEST_REUSE_SPARK_CONTEXT = "beam.spark.test.reuseSparkContext";
 
   // Spark allows only one context for JVM so this can be static.
-  private static JavaSparkContext sparkContext;
-  private static String sparkMaster;
-  private static boolean usesProvidedSparkContext;
+  private static @Nullable JavaSparkContext sparkContext;
+  private static @Nullable String sparkMaster;
+
+  private static boolean hasProvidedSparkContext;
 
   private SparkContextFactory() {}
 
+  /**
+   * Set an externally managed {@link JavaSparkContext} that will be used if {@link
+   * SparkContextOptions#getUsesProvidedSparkContext()} is set to {@code true}.
+   *
+   * <p>A Spark context can also be provided using {@link
+   * SparkContextOptions#setProvidedSparkContext(JavaSparkContext)}. However, it will be dropped
+   * during serialization potentially leading to confusing behavior. This is particularly the case
+   * when used in tests with {@link org.apache.beam.sdk.testing.TestPipeline}.
+   */
+  public static synchronized void setProvidedSparkContext(JavaSparkContext providedSparkContext) {
+    sparkContext = checkNotNull(providedSparkContext);
+    hasProvidedSparkContext = true;
+    sparkMaster = null;

Review Comment:
   set sparkMaster from provided `SparkContext` ?



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java:
##########
@@ -25,80 +28,111 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** The Spark context factory. */
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
 public final class SparkContextFactory {
   private static final Logger LOG = LoggerFactory.getLogger(SparkContextFactory.class);
 
   /**
    * If the property {@code beam.spark.test.reuseSparkContext} is set to {@code true} then the Spark
    * context will be reused for beam pipelines. This property should only be enabled for tests.
+   *
+   * @deprecated Please use {@link SparkContextOptions} instead to allow for proper lifecycle
+   *     control to not leak your SparkContext.
    */
+  @Deprecated
   public static final String TEST_REUSE_SPARK_CONTEXT = "beam.spark.test.reuseSparkContext";
 
   // Spark allows only one context for JVM so this can be static.
-  private static JavaSparkContext sparkContext;
-  private static String sparkMaster;
-  private static boolean usesProvidedSparkContext;
+  private static @Nullable JavaSparkContext sparkContext;
+  private static @Nullable String sparkMaster;
+
+  private static boolean hasProvidedSparkContext;
 
   private SparkContextFactory() {}
 
+  /**
+   * Set an externally managed {@link JavaSparkContext} that will be used if {@link
+   * SparkContextOptions#getUsesProvidedSparkContext()} is set to {@code true}.
+   *
+   * <p>A Spark context can also be provided using {@link
+   * SparkContextOptions#setProvidedSparkContext(JavaSparkContext)}. However, it will be dropped
+   * during serialization potentially leading to confusing behavior. This is particularly the case
+   * when used in tests with {@link org.apache.beam.sdk.testing.TestPipeline}.
+   */
+  public static synchronized void setProvidedSparkContext(JavaSparkContext providedSparkContext) {
+    sparkContext = checkNotNull(providedSparkContext);
+    hasProvidedSparkContext = true;
+    sparkMaster = null;
+  }
+
+  public static synchronized void clearProvidedSparkContext() {
+    hasProvidedSparkContext = false;
+    sparkContext = null;

Review Comment:
   clear `sparkMaster` as well ?



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java:
##########
@@ -47,72 +54,49 @@ public class ProvidedSparkContextTest {
   private static final String PROVIDED_CONTEXT_EXCEPTION =
       "The provided Spark context was not created or was stopped";
 
+  @ClassRule public static SparkContextOptionsRule contextRule = new SparkContextOptionsRule();

Review Comment:
   name `contextRule` is missleading as a `SparkContextRule` class exists, pleaxe rename to `contextOptionsRule`



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java:
##########
@@ -20,190 +20,136 @@
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.fail;
+import static org.joda.time.Duration.millis;
+import static org.junit.Assert.assertThrows;
 
 import java.io.Serializable;
+import javax.annotation.Nullable;
 import org.apache.beam.runners.spark.io.CreateStream;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-import org.junit.Rule;
+import org.junit.ClassRule;
 import org.junit.Test;
-import org.junit.rules.TestName;
 
 /** This suite tests that various scenarios result in proper states of the pipeline. */
 public class SparkPipelineStateTest implements Serializable {
 
-  private static class MyCustomException extends RuntimeException {
+  @ClassRule public static SparkContextRule contextRule = new SparkContextRule();
 
+  private static class MyCustomException extends RuntimeException {

Review Comment:
   naming is so cute :smile: please rename to CustomException



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java:
##########
@@ -18,24 +18,31 @@
 package org.apache.beam.runners.spark;
 
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.assertThrows;
 
 import org.apache.beam.runners.spark.examples.WordCount;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
-import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.ClassRule;
+import org.junit.FixMethodOrder;
 import org.junit.Test;
+import org.junit.runners.MethodSorters;
 
-/** Provided Spark Context tests. */
+/**
+ * Provided Spark Context tests.
+ *
+ * <p>Note: These tests are run sequentially ordered by their name to reuse the Spark context and
+ * speed up testing.

Review Comment:
   clever !



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java:
##########
@@ -20,190 +20,136 @@
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.fail;
+import static org.joda.time.Duration.millis;
+import static org.junit.Assert.assertThrows;
 
 import java.io.Serializable;
+import javax.annotation.Nullable;
 import org.apache.beam.runners.spark.io.CreateStream;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-import org.junit.Rule;
+import org.junit.ClassRule;
 import org.junit.Test;
-import org.junit.rules.TestName;
 
 /** This suite tests that various scenarios result in proper states of the pipeline. */
 public class SparkPipelineStateTest implements Serializable {
 
-  private static class MyCustomException extends RuntimeException {
+  @ClassRule public static SparkContextRule contextRule = new SparkContextRule();
 
+  private static class MyCustomException extends RuntimeException {
     MyCustomException(final String message) {
       super(message);
     }
   }
 
-  private final transient SparkPipelineOptions options =
-      PipelineOptionsFactory.as(SparkPipelineOptions.class);
-
-  @Rule public transient TestName testName = new TestName();
-
-  private static final String FAILED_THE_BATCH_INTENTIONALLY = "Failed the batch intentionally";
-
-  private ParDo.SingleOutput<String, String> printParDo(final String prefix) {
-    return ParDo.of(
-        new DoFn<String, String>() {
-
-          @ProcessElement
-          public void processElement(final ProcessContext c) {
-            System.out.println(prefix + " " + c.element());
-          }
-        });
-  }
-
-  private PTransform<PBegin, PCollection<String>> getValues(final SparkPipelineOptions options) {
-    final boolean doNotSyncWithWatermark = false;
-    return options.isStreaming()
-        ? CreateStream.of(StringUtf8Coder.of(), Duration.millis(1), doNotSyncWithWatermark)
-            .nextBatch("one", "two")
-        : Create.of("one", "two");
+  private static class FailAlways extends SimpleFunction<String, String> {
+    @Override
+    public String apply(final String input) {
+      throw new MyCustomException(FAILED_THE_BATCH_INTENTIONALLY);
+    }
   }
 
-  private SparkPipelineOptions getStreamingOptions() {
-    options.setRunner(SparkRunner.class);
-    options.setStreaming(true);
-    return options;
-  }
+  private static final String FAILED_THE_BATCH_INTENTIONALLY = "Failed the batch intentionally";
 
-  private SparkPipelineOptions getBatchOptions() {
+  private Pipeline createPipeline(
+      boolean isStreaming, @Nullable SimpleFunction<String, String> mapFun) {
+    SparkContextOptions options = contextRule.createPipelineOptions();
     options.setRunner(SparkRunner.class);
-    options.setStreaming(false); // explicit because options is reused throughout the test.
-    return options;
-  }
-
-  private Pipeline getPipeline(final SparkPipelineOptions options) {
-
-    final Pipeline pipeline = Pipeline.create(options);
-    final String name = testName.getMethodName() + "(isStreaming=" + options.isStreaming() + ")";
+    options.setStreaming(isStreaming);
 
-    pipeline.apply(getValues(options)).setCoder(StringUtf8Coder.of()).apply(printParDo(name));
+    Pipeline pipeline = Pipeline.create(options);
+    PTransform<PBegin, PCollection<String>> values =
+        isStreaming
+            ? CreateStream.of(StringUtf8Coder.of(), millis(1), false).nextBatch("one", "two")
+            : Create.of("one", "two");
 
+    PCollection<String> collection = pipeline.apply(values).setCoder(StringUtf8Coder.of());
+    if (mapFun != null) {
+      collection.apply(MapElements.via(mapFun));
+    }
     return pipeline;
   }
 
-  private void testFailedPipeline(final SparkPipelineOptions options) throws Exception {
-
-    SparkPipelineResult result = null;
-
-    try {
-      final Pipeline pipeline = Pipeline.create(options);
-      pipeline
-          .apply(getValues(options))
-          .setCoder(StringUtf8Coder.of())
-          .apply(
-              MapElements.via(
-                  new SimpleFunction<String, String>() {
-
-                    @Override
-                    public String apply(final String input) {
-                      throw new MyCustomException(FAILED_THE_BATCH_INTENTIONALLY);
-                    }
-                  }));
-
-      result = (SparkPipelineResult) pipeline.run();
-      result.waitUntilFinish();
-    } catch (final Exception e) {
-      assertThat(e, instanceOf(Pipeline.PipelineExecutionException.class));
-      assertThat(e.getCause(), instanceOf(MyCustomException.class));
-      assertThat(e.getCause().getMessage(), is(FAILED_THE_BATCH_INTENTIONALLY));
-      assertThat(result.getState(), is(PipelineResult.State.FAILED));
-      result.cancel();
-      return;
-    }
+  private void testFailedPipeline(boolean isStreaming) throws Exception {
+    Pipeline pipeline = createPipeline(isStreaming, new FailAlways());
+    SparkPipelineResult result = (SparkPipelineResult) pipeline.run();
 
-    fail("An injected failure did not affect the pipeline as expected.");
+    PipelineExecutionException e =
+        assertThrows(PipelineExecutionException.class, () -> result.waitUntilFinish());
+    assertThat(e.getCause(), instanceOf(MyCustomException.class));
+    assertThat(e.getCause().getMessage(), is(FAILED_THE_BATCH_INTENTIONALLY));
+    assertThat(result.getState(), is(PipelineResult.State.FAILED));
+    result.cancel();
   }
 
-  private void testTimeoutPipeline(final SparkPipelineOptions options) throws Exception {
-
-    final Pipeline pipeline = getPipeline(options);
-
-    final SparkPipelineResult result = (SparkPipelineResult) pipeline.run();
-
-    result.waitUntilFinish(Duration.millis(1));
+  private void testTimeoutPipeline(boolean isStreaming) throws Exception {

Review Comment:
   I know you did not change this test functionnally but it is very strange as it does not assert any timeout as the TimeoutException is just ignored in `waitUntilFinish` and nothing is thrown and no state change is done. I think you should add a comment to explain that



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java:
##########
@@ -112,8 +110,6 @@ public class ResumeFromCheckpointStreamingTest implements Serializable {
 
   private transient TemporaryFolder temporaryFolder;
 
-  @Rule public final transient ReuseSparkContextRule noContextReuse = ReuseSparkContextRule.no();
-

Review Comment:
     default of your sparkContextRule is to reuse the context. So to disable that, you just don't add a context rule, right ?



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/EncoderHelpersTest.java:
##########
@@ -32,16 +33,15 @@
 @RunWith(JUnit4.class)
 public class EncoderHelpersTest {
 
+  @ClassRule public static SparkSessionRule sessionRule = new SparkSessionRule();
+
   @Test
   public void beamCoderToSparkEncoderTest() {
-    SparkSession sparkSession =
-        SparkSession.builder()
-            .appName("beamCoderToSparkEncoderTest")
-            .master("local[4]")
-            .getOrCreate();

Review Comment:
   Thanks, it is cleaner to use a session test rule than manually creating it. I just did the simplest when writing this test :smile: 



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java:
##########
@@ -22,91 +22,94 @@
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Registration;
-import org.apache.beam.runners.spark.SparkContextOptions;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.TestSparkPipelineOptions;
-import org.apache.beam.runners.spark.TestSparkRunner;
+import org.apache.beam.runners.spark.SparkContextRule;
 import org.apache.beam.runners.spark.io.MicrobatchSource;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.beam.sdk.values.KV;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
 
-/** Testing of beam registrar. */
+/**
+ * Testing of beam registrar. Note: There can only be one Spark context at a time. For that reason
+ * tests requiring a different context have to be forked using separate test classes.
+ */
 @SuppressWarnings({
   "rawtypes" // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
 })
+@RunWith(Enclosed.class)
 public class SparkRunnerKryoRegistratorTest {
 
-  @Test
-  public void testKryoRegistration() {
-    SparkConf conf = new SparkConf();
-    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
-    conf.set("spark.kryo.registrator", WrapperKryoRegistrator.class.getName());
-    runSimplePipelineWithSparkContext(conf);
-    assertTrue(
-        "WrapperKryoRegistrator wasn't initiated, probably KryoSerializer is not set",
-        WrapperKryoRegistrator.wasInitiated);
-  }
+  public static class WithKryoSerializer {
 
-  @Test
-  public void testDefaultSerializerNotCallingKryo() {
-    SparkConf conf = new SparkConf();
-    conf.set("spark.kryo.registrator", KryoRegistratorIsNotCalled.class.getName());
-    runSimplePipelineWithSparkContext(conf);
-  }
+    @ClassRule
+    public static SparkContextRule contextRule =
+        new SparkContextRule(
+            KV.of("spark.serializer", "org.apache.spark.serializer.KryoSerializer"),
+            KV.of("spark.kryo.registrator", WrapperKryoRegistrator.class.getName()));
 
-  private void runSimplePipelineWithSparkContext(SparkConf conf) {
-    SparkPipelineOptions options =
-        PipelineOptionsFactory.create().as(TestSparkPipelineOptions.class);
-    options.setRunner(TestSparkRunner.class);
+    @Test
+    public void testKryoRegistration() {
+      runSimplePipelineWithSparkContextOptions(contextRule);
+      assertTrue(
+          "WrapperKryoRegistrator wasn't initiated, probably KryoSerializer is not set",
+          WrapperKryoRegistrator.wasInitiated);
+    }
 
-    conf.set("spark.master", "local");
-    conf.setAppName("test");
+    /**
+     * A {@link SparkRunnerKryoRegistrator} that registers an internal class to validate
+     * KryoSerialization resolution. Use only for test purposes. Needs to be public for
+     * serialization.
+     */
+    public static class WrapperKryoRegistrator extends SparkRunnerKryoRegistrator {
 
-    JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
-    options.setUsesProvidedSparkContext(true);
-    options.as(SparkContextOptions.class).setProvidedSparkContext(javaSparkContext);
-    Pipeline p = Pipeline.create(options);
-    p.apply(Create.of("a")); // some operation to trigger pipeline construction
-    p.run().waitUntilFinish();
-    javaSparkContext.stop();
-  }
+      static boolean wasInitiated = false;
 
-  /**
-   * A {@link SparkRunnerKryoRegistrator} that fails if called. Use only for test purposes. Needs to
-   * be public for serialization.
-   */
-  public static class KryoRegistratorIsNotCalled extends SparkRunnerKryoRegistrator {
+      public WrapperKryoRegistrator() {
+        wasInitiated = true;
+      }
 
-    @Override
-    public void registerClasses(Kryo kryo) {
-      fail(
-          "Default spark.serializer is JavaSerializer"
-              + " so spark.kryo.registrator shouldn't be called");
+      @Override
+      public void registerClasses(Kryo kryo) {
+        super.registerClasses(kryo);
+        Registration registration = kryo.getRegistration(MicrobatchSource.class);
+        com.esotericsoftware.kryo.Serializer kryoSerializer = registration.getSerializer();
+        assertTrue(kryoSerializer instanceof StatelessJavaSerializer);
+      }
     }
   }
 
-  /**
-   * A {@link SparkRunnerKryoRegistrator} that registers an internal class to validate
-   * KryoSerialization resolution. Use only for test purposes. Needs to be public for serialization.
-   */
-  public static class WrapperKryoRegistrator extends SparkRunnerKryoRegistrator {
+  public static class WithoutKryoSerializer {
+    @ClassRule
+    public static SparkContextRule contextRule =
+        new SparkContextRule(
+            KV.of("spark.kryo.registrator", KryoRegistratorIsNotCalled.class.getName()));
 
-    static boolean wasInitiated = false;
-
-    public WrapperKryoRegistrator() {
-      wasInitiated = true;
+    @Test
+    public void testDefaultSerializerNotCallingKryo() {

Review Comment:
   the assertion here is just that no exception is thrown upon serde ?



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java:
##########
@@ -47,72 +54,49 @@ public class ProvidedSparkContextTest {
   private static final String PROVIDED_CONTEXT_EXCEPTION =
       "The provided Spark context was not created or was stopped";
 
+  @ClassRule public static SparkContextOptionsRule contextRule = new SparkContextOptionsRule();
+
   /** Provide a context and call pipeline run. */
   @Test
-  public void testWithProvidedContext() throws Exception {
-    JavaSparkContext jsc = new JavaSparkContext("local[*]", "Existing_Context");
-    testWithValidProvidedContext(jsc);
+  public void testAWithProvidedContext() throws Exception {
+    Pipeline p = createPipeline();
+    PipelineResult result = p.run(); // Run test from pipeline
+    result.waitUntilFinish();

Review Comment:
   should not be needed for batch pipelines but good to get used to call it



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java:
##########
@@ -47,72 +54,49 @@ public class ProvidedSparkContextTest {
   private static final String PROVIDED_CONTEXT_EXCEPTION =
       "The provided Spark context was not created or was stopped";
 
+  @ClassRule public static SparkContextOptionsRule contextRule = new SparkContextOptionsRule();
+
   /** Provide a context and call pipeline run. */
   @Test
-  public void testWithProvidedContext() throws Exception {
-    JavaSparkContext jsc = new JavaSparkContext("local[*]", "Existing_Context");
-    testWithValidProvidedContext(jsc);
+  public void testAWithProvidedContext() throws Exception {
+    Pipeline p = createPipeline();
+    PipelineResult result = p.run(); // Run test from pipeline
+    result.waitUntilFinish();
+    TestPipeline.verifyPAssertsSucceeded(p, result);

Review Comment:
   why is this needed, adding the PAssert to the pipeline DAG and running the pipeline should be enough



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java:
##########
@@ -68,7 +73,7 @@ public void testInBatchMode() throws Exception {
             .apply(new WordCount.CountWords())
             .apply(MapElements.via(new WordCount.FormatAsTextFn()));
     PAssert.that(output).containsInAnyOrder(EXPECTED_COUNTS);
-    pipeline.run();
+    pipeline.run().waitUntilFinish();

Review Comment:
   good catch !



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/SparkContextRule.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark;
+
+import static java.util.stream.Collectors.toMap;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.spark.translation.SparkContextFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.KV;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.rules.ExternalResource;
+
+public class SparkContextRule extends ExternalResource implements Serializable {
+  private transient SparkConf sparkConf;
+  private transient @Nullable JavaSparkContext sparkContext = null;
+
+  public SparkContextRule(String sparkMaster, Map<String, String> sparkConfig) {
+    sparkConf = new SparkConf();
+    sparkConfig.forEach(sparkConf::set);
+    sparkConf.setMaster(sparkMaster).setAppName(sparkMaster);

Review Comment:
   get the app name from pipeline options



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/SparkContextRule.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark;
+
+import static java.util.stream.Collectors.toMap;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.spark.translation.SparkContextFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.KV;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.rules.ExternalResource;
+
+public class SparkContextRule extends ExternalResource implements Serializable {
+  private transient SparkConf sparkConf;
+  private transient @Nullable JavaSparkContext sparkContext = null;
+
+  public SparkContextRule(String sparkMaster, Map<String, String> sparkConfig) {
+    sparkConf = new SparkConf();
+    sparkConfig.forEach(sparkConf::set);
+    sparkConf.setMaster(sparkMaster).setAppName(sparkMaster);
+  }
+
+  public SparkContextRule(KV<String, String>... sparkConfig) {
+    this("local", sparkConfig);
+  }
+
+  public SparkContextRule(String sparkMaster, KV<String, String>... sparkConfig) {
+    this(sparkMaster, Arrays.stream(sparkConfig).collect(toMap(KV::getKey, KV::getValue)));
+  }
+
+  public JavaSparkContext getSparkContext() {
+    if (sparkContext == null) {
+      throw new IllegalStateException("SparkContext not available");
+    }
+    return sparkContext;
+  }
+
+  public SparkContextOptions createPipelineOptions() {
+    return configure(TestPipeline.testingPipelineOptions());
+  }
+
+  public SparkContextOptions configure(PipelineOptions opts) {
+    SparkContextOptions ctxOpts = opts.as(SparkContextOptions.class);
+    ctxOpts.setUsesProvidedSparkContext(true);
+    ctxOpts.setProvidedSparkContext(getSparkContext());
+    return ctxOpts;
+  }
+
+  @Override
+  protected void before() throws Throwable {
+    sparkContext = new JavaSparkContext(sparkConf);
+    SparkContextFactory.setProvidedSparkContext(sparkContext);
+  }
+
+  @Override
+  protected void after() {
+    SparkContextFactory.clearProvidedSparkContext();
+    getSparkContext().stop();

Review Comment:
   if it is no more used, set to null to be sure that the GC will free memory ?



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SparkSessionRule.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming;
+
+import static java.util.stream.Collectors.toMap;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.values.KV;
+import org.apache.spark.sql.SparkSession;
+import org.junit.rules.ExternalResource;
+
+public class SparkSessionRule extends ExternalResource implements Serializable {
+  private transient SparkSession.Builder builder;
+  private transient @Nullable SparkSession session = null;
+
+  public SparkSessionRule(String sparkMaster, Map<String, String> sparkConfig) {
+    builder = SparkSession.builder();
+    sparkConfig.forEach(builder::config);
+    builder.master(sparkMaster).appName("test");

Review Comment:
   appName should be fetchable from beam pipeline options



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java:
##########
@@ -25,80 +28,111 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** The Spark context factory. */
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
 public final class SparkContextFactory {
   private static final Logger LOG = LoggerFactory.getLogger(SparkContextFactory.class);
 
   /**
    * If the property {@code beam.spark.test.reuseSparkContext} is set to {@code true} then the Spark
    * context will be reused for beam pipelines. This property should only be enabled for tests.
+   *
+   * @deprecated Please use {@link SparkContextOptions} instead to allow for proper lifecycle
+   *     control to not leak your SparkContext.
    */
+  @Deprecated
   public static final String TEST_REUSE_SPARK_CONTEXT = "beam.spark.test.reuseSparkContext";
 
   // Spark allows only one context for JVM so this can be static.
-  private static JavaSparkContext sparkContext;
-  private static String sparkMaster;
-  private static boolean usesProvidedSparkContext;
+  private static @Nullable JavaSparkContext sparkContext;
+  private static @Nullable String sparkMaster;
+
+  private static boolean hasProvidedSparkContext;
 
   private SparkContextFactory() {}
 
+  /**
+   * Set an externally managed {@link JavaSparkContext} that will be used if {@link
+   * SparkContextOptions#getUsesProvidedSparkContext()} is set to {@code true}.
+   *
+   * <p>A Spark context can also be provided using {@link
+   * SparkContextOptions#setProvidedSparkContext(JavaSparkContext)}. However, it will be dropped
+   * during serialization potentially leading to confusing behavior. This is particularly the case
+   * when used in tests with {@link org.apache.beam.sdk.testing.TestPipeline}.
+   */
+  public static synchronized void setProvidedSparkContext(JavaSparkContext providedSparkContext) {
+    sparkContext = checkNotNull(providedSparkContext);
+    hasProvidedSparkContext = true;
+    sparkMaster = null;
+  }
+
+  public static synchronized void clearProvidedSparkContext() {
+    hasProvidedSparkContext = false;
+    sparkContext = null;
+  }
+
   public static synchronized JavaSparkContext getSparkContext(SparkPipelineOptions options) {
     SparkContextOptions contextOptions = options.as(SparkContextOptions.class);
-    usesProvidedSparkContext = contextOptions.getUsesProvidedSparkContext();
-    // reuse should be ignored if the context is provided.
-    if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT) && !usesProvidedSparkContext) {
-
-      // if the context is null or stopped for some reason, re-create it.
-      if (sparkContext == null || sparkContext.sc().isStopped()) {
-        sparkContext = createSparkContext(contextOptions);
+    if (contextOptions.getUsesProvidedSparkContext()) {
+      JavaSparkContext jsc = contextOptions.getProvidedSparkContext();
+      if (jsc != null) {
+        setProvidedSparkContext(jsc);
+      } else if (hasProvidedSparkContext) {
+        jsc = sparkContext;
+      }
+      if (jsc == null) {
+        throw new IllegalStateException(
+            "No Spark context was provided. Use SparkContextFactor.setProvidedSparkContext to do so.");
+      } else if (jsc.sc().isStopped()) {
+        LOG.error("The provided Spark context " + jsc + " was already stopped.");
+        throw new IllegalStateException("The provided Spark context was already stopped");
+      }
+      LOG.info("Using a provided Spark Context");
+      return jsc;
+    } else if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)) {
+      // This is highly discouraged as it leaks SparkContexts without any control.
+      // If the context is null or stopped for some reason, re-create it.

Review Comment:
   If by leak you mean that we keep a reference to the context and thus the GC can't free the object, would it be possible in that case to solve the leak for example by clearing the references that point to it (or using an alternative reference type) and keep this setting? 



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java:
##########
@@ -25,80 +28,111 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** The Spark context factory. */
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
 public final class SparkContextFactory {
   private static final Logger LOG = LoggerFactory.getLogger(SparkContextFactory.class);
 
   /**
    * If the property {@code beam.spark.test.reuseSparkContext} is set to {@code true} then the Spark
    * context will be reused for beam pipelines. This property should only be enabled for tests.
+   *
+   * @deprecated Please use {@link SparkContextOptions} instead to allow for proper lifecycle
+   *     control to not leak your SparkContext.
    */
+  @Deprecated
   public static final String TEST_REUSE_SPARK_CONTEXT = "beam.spark.test.reuseSparkContext";
 
   // Spark allows only one context for JVM so this can be static.
-  private static JavaSparkContext sparkContext;
-  private static String sparkMaster;
-  private static boolean usesProvidedSparkContext;
+  private static @Nullable JavaSparkContext sparkContext;
+  private static @Nullable String sparkMaster;
+
+  private static boolean hasProvidedSparkContext;
 
   private SparkContextFactory() {}
 
+  /**
+   * Set an externally managed {@link JavaSparkContext} that will be used if {@link
+   * SparkContextOptions#getUsesProvidedSparkContext()} is set to {@code true}.
+   *
+   * <p>A Spark context can also be provided using {@link
+   * SparkContextOptions#setProvidedSparkContext(JavaSparkContext)}. However, it will be dropped

Review Comment:
   please add this comment and deprecation notice to `SparkContextOptions#setProvidedSparkContext`



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java:
##########
@@ -47,72 +54,49 @@ public class ProvidedSparkContextTest {
   private static final String PROVIDED_CONTEXT_EXCEPTION =
       "The provided Spark context was not created or was stopped";
 
+  @ClassRule public static SparkContextOptionsRule contextRule = new SparkContextOptionsRule();
+
   /** Provide a context and call pipeline run. */
   @Test
-  public void testWithProvidedContext() throws Exception {
-    JavaSparkContext jsc = new JavaSparkContext("local[*]", "Existing_Context");
-    testWithValidProvidedContext(jsc);
+  public void testAWithProvidedContext() throws Exception {
+    Pipeline p = createPipeline();
+    PipelineResult result = p.run(); // Run test from pipeline
+    result.waitUntilFinish();
+    TestPipeline.verifyPAssertsSucceeded(p, result);
     // A provided context must not be stopped after execution
-    assertFalse(jsc.sc().isStopped());
-    jsc.stop();
+    assertFalse(contextRule.getSparkContext().sc().isStopped());
   }
 
-  /** Provide a context and call pipeline run. */
+  /** A SparkRunner with a stopped provided Spark context cannot run pipelines. */
   @Test
-  public void testWithNullContext() throws Exception {
-    testWithInvalidContext(null);
+  public void testBWithStoppedProvidedContext() {
+    // Stop the provided Spark context
+    contextRule.getSparkContext().sc().stop();
+    assertThrows(
+        PROVIDED_CONTEXT_EXCEPTION,
+        RuntimeException.class,
+        () -> createPipeline().run().waitUntilFinish());
   }
 
-  /** A SparkRunner with a stopped provided Spark context cannot run pipelines. */
+  /** Provide a context and call pipeline run. */
   @Test
-  public void testWithStoppedProvidedContext() throws Exception {
-    JavaSparkContext jsc = new JavaSparkContext("local[*]", "Existing_Context");
-    // Stop the provided Spark context directly
-    jsc.stop();
-    testWithInvalidContext(jsc);
+  public void testCWithNullContext() {
+    contextRule.getOptions().setProvidedSparkContext(null);
+    assertThrows(
+        PROVIDED_CONTEXT_EXCEPTION,
+        RuntimeException.class,
+        () -> createPipeline().run().waitUntilFinish());
   }
 
-  private void testWithValidProvidedContext(JavaSparkContext jsc) throws Exception {
-    SparkContextOptions options = getSparkContextOptions(jsc);
-
-    Pipeline p = Pipeline.create(options);
+  private Pipeline createPipeline() {
+    Pipeline p = Pipeline.create(contextRule.getOptions());
     PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()));
     PCollection<String> output =
         inputWords
             .apply(new WordCount.CountWords())
             .apply(MapElements.via(new WordCount.FormatAsTextFn()));
-
-    PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
-
     // Run test from pipeline
-    PipelineResult result = p.run();
-
-    TestPipeline.verifyPAssertsSucceeded(p, result);
-  }
-
-  private void testWithInvalidContext(JavaSparkContext jsc) {
-    SparkContextOptions options = getSparkContextOptions(jsc);
-
-    Pipeline p = Pipeline.create(options);
-    PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()));
-    inputWords
-        .apply(new WordCount.CountWords())
-        .apply(MapElements.via(new WordCount.FormatAsTextFn()));
-
-    try {
-      p.run().waitUntilFinish();
-      fail("Should throw an exception when The provided Spark context is null or stopped");
-    } catch (RuntimeException e) {
-      assert e.getMessage().contains(PROVIDED_CONTEXT_EXCEPTION);
-    }
-  }
-
-  private static SparkContextOptions getSparkContextOptions(JavaSparkContext jsc) {
-    final SparkContextOptions options = PipelineOptionsFactory.as(SparkContextOptions.class);
-    options.setRunner(TestSparkRunner.class);
-    options.setUsesProvidedSparkContext(true);
-    options.setProvidedSparkContext(jsc);
-    options.setEnableSparkMetricSinks(false);
-    return options;
+    PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
+    return p;

Review Comment:
   yes I prefer this coding style too: preparing the pipeline in a util method and do the run and assertions inside the test method. + it avoids the pipeline code duplication that was in place



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java:
##########
@@ -86,7 +85,6 @@
 public class CreateStreamTest implements Serializable {
 
   @Rule public final transient TestPipeline p = TestPipeline.create();
-  @Rule public final transient ReuseSparkContextRule noContextResue = ReuseSparkContextRule.no();

Review Comment:
   and here again



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java:
##########
@@ -25,80 +28,111 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** The Spark context factory. */
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
 public final class SparkContextFactory {
   private static final Logger LOG = LoggerFactory.getLogger(SparkContextFactory.class);
 
   /**
    * If the property {@code beam.spark.test.reuseSparkContext} is set to {@code true} then the Spark
    * context will be reused for beam pipelines. This property should only be enabled for tests.
+   *
+   * @deprecated Please use {@link SparkContextOptions} instead to allow for proper lifecycle
+   *     control to not leak your SparkContext.
    */
+  @Deprecated
   public static final String TEST_REUSE_SPARK_CONTEXT = "beam.spark.test.reuseSparkContext";
 
   // Spark allows only one context for JVM so this can be static.
-  private static JavaSparkContext sparkContext;
-  private static String sparkMaster;
-  private static boolean usesProvidedSparkContext;
+  private static @Nullable JavaSparkContext sparkContext;
+  private static @Nullable String sparkMaster;
+
+  private static boolean hasProvidedSparkContext;
 
   private SparkContextFactory() {}
 
+  /**
+   * Set an externally managed {@link JavaSparkContext} that will be used if {@link
+   * SparkContextOptions#getUsesProvidedSparkContext()} is set to {@code true}.
+   *
+   * <p>A Spark context can also be provided using {@link
+   * SparkContextOptions#setProvidedSparkContext(JavaSparkContext)}. However, it will be dropped
+   * during serialization potentially leading to confusing behavior. This is particularly the case
+   * when used in tests with {@link org.apache.beam.sdk.testing.TestPipeline}.
+   */
+  public static synchronized void setProvidedSparkContext(JavaSparkContext providedSparkContext) {
+    sparkContext = checkNotNull(providedSparkContext);
+    hasProvidedSparkContext = true;
+    sparkMaster = null;
+  }
+
+  public static synchronized void clearProvidedSparkContext() {
+    hasProvidedSparkContext = false;
+    sparkContext = null;
+  }
+
   public static synchronized JavaSparkContext getSparkContext(SparkPipelineOptions options) {
     SparkContextOptions contextOptions = options.as(SparkContextOptions.class);
-    usesProvidedSparkContext = contextOptions.getUsesProvidedSparkContext();
-    // reuse should be ignored if the context is provided.
-    if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT) && !usesProvidedSparkContext) {
-
-      // if the context is null or stopped for some reason, re-create it.
-      if (sparkContext == null || sparkContext.sc().isStopped()) {
-        sparkContext = createSparkContext(contextOptions);
+    if (contextOptions.getUsesProvidedSparkContext()) {
+      JavaSparkContext jsc = contextOptions.getProvidedSparkContext();
+      if (jsc != null) {
+        setProvidedSparkContext(jsc);
+      } else if (hasProvidedSparkContext) {
+        jsc = sparkContext;
+      }
+      if (jsc == null) {
+        throw new IllegalStateException(
+            "No Spark context was provided. Use SparkContextFactor.setProvidedSparkContext to do so.");
+      } else if (jsc.sc().isStopped()) {
+        LOG.error("The provided Spark context " + jsc + " was already stopped.");
+        throw new IllegalStateException("The provided Spark context was already stopped");
+      }
+      LOG.info("Using a provided Spark Context");
+      return jsc;
+    } else if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)) {
+      // This is highly discouraged as it leaks SparkContexts without any control.
+      // If the context is null or stopped for some reason, re-create it.
+      @Nullable JavaSparkContext jsc = sparkContext;
+      if (jsc == null || jsc.sc().isStopped()) {
+        sparkContext = jsc = createSparkContext(contextOptions);
         sparkMaster = options.getSparkMaster();
+        hasProvidedSparkContext = false;

Review Comment:
   should be true ?



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SparkCoGroupByKeyStreamingTest.java:
##########
@@ -53,8 +52,6 @@ public class SparkCoGroupByKeyStreamingTest {
   private static final TupleTag<Integer> INPUT1_TAG = new TupleTag<>("input1");
   private static final TupleTag<Integer> INPUT2_TAG = new TupleTag<>("input2");
 
-  @Rule public final transient ReuseSparkContextRule noContextResue = ReuseSparkContextRule.no();
-

Review Comment:
   same here



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SparkSessionRule.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming;
+
+import static java.util.stream.Collectors.toMap;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.values.KV;
+import org.apache.spark.sql.SparkSession;
+import org.junit.rules.ExternalResource;
+
+public class SparkSessionRule extends ExternalResource implements Serializable {
+  private transient SparkSession.Builder builder;
+  private transient @Nullable SparkSession session = null;
+
+  public SparkSessionRule(String sparkMaster, Map<String, String> sparkConfig) {
+    builder = SparkSession.builder();
+    sparkConfig.forEach(builder::config);
+    builder.master(sparkMaster).appName("test");
+  }
+
+  public SparkSessionRule(KV<String, String>... sparkConfig) {
+    this("local", sparkConfig);
+  }
+
+  public SparkSessionRule(String sparkMaster, KV<String, String>... sparkConfig) {
+    this(sparkMaster, Arrays.stream(sparkConfig).collect(toMap(KV::getKey, KV::getValue)));
+  }
+
+  public SparkSession getSession() {
+    if (session == null) {
+      throw new IllegalStateException("SparkSession not available");
+    }
+    return session;
+  }
+
+  @Override
+  protected void before() throws Throwable {
+    session = builder.getOrCreate();
+  }
+
+  @Override
+  protected void after() {
+    getSession().stop();

Review Comment:
   set session to null to be sure that it is freed by GC ?



##########
runners/spark/src/test/java/org/apache/beam/runners/spark/metrics/SparkMetricsPusherTest.java:
##########
@@ -52,8 +51,6 @@ public class SparkMetricsPusherTest {
   private static final Logger LOG = LoggerFactory.getLogger(SparkMetricsPusherTest.class);
   private static final String COUNTER_NAME = "counter";
 
-  @Rule public final transient ReuseSparkContextRule noContextResue = ReuseSparkContextRule.no();
-

Review Comment:
   once again



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r869996630


##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java:
##########
@@ -25,80 +28,111 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** The Spark context factory. */
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
 public final class SparkContextFactory {
   private static final Logger LOG = LoggerFactory.getLogger(SparkContextFactory.class);
 
   /**
    * If the property {@code beam.spark.test.reuseSparkContext} is set to {@code true} then the Spark
    * context will be reused for beam pipelines. This property should only be enabled for tests.
+   *
+   * @deprecated Please use {@link SparkContextOptions} instead to allow for proper lifecycle
+   *     control to not leak your SparkContext.
    */
+  @Deprecated
   public static final String TEST_REUSE_SPARK_CONTEXT = "beam.spark.test.reuseSparkContext";
 
   // Spark allows only one context for JVM so this can be static.
-  private static JavaSparkContext sparkContext;
-  private static String sparkMaster;
-  private static boolean usesProvidedSparkContext;
+  private static @Nullable JavaSparkContext sparkContext;
+  private static @Nullable String sparkMaster;
+
+  private static boolean hasProvidedSparkContext;
 
   private SparkContextFactory() {}
 
+  /**
+   * Set an externally managed {@link JavaSparkContext} that will be used if {@link
+   * SparkContextOptions#getUsesProvidedSparkContext()} is set to {@code true}.
+   *
+   * <p>A Spark context can also be provided using {@link
+   * SparkContextOptions#setProvidedSparkContext(JavaSparkContext)}. However, it will be dropped
+   * during serialization potentially leading to confusing behavior. This is particularly the case
+   * when used in tests with {@link org.apache.beam.sdk.testing.TestPipeline}.
+   */
+  public static synchronized void setProvidedSparkContext(JavaSparkContext providedSparkContext) {
+    sparkContext = checkNotNull(providedSparkContext);
+    hasProvidedSparkContext = true;
+    sparkMaster = null;
+  }
+
+  public static synchronized void clearProvidedSparkContext() {
+    hasProvidedSparkContext = false;
+    sparkContext = null;
+  }
+
   public static synchronized JavaSparkContext getSparkContext(SparkPipelineOptions options) {
     SparkContextOptions contextOptions = options.as(SparkContextOptions.class);
-    usesProvidedSparkContext = contextOptions.getUsesProvidedSparkContext();
-    // reuse should be ignored if the context is provided.
-    if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT) && !usesProvidedSparkContext) {
-
-      // if the context is null or stopped for some reason, re-create it.
-      if (sparkContext == null || sparkContext.sc().isStopped()) {
-        sparkContext = createSparkContext(contextOptions);
+    if (contextOptions.getUsesProvidedSparkContext()) {
+      JavaSparkContext jsc = contextOptions.getProvidedSparkContext();
+      if (jsc != null) {
+        setProvidedSparkContext(jsc);
+      } else if (hasProvidedSparkContext) {
+        jsc = sparkContext;
+      }
+      if (jsc == null) {
+        throw new IllegalStateException(
+            "No Spark context was provided. Use SparkContextFactor.setProvidedSparkContext to do so.");
+      } else if (jsc.sc().isStopped()) {
+        LOG.error("The provided Spark context " + jsc + " was already stopped.");
+        throw new IllegalStateException("The provided Spark context was already stopped");
+      }
+      LOG.info("Using a provided Spark Context");
+      return jsc;
+    } else if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)) {
+      // This is highly discouraged as it leaks SparkContexts without any control.
+      // If the context is null or stopped for some reason, re-create it.

Review Comment:
   No, GC is pretty much irrelevant ... by leak i mean a resource leak. There's a SparkContext left behind that is never stopped without any way for users to do so. The problem is that this will prevent any further attempt to create a new SparkContext in the same JVM



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] echauchot commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r870268151


##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java:
##########
@@ -36,7 +36,8 @@ public final class SparkContextFactory {
    * context will be reused for beam pipelines. This property should only be enabled for tests.
    *
    * @deprecated Please use {@link SparkContextOptions} instead to allow for proper lifecycle

Review Comment:
   IMHO I think we should guide the user more by saying something like:
   "Please use SparkContextRule to allow for automatic control of the SparkContext lifecycle (with a default of reusing it) or directly provide a SparkContext by using SparkContextFactory#setProvidedSparkContext and enabling SparkContextOptions#usesProvidedSparkContext"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] echauchot commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r871219417


##########
runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java:
##########
@@ -22,91 +22,94 @@
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Registration;
-import org.apache.beam.runners.spark.SparkContextOptions;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.TestSparkPipelineOptions;
-import org.apache.beam.runners.spark.TestSparkRunner;
+import org.apache.beam.runners.spark.SparkContextRule;
 import org.apache.beam.runners.spark.io.MicrobatchSource;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.beam.sdk.values.KV;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
 
-/** Testing of beam registrar. */
+/**
+ * Testing of beam registrar. Note: There can only be one Spark context at a time. For that reason
+ * tests requiring a different context have to be forked using separate test classes.
+ */
 @SuppressWarnings({
   "rawtypes" // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
 })
+@RunWith(Enclosed.class)
 public class SparkRunnerKryoRegistratorTest {
 
-  @Test
-  public void testKryoRegistration() {
-    SparkConf conf = new SparkConf();
-    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
-    conf.set("spark.kryo.registrator", WrapperKryoRegistrator.class.getName());
-    runSimplePipelineWithSparkContext(conf);
-    assertTrue(
-        "WrapperKryoRegistrator wasn't initiated, probably KryoSerializer is not set",
-        WrapperKryoRegistrator.wasInitiated);
-  }
+  public static class WithKryoSerializer {
 
-  @Test
-  public void testDefaultSerializerNotCallingKryo() {
-    SparkConf conf = new SparkConf();
-    conf.set("spark.kryo.registrator", KryoRegistratorIsNotCalled.class.getName());
-    runSimplePipelineWithSparkContext(conf);
-  }
+    @ClassRule
+    public static SparkContextRule contextRule =
+        new SparkContextRule(
+            KV.of("spark.serializer", "org.apache.spark.serializer.KryoSerializer"),
+            KV.of("spark.kryo.registrator", WrapperKryoRegistrator.class.getName()));
 
-  private void runSimplePipelineWithSparkContext(SparkConf conf) {
-    SparkPipelineOptions options =
-        PipelineOptionsFactory.create().as(TestSparkPipelineOptions.class);
-    options.setRunner(TestSparkRunner.class);
+    @Test
+    public void testKryoRegistration() {
+      runSimplePipelineWithSparkContextOptions(contextRule);
+      assertTrue(
+          "WrapperKryoRegistrator wasn't initiated, probably KryoSerializer is not set",
+          WrapperKryoRegistrator.wasInitiated);
+    }
 
-    conf.set("spark.master", "local");
-    conf.setAppName("test");
+    /**
+     * A {@link SparkRunnerKryoRegistrator} that registers an internal class to validate
+     * KryoSerialization resolution. Use only for test purposes. Needs to be public for
+     * serialization.
+     */
+    public static class WrapperKryoRegistrator extends SparkRunnerKryoRegistrator {
 
-    JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
-    options.setUsesProvidedSparkContext(true);
-    options.as(SparkContextOptions.class).setProvidedSparkContext(javaSparkContext);
-    Pipeline p = Pipeline.create(options);
-    p.apply(Create.of("a")); // some operation to trigger pipeline construction
-    p.run().waitUntilFinish();
-    javaSparkContext.stop();
-  }
+      static boolean wasInitiated = false;
 
-  /**
-   * A {@link SparkRunnerKryoRegistrator} that fails if called. Use only for test purposes. Needs to
-   * be public for serialization.
-   */
-  public static class KryoRegistratorIsNotCalled extends SparkRunnerKryoRegistrator {
+      public WrapperKryoRegistrator() {
+        wasInitiated = true;
+      }
 
-    @Override
-    public void registerClasses(Kryo kryo) {
-      fail(
-          "Default spark.serializer is JavaSerializer"
-              + " so spark.kryo.registrator shouldn't be called");
+      @Override
+      public void registerClasses(Kryo kryo) {
+        super.registerClasses(kryo);
+        Registration registration = kryo.getRegistration(MicrobatchSource.class);
+        com.esotericsoftware.kryo.Serializer kryoSerializer = registration.getSerializer();
+        assertTrue(kryoSerializer instanceof StatelessJavaSerializer);
+      }
     }
   }
 
-  /**
-   * A {@link SparkRunnerKryoRegistrator} that registers an internal class to validate
-   * KryoSerialization resolution. Use only for test purposes. Needs to be public for serialization.
-   */
-  public static class WrapperKryoRegistrator extends SparkRunnerKryoRegistrator {
+  public static class WithoutKryoSerializer {
+    @ClassRule
+    public static SparkContextRule contextRule =
+        new SparkContextRule(
+            KV.of("spark.kryo.registrator", KryoRegistratorIsNotCalled.class.getName()));
 
-    static boolean wasInitiated = false;
-
-    public WrapperKryoRegistrator() {
-      wasInitiated = true;
+    @Test
+    public void testDefaultSerializerNotCallingKryo() {

Review Comment:
   thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] echauchot commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r870090737


##########
runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SparkSessionRule.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming;
+
+import static java.util.stream.Collectors.toMap;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.values.KV;
+import org.apache.spark.sql.SparkSession;
+import org.junit.rules.ExternalResource;
+
+public class SparkSessionRule extends ExternalResource implements Serializable {
+  private transient SparkSession.Builder builder;
+  private transient @Nullable SparkSession session = null;
+
+  public SparkSessionRule(String sparkMaster, Map<String, String> sparkConfig) {
+    builder = SparkSession.builder();
+    sparkConfig.forEach(builder::config);
+    builder.master(sparkMaster).appName("test");

Review Comment:
   yes, same as above using Description looks good to me



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r871239931


##########
runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java:
##########
@@ -48,61 +49,45 @@ public static class WithKryoSerializer {
     public static SparkContextRule contextRule =
         new SparkContextRule(
             KV.of("spark.serializer", "org.apache.spark.serializer.KryoSerializer"),
-            KV.of("spark.kryo.registrator", WrapperKryoRegistrator.class.getName()));
+            KV.of("spark.kryo.registrator", TestKryoRegistrator.class.getName()));
 
     @Test
     public void testKryoRegistration() {
+      TestKryoRegistrator.wasInitiated = false;
       runSimplePipelineWithSparkContextOptions(contextRule);
-      assertTrue(
-          "WrapperKryoRegistrator wasn't initiated, probably KryoSerializer is not set",
-          WrapperKryoRegistrator.wasInitiated);
-    }
-
-    /**
-     * A {@link SparkRunnerKryoRegistrator} that registers an internal class to validate
-     * KryoSerialization resolution. Use only for test purposes. Needs to be public for
-     * serialization.
-     */
-    public static class WrapperKryoRegistrator extends SparkRunnerKryoRegistrator {
-
-      static boolean wasInitiated = false;
-
-      public WrapperKryoRegistrator() {
-        wasInitiated = true;
-      }
-
-      @Override
-      public void registerClasses(Kryo kryo) {
-        super.registerClasses(kryo);
-        Registration registration = kryo.getRegistration(MicrobatchSource.class);
-        com.esotericsoftware.kryo.Serializer kryoSerializer = registration.getSerializer();
-        assertTrue(kryoSerializer instanceof StatelessJavaSerializer);
-      }
+      assertTrue(TestKryoRegistrator.wasInitiated);
     }
   }
 
   public static class WithoutKryoSerializer {
     @ClassRule
     public static SparkContextRule contextRule =
-        new SparkContextRule(
-            KV.of("spark.kryo.registrator", KryoRegistratorIsNotCalled.class.getName()));
+        new SparkContextRule(KV.of("spark.kryo.registrator", TestKryoRegistrator.class.getName()));
 
     @Test
     public void testDefaultSerializerNotCallingKryo() {
+      TestKryoRegistrator.wasInitiated = false;
       runSimplePipelineWithSparkContextOptions(contextRule);
+      assertFalse(TestKryoRegistrator.wasInitiated);

Review Comment:
   Instead of throwing, I'm asserting it was not initiated... the intend of the test should be much clearer now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] echauchot commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r870084524


##########
runners/spark/src/test/java/org/apache/beam/runners/spark/SparkContextRule.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark;
+
+import static java.util.stream.Collectors.toMap;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.spark.translation.SparkContextFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.KV;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.rules.ExternalResource;
+
+public class SparkContextRule extends ExternalResource implements Serializable {
+  private transient SparkConf sparkConf;
+  private transient @Nullable JavaSparkContext sparkContext = null;
+
+  public SparkContextRule(String sparkMaster, Map<String, String> sparkConfig) {
+    sparkConf = new SparkConf();
+    sparkConfig.forEach(sparkConf::set);
+    sparkConf.setMaster(sparkMaster).setAppName(sparkMaster);

Review Comment:
   Ah yes, agree I did not there was no dep on PipelineOptions already.  Your change with using the junit description looks good to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
mosche commented on PR #17406:
URL: https://github.com/apache/beam/pull/17406#issuecomment-1112595141

   @echauchot ping :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r870217889


##########
runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java:
##########
@@ -47,72 +54,49 @@ public class ProvidedSparkContextTest {
   private static final String PROVIDED_CONTEXT_EXCEPTION =
       "The provided Spark context was not created or was stopped";
 
+  @ClassRule public static SparkContextOptionsRule contextRule = new SparkContextOptionsRule();
+
   /** Provide a context and call pipeline run. */
   @Test
-  public void testWithProvidedContext() throws Exception {
-    JavaSparkContext jsc = new JavaSparkContext("local[*]", "Existing_Context");
-    testWithValidProvidedContext(jsc);
+  public void testAWithProvidedContext() throws Exception {
+    Pipeline p = createPipeline();
+    PipelineResult result = p.run(); // Run test from pipeline
+    result.waitUntilFinish();
+    TestPipeline.verifyPAssertsSucceeded(p, result);

Review Comment:
   @echauchot I don't know, I didn't come up with these tests ... and this is not related to this PR at all



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r870015838


##########
runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java:
##########
@@ -20,190 +20,136 @@
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.fail;
+import static org.joda.time.Duration.millis;
+import static org.junit.Assert.assertThrows;
 
 import java.io.Serializable;
+import javax.annotation.Nullable;
 import org.apache.beam.runners.spark.io.CreateStream;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-import org.junit.Rule;
+import org.junit.ClassRule;
 import org.junit.Test;
-import org.junit.rules.TestName;
 
 /** This suite tests that various scenarios result in proper states of the pipeline. */
 public class SparkPipelineStateTest implements Serializable {
 
-  private static class MyCustomException extends RuntimeException {
+  @ClassRule public static SparkContextRule contextRule = new SparkContextRule();
 
+  private static class MyCustomException extends RuntimeException {
     MyCustomException(final String message) {
       super(message);
     }
   }
 
-  private final transient SparkPipelineOptions options =
-      PipelineOptionsFactory.as(SparkPipelineOptions.class);
-
-  @Rule public transient TestName testName = new TestName();
-
-  private static final String FAILED_THE_BATCH_INTENTIONALLY = "Failed the batch intentionally";
-
-  private ParDo.SingleOutput<String, String> printParDo(final String prefix) {
-    return ParDo.of(
-        new DoFn<String, String>() {
-
-          @ProcessElement
-          public void processElement(final ProcessContext c) {
-            System.out.println(prefix + " " + c.element());
-          }
-        });
-  }
-
-  private PTransform<PBegin, PCollection<String>> getValues(final SparkPipelineOptions options) {
-    final boolean doNotSyncWithWatermark = false;
-    return options.isStreaming()
-        ? CreateStream.of(StringUtf8Coder.of(), Duration.millis(1), doNotSyncWithWatermark)
-            .nextBatch("one", "two")
-        : Create.of("one", "two");
+  private static class FailAlways extends SimpleFunction<String, String> {
+    @Override
+    public String apply(final String input) {
+      throw new MyCustomException(FAILED_THE_BATCH_INTENTIONALLY);
+    }
   }
 
-  private SparkPipelineOptions getStreamingOptions() {
-    options.setRunner(SparkRunner.class);
-    options.setStreaming(true);
-    return options;
-  }
+  private static final String FAILED_THE_BATCH_INTENTIONALLY = "Failed the batch intentionally";
 
-  private SparkPipelineOptions getBatchOptions() {
+  private Pipeline createPipeline(
+      boolean isStreaming, @Nullable SimpleFunction<String, String> mapFun) {
+    SparkContextOptions options = contextRule.createPipelineOptions();
     options.setRunner(SparkRunner.class);
-    options.setStreaming(false); // explicit because options is reused throughout the test.
-    return options;
-  }
-
-  private Pipeline getPipeline(final SparkPipelineOptions options) {
-
-    final Pipeline pipeline = Pipeline.create(options);
-    final String name = testName.getMethodName() + "(isStreaming=" + options.isStreaming() + ")";
+    options.setStreaming(isStreaming);
 
-    pipeline.apply(getValues(options)).setCoder(StringUtf8Coder.of()).apply(printParDo(name));
+    Pipeline pipeline = Pipeline.create(options);
+    PTransform<PBegin, PCollection<String>> values =
+        isStreaming
+            ? CreateStream.of(StringUtf8Coder.of(), millis(1), false).nextBatch("one", "two")
+            : Create.of("one", "two");
 
+    PCollection<String> collection = pipeline.apply(values).setCoder(StringUtf8Coder.of());
+    if (mapFun != null) {
+      collection.apply(MapElements.via(mapFun));
+    }
     return pipeline;
   }
 
-  private void testFailedPipeline(final SparkPipelineOptions options) throws Exception {
-
-    SparkPipelineResult result = null;
-
-    try {
-      final Pipeline pipeline = Pipeline.create(options);
-      pipeline
-          .apply(getValues(options))
-          .setCoder(StringUtf8Coder.of())
-          .apply(
-              MapElements.via(
-                  new SimpleFunction<String, String>() {
-
-                    @Override
-                    public String apply(final String input) {
-                      throw new MyCustomException(FAILED_THE_BATCH_INTENTIONALLY);
-                    }
-                  }));
-
-      result = (SparkPipelineResult) pipeline.run();
-      result.waitUntilFinish();
-    } catch (final Exception e) {
-      assertThat(e, instanceOf(Pipeline.PipelineExecutionException.class));
-      assertThat(e.getCause(), instanceOf(MyCustomException.class));
-      assertThat(e.getCause().getMessage(), is(FAILED_THE_BATCH_INTENTIONALLY));
-      assertThat(result.getState(), is(PipelineResult.State.FAILED));
-      result.cancel();
-      return;
-    }
+  private void testFailedPipeline(boolean isStreaming) throws Exception {
+    Pipeline pipeline = createPipeline(isStreaming, new FailAlways());
+    SparkPipelineResult result = (SparkPipelineResult) pipeline.run();
 
-    fail("An injected failure did not affect the pipeline as expected.");
+    PipelineExecutionException e =
+        assertThrows(PipelineExecutionException.class, () -> result.waitUntilFinish());
+    assertThat(e.getCause(), instanceOf(MyCustomException.class));
+    assertThat(e.getCause().getMessage(), is(FAILED_THE_BATCH_INTENTIONALLY));
+    assertThat(result.getState(), is(PipelineResult.State.FAILED));
+    result.cancel();
   }
 
-  private void testTimeoutPipeline(final SparkPipelineOptions options) throws Exception {
-
-    final Pipeline pipeline = getPipeline(options);
-
-    final SparkPipelineResult result = (SparkPipelineResult) pipeline.run();
-
-    result.waitUntilFinish(Duration.millis(1));
+  private void testTimeoutPipeline(boolean isStreaming) throws Exception {

Review Comment:
   Agree, this is strange ... Added a comment and renamed the tests to make clear what this does



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r870102150


##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java:
##########
@@ -25,80 +28,111 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** The Spark context factory. */
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
 public final class SparkContextFactory {
   private static final Logger LOG = LoggerFactory.getLogger(SparkContextFactory.class);
 
   /**
    * If the property {@code beam.spark.test.reuseSparkContext} is set to {@code true} then the Spark
    * context will be reused for beam pipelines. This property should only be enabled for tests.
+   *
+   * @deprecated Please use {@link SparkContextOptions} instead to allow for proper lifecycle
+   *     control to not leak your SparkContext.
    */
+  @Deprecated
   public static final String TEST_REUSE_SPARK_CONTEXT = "beam.spark.test.reuseSparkContext";
 
   // Spark allows only one context for JVM so this can be static.
-  private static JavaSparkContext sparkContext;
-  private static String sparkMaster;
-  private static boolean usesProvidedSparkContext;
+  private static @Nullable JavaSparkContext sparkContext;
+  private static @Nullable String sparkMaster;
+
+  private static boolean hasProvidedSparkContext;
 
   private SparkContextFactory() {}
 
+  /**
+   * Set an externally managed {@link JavaSparkContext} that will be used if {@link
+   * SparkContextOptions#getUsesProvidedSparkContext()} is set to {@code true}.
+   *
+   * <p>A Spark context can also be provided using {@link
+   * SparkContextOptions#setProvidedSparkContext(JavaSparkContext)}. However, it will be dropped
+   * during serialization potentially leading to confusing behavior. This is particularly the case
+   * when used in tests with {@link org.apache.beam.sdk.testing.TestPipeline}.
+   */
+  public static synchronized void setProvidedSparkContext(JavaSparkContext providedSparkContext) {
+    sparkContext = checkNotNull(providedSparkContext);
+    hasProvidedSparkContext = true;
+    sparkMaster = null;
+  }
+
+  public static synchronized void clearProvidedSparkContext() {
+    hasProvidedSparkContext = false;
+    sparkContext = null;
+  }
+
   public static synchronized JavaSparkContext getSparkContext(SparkPipelineOptions options) {
     SparkContextOptions contextOptions = options.as(SparkContextOptions.class);
-    usesProvidedSparkContext = contextOptions.getUsesProvidedSparkContext();
-    // reuse should be ignored if the context is provided.
-    if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT) && !usesProvidedSparkContext) {
-
-      // if the context is null or stopped for some reason, re-create it.
-      if (sparkContext == null || sparkContext.sc().isStopped()) {
-        sparkContext = createSparkContext(contextOptions);
+    if (contextOptions.getUsesProvidedSparkContext()) {
+      JavaSparkContext jsc = contextOptions.getProvidedSparkContext();
+      if (jsc != null) {
+        setProvidedSparkContext(jsc);
+      } else if (hasProvidedSparkContext) {
+        jsc = sparkContext;
+      }
+      if (jsc == null) {
+        throw new IllegalStateException(
+            "No Spark context was provided. Use SparkContextFactor.setProvidedSparkContext to do so.");
+      } else if (jsc.sc().isStopped()) {
+        LOG.error("The provided Spark context " + jsc + " was already stopped.");
+        throw new IllegalStateException("The provided Spark context was already stopped");
+      }
+      LOG.info("Using a provided Spark Context");
+      return jsc;
+    } else if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)) {
+      // This is highly discouraged as it leaks SparkContexts without any control.
+      // If the context is null or stopped for some reason, re-create it.

Review Comment:
   👍 



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java:
##########
@@ -25,80 +28,111 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** The Spark context factory. */
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
 public final class SparkContextFactory {
   private static final Logger LOG = LoggerFactory.getLogger(SparkContextFactory.class);
 
   /**
    * If the property {@code beam.spark.test.reuseSparkContext} is set to {@code true} then the Spark
    * context will be reused for beam pipelines. This property should only be enabled for tests.
+   *
+   * @deprecated Please use {@link SparkContextOptions} instead to allow for proper lifecycle
+   *     control to not leak your SparkContext.
    */
+  @Deprecated
   public static final String TEST_REUSE_SPARK_CONTEXT = "beam.spark.test.reuseSparkContext";
 
   // Spark allows only one context for JVM so this can be static.
-  private static JavaSparkContext sparkContext;
-  private static String sparkMaster;
-  private static boolean usesProvidedSparkContext;
+  private static @Nullable JavaSparkContext sparkContext;
+  private static @Nullable String sparkMaster;
+
+  private static boolean hasProvidedSparkContext;
 
   private SparkContextFactory() {}
 
+  /**
+   * Set an externally managed {@link JavaSparkContext} that will be used if {@link
+   * SparkContextOptions#getUsesProvidedSparkContext()} is set to {@code true}.
+   *
+   * <p>A Spark context can also be provided using {@link
+   * SparkContextOptions#setProvidedSparkContext(JavaSparkContext)}. However, it will be dropped
+   * during serialization potentially leading to confusing behavior. This is particularly the case
+   * when used in tests with {@link org.apache.beam.sdk.testing.TestPipeline}.
+   */
+  public static synchronized void setProvidedSparkContext(JavaSparkContext providedSparkContext) {
+    sparkContext = checkNotNull(providedSparkContext);
+    hasProvidedSparkContext = true;
+    sparkMaster = null;
+  }
+
+  public static synchronized void clearProvidedSparkContext() {
+    hasProvidedSparkContext = false;
+    sparkContext = null;
+  }
+
   public static synchronized JavaSparkContext getSparkContext(SparkPipelineOptions options) {
     SparkContextOptions contextOptions = options.as(SparkContextOptions.class);
-    usesProvidedSparkContext = contextOptions.getUsesProvidedSparkContext();
-    // reuse should be ignored if the context is provided.
-    if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT) && !usesProvidedSparkContext) {
-
-      // if the context is null or stopped for some reason, re-create it.
-      if (sparkContext == null || sparkContext.sc().isStopped()) {
-        sparkContext = createSparkContext(contextOptions);
+    if (contextOptions.getUsesProvidedSparkContext()) {
+      JavaSparkContext jsc = contextOptions.getProvidedSparkContext();
+      if (jsc != null) {
+        setProvidedSparkContext(jsc);
+      } else if (hasProvidedSparkContext) {
+        jsc = sparkContext;
+      }
+      if (jsc == null) {
+        throw new IllegalStateException(
+            "No Spark context was provided. Use SparkContextFactor.setProvidedSparkContext to do so.");
+      } else if (jsc.sc().isStopped()) {
+        LOG.error("The provided Spark context " + jsc + " was already stopped.");
+        throw new IllegalStateException("The provided Spark context was already stopped");
+      }
+      LOG.info("Using a provided Spark Context");
+      return jsc;
+    } else if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)) {
+      // This is highly discouraged as it leaks SparkContexts without any control.
+      // If the context is null or stopped for some reason, re-create it.
+      @Nullable JavaSparkContext jsc = sparkContext;
+      if (jsc == null || jsc.sc().isStopped()) {
+        sparkContext = jsc = createSparkContext(contextOptions);
         sparkMaster = options.getSparkMaster();
+        hasProvidedSparkContext = false;
+      } else if (hasProvidedSparkContext) {

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r870102329


##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java:
##########
@@ -25,80 +28,111 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** The Spark context factory. */
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
 public final class SparkContextFactory {
   private static final Logger LOG = LoggerFactory.getLogger(SparkContextFactory.class);
 
   /**
    * If the property {@code beam.spark.test.reuseSparkContext} is set to {@code true} then the Spark
    * context will be reused for beam pipelines. This property should only be enabled for tests.
+   *
+   * @deprecated Please use {@link SparkContextOptions} instead to allow for proper lifecycle
+   *     control to not leak your SparkContext.
    */
+  @Deprecated
   public static final String TEST_REUSE_SPARK_CONTEXT = "beam.spark.test.reuseSparkContext";
 
   // Spark allows only one context for JVM so this can be static.
-  private static JavaSparkContext sparkContext;
-  private static String sparkMaster;
-  private static boolean usesProvidedSparkContext;
+  private static @Nullable JavaSparkContext sparkContext;
+  private static @Nullable String sparkMaster;
+
+  private static boolean hasProvidedSparkContext;
 
   private SparkContextFactory() {}
 
+  /**
+   * Set an externally managed {@link JavaSparkContext} that will be used if {@link
+   * SparkContextOptions#getUsesProvidedSparkContext()} is set to {@code true}.
+   *
+   * <p>A Spark context can also be provided using {@link
+   * SparkContextOptions#setProvidedSparkContext(JavaSparkContext)}. However, it will be dropped
+   * during serialization potentially leading to confusing behavior. This is particularly the case
+   * when used in tests with {@link org.apache.beam.sdk.testing.TestPipeline}.
+   */
+  public static synchronized void setProvidedSparkContext(JavaSparkContext providedSparkContext) {
+    sparkContext = checkNotNull(providedSparkContext);
+    hasProvidedSparkContext = true;
+    sparkMaster = null;
+  }
+
+  public static synchronized void clearProvidedSparkContext() {
+    hasProvidedSparkContext = false;
+    sparkContext = null;
+  }
+
   public static synchronized JavaSparkContext getSparkContext(SparkPipelineOptions options) {
     SparkContextOptions contextOptions = options.as(SparkContextOptions.class);
-    usesProvidedSparkContext = contextOptions.getUsesProvidedSparkContext();
-    // reuse should be ignored if the context is provided.
-    if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT) && !usesProvidedSparkContext) {
-
-      // if the context is null or stopped for some reason, re-create it.
-      if (sparkContext == null || sparkContext.sc().isStopped()) {
-        sparkContext = createSparkContext(contextOptions);
+    if (contextOptions.getUsesProvidedSparkContext()) {
+      JavaSparkContext jsc = contextOptions.getProvidedSparkContext();
+      if (jsc != null) {
+        setProvidedSparkContext(jsc);
+      } else if (hasProvidedSparkContext) {
+        jsc = sparkContext;
+      }
+      if (jsc == null) {
+        throw new IllegalStateException(
+            "No Spark context was provided. Use SparkContextFactor.setProvidedSparkContext to do so.");
+      } else if (jsc.sc().isStopped()) {
+        LOG.error("The provided Spark context " + jsc + " was already stopped.");
+        throw new IllegalStateException("The provided Spark context was already stopped");
+      }
+      LOG.info("Using a provided Spark Context");
+      return jsc;
+    } else if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)) {
+      // This is highly discouraged as it leaks SparkContexts without any control.
+      // If the context is null or stopped for some reason, re-create it.
+      @Nullable JavaSparkContext jsc = sparkContext;
+      if (jsc == null || jsc.sc().isStopped()) {
+        sparkContext = jsc = createSparkContext(contextOptions);
         sparkMaster = options.getSparkMaster();
+        hasProvidedSparkContext = false;
+      } else if (hasProvidedSparkContext) {

Review Comment:
   done



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java:
##########
@@ -25,80 +28,111 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** The Spark context factory. */
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
 public final class SparkContextFactory {
   private static final Logger LOG = LoggerFactory.getLogger(SparkContextFactory.class);
 
   /**
    * If the property {@code beam.spark.test.reuseSparkContext} is set to {@code true} then the Spark
    * context will be reused for beam pipelines. This property should only be enabled for tests.
+   *
+   * @deprecated Please use {@link SparkContextOptions} instead to allow for proper lifecycle
+   *     control to not leak your SparkContext.
    */
+  @Deprecated
   public static final String TEST_REUSE_SPARK_CONTEXT = "beam.spark.test.reuseSparkContext";
 
   // Spark allows only one context for JVM so this can be static.
-  private static JavaSparkContext sparkContext;
-  private static String sparkMaster;
-  private static boolean usesProvidedSparkContext;
+  private static @Nullable JavaSparkContext sparkContext;
+  private static @Nullable String sparkMaster;
+
+  private static boolean hasProvidedSparkContext;
 
   private SparkContextFactory() {}
 
+  /**
+   * Set an externally managed {@link JavaSparkContext} that will be used if {@link
+   * SparkContextOptions#getUsesProvidedSparkContext()} is set to {@code true}.
+   *
+   * <p>A Spark context can also be provided using {@link
+   * SparkContextOptions#setProvidedSparkContext(JavaSparkContext)}. However, it will be dropped
+   * during serialization potentially leading to confusing behavior. This is particularly the case
+   * when used in tests with {@link org.apache.beam.sdk.testing.TestPipeline}.
+   */
+  public static synchronized void setProvidedSparkContext(JavaSparkContext providedSparkContext) {
+    sparkContext = checkNotNull(providedSparkContext);
+    hasProvidedSparkContext = true;
+    sparkMaster = null;
+  }
+
+  public static synchronized void clearProvidedSparkContext() {
+    hasProvidedSparkContext = false;
+    sparkContext = null;
+  }
+
   public static synchronized JavaSparkContext getSparkContext(SparkPipelineOptions options) {
     SparkContextOptions contextOptions = options.as(SparkContextOptions.class);
-    usesProvidedSparkContext = contextOptions.getUsesProvidedSparkContext();
-    // reuse should be ignored if the context is provided.
-    if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT) && !usesProvidedSparkContext) {
-
-      // if the context is null or stopped for some reason, re-create it.
-      if (sparkContext == null || sparkContext.sc().isStopped()) {
-        sparkContext = createSparkContext(contextOptions);
+    if (contextOptions.getUsesProvidedSparkContext()) {
+      JavaSparkContext jsc = contextOptions.getProvidedSparkContext();
+      if (jsc != null) {
+        setProvidedSparkContext(jsc);
+      } else if (hasProvidedSparkContext) {
+        jsc = sparkContext;
+      }
+      if (jsc == null) {
+        throw new IllegalStateException(
+            "No Spark context was provided. Use SparkContextFactor.setProvidedSparkContext to do so.");
+      } else if (jsc.sc().isStopped()) {
+        LOG.error("The provided Spark context " + jsc + " was already stopped.");
+        throw new IllegalStateException("The provided Spark context was already stopped");
+      }
+      LOG.info("Using a provided Spark Context");
+      return jsc;
+    } else if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)) {
+      // This is highly discouraged as it leaks SparkContexts without any control.
+      // If the context is null or stopped for some reason, re-create it.

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r869993041


##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java:
##########
@@ -25,80 +28,111 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** The Spark context factory. */
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
 public final class SparkContextFactory {
   private static final Logger LOG = LoggerFactory.getLogger(SparkContextFactory.class);
 
   /**
    * If the property {@code beam.spark.test.reuseSparkContext} is set to {@code true} then the Spark
    * context will be reused for beam pipelines. This property should only be enabled for tests.
+   *
+   * @deprecated Please use {@link SparkContextOptions} instead to allow for proper lifecycle
+   *     control to not leak your SparkContext.
    */
+  @Deprecated
   public static final String TEST_REUSE_SPARK_CONTEXT = "beam.spark.test.reuseSparkContext";
 
   // Spark allows only one context for JVM so this can be static.
-  private static JavaSparkContext sparkContext;
-  private static String sparkMaster;
-  private static boolean usesProvidedSparkContext;
+  private static @Nullable JavaSparkContext sparkContext;
+  private static @Nullable String sparkMaster;
+
+  private static boolean hasProvidedSparkContext;
 
   private SparkContextFactory() {}
 
+  /**
+   * Set an externally managed {@link JavaSparkContext} that will be used if {@link
+   * SparkContextOptions#getUsesProvidedSparkContext()} is set to {@code true}.
+   *
+   * <p>A Spark context can also be provided using {@link
+   * SparkContextOptions#setProvidedSparkContext(JavaSparkContext)}. However, it will be dropped
+   * during serialization potentially leading to confusing behavior. This is particularly the case
+   * when used in tests with {@link org.apache.beam.sdk.testing.TestPipeline}.
+   */
+  public static synchronized void setProvidedSparkContext(JavaSparkContext providedSparkContext) {
+    sparkContext = checkNotNull(providedSparkContext);
+    hasProvidedSparkContext = true;
+    sparkMaster = null;

Review Comment:
   No, this must explicitly be null. I renamed the variable to `reusableSparkMaster`, it may only be set if reusing the spar context via the legacy flag



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r870022120


##########
runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java:
##########
@@ -22,91 +22,94 @@
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Registration;
-import org.apache.beam.runners.spark.SparkContextOptions;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.TestSparkPipelineOptions;
-import org.apache.beam.runners.spark.TestSparkRunner;
+import org.apache.beam.runners.spark.SparkContextRule;
 import org.apache.beam.runners.spark.io.MicrobatchSource;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.beam.sdk.values.KV;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
 
-/** Testing of beam registrar. */
+/**
+ * Testing of beam registrar. Note: There can only be one Spark context at a time. For that reason
+ * tests requiring a different context have to be forked using separate test classes.
+ */
 @SuppressWarnings({
   "rawtypes" // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
 })
+@RunWith(Enclosed.class)
 public class SparkRunnerKryoRegistratorTest {
 
-  @Test
-  public void testKryoRegistration() {
-    SparkConf conf = new SparkConf();
-    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
-    conf.set("spark.kryo.registrator", WrapperKryoRegistrator.class.getName());
-    runSimplePipelineWithSparkContext(conf);
-    assertTrue(
-        "WrapperKryoRegistrator wasn't initiated, probably KryoSerializer is not set",
-        WrapperKryoRegistrator.wasInitiated);
-  }
+  public static class WithKryoSerializer {
 
-  @Test
-  public void testDefaultSerializerNotCallingKryo() {
-    SparkConf conf = new SparkConf();
-    conf.set("spark.kryo.registrator", KryoRegistratorIsNotCalled.class.getName());
-    runSimplePipelineWithSparkContext(conf);
-  }
+    @ClassRule
+    public static SparkContextRule contextRule =
+        new SparkContextRule(
+            KV.of("spark.serializer", "org.apache.spark.serializer.KryoSerializer"),
+            KV.of("spark.kryo.registrator", WrapperKryoRegistrator.class.getName()));
 
-  private void runSimplePipelineWithSparkContext(SparkConf conf) {
-    SparkPipelineOptions options =
-        PipelineOptionsFactory.create().as(TestSparkPipelineOptions.class);
-    options.setRunner(TestSparkRunner.class);
+    @Test
+    public void testKryoRegistration() {
+      runSimplePipelineWithSparkContextOptions(contextRule);
+      assertTrue(
+          "WrapperKryoRegistrator wasn't initiated, probably KryoSerializer is not set",
+          WrapperKryoRegistrator.wasInitiated);
+    }
 
-    conf.set("spark.master", "local");
-    conf.setAppName("test");
+    /**
+     * A {@link SparkRunnerKryoRegistrator} that registers an internal class to validate
+     * KryoSerialization resolution. Use only for test purposes. Needs to be public for
+     * serialization.
+     */
+    public static class WrapperKryoRegistrator extends SparkRunnerKryoRegistrator {
 
-    JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
-    options.setUsesProvidedSparkContext(true);
-    options.as(SparkContextOptions.class).setProvidedSparkContext(javaSparkContext);
-    Pipeline p = Pipeline.create(options);
-    p.apply(Create.of("a")); // some operation to trigger pipeline construction
-    p.run().waitUntilFinish();
-    javaSparkContext.stop();
-  }
+      static boolean wasInitiated = false;
 
-  /**
-   * A {@link SparkRunnerKryoRegistrator} that fails if called. Use only for test purposes. Needs to
-   * be public for serialization.
-   */
-  public static class KryoRegistratorIsNotCalled extends SparkRunnerKryoRegistrator {
+      public WrapperKryoRegistrator() {
+        wasInitiated = true;
+      }
 
-    @Override
-    public void registerClasses(Kryo kryo) {
-      fail(
-          "Default spark.serializer is JavaSerializer"
-              + " so spark.kryo.registrator shouldn't be called");
+      @Override
+      public void registerClasses(Kryo kryo) {
+        super.registerClasses(kryo);
+        Registration registration = kryo.getRegistration(MicrobatchSource.class);
+        com.esotericsoftware.kryo.Serializer kryoSerializer = registration.getSerializer();
+        assertTrue(kryoSerializer instanceof StatelessJavaSerializer);
+      }
     }
   }
 
-  /**
-   * A {@link SparkRunnerKryoRegistrator} that registers an internal class to validate
-   * KryoSerialization resolution. Use only for test purposes. Needs to be public for serialization.
-   */
-  public static class WrapperKryoRegistrator extends SparkRunnerKryoRegistrator {
+  public static class WithoutKryoSerializer {
+    @ClassRule
+    public static SparkContextRule contextRule =
+        new SparkContextRule(
+            KV.of("spark.kryo.registrator", KryoRegistratorIsNotCalled.class.getName()));
 
-    static boolean wasInitiated = false;
-
-    public WrapperKryoRegistrator() {
-      wasInitiated = true;
+    @Test
+    public void testDefaultSerializerNotCallingKryo() {

Review Comment:
   Not my code, but that seems to be the approach... it would throw if it attempted to initialize the Kryo registrator 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] echauchot commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r871218678


##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java:
##########
@@ -36,7 +36,8 @@ public final class SparkContextFactory {
    * context will be reused for beam pipelines. This property should only be enabled for tests.
    *
    * @deprecated Please use {@link SparkContextOptions} instead to allow for proper lifecycle

Review Comment:
   when I said user I meant user of the library in other words a beam developer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] echauchot commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r871350788


##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java:
##########
@@ -36,7 +36,8 @@ public final class SparkContextFactory {
    * context will be reused for beam pipelines. This property should only be enabled for tests.
    *
    * @deprecated Please use {@link SparkContextOptions} instead to allow for proper lifecycle

Review Comment:
   your wording in last commit it perfect, much clearer
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
mosche commented on PR #17406:
URL: https://github.com/apache/beam/pull/17406#issuecomment-1104853528

   @echauchot I pushed one more commit with the change suggested above


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] echauchot commented on pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
echauchot commented on PR #17406:
URL: https://github.com/apache/beam/pull/17406#issuecomment-1104875056

   @mosche need to finish something and I take a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r870024140


##########
runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java:
##########
@@ -112,8 +110,6 @@ public class ResumeFromCheckpointStreamingTest implements Serializable {
 
   private transient TemporaryFolder temporaryFolder;
 
-  @Rule public final transient ReuseSparkContextRule noContextReuse = ReuseSparkContextRule.no();
-

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r870047265


##########
runners/spark/src/test/java/org/apache/beam/runners/spark/SparkContextRule.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark;
+
+import static java.util.stream.Collectors.toMap;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.spark.translation.SparkContextFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.KV;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.rules.ExternalResource;
+
+public class SparkContextRule extends ExternalResource implements Serializable {
+  private transient SparkConf sparkConf;
+  private transient @Nullable JavaSparkContext sparkContext = null;
+
+  public SparkContextRule(String sparkMaster, Map<String, String> sparkConfig) {
+    sparkConf = new SparkConf();
+    sparkConfig.forEach(sparkConf::set);
+    sparkConf.setMaster(sparkMaster).setAppName(sparkMaster);

Review Comment:
   Honestly, I don't see the point of having this depend on PipelineOptions ... the value would be as meaningless as it is here. I changed the appName to reflect the scope of the rule



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r870047665


##########
runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SparkSessionRule.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming;
+
+import static java.util.stream.Collectors.toMap;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.values.KV;
+import org.apache.spark.sql.SparkSession;
+import org.junit.rules.ExternalResource;
+
+public class SparkSessionRule extends ExternalResource implements Serializable {
+  private transient SparkSession.Builder builder;
+  private transient @Nullable SparkSession session = null;
+
+  public SparkSessionRule(String sparkMaster, Map<String, String> sparkConfig) {
+    builder = SparkSession.builder();
+    sparkConfig.forEach(builder::config);
+    builder.master(sparkMaster).appName("test");

Review Comment:
   Same as above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] echauchot commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r870070388


##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java:
##########
@@ -25,80 +28,111 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** The Spark context factory. */
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
 public final class SparkContextFactory {
   private static final Logger LOG = LoggerFactory.getLogger(SparkContextFactory.class);
 
   /**
    * If the property {@code beam.spark.test.reuseSparkContext} is set to {@code true} then the Spark
    * context will be reused for beam pipelines. This property should only be enabled for tests.
+   *
+   * @deprecated Please use {@link SparkContextOptions} instead to allow for proper lifecycle
+   *     control to not leak your SparkContext.
    */
+  @Deprecated
   public static final String TEST_REUSE_SPARK_CONTEXT = "beam.spark.test.reuseSparkContext";
 
   // Spark allows only one context for JVM so this can be static.
-  private static JavaSparkContext sparkContext;
-  private static String sparkMaster;
-  private static boolean usesProvidedSparkContext;
+  private static @Nullable JavaSparkContext sparkContext;
+  private static @Nullable String sparkMaster;
+
+  private static boolean hasProvidedSparkContext;
 
   private SparkContextFactory() {}
 
+  /**
+   * Set an externally managed {@link JavaSparkContext} that will be used if {@link
+   * SparkContextOptions#getUsesProvidedSparkContext()} is set to {@code true}.
+   *
+   * <p>A Spark context can also be provided using {@link
+   * SparkContextOptions#setProvidedSparkContext(JavaSparkContext)}. However, it will be dropped
+   * during serialization potentially leading to confusing behavior. This is particularly the case
+   * when used in tests with {@link org.apache.beam.sdk.testing.TestPipeline}.
+   */
+  public static synchronized void setProvidedSparkContext(JavaSparkContext providedSparkContext) {
+    sparkContext = checkNotNull(providedSparkContext);
+    hasProvidedSparkContext = true;
+    sparkMaster = null;

Review Comment:
   clearer



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] echauchot commented on pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
echauchot commented on PR #17406:
URL: https://github.com/apache/beam/pull/17406#issuecomment-1112979411

   @mosche sorry for the delay. I should be able to start your review by the end of the afternoon


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] echauchot commented on pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
echauchot commented on PR #17406:
URL: https://github.com/apache/beam/pull/17406#issuecomment-1114560170

   @mosche reviewing ...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r870004066


##########
runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java:
##########
@@ -47,72 +54,49 @@ public class ProvidedSparkContextTest {
   private static final String PROVIDED_CONTEXT_EXCEPTION =
       "The provided Spark context was not created or was stopped";
 
+  @ClassRule public static SparkContextOptionsRule contextRule = new SparkContextOptionsRule();
+
   /** Provide a context and call pipeline run. */
   @Test
-  public void testWithProvidedContext() throws Exception {
-    JavaSparkContext jsc = new JavaSparkContext("local[*]", "Existing_Context");
-    testWithValidProvidedContext(jsc);
+  public void testAWithProvidedContext() throws Exception {
+    Pipeline p = createPipeline();
+    PipelineResult result = p.run(); // Run test from pipeline
+    result.waitUntilFinish();
+    TestPipeline.verifyPAssertsSucceeded(p, result);

Review Comment:
   `p` is not a `TestPipeline`, therefore that's critical



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] echauchot commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r870202389


##########
runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java:
##########
@@ -47,72 +54,49 @@ public class ProvidedSparkContextTest {
   private static final String PROVIDED_CONTEXT_EXCEPTION =
       "The provided Spark context was not created or was stopped";
 
+  @ClassRule public static SparkContextOptionsRule contextRule = new SparkContextOptionsRule();
+
   /** Provide a context and call pipeline run. */
   @Test
-  public void testWithProvidedContext() throws Exception {
-    JavaSparkContext jsc = new JavaSparkContext("local[*]", "Existing_Context");
-    testWithValidProvidedContext(jsc);
+  public void testAWithProvidedContext() throws Exception {
+    Pipeline p = createPipeline();
+    PipelineResult result = p.run(); // Run test from pipeline
+    result.waitUntilFinish();
+    TestPipeline.verifyPAssertsSucceeded(p, result);

Review Comment:
   yes but why not use `TestPipeline.create(contextOptionsRule.getOptions())` in `createPipeline()` and avoid `TestPipeline.verifyPAssertsSucceeded(p, result);`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] echauchot commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r870239910


##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java:
##########
@@ -25,80 +28,111 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** The Spark context factory. */
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
 public final class SparkContextFactory {
   private static final Logger LOG = LoggerFactory.getLogger(SparkContextFactory.class);
 
   /**
    * If the property {@code beam.spark.test.reuseSparkContext} is set to {@code true} then the Spark
    * context will be reused for beam pipelines. This property should only be enabled for tests.
+   *
+   * @deprecated Please use {@link SparkContextOptions} instead to allow for proper lifecycle
+   *     control to not leak your SparkContext.
    */
+  @Deprecated
   public static final String TEST_REUSE_SPARK_CONTEXT = "beam.spark.test.reuseSparkContext";
 
   // Spark allows only one context for JVM so this can be static.
-  private static JavaSparkContext sparkContext;
-  private static String sparkMaster;
-  private static boolean usesProvidedSparkContext;
+  private static @Nullable JavaSparkContext sparkContext;
+  private static @Nullable String sparkMaster;
+
+  private static boolean hasProvidedSparkContext;
 
   private SparkContextFactory() {}
 
+  /**
+   * Set an externally managed {@link JavaSparkContext} that will be used if {@link
+   * SparkContextOptions#getUsesProvidedSparkContext()} is set to {@code true}.
+   *
+   * <p>A Spark context can also be provided using {@link
+   * SparkContextOptions#setProvidedSparkContext(JavaSparkContext)}. However, it will be dropped
+   * during serialization potentially leading to confusing behavior. This is particularly the case
+   * when used in tests with {@link org.apache.beam.sdk.testing.TestPipeline}.
+   */
+  public static synchronized void setProvidedSparkContext(JavaSparkContext providedSparkContext) {
+    sparkContext = checkNotNull(providedSparkContext);
+    hasProvidedSparkContext = true;
+    sparkMaster = null;
+  }
+
+  public static synchronized void clearProvidedSparkContext() {
+    hasProvidedSparkContext = false;
+    sparkContext = null;
+  }
+
   public static synchronized JavaSparkContext getSparkContext(SparkPipelineOptions options) {
     SparkContextOptions contextOptions = options.as(SparkContextOptions.class);
-    usesProvidedSparkContext = contextOptions.getUsesProvidedSparkContext();
-    // reuse should be ignored if the context is provided.
-    if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT) && !usesProvidedSparkContext) {
-
-      // if the context is null or stopped for some reason, re-create it.
-      if (sparkContext == null || sparkContext.sc().isStopped()) {
-        sparkContext = createSparkContext(contextOptions);
+    if (contextOptions.getUsesProvidedSparkContext()) {
+      JavaSparkContext jsc = contextOptions.getProvidedSparkContext();
+      if (jsc != null) {
+        setProvidedSparkContext(jsc);
+      } else if (hasProvidedSparkContext) {
+        jsc = sparkContext;
+      }
+      if (jsc == null) {
+        throw new IllegalStateException(
+            "No Spark context was provided. Use SparkContextFactor.setProvidedSparkContext to do so.");
+      } else if (jsc.sc().isStopped()) {
+        LOG.error("The provided Spark context " + jsc + " was already stopped.");
+        throw new IllegalStateException("The provided Spark context was already stopped");
+      }
+      LOG.info("Using a provided Spark Context");
+      return jsc;
+    } else if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)) {
+      // This is highly discouraged as it leaks SparkContexts without any control.
+      // If the context is null or stopped for some reason, re-create it.
+      @Nullable JavaSparkContext jsc = sparkContext;
+      if (jsc == null || jsc.sc().isStopped()) {
+        sparkContext = jsc = createSparkContext(contextOptions);
         sparkMaster = options.getSparkMaster();
+        hasProvidedSparkContext = false;

Review Comment:
   thanks for your last javadoc update, it is clearer that TEST_REUSE_SPARK_CONTEXT are mutually exclusive



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] echauchot commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r870237223


##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java:
##########
@@ -25,80 +28,111 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** The Spark context factory. */
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
 public final class SparkContextFactory {
   private static final Logger LOG = LoggerFactory.getLogger(SparkContextFactory.class);
 
   /**
    * If the property {@code beam.spark.test.reuseSparkContext} is set to {@code true} then the Spark
    * context will be reused for beam pipelines. This property should only be enabled for tests.
+   *
+   * @deprecated Please use {@link SparkContextOptions} instead to allow for proper lifecycle
+   *     control to not leak your SparkContext.
    */
+  @Deprecated
   public static final String TEST_REUSE_SPARK_CONTEXT = "beam.spark.test.reuseSparkContext";
 
   // Spark allows only one context for JVM so this can be static.
-  private static JavaSparkContext sparkContext;
-  private static String sparkMaster;
-  private static boolean usesProvidedSparkContext;
+  private static @Nullable JavaSparkContext sparkContext;
+  private static @Nullable String sparkMaster;
+
+  private static boolean hasProvidedSparkContext;
 
   private SparkContextFactory() {}
 
+  /**
+   * Set an externally managed {@link JavaSparkContext} that will be used if {@link
+   * SparkContextOptions#getUsesProvidedSparkContext()} is set to {@code true}.
+   *
+   * <p>A Spark context can also be provided using {@link
+   * SparkContextOptions#setProvidedSparkContext(JavaSparkContext)}. However, it will be dropped
+   * during serialization potentially leading to confusing behavior. This is particularly the case
+   * when used in tests with {@link org.apache.beam.sdk.testing.TestPipeline}.
+   */
+  public static synchronized void setProvidedSparkContext(JavaSparkContext providedSparkContext) {
+    sparkContext = checkNotNull(providedSparkContext);
+    hasProvidedSparkContext = true;
+    sparkMaster = null;
+  }
+
+  public static synchronized void clearProvidedSparkContext() {
+    hasProvidedSparkContext = false;
+    sparkContext = null;
+  }
+
   public static synchronized JavaSparkContext getSparkContext(SparkPipelineOptions options) {
     SparkContextOptions contextOptions = options.as(SparkContextOptions.class);
-    usesProvidedSparkContext = contextOptions.getUsesProvidedSparkContext();
-    // reuse should be ignored if the context is provided.
-    if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT) && !usesProvidedSparkContext) {
-
-      // if the context is null or stopped for some reason, re-create it.
-      if (sparkContext == null || sparkContext.sc().isStopped()) {
-        sparkContext = createSparkContext(contextOptions);
+    if (contextOptions.getUsesProvidedSparkContext()) {
+      JavaSparkContext jsc = contextOptions.getProvidedSparkContext();
+      if (jsc != null) {
+        setProvidedSparkContext(jsc);
+      } else if (hasProvidedSparkContext) {
+        jsc = sparkContext;
+      }
+      if (jsc == null) {
+        throw new IllegalStateException(
+            "No Spark context was provided. Use SparkContextFactor.setProvidedSparkContext to do so.");
+      } else if (jsc.sc().isStopped()) {
+        LOG.error("The provided Spark context " + jsc + " was already stopped.");
+        throw new IllegalStateException("The provided Spark context was already stopped");
+      }
+      LOG.info("Using a provided Spark Context");
+      return jsc;
+    } else if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)) {
+      // This is highly discouraged as it leaks SparkContexts without any control.
+      // If the context is null or stopped for some reason, re-create it.
+      @Nullable JavaSparkContext jsc = sparkContext;
+      if (jsc == null || jsc.sc().isStopped()) {
+        sparkContext = jsc = createSparkContext(contextOptions);
         sparkMaster = options.getSparkMaster();
+        hasProvidedSparkContext = false;
+      } else if (hasProvidedSparkContext) {

Review Comment:
   thanks for the last javadoc update, it is clearer



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mosche commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
mosche commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r871205748


##########
runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java:
##########
@@ -22,91 +22,94 @@
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Registration;
-import org.apache.beam.runners.spark.SparkContextOptions;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.TestSparkPipelineOptions;
-import org.apache.beam.runners.spark.TestSparkRunner;
+import org.apache.beam.runners.spark.SparkContextRule;
 import org.apache.beam.runners.spark.io.MicrobatchSource;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.beam.sdk.values.KV;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
 
-/** Testing of beam registrar. */
+/**
+ * Testing of beam registrar. Note: There can only be one Spark context at a time. For that reason
+ * tests requiring a different context have to be forked using separate test classes.
+ */
 @SuppressWarnings({
   "rawtypes" // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
 })
+@RunWith(Enclosed.class)
 public class SparkRunnerKryoRegistratorTest {
 
-  @Test
-  public void testKryoRegistration() {
-    SparkConf conf = new SparkConf();
-    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
-    conf.set("spark.kryo.registrator", WrapperKryoRegistrator.class.getName());
-    runSimplePipelineWithSparkContext(conf);
-    assertTrue(
-        "WrapperKryoRegistrator wasn't initiated, probably KryoSerializer is not set",
-        WrapperKryoRegistrator.wasInitiated);
-  }
+  public static class WithKryoSerializer {
 
-  @Test
-  public void testDefaultSerializerNotCallingKryo() {
-    SparkConf conf = new SparkConf();
-    conf.set("spark.kryo.registrator", KryoRegistratorIsNotCalled.class.getName());
-    runSimplePipelineWithSparkContext(conf);
-  }
+    @ClassRule
+    public static SparkContextRule contextRule =
+        new SparkContextRule(
+            KV.of("spark.serializer", "org.apache.spark.serializer.KryoSerializer"),
+            KV.of("spark.kryo.registrator", WrapperKryoRegistrator.class.getName()));
 
-  private void runSimplePipelineWithSparkContext(SparkConf conf) {
-    SparkPipelineOptions options =
-        PipelineOptionsFactory.create().as(TestSparkPipelineOptions.class);
-    options.setRunner(TestSparkRunner.class);
+    @Test
+    public void testKryoRegistration() {
+      runSimplePipelineWithSparkContextOptions(contextRule);
+      assertTrue(
+          "WrapperKryoRegistrator wasn't initiated, probably KryoSerializer is not set",
+          WrapperKryoRegistrator.wasInitiated);
+    }
 
-    conf.set("spark.master", "local");
-    conf.setAppName("test");
+    /**
+     * A {@link SparkRunnerKryoRegistrator} that registers an internal class to validate
+     * KryoSerialization resolution. Use only for test purposes. Needs to be public for
+     * serialization.
+     */
+    public static class WrapperKryoRegistrator extends SparkRunnerKryoRegistrator {
 
-    JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
-    options.setUsesProvidedSparkContext(true);
-    options.as(SparkContextOptions.class).setProvidedSparkContext(javaSparkContext);
-    Pipeline p = Pipeline.create(options);
-    p.apply(Create.of("a")); // some operation to trigger pipeline construction
-    p.run().waitUntilFinish();
-    javaSparkContext.stop();
-  }
+      static boolean wasInitiated = false;
 
-  /**
-   * A {@link SparkRunnerKryoRegistrator} that fails if called. Use only for test purposes. Needs to
-   * be public for serialization.
-   */
-  public static class KryoRegistratorIsNotCalled extends SparkRunnerKryoRegistrator {
+      public WrapperKryoRegistrator() {
+        wasInitiated = true;
+      }
 
-    @Override
-    public void registerClasses(Kryo kryo) {
-      fail(
-          "Default spark.serializer is JavaSerializer"
-              + " so spark.kryo.registrator shouldn't be called");
+      @Override
+      public void registerClasses(Kryo kryo) {
+        super.registerClasses(kryo);
+        Registration registration = kryo.getRegistration(MicrobatchSource.class);
+        com.esotericsoftware.kryo.Serializer kryoSerializer = registration.getSerializer();
+        assertTrue(kryoSerializer instanceof StatelessJavaSerializer);
+      }
     }
   }
 
-  /**
-   * A {@link SparkRunnerKryoRegistrator} that registers an internal class to validate
-   * KryoSerialization resolution. Use only for test purposes. Needs to be public for serialization.
-   */
-  public static class WrapperKryoRegistrator extends SparkRunnerKryoRegistrator {
+  public static class WithoutKryoSerializer {
+    @ClassRule
+    public static SparkContextRule contextRule =
+        new SparkContextRule(
+            KV.of("spark.kryo.registrator", KryoRegistratorIsNotCalled.class.getName()));
 
-    static boolean wasInitiated = false;
-
-    public WrapperKryoRegistrator() {
-      wasInitiated = true;
+    @Test
+    public void testDefaultSerializerNotCallingKryo() {

Review Comment:
   fixed the test to be clearer



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] echauchot commented on a diff in pull request #17406: [BEAM-14334] Fix leakage of SparkContext in Spark runner tests to remove forkEvery 1

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #17406:
URL: https://github.com/apache/beam/pull/17406#discussion_r871218678


##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java:
##########
@@ -36,7 +36,8 @@ public final class SparkContextFactory {
    * context will be reused for beam pipelines. This property should only be enabled for tests.
    *
    * @deprecated Please use {@link SparkContextOptions} instead to allow for proper lifecycle

Review Comment:
   when I said user, I meant user of the library in other words a beam dev.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org