You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sa...@apache.org on 2018/01/17 06:15:00 UTC

spark git commit: Revert "[SPARK-23020][CORE] Fix races in launcher code, test."

Repository: spark
Updated Branches:
  refs/heads/master 166705785 -> 50345a2aa


Revert "[SPARK-23020][CORE] Fix races in launcher code, test."

This reverts commit 66217dac4f8952a9923625908ad3dcb030763c81.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/50345a2a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/50345a2a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/50345a2a

Branch: refs/heads/master
Commit: 50345a2aa59741c511d555edbbad2da9611e7d16
Parents: 1667057
Author: Sameer Agarwal <sa...@apache.org>
Authored: Tue Jan 16 22:14:47 2018 -0800
Committer: Sameer Agarwal <sa...@apache.org>
Committed: Tue Jan 16 22:14:47 2018 -0800

----------------------------------------------------------------------
 .../spark/launcher/SparkLauncherSuite.java      | 49 +++++++-------------
 .../spark/launcher/AbstractAppHandle.java       | 22 ++-------
 .../spark/launcher/ChildProcAppHandle.java      | 18 ++++---
 .../spark/launcher/InProcessAppHandle.java      | 17 ++++---
 .../spark/launcher/LauncherConnection.java      | 14 +++---
 .../apache/spark/launcher/LauncherServer.java   | 46 +++---------------
 .../org/apache/spark/launcher/BaseSuite.java    | 42 +++--------------
 .../spark/launcher/LauncherServerSuite.java     | 20 +++++---
 8 files changed, 72 insertions(+), 156 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/50345a2a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
index a042375..9d2f563 100644
--- a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
+++ b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
@@ -17,7 +17,6 @@
 
 package org.apache.spark.launcher;
 
-import java.time.Duration;
 import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -32,7 +31,6 @@ import static org.junit.Assume.*;
 import static org.mockito.Mockito.*;
 
 import org.apache.spark.SparkContext;
-import org.apache.spark.SparkContext$;
 import org.apache.spark.internal.config.package$;
 import org.apache.spark.util.Utils;
 
@@ -139,9 +137,7 @@ public class SparkLauncherSuite extends BaseSuite {
       // Here DAGScheduler is stopped, while SparkContext.clearActiveContext may not be called yet.
       // Wait for a reasonable amount of time to avoid creating two active SparkContext in JVM.
       // See SPARK-23019 and SparkContext.stop() for details.
-      eventually(Duration.ofSeconds(5), Duration.ofMillis(10), () -> {
-        assertTrue("SparkContext is still alive.", SparkContext$.MODULE$.getActive().isEmpty());
-      });
+      TimeUnit.MILLISECONDS.sleep(500);
     }
   }
 
@@ -150,35 +146,26 @@ public class SparkLauncherSuite extends BaseSuite {
     SparkAppHandle.Listener listener = mock(SparkAppHandle.Listener.class);
     doAnswer(invocation -> {
       SparkAppHandle h = (SparkAppHandle) invocation.getArguments()[0];
-      synchronized (transitions) {
-        transitions.add(h.getState());
-      }
+      transitions.add(h.getState());
       return null;
     }).when(listener).stateChanged(any(SparkAppHandle.class));
 
-    SparkAppHandle handle = null;
-    try {
-      handle = new InProcessLauncher()
-        .setMaster("local")
-        .setAppResource(SparkLauncher.NO_RESOURCE)
-        .setMainClass(InProcessTestApp.class.getName())
-        .addAppArgs("hello")
-        .startApplication(listener);
-
-      waitFor(handle);
-      assertEquals(SparkAppHandle.State.FINISHED, handle.getState());
-
-      // Matches the behavior of LocalSchedulerBackend.
-      List<SparkAppHandle.State> expected = Arrays.asList(
-        SparkAppHandle.State.CONNECTED,
-        SparkAppHandle.State.RUNNING,
-        SparkAppHandle.State.FINISHED);
-      assertEquals(expected, transitions);
-    } finally {
-      if (handle != null) {
-        handle.kill();
-      }
-    }
+    SparkAppHandle handle = new InProcessLauncher()
+      .setMaster("local")
+      .setAppResource(SparkLauncher.NO_RESOURCE)
+      .setMainClass(InProcessTestApp.class.getName())
+      .addAppArgs("hello")
+      .startApplication(listener);
+
+    waitFor(handle);
+    assertEquals(SparkAppHandle.State.FINISHED, handle.getState());
+
+    // Matches the behavior of LocalSchedulerBackend.
+    List<SparkAppHandle.State> expected = Arrays.asList(
+      SparkAppHandle.State.CONNECTED,
+      SparkAppHandle.State.RUNNING,
+      SparkAppHandle.State.FINISHED);
+    assertEquals(expected, transitions);
   }
 
   public static class SparkLauncherTestApp {

http://git-wip-us.apache.org/repos/asf/spark/blob/50345a2a/launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java
----------------------------------------------------------------------
diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java
index daf0972..df1e731 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java
@@ -33,7 +33,7 @@ abstract class AbstractAppHandle implements SparkAppHandle {
   private List<Listener> listeners;
   private State state;
   private String appId;
-  private volatile boolean disposed;
+  private boolean disposed;
 
   protected AbstractAppHandle(LauncherServer server) {
     this.server = server;
@@ -70,7 +70,8 @@ abstract class AbstractAppHandle implements SparkAppHandle {
 
   @Override
   public synchronized void disconnect() {
-    if (!isDisposed()) {
+    if (!disposed) {
+      disposed = true;
       if (connection != null) {
         try {
           connection.close();
@@ -78,7 +79,7 @@ abstract class AbstractAppHandle implements SparkAppHandle {
           // no-op.
         }
       }
-      dispose();
+      server.unregister(this);
     }
   }
 
@@ -94,21 +95,6 @@ abstract class AbstractAppHandle implements SparkAppHandle {
     return disposed;
   }
 
-  /**
-   * Mark the handle as disposed, and set it as LOST in case the current state is not final.
-   */
-  synchronized void dispose() {
-    if (!isDisposed()) {
-      // Unregister first to make sure that the connection with the app has been really
-      // terminated.
-      server.unregister(this);
-      if (!getState().isFinal()) {
-        setState(State.LOST);
-      }
-      this.disposed = true;
-    }
-  }
-
   void setState(State s) {
     setState(s, false);
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/50345a2a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
----------------------------------------------------------------------
diff --git a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
index 2b99461..8b3f427 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
@@ -48,16 +48,14 @@ class ChildProcAppHandle extends AbstractAppHandle {
 
   @Override
   public synchronized void kill() {
-    if (!isDisposed()) {
-      setState(State.KILLED);
-      disconnect();
-      if (childProc != null) {
-        if (childProc.isAlive()) {
-          childProc.destroyForcibly();
-        }
-        childProc = null;
+    disconnect();
+    if (childProc != null) {
+      if (childProc.isAlive()) {
+        childProc.destroyForcibly();
       }
+      childProc = null;
     }
+    setState(State.KILLED);
   }
 
   void setChildProc(Process childProc, String loggerName, InputStream logStream) {
@@ -96,6 +94,8 @@ class ChildProcAppHandle extends AbstractAppHandle {
         return;
       }
 
+      disconnect();
+
       int ec;
       try {
         ec = proc.exitValue();
@@ -118,8 +118,6 @@ class ChildProcAppHandle extends AbstractAppHandle {
       if (newState != null) {
         setState(newState, true);
       }
-
-      disconnect();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/50345a2a/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java
----------------------------------------------------------------------
diff --git a/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java
index f04263c..acd64c9 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java
@@ -39,16 +39,15 @@ class InProcessAppHandle extends AbstractAppHandle {
 
   @Override
   public synchronized void kill() {
-    if (!isDisposed()) {
-      LOG.warning("kill() may leave the underlying app running in in-process mode.");
-      setState(State.KILLED);
-      disconnect();
-
-      // Interrupt the thread. This is not guaranteed to kill the app, though.
-      if (app != null) {
-        app.interrupt();
-      }
+    LOG.warning("kill() may leave the underlying app running in in-process mode.");
+    disconnect();
+
+    // Interrupt the thread. This is not guaranteed to kill the app, though.
+    if (app != null) {
+      app.interrupt();
     }
+
+    setState(State.KILLED);
   }
 
   synchronized void start(String appName, Method main, String[] args) {

http://git-wip-us.apache.org/repos/asf/spark/blob/50345a2a/launcher/src/main/java/org/apache/spark/launcher/LauncherConnection.java
----------------------------------------------------------------------
diff --git a/launcher/src/main/java/org/apache/spark/launcher/LauncherConnection.java b/launcher/src/main/java/org/apache/spark/launcher/LauncherConnection.java
index fd6f229..b4a8719 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/LauncherConnection.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherConnection.java
@@ -95,15 +95,15 @@ abstract class LauncherConnection implements Closeable, Runnable {
   }
 
   @Override
-  public synchronized void close() throws IOException {
+  public void close() throws IOException {
     if (!closed) {
-      closed = true;
-      socket.close();
+      synchronized (this) {
+        if (!closed) {
+          closed = true;
+          socket.close();
+        }
+      }
     }
   }
 
-  boolean isOpen() {
-    return !closed;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/50345a2a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
----------------------------------------------------------------------
diff --git a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
index 660c444..b8999a1 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
@@ -217,33 +217,6 @@ class LauncherServer implements Closeable {
         break;
       }
     }
-
-    // If there is a live connection for this handle, we need to wait for it to finish before
-    // returning, otherwise there might be a race between the connection thread processing
-    // buffered data and the handle cleaning up after itself, leading to potentially the wrong
-    // state being reported for the handle.
-    ServerConnection conn = null;
-    synchronized (clients) {
-      for (ServerConnection c : clients) {
-        if (c.handle == handle) {
-          conn = c;
-          break;
-        }
-      }
-    }
-
-    if (conn != null) {
-      synchronized (conn) {
-        if (conn.isOpen()) {
-          try {
-            conn.wait();
-          } catch (InterruptedException ie) {
-            // Ignore.
-          }
-        }
-      }
-    }
-
     unref();
   }
 
@@ -315,7 +288,7 @@ class LauncherServer implements Closeable {
   private class ServerConnection extends LauncherConnection {
 
     private TimerTask timeout;
-    volatile AbstractAppHandle handle;
+    private AbstractAppHandle handle;
 
     ServerConnection(Socket socket, TimerTask timeout) throws IOException {
       super(socket);
@@ -365,21 +338,16 @@ class LauncherServer implements Closeable {
 
     @Override
     public void close() throws IOException {
-      if (!isOpen()) {
-        return;
-      }
-
       synchronized (clients) {
         clients.remove(this);
       }
-
-      synchronized (this) {
-        super.close();
-        notifyAll();
-      }
-
+      super.close();
       if (handle != null) {
-        handle.dispose();
+        if (!handle.getState().isFinal()) {
+          LOG.log(Level.WARNING, "Lost connection to spark application.");
+          handle.setState(SparkAppHandle.State.LOST);
+        }
+        handle.disconnect();
       }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/50345a2a/launcher/src/test/java/org/apache/spark/launcher/BaseSuite.java
----------------------------------------------------------------------
diff --git a/launcher/src/test/java/org/apache/spark/launcher/BaseSuite.java b/launcher/src/test/java/org/apache/spark/launcher/BaseSuite.java
index 3722a59..3e1a90e 100644
--- a/launcher/src/test/java/org/apache/spark/launcher/BaseSuite.java
+++ b/launcher/src/test/java/org/apache/spark/launcher/BaseSuite.java
@@ -17,7 +17,6 @@
 
 package org.apache.spark.launcher;
 
-import java.time.Duration;
 import java.util.concurrent.TimeUnit;
 
 import org.junit.After;
@@ -48,46 +47,19 @@ class BaseSuite {
     assertNull(server);
   }
 
-  protected void waitFor(final SparkAppHandle handle) throws Exception {
+  protected void waitFor(SparkAppHandle handle) throws Exception {
+    long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
     try {
-      eventually(Duration.ofSeconds(10), Duration.ofMillis(10), () -> {
-        assertTrue("Handle is not in final state.", handle.getState().isFinal());
-      });
+      while (!handle.getState().isFinal()) {
+        assertTrue("Timed out waiting for handle to transition to final state.",
+          System.nanoTime() < deadline);
+        TimeUnit.MILLISECONDS.sleep(10);
+      }
     } finally {
       if (!handle.getState().isFinal()) {
         handle.kill();
       }
     }
-
-    // Wait until the handle has been marked as disposed, to make sure all cleanup tasks
-    // have been performed.
-    AbstractAppHandle ahandle = (AbstractAppHandle) handle;
-    eventually(Duration.ofSeconds(10), Duration.ofMillis(10), () -> {
-      assertTrue("Handle is still not marked as disposed.", ahandle.isDisposed());
-    });
-  }
-
-  /**
-   * Call a closure that performs a check every "period" until it succeeds, or the timeout
-   * elapses.
-   */
-  protected void eventually(Duration timeout, Duration period, Runnable check) throws Exception {
-    assertTrue("Timeout needs to be larger than period.", timeout.compareTo(period) > 0);
-    long deadline = System.nanoTime() + timeout.toNanos();
-    int count = 0;
-    while (true) {
-      try {
-        count++;
-        check.run();
-        return;
-      } catch (Throwable t) {
-        if (System.nanoTime() >= deadline) {
-          String msg = String.format("Failed check after %d tries: %s.", count, t.getMessage());
-          throw new IllegalStateException(msg, t);
-        }
-        Thread.sleep(period.toMillis());
-      }
-    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/50345a2a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
----------------------------------------------------------------------
diff --git a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
index 75c1af0..7e2b09c 100644
--- a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
+++ b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
@@ -23,14 +23,12 @@ import java.io.ObjectInputStream;
 import java.net.InetAddress;
 import java.net.Socket;
 import java.net.SocketException;
-import java.time.Duration;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.junit.Test;
 import static org.junit.Assert.*;
@@ -199,20 +197,28 @@ public class LauncherServerSuite extends BaseSuite {
    * server-side close immediately.
    */
   private void waitForError(TestClient client, String secret) throws Exception {
-    final AtomicBoolean helloSent = new AtomicBoolean();
-    eventually(Duration.ofSeconds(1), Duration.ofMillis(10), () -> {
+    boolean helloSent = false;
+    int maxTries = 10;
+    for (int i = 0; i < maxTries; i++) {
       try {
-        if (!helloSent.get()) {
+        if (!helloSent) {
           client.send(new Hello(secret, "1.4.0"));
-          helloSent.set(true);
+          helloSent = true;
         } else {
           client.send(new SetAppId("appId"));
         }
         fail("Expected error but message went through.");
       } catch (IllegalStateException | IOException e) {
         // Expected.
+        break;
+      } catch (AssertionError e) {
+        if (i < maxTries - 1) {
+          Thread.sleep(100);
+        } else {
+          throw new AssertionError("Test failed after " + maxTries + " attempts.", e);
+        }
       }
-    });
+    }
   }
 
   private static class TestClient extends LauncherConnection {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org