You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sa...@apache.org on 2018/01/17 06:15:00 UTC
spark git commit: Revert "[SPARK-23020][CORE] Fix races in launcher
code, test."
Repository: spark
Updated Branches:
refs/heads/master 166705785 -> 50345a2aa
Revert "[SPARK-23020][CORE] Fix races in launcher code, test."
This reverts commit 66217dac4f8952a9923625908ad3dcb030763c81.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/50345a2a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/50345a2a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/50345a2a
Branch: refs/heads/master
Commit: 50345a2aa59741c511d555edbbad2da9611e7d16
Parents: 1667057
Author: Sameer Agarwal <sa...@apache.org>
Authored: Tue Jan 16 22:14:47 2018 -0800
Committer: Sameer Agarwal <sa...@apache.org>
Committed: Tue Jan 16 22:14:47 2018 -0800
----------------------------------------------------------------------
.../spark/launcher/SparkLauncherSuite.java | 49 +++++++-------------
.../spark/launcher/AbstractAppHandle.java | 22 ++-------
.../spark/launcher/ChildProcAppHandle.java | 18 ++++---
.../spark/launcher/InProcessAppHandle.java | 17 ++++---
.../spark/launcher/LauncherConnection.java | 14 +++---
.../apache/spark/launcher/LauncherServer.java | 46 +++---------------
.../org/apache/spark/launcher/BaseSuite.java | 42 +++--------------
.../spark/launcher/LauncherServerSuite.java | 20 +++++---
8 files changed, 72 insertions(+), 156 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/50345a2a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
index a042375..9d2f563 100644
--- a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
+++ b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
@@ -17,7 +17,6 @@
package org.apache.spark.launcher;
-import java.time.Duration;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.HashMap;
@@ -32,7 +31,6 @@ import static org.junit.Assume.*;
import static org.mockito.Mockito.*;
import org.apache.spark.SparkContext;
-import org.apache.spark.SparkContext$;
import org.apache.spark.internal.config.package$;
import org.apache.spark.util.Utils;
@@ -139,9 +137,7 @@ public class SparkLauncherSuite extends BaseSuite {
// Here DAGScheduler is stopped, while SparkContext.clearActiveContext may not be called yet.
// Wait for a reasonable amount of time to avoid creating two active SparkContext in JVM.
// See SPARK-23019 and SparkContext.stop() for details.
- eventually(Duration.ofSeconds(5), Duration.ofMillis(10), () -> {
- assertTrue("SparkContext is still alive.", SparkContext$.MODULE$.getActive().isEmpty());
- });
+ TimeUnit.MILLISECONDS.sleep(500);
}
}
@@ -150,35 +146,26 @@ public class SparkLauncherSuite extends BaseSuite {
SparkAppHandle.Listener listener = mock(SparkAppHandle.Listener.class);
doAnswer(invocation -> {
SparkAppHandle h = (SparkAppHandle) invocation.getArguments()[0];
- synchronized (transitions) {
- transitions.add(h.getState());
- }
+ transitions.add(h.getState());
return null;
}).when(listener).stateChanged(any(SparkAppHandle.class));
- SparkAppHandle handle = null;
- try {
- handle = new InProcessLauncher()
- .setMaster("local")
- .setAppResource(SparkLauncher.NO_RESOURCE)
- .setMainClass(InProcessTestApp.class.getName())
- .addAppArgs("hello")
- .startApplication(listener);
-
- waitFor(handle);
- assertEquals(SparkAppHandle.State.FINISHED, handle.getState());
-
- // Matches the behavior of LocalSchedulerBackend.
- List<SparkAppHandle.State> expected = Arrays.asList(
- SparkAppHandle.State.CONNECTED,
- SparkAppHandle.State.RUNNING,
- SparkAppHandle.State.FINISHED);
- assertEquals(expected, transitions);
- } finally {
- if (handle != null) {
- handle.kill();
- }
- }
+ SparkAppHandle handle = new InProcessLauncher()
+ .setMaster("local")
+ .setAppResource(SparkLauncher.NO_RESOURCE)
+ .setMainClass(InProcessTestApp.class.getName())
+ .addAppArgs("hello")
+ .startApplication(listener);
+
+ waitFor(handle);
+ assertEquals(SparkAppHandle.State.FINISHED, handle.getState());
+
+ // Matches the behavior of LocalSchedulerBackend.
+ List<SparkAppHandle.State> expected = Arrays.asList(
+ SparkAppHandle.State.CONNECTED,
+ SparkAppHandle.State.RUNNING,
+ SparkAppHandle.State.FINISHED);
+ assertEquals(expected, transitions);
}
public static class SparkLauncherTestApp {
http://git-wip-us.apache.org/repos/asf/spark/blob/50345a2a/launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java
----------------------------------------------------------------------
diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java
index daf0972..df1e731 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractAppHandle.java
@@ -33,7 +33,7 @@ abstract class AbstractAppHandle implements SparkAppHandle {
private List<Listener> listeners;
private State state;
private String appId;
- private volatile boolean disposed;
+ private boolean disposed;
protected AbstractAppHandle(LauncherServer server) {
this.server = server;
@@ -70,7 +70,8 @@ abstract class AbstractAppHandle implements SparkAppHandle {
@Override
public synchronized void disconnect() {
- if (!isDisposed()) {
+ if (!disposed) {
+ disposed = true;
if (connection != null) {
try {
connection.close();
@@ -78,7 +79,7 @@ abstract class AbstractAppHandle implements SparkAppHandle {
// no-op.
}
}
- dispose();
+ server.unregister(this);
}
}
@@ -94,21 +95,6 @@ abstract class AbstractAppHandle implements SparkAppHandle {
return disposed;
}
- /**
- * Mark the handle as disposed, and set it as LOST in case the current state is not final.
- */
- synchronized void dispose() {
- if (!isDisposed()) {
- // Unregister first to make sure that the connection with the app has been really
- // terminated.
- server.unregister(this);
- if (!getState().isFinal()) {
- setState(State.LOST);
- }
- this.disposed = true;
- }
- }
-
void setState(State s) {
setState(s, false);
}
http://git-wip-us.apache.org/repos/asf/spark/blob/50345a2a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
----------------------------------------------------------------------
diff --git a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
index 2b99461..8b3f427 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
@@ -48,16 +48,14 @@ class ChildProcAppHandle extends AbstractAppHandle {
@Override
public synchronized void kill() {
- if (!isDisposed()) {
- setState(State.KILLED);
- disconnect();
- if (childProc != null) {
- if (childProc.isAlive()) {
- childProc.destroyForcibly();
- }
- childProc = null;
+ disconnect();
+ if (childProc != null) {
+ if (childProc.isAlive()) {
+ childProc.destroyForcibly();
}
+ childProc = null;
}
+ setState(State.KILLED);
}
void setChildProc(Process childProc, String loggerName, InputStream logStream) {
@@ -96,6 +94,8 @@ class ChildProcAppHandle extends AbstractAppHandle {
return;
}
+ disconnect();
+
int ec;
try {
ec = proc.exitValue();
@@ -118,8 +118,6 @@ class ChildProcAppHandle extends AbstractAppHandle {
if (newState != null) {
setState(newState, true);
}
-
- disconnect();
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/50345a2a/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java
----------------------------------------------------------------------
diff --git a/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java
index f04263c..acd64c9 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java
@@ -39,16 +39,15 @@ class InProcessAppHandle extends AbstractAppHandle {
@Override
public synchronized void kill() {
- if (!isDisposed()) {
- LOG.warning("kill() may leave the underlying app running in in-process mode.");
- setState(State.KILLED);
- disconnect();
-
- // Interrupt the thread. This is not guaranteed to kill the app, though.
- if (app != null) {
- app.interrupt();
- }
+ LOG.warning("kill() may leave the underlying app running in in-process mode.");
+ disconnect();
+
+ // Interrupt the thread. This is not guaranteed to kill the app, though.
+ if (app != null) {
+ app.interrupt();
}
+
+ setState(State.KILLED);
}
synchronized void start(String appName, Method main, String[] args) {
http://git-wip-us.apache.org/repos/asf/spark/blob/50345a2a/launcher/src/main/java/org/apache/spark/launcher/LauncherConnection.java
----------------------------------------------------------------------
diff --git a/launcher/src/main/java/org/apache/spark/launcher/LauncherConnection.java b/launcher/src/main/java/org/apache/spark/launcher/LauncherConnection.java
index fd6f229..b4a8719 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/LauncherConnection.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherConnection.java
@@ -95,15 +95,15 @@ abstract class LauncherConnection implements Closeable, Runnable {
}
@Override
- public synchronized void close() throws IOException {
+ public void close() throws IOException {
if (!closed) {
- closed = true;
- socket.close();
+ synchronized (this) {
+ if (!closed) {
+ closed = true;
+ socket.close();
+ }
+ }
}
}
- boolean isOpen() {
- return !closed;
- }
-
}
http://git-wip-us.apache.org/repos/asf/spark/blob/50345a2a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
----------------------------------------------------------------------
diff --git a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
index 660c444..b8999a1 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
@@ -217,33 +217,6 @@ class LauncherServer implements Closeable {
break;
}
}
-
- // If there is a live connection for this handle, we need to wait for it to finish before
- // returning, otherwise there might be a race between the connection thread processing
- // buffered data and the handle cleaning up after itself, leading to potentially the wrong
- // state being reported for the handle.
- ServerConnection conn = null;
- synchronized (clients) {
- for (ServerConnection c : clients) {
- if (c.handle == handle) {
- conn = c;
- break;
- }
- }
- }
-
- if (conn != null) {
- synchronized (conn) {
- if (conn.isOpen()) {
- try {
- conn.wait();
- } catch (InterruptedException ie) {
- // Ignore.
- }
- }
- }
- }
-
unref();
}
@@ -315,7 +288,7 @@ class LauncherServer implements Closeable {
private class ServerConnection extends LauncherConnection {
private TimerTask timeout;
- volatile AbstractAppHandle handle;
+ private AbstractAppHandle handle;
ServerConnection(Socket socket, TimerTask timeout) throws IOException {
super(socket);
@@ -365,21 +338,16 @@ class LauncherServer implements Closeable {
@Override
public void close() throws IOException {
- if (!isOpen()) {
- return;
- }
-
synchronized (clients) {
clients.remove(this);
}
-
- synchronized (this) {
- super.close();
- notifyAll();
- }
-
+ super.close();
if (handle != null) {
- handle.dispose();
+ if (!handle.getState().isFinal()) {
+ LOG.log(Level.WARNING, "Lost connection to spark application.");
+ handle.setState(SparkAppHandle.State.LOST);
+ }
+ handle.disconnect();
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/50345a2a/launcher/src/test/java/org/apache/spark/launcher/BaseSuite.java
----------------------------------------------------------------------
diff --git a/launcher/src/test/java/org/apache/spark/launcher/BaseSuite.java b/launcher/src/test/java/org/apache/spark/launcher/BaseSuite.java
index 3722a59..3e1a90e 100644
--- a/launcher/src/test/java/org/apache/spark/launcher/BaseSuite.java
+++ b/launcher/src/test/java/org/apache/spark/launcher/BaseSuite.java
@@ -17,7 +17,6 @@
package org.apache.spark.launcher;
-import java.time.Duration;
import java.util.concurrent.TimeUnit;
import org.junit.After;
@@ -48,46 +47,19 @@ class BaseSuite {
assertNull(server);
}
- protected void waitFor(final SparkAppHandle handle) throws Exception {
+ protected void waitFor(SparkAppHandle handle) throws Exception {
+ long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
try {
- eventually(Duration.ofSeconds(10), Duration.ofMillis(10), () -> {
- assertTrue("Handle is not in final state.", handle.getState().isFinal());
- });
+ while (!handle.getState().isFinal()) {
+ assertTrue("Timed out waiting for handle to transition to final state.",
+ System.nanoTime() < deadline);
+ TimeUnit.MILLISECONDS.sleep(10);
+ }
} finally {
if (!handle.getState().isFinal()) {
handle.kill();
}
}
-
- // Wait until the handle has been marked as disposed, to make sure all cleanup tasks
- // have been performed.
- AbstractAppHandle ahandle = (AbstractAppHandle) handle;
- eventually(Duration.ofSeconds(10), Duration.ofMillis(10), () -> {
- assertTrue("Handle is still not marked as disposed.", ahandle.isDisposed());
- });
- }
-
- /**
- * Call a closure that performs a check every "period" until it succeeds, or the timeout
- * elapses.
- */
- protected void eventually(Duration timeout, Duration period, Runnable check) throws Exception {
- assertTrue("Timeout needs to be larger than period.", timeout.compareTo(period) > 0);
- long deadline = System.nanoTime() + timeout.toNanos();
- int count = 0;
- while (true) {
- try {
- count++;
- check.run();
- return;
- } catch (Throwable t) {
- if (System.nanoTime() >= deadline) {
- String msg = String.format("Failed check after %d tries: %s.", count, t.getMessage());
- throw new IllegalStateException(msg, t);
- }
- Thread.sleep(period.toMillis());
- }
- }
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/50345a2a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
----------------------------------------------------------------------
diff --git a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
index 75c1af0..7e2b09c 100644
--- a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
+++ b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
@@ -23,14 +23,12 @@ import java.io.ObjectInputStream;
import java.net.InetAddress;
import java.net.Socket;
import java.net.SocketException;
-import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Test;
import static org.junit.Assert.*;
@@ -199,20 +197,28 @@ public class LauncherServerSuite extends BaseSuite {
* server-side close immediately.
*/
private void waitForError(TestClient client, String secret) throws Exception {
- final AtomicBoolean helloSent = new AtomicBoolean();
- eventually(Duration.ofSeconds(1), Duration.ofMillis(10), () -> {
+ boolean helloSent = false;
+ int maxTries = 10;
+ for (int i = 0; i < maxTries; i++) {
try {
- if (!helloSent.get()) {
+ if (!helloSent) {
client.send(new Hello(secret, "1.4.0"));
- helloSent.set(true);
+ helloSent = true;
} else {
client.send(new SetAppId("appId"));
}
fail("Expected error but message went through.");
} catch (IllegalStateException | IOException e) {
// Expected.
+ break;
+ } catch (AssertionError e) {
+ if (i < maxTries - 1) {
+ Thread.sleep(100);
+ } else {
+ throw new AssertionError("Test failed after " + maxTries + " attempts.", e);
+ }
}
- });
+ }
}
private static class TestClient extends LauncherConnection {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org