You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/08/15 18:26:34 UTC

spark git commit: [SPARK-17742][CORE] Handle child process exit in SparkLauncher.

Repository: spark
Updated Branches:
  refs/heads/master 14bdb25fd -> cba826d00


[SPARK-17742][CORE] Handle child process exit in SparkLauncher.

Currently the launcher handle does not monitor the child spark-submit
process it launches; this means that if the child exits with an error,
the handle's state will never change, and an application will not know
that the application has failed.

This change adds code to monitor the child process, and changes the
handle state appropriately when the child process exits.

Tested with added unit tests.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #18877 from vanzin/SPARK-17742.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cba826d0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cba826d0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cba826d0

Branch: refs/heads/master
Commit: cba826d00173a945b0c9a7629c66e36fa73b723e
Parents: 14bdb25
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Tue Aug 15 11:26:29 2017 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Tue Aug 15 11:26:29 2017 -0700

----------------------------------------------------------------------
 .../spark/launcher/SparkLauncherSuite.java      |   4 +-
 .../spark/launcher/ChildProcAppHandle.java      |  65 +++--
 .../apache/spark/launcher/OutputRedirector.java |  16 +-
 .../spark/launcher/ChildProcAppHandleSuite.java | 248 +++++++++++++++++++
 .../spark/launcher/OutputRedirectionSuite.java  | 226 -----------------
 launcher/src/test/resources/log4j.properties    |   2 +-
 6 files changed, 313 insertions(+), 248 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cba826d0/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
index 19861b8..db4fc26 100644
--- a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
+++ b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
@@ -116,11 +116,11 @@ public class SparkLauncherSuite {
       .setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, System.getProperty("java.class.path"))
       .addSparkArg(opts.CLASS, "ShouldBeOverriddenBelow")
       .setMainClass(SparkLauncherTestApp.class.getName())
+      .redirectError()
       .addAppArgs("proc");
     final Process app = launcher.launch();
 
-    new OutputRedirector(app.getInputStream(), TF);
-    new OutputRedirector(app.getErrorStream(), TF);
+    new OutputRedirector(app.getInputStream(), getClass().getName() + ".child", TF);
     assertEquals(0, app.waitFor());
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cba826d0/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
----------------------------------------------------------------------
diff --git a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
index 3ce4b79..bf91640 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
@@ -34,7 +34,7 @@ class ChildProcAppHandle implements SparkAppHandle {
   private final String secret;
   private final LauncherServer server;
 
-  private Process childProc;
+  private volatile Process childProc;
   private boolean disposed;
   private LauncherConnection connection;
   private List<Listener> listeners;
@@ -96,18 +96,14 @@ class ChildProcAppHandle implements SparkAppHandle {
 
   @Override
   public synchronized void kill() {
-    if (!disposed) {
-      disconnect();
-    }
+    disconnect();
     if (childProc != null) {
-      try {
-        childProc.exitValue();
-      } catch (IllegalThreadStateException e) {
+      if (childProc.isAlive()) {
         childProc.destroyForcibly();
-      } finally {
-        childProc = null;
       }
+      childProc = null;
     }
+    setState(State.KILLED);
   }
 
   String getSecret() {
@@ -118,7 +114,13 @@ class ChildProcAppHandle implements SparkAppHandle {
     this.childProc = childProc;
     if (logStream != null) {
       this.redirector = new OutputRedirector(logStream, loggerName,
-        SparkLauncher.REDIRECTOR_FACTORY);
+        SparkLauncher.REDIRECTOR_FACTORY, this);
+    } else {
+      // If there is no log redirection, spawn a thread that will wait for the child process
+      // to finish.
+      Thread waiter = SparkLauncher.REDIRECTOR_FACTORY.newThread(this::monitorChild);
+      waiter.setDaemon(true);
+      waiter.start();
     }
   }
 
@@ -134,7 +136,7 @@ class ChildProcAppHandle implements SparkAppHandle {
     return connection;
   }
 
-  void setState(State s) {
+  synchronized void setState(State s) {
     if (!state.isFinal()) {
       state = s;
       fireEvent(false);
@@ -144,17 +146,48 @@ class ChildProcAppHandle implements SparkAppHandle {
     }
   }
 
-  void setAppId(String appId) {
+  synchronized void setAppId(String appId) {
     this.appId = appId;
     fireEvent(true);
   }
 
-  // Visible for testing.
-  boolean isRunning() {
-    return childProc == null || childProc.isAlive() || (redirector != null && redirector.isAlive());
+  /**
+   * Wait for the child process to exit and update the handle's state if necessary, accoding to
+   * the exit code.
+   */
+  void monitorChild() {
+    while (childProc.isAlive()) {
+      try {
+        childProc.waitFor();
+      } catch (Exception e) {
+        LOG.log(Level.WARNING, "Exception waiting for child process to exit.", e);
+      }
+    }
+
+    synchronized (this) {
+      if (disposed) {
+        return;
+      }
+
+      disconnect();
+
+      int ec;
+      try {
+        ec = childProc.exitValue();
+      } catch (Exception e) {
+        LOG.log(Level.WARNING, "Exception getting child process exit code, assuming failure.", e);
+        ec = 1;
+      }
+
+      // Only override the success state; leave other fail states alone.
+      if (!state.isFinal() || (ec != 0 && state == State.FINISHED)) {
+        state = State.LOST;
+        fireEvent(false);
+      }
+    }
   }
 
-  private synchronized void fireEvent(boolean isInfoChanged) {
+  private void fireEvent(boolean isInfoChanged) {
     if (listeners != null) {
       for (Listener l : listeners) {
         if (isInfoChanged) {

http://git-wip-us.apache.org/repos/asf/spark/blob/cba826d0/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java
----------------------------------------------------------------------
diff --git a/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java b/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java
index 63abae9..6f4b0bb 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java
@@ -34,18 +34,24 @@ class OutputRedirector {
   private final BufferedReader reader;
   private final Logger sink;
   private final Thread thread;
+  private final ChildProcAppHandle callback;
 
   private volatile boolean active;
 
-  OutputRedirector(InputStream in, ThreadFactory tf) {
-    this(in, OutputRedirector.class.getName(), tf);
+  OutputRedirector(InputStream in, String loggerName, ThreadFactory tf) {
+    this(in, loggerName, tf, null);
   }
 
-  OutputRedirector(InputStream in, String loggerName, ThreadFactory tf) {
+  OutputRedirector(
+      InputStream in,
+      String loggerName,
+      ThreadFactory tf,
+      ChildProcAppHandle callback) {
     this.active = true;
     this.reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
     this.thread = tf.newThread(this::redirect);
     this.sink = Logger.getLogger(loggerName);
+    this.callback = callback;
     thread.start();
   }
 
@@ -59,6 +65,10 @@ class OutputRedirector {
       }
     } catch (IOException e) {
       sink.log(Level.FINE, "Error reading child process output.", e);
+    } finally {
+      if (callback != null) {
+        callback.monitorChild();
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cba826d0/launcher/src/test/java/org/apache/spark/launcher/ChildProcAppHandleSuite.java
----------------------------------------------------------------------
diff --git a/launcher/src/test/java/org/apache/spark/launcher/ChildProcAppHandleSuite.java b/launcher/src/test/java/org/apache/spark/launcher/ChildProcAppHandleSuite.java
new file mode 100644
index 0000000..64a87b3
--- /dev/null
+++ b/launcher/src/test/java/org/apache/spark/launcher/ChildProcAppHandleSuite.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import static java.nio.file.attribute.PosixFilePermission.*;
+
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+import static org.junit.Assume.*;
+
+import static org.apache.spark.launcher.CommandBuilderUtils.*;
+
+public class ChildProcAppHandleSuite extends BaseSuite {
+
+  private static final List<String> MESSAGES = new ArrayList<>();
+
+  private static final List<String> TEST_SCRIPT = Arrays.asList(
+    "#!/bin/sh",
+    "echo \"output\"",
+    "echo \"error\" 1>&2");
+
+  private static File TEST_SCRIPT_PATH;
+
+  @AfterClass
+  public static void cleanupClass() throws Exception {
+    if (TEST_SCRIPT_PATH != null) {
+      TEST_SCRIPT_PATH.delete();
+      TEST_SCRIPT_PATH = null;
+    }
+  }
+
+  @BeforeClass
+  public static void setupClass() throws Exception {
+    TEST_SCRIPT_PATH = File.createTempFile("output-redir-test", ".sh");
+    Files.setPosixFilePermissions(TEST_SCRIPT_PATH.toPath(),
+      EnumSet.of(OWNER_READ, OWNER_EXECUTE, OWNER_WRITE));
+    Files.write(TEST_SCRIPT_PATH.toPath(), TEST_SCRIPT);
+  }
+
+  @Before
+  public void cleanupLog() {
+    MESSAGES.clear();
+  }
+
+  @Test
+  public void testRedirectsSimple() throws Exception {
+    SparkLauncher launcher = new SparkLauncher();
+    launcher.redirectError(ProcessBuilder.Redirect.PIPE);
+    assertNotNull(launcher.errorStream);
+    assertEquals(launcher.errorStream.type(), ProcessBuilder.Redirect.Type.PIPE);
+
+    launcher.redirectOutput(ProcessBuilder.Redirect.PIPE);
+    assertNotNull(launcher.outputStream);
+    assertEquals(launcher.outputStream.type(), ProcessBuilder.Redirect.Type.PIPE);
+  }
+
+  @Test
+  public void testRedirectLastWins() throws Exception {
+    SparkLauncher launcher = new SparkLauncher();
+    launcher.redirectError(ProcessBuilder.Redirect.PIPE)
+      .redirectError(ProcessBuilder.Redirect.INHERIT);
+    assertEquals(launcher.errorStream.type(), ProcessBuilder.Redirect.Type.INHERIT);
+
+    launcher.redirectOutput(ProcessBuilder.Redirect.PIPE)
+      .redirectOutput(ProcessBuilder.Redirect.INHERIT);
+    assertEquals(launcher.outputStream.type(), ProcessBuilder.Redirect.Type.INHERIT);
+  }
+
+  @Test
+  public void testRedirectToLog() throws Exception {
+    assumeFalse(isWindows());
+
+    SparkAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher()
+      .startApplication();
+    waitFor(handle);
+
+    assertTrue(MESSAGES.contains("output"));
+    assertTrue(MESSAGES.contains("error"));
+  }
+
+  @Test
+  public void testRedirectErrorToLog() throws Exception {
+    assumeFalse(isWindows());
+
+    Path err = Files.createTempFile("stderr", "txt");
+
+    SparkAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher()
+      .redirectError(err.toFile())
+      .startApplication();
+    waitFor(handle);
+
+    assertTrue(MESSAGES.contains("output"));
+    assertEquals(Arrays.asList("error"), Files.lines(err).collect(Collectors.toList()));
+  }
+
+  @Test
+  public void testRedirectOutputToLog() throws Exception {
+    assumeFalse(isWindows());
+
+    Path out = Files.createTempFile("stdout", "txt");
+
+    SparkAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher()
+      .redirectOutput(out.toFile())
+      .startApplication();
+    waitFor(handle);
+
+    assertTrue(MESSAGES.contains("error"));
+    assertEquals(Arrays.asList("output"), Files.lines(out).collect(Collectors.toList()));
+  }
+
+  @Test
+  public void testNoRedirectToLog() throws Exception {
+    assumeFalse(isWindows());
+
+    Path out = Files.createTempFile("stdout", "txt");
+    Path err = Files.createTempFile("stderr", "txt");
+
+    ChildProcAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher()
+      .redirectError(err.toFile())
+      .redirectOutput(out.toFile())
+      .startApplication();
+    waitFor(handle);
+
+    assertTrue(MESSAGES.isEmpty());
+    assertEquals(Arrays.asList("error"), Files.lines(err).collect(Collectors.toList()));
+    assertEquals(Arrays.asList("output"), Files.lines(out).collect(Collectors.toList()));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testBadLogRedirect() throws Exception {
+    new SparkLauncher()
+      .redirectError()
+      .redirectOutput(Files.createTempFile("stdout", "txt").toFile())
+      .redirectToLog("foo")
+      .launch()
+      .waitFor();
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testRedirectErrorTwiceFails() throws Exception {
+    new SparkLauncher()
+      .redirectError()
+      .redirectError(Files.createTempFile("stderr", "txt").toFile())
+      .launch()
+      .waitFor();
+  }
+
+  @Test
+  public void testProcMonitorWithOutputRedirection() throws Exception {
+    File err = Files.createTempFile("out", "txt").toFile();
+    SparkAppHandle handle = new TestSparkLauncher()
+      .redirectError()
+      .redirectOutput(err)
+      .startApplication();
+    waitFor(handle);
+    assertEquals(SparkAppHandle.State.LOST, handle.getState());
+  }
+
+  @Test
+  public void testProcMonitorWithLogRedirection() throws Exception {
+    SparkAppHandle handle = new TestSparkLauncher()
+      .redirectToLog(getClass().getName())
+      .startApplication();
+    waitFor(handle);
+    assertEquals(SparkAppHandle.State.LOST, handle.getState());
+  }
+
+  private void waitFor(SparkAppHandle handle) throws Exception {
+    long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
+    try {
+      while (!handle.getState().isFinal()) {
+        assertTrue("Timed out waiting for handle to transition to final state.",
+          System.nanoTime() < deadline);
+        TimeUnit.MILLISECONDS.sleep(10);
+      }
+    } finally {
+      if (!handle.getState().isFinal()) {
+        handle.kill();
+      }
+    }
+  }
+
+  private static class TestSparkLauncher extends SparkLauncher {
+
+    TestSparkLauncher() {
+      setAppResource("outputredirtest");
+    }
+
+    @Override
+    String findSparkSubmit() {
+      return TEST_SCRIPT_PATH.getAbsolutePath();
+    }
+
+  }
+
+  /**
+   * A log4j appender used by child apps of this test. It records all messages logged through it in
+   * memory so the test can check them.
+   */
+  public static class LogAppender extends AppenderSkeleton {
+
+    @Override
+    protected void append(LoggingEvent event) {
+      MESSAGES.add(event.getMessage().toString());
+    }
+
+    @Override
+    public boolean requiresLayout() {
+      return false;
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/cba826d0/launcher/src/test/java/org/apache/spark/launcher/OutputRedirectionSuite.java
----------------------------------------------------------------------
diff --git a/launcher/src/test/java/org/apache/spark/launcher/OutputRedirectionSuite.java b/launcher/src/test/java/org/apache/spark/launcher/OutputRedirectionSuite.java
deleted file mode 100644
index ba044d3..0000000
--- a/launcher/src/test/java/org/apache/spark/launcher/OutputRedirectionSuite.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.launcher;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.stream.Collectors;
-import static java.nio.file.attribute.PosixFilePermission.*;
-
-import org.apache.log4j.AppenderSkeleton;
-import org.apache.log4j.spi.LoggingEvent;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import static org.junit.Assert.*;
-import static org.junit.Assume.*;
-
-import static org.apache.spark.launcher.CommandBuilderUtils.*;
-
-public class OutputRedirectionSuite extends BaseSuite {
-
-  private static final List<String> MESSAGES = new ArrayList<>();
-
-  private static final List<String> TEST_SCRIPT = Arrays.asList(
-    "#!/bin/sh",
-    "echo \"output\"",
-    "echo \"error\" 1>&2");
-
-  private static File TEST_SCRIPT_PATH;
-
-  @AfterClass
-  public static void cleanupClass() throws Exception {
-    if (TEST_SCRIPT_PATH != null) {
-      TEST_SCRIPT_PATH.delete();
-      TEST_SCRIPT_PATH = null;
-    }
-  }
-
-  @BeforeClass
-  public static void setupClass() throws Exception {
-    TEST_SCRIPT_PATH = File.createTempFile("output-redir-test", ".sh");
-    Files.setPosixFilePermissions(TEST_SCRIPT_PATH.toPath(),
-      EnumSet.of(OWNER_READ, OWNER_EXECUTE, OWNER_WRITE));
-    Files.write(TEST_SCRIPT_PATH.toPath(), TEST_SCRIPT);
-  }
-
-  @Before
-  public void cleanupLog() {
-    MESSAGES.clear();
-  }
-
-  @Test
-  public void testRedirectsSimple() throws Exception {
-    SparkLauncher launcher = new SparkLauncher();
-    launcher.redirectError(ProcessBuilder.Redirect.PIPE);
-    assertNotNull(launcher.errorStream);
-    assertEquals(launcher.errorStream.type(), ProcessBuilder.Redirect.Type.PIPE);
-
-    launcher.redirectOutput(ProcessBuilder.Redirect.PIPE);
-    assertNotNull(launcher.outputStream);
-    assertEquals(launcher.outputStream.type(), ProcessBuilder.Redirect.Type.PIPE);
-  }
-
-  @Test
-  public void testRedirectLastWins() throws Exception {
-    SparkLauncher launcher = new SparkLauncher();
-    launcher.redirectError(ProcessBuilder.Redirect.PIPE)
-      .redirectError(ProcessBuilder.Redirect.INHERIT);
-    assertEquals(launcher.errorStream.type(), ProcessBuilder.Redirect.Type.INHERIT);
-
-    launcher.redirectOutput(ProcessBuilder.Redirect.PIPE)
-      .redirectOutput(ProcessBuilder.Redirect.INHERIT);
-    assertEquals(launcher.outputStream.type(), ProcessBuilder.Redirect.Type.INHERIT);
-  }
-
-  @Test
-  public void testRedirectToLog() throws Exception {
-    assumeFalse(isWindows());
-
-    ChildProcAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher().startApplication();
-    waitFor(handle);
-
-    assertTrue(MESSAGES.contains("output"));
-    assertTrue(MESSAGES.contains("error"));
-  }
-
-  @Test
-  public void testRedirectErrorToLog() throws Exception {
-    assumeFalse(isWindows());
-
-    Path err = Files.createTempFile("stderr", "txt");
-
-    ChildProcAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher()
-      .redirectError(err.toFile())
-      .startApplication();
-    waitFor(handle);
-
-    assertTrue(MESSAGES.contains("output"));
-    assertEquals(Arrays.asList("error"), Files.lines(err).collect(Collectors.toList()));
-  }
-
-  @Test
-  public void testRedirectOutputToLog() throws Exception {
-    assumeFalse(isWindows());
-
-    Path out = Files.createTempFile("stdout", "txt");
-
-    ChildProcAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher()
-      .redirectOutput(out.toFile())
-      .startApplication();
-    waitFor(handle);
-
-    assertTrue(MESSAGES.contains("error"));
-    assertEquals(Arrays.asList("output"), Files.lines(out).collect(Collectors.toList()));
-  }
-
-  @Test
-  public void testNoRedirectToLog() throws Exception {
-    assumeFalse(isWindows());
-
-    Path out = Files.createTempFile("stdout", "txt");
-    Path err = Files.createTempFile("stderr", "txt");
-
-    ChildProcAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher()
-      .redirectError(err.toFile())
-      .redirectOutput(out.toFile())
-      .startApplication();
-    waitFor(handle);
-
-    assertTrue(MESSAGES.isEmpty());
-    assertEquals(Arrays.asList("error"), Files.lines(err).collect(Collectors.toList()));
-    assertEquals(Arrays.asList("output"), Files.lines(out).collect(Collectors.toList()));
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testBadLogRedirect() throws Exception {
-    new SparkLauncher()
-      .redirectError()
-      .redirectOutput(Files.createTempFile("stdout", "txt").toFile())
-      .redirectToLog("foo")
-      .launch()
-      .waitFor();
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testRedirectErrorTwiceFails() throws Exception {
-    new SparkLauncher()
-      .redirectError()
-      .redirectError(Files.createTempFile("stderr", "txt").toFile())
-      .launch()
-      .waitFor();
-  }
-
-  private void waitFor(ChildProcAppHandle handle) throws Exception {
-    try {
-      while (handle.isRunning()) {
-        Thread.sleep(10);
-      }
-    } finally {
-      // Explicit unregister from server since the handle doesn't yet do that when the
-      // process finishes by itself.
-      LauncherServer server = LauncherServer.getServerInstance();
-      if (server != null) {
-        server.unregister(handle);
-      }
-    }
-  }
-
-  private static class TestSparkLauncher extends SparkLauncher {
-
-    TestSparkLauncher() {
-      setAppResource("outputredirtest");
-    }
-
-    @Override
-    String findSparkSubmit() {
-      return TEST_SCRIPT_PATH.getAbsolutePath();
-    }
-
-  }
-
-  /**
-   * A log4j appender used by child apps of this test. It records all messages logged through it in
-   * memory so the test can check them.
-   */
-  public static class LogAppender extends AppenderSkeleton {
-
-    @Override
-    protected void append(LoggingEvent event) {
-      MESSAGES.add(event.getMessage().toString());
-    }
-
-    @Override
-    public boolean requiresLayout() {
-      return false;
-    }
-
-    @Override
-    public void close() {
-
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/cba826d0/launcher/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/launcher/src/test/resources/log4j.properties b/launcher/src/test/resources/log4j.properties
index bd982b1..366a6b9 100644
--- a/launcher/src/test/resources/log4j.properties
+++ b/launcher/src/test/resources/log4j.properties
@@ -29,7 +29,7 @@ log4j.appender.childproc.target=System.err
 log4j.appender.childproc.layout=org.apache.log4j.PatternLayout
 log4j.appender.childproc.layout.ConversionPattern=%t: %m%n
 
-log4j.appender.outputredirtest=org.apache.spark.launcher.OutputRedirectionSuite$LogAppender
+log4j.appender.outputredirtest=org.apache.spark.launcher.ChildProcAppHandleSuite$LogAppender
 log4j.logger.org.apache.spark.launcher.app.outputredirtest=INFO, outputredirtest
 log4j.logger.org.apache.spark.launcher.app.outputredirtest.additivity=false
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org