You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/01/26 03:58:28 UTC

spark git commit: [SPARK-23020][CORE] Fix race in SparkAppHandle cleanup, again.

Repository: spark
Updated Branches:
  refs/heads/master 7bd46d987 -> 70a68b328


[SPARK-23020][CORE] Fix race in SparkAppHandle cleanup, again.

Third time is the charm?

There was still a race that was left in previous attempts. If the handle
closes the connection, the close() implementation would clean up state
that would prevent the thread from waiting on the connection thread to
finish. That could cause the race causing the test flakiness reported
in the bug.

The fix is to move the "wait for connection thread" code to a separate
close method that is used by the handle; that also simplifies the code
a bit and makes it also easier to follow.

I included an unrelated, but correct, change to a YARN test so that
it triggers when the PR is built.

Tested by inserting a sleep in the connection thread to mimic the race;
test failed reliably with the sleep, passes now. (Sleep not included in
the patch.) Also ran YARN tests to make sure.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #20388 from vanzin/SPARK-23020.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/70a68b32
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/70a68b32
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/70a68b32

Branch: refs/heads/master
Commit: 70a68b328b856c17eb22cc86fee0ebe8d64f8825
Parents: 7bd46d9
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Fri Jan 26 11:58:20 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Jan 26 11:58:20 2018 +0800

----------------------------------------------------------------------
 .../spark/launcher/AbstractAppHandle.java       | 42 +++++++++------
 .../spark/launcher/ChildProcAppHandle.java      | 11 +---
 .../spark/launcher/InProcessAppHandle.java      |  9 +---
 .../apache/spark/launcher/LauncherServer.java   | 55 +++++++++-----------
 .../spark/deploy/yarn/YarnClusterSuite.scala    |  5 +-
 5 files changed, 55 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/70a68b32/launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java
----------------------------------------------------------------------
diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java
index daf0972..84a25a5 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java
@@ -20,6 +20,7 @@ package org.apache.spark.launcher;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -29,15 +30,15 @@ abstract class AbstractAppHandle implements SparkAppHandle {
 
   private final LauncherServer server;
 
-  private LauncherConnection connection;
+  private LauncherServer.ServerConnection connection;
   private List<Listener> listeners;
-  private State state;
+  private AtomicReference<State> state;
   private String appId;
   private volatile boolean disposed;
 
   protected AbstractAppHandle(LauncherServer server) {
     this.server = server;
-    this.state = State.UNKNOWN;
+    this.state = new AtomicReference<>(State.UNKNOWN);
   }
 
   @Override
@@ -50,7 +51,7 @@ abstract class AbstractAppHandle implements SparkAppHandle {
 
   @Override
   public State getState() {
-    return state;
+    return state.get();
   }
 
   @Override
@@ -73,7 +74,7 @@ abstract class AbstractAppHandle implements SparkAppHandle {
     if (!isDisposed()) {
       if (connection != null) {
         try {
-          connection.close();
+          connection.closeAndWait();
         } catch (IOException ioe) {
           // no-op.
         }
@@ -82,7 +83,7 @@ abstract class AbstractAppHandle implements SparkAppHandle {
     }
   }
 
-  void setConnection(LauncherConnection connection) {
+  void setConnection(LauncherServer.ServerConnection connection) {
     this.connection = connection;
   }
 
@@ -99,12 +100,9 @@ abstract class AbstractAppHandle implements SparkAppHandle {
    */
   synchronized void dispose() {
     if (!isDisposed()) {
-      // Unregister first to make sure that the connection with the app has been really
-      // terminated.
       server.unregister(this);
-      if (!getState().isFinal()) {
-        setState(State.LOST);
-      }
+      // Set state to LOST if not yet final.
+      setState(State.LOST, false);
       this.disposed = true;
     }
   }
@@ -113,14 +111,24 @@ abstract class AbstractAppHandle implements SparkAppHandle {
     setState(s, false);
   }
 
-  synchronized void setState(State s, boolean force) {
-    if (force || !state.isFinal()) {
-      state = s;
+  void setState(State s, boolean force) {
+    if (force) {
+      state.set(s);
       fireEvent(false);
-    } else {
-      LOG.log(Level.WARNING, "Backend requested transition from final state {0} to {1}.",
-        new Object[] { state, s });
+      return;
     }
+
+    State current = state.get();
+    while (!current.isFinal()) {
+      if (state.compareAndSet(current, s)) {
+        fireEvent(false);
+        return;
+      }
+      current = state.get();
+    }
+
+    LOG.log(Level.WARNING, "Backend requested transition from final state {0} to {1}.",
+      new Object[] { current, s });
   }
 
   synchronized void setAppId(String appId) {

http://git-wip-us.apache.org/repos/asf/spark/blob/70a68b32/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
----------------------------------------------------------------------
diff --git a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
index 2b99461..5e3c956 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
@@ -104,19 +104,12 @@ class ChildProcAppHandle extends AbstractAppHandle {
         ec = 1;
       }
 
-      State currState = getState();
-      State newState = null;
       if (ec != 0) {
+        State currState = getState();
         // Override state with failure if the current state is not final, or is success.
         if (!currState.isFinal() || currState == State.FINISHED) {
-          newState = State.FAILED;
+          setState(State.FAILED, true);
         }
-      } else if (!currState.isFinal()) {
-        newState = State.LOST;
-      }
-
-      if (newState != null) {
-        setState(newState, true);
       }
 
       disconnect();

http://git-wip-us.apache.org/repos/asf/spark/blob/70a68b32/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java
----------------------------------------------------------------------
diff --git a/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java
index f04263c..b8030e0 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java
@@ -66,14 +66,7 @@ class InProcessAppHandle extends AbstractAppHandle {
         setState(State.FAILED);
       }
 
-      synchronized (InProcessAppHandle.this) {
-        if (!isDisposed()) {
-          disconnect();
-          if (!getState().isFinal()) {
-            setState(State.LOST, true);
-          }
-        }
-      }
+      disconnect();
     });
 
     app.setName(String.format(THREAD_NAME_FMT, THREAD_IDS.incrementAndGet(), appName));

http://git-wip-us.apache.org/repos/asf/spark/blob/70a68b32/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
----------------------------------------------------------------------
diff --git a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
index 8091885..f4ecd52 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
@@ -218,32 +218,6 @@ class LauncherServer implements Closeable {
       }
     }
 
-    // If there is a live connection for this handle, we need to wait for it to finish before
-    // returning, otherwise there might be a race between the connection thread processing
-    // buffered data and the handle cleaning up after itself, leading to potentially the wrong
-    // state being reported for the handle.
-    ServerConnection conn = null;
-    synchronized (clients) {
-      for (ServerConnection c : clients) {
-        if (c.handle == handle) {
-          conn = c;
-          break;
-        }
-      }
-    }
-
-    if (conn != null) {
-      synchronized (conn) {
-        if (conn.isOpen()) {
-          try {
-            conn.wait();
-          } catch (InterruptedException ie) {
-            // Ignore.
-          }
-        }
-      }
-    }
-
     unref();
   }
 
@@ -312,9 +286,10 @@ class LauncherServer implements Closeable {
     }
   }
 
-  private class ServerConnection extends LauncherConnection {
+  class ServerConnection extends LauncherConnection {
 
     private TimerTask timeout;
+    private volatile Thread connectionThread;
     volatile AbstractAppHandle handle;
 
     ServerConnection(Socket socket, TimerTask timeout) throws IOException {
@@ -323,6 +298,12 @@ class LauncherServer implements Closeable {
     }
 
     @Override
+    public void run() {
+      this.connectionThread = Thread.currentThread();
+      super.run();
+    }
+
+    @Override
     protected void handle(Message msg) throws IOException {
       try {
         if (msg instanceof Hello) {
@@ -376,9 +357,23 @@ class LauncherServer implements Closeable {
         clients.remove(this);
       }
 
-      synchronized (this) {
-        super.close();
-        notifyAll();
+      super.close();
+    }
+
+    /**
+     * Close the connection and wait for any buffered data to be processed before returning.
+     * This ensures any changes reported by the child application take effect.
+     */
+    public void closeAndWait() throws IOException {
+      close();
+
+      Thread connThread = this.connectionThread;
+      if (Thread.currentThread() != connThread) {
+        try {
+          connThread.join();
+        } catch (InterruptedException ie) {
+          // Ignore.
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/70a68b32/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index e9dcfaf..5003326 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -45,8 +45,7 @@ import org.apache.spark.util.Utils
 
 /**
  * Integration tests for YARN; these tests use a mini Yarn cluster to run Spark-on-YARN
- * applications, and require the Spark assembly to be built before they can be successfully
- * run.
+ * applications.
  */
 @ExtendedYarnTest
 class YarnClusterSuite extends BaseYarnClusterSuite {
@@ -152,7 +151,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
   }
 
   test("run Python application in yarn-cluster mode using " +
-    " spark.yarn.appMasterEnv to override local envvar") {
+    "spark.yarn.appMasterEnv to override local envvar") {
     testPySpark(
       clientMode = false,
       extraConf = Map(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org