You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2014/11/07 04:54:41 UTC

spark git commit: [SPARK-4236] Cleanup removed applications' files in shuffle service

Repository: spark
Updated Branches:
  refs/heads/master f165b2bbf -> 48a19a6db


[SPARK-4236] Cleanup removed applications' files in shuffle service

This relies on a hook from whoever is hosting the shuffle service to invoke removeApplication() when the application is completed. Once invoked, we will clean up all the executors' shuffle directories we know about.

Author: Aaron Davidson <aa...@databricks.com>

Closes #3126 from aarondav/cleanup and squashes the following commits:

33a64a9 [Aaron Davidson] Missing brace
e6e428f [Aaron Davidson] Address comments
16a0d27 [Aaron Davidson] Cleanup
e4df3e7 [Aaron Davidson] [SPARK-4236] Cleanup removed applications' files in shuffle service


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/48a19a6d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/48a19a6d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/48a19a6d

Branch: refs/heads/master
Commit: 48a19a6dba896f7d0b637f84e114b7efbb814e51
Parents: f165b2b
Author: Aaron Davidson <aa...@databricks.com>
Authored: Thu Nov 6 19:54:32 2014 -0800
Committer: Andrew Or <an...@databricks.com>
Committed: Thu Nov 6 19:54:32 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/util/Utils.scala     |   1 +
 .../spark/ExternalShuffleServiceSuite.scala     |   5 +-
 .../apache/spark/network/util/JavaUtils.java    |  59 ++++++++
 .../shuffle/ExternalShuffleBlockHandler.java    |  10 +-
 .../shuffle/ExternalShuffleBlockManager.java    | 118 +++++++++++++--
 .../shuffle/ExternalShuffleCleanupSuite.java    | 142 +++++++++++++++++++
 .../ExternalShuffleIntegrationSuite.java        |   2 +-
 .../network/shuffle/TestShuffleDataContext.java |   4 +-
 8 files changed, 319 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/48a19a6d/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 7caf6bc..2cbd38d 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -755,6 +755,7 @@ private[spark] object Utils extends Logging {
   /**
    * Delete a file or directory and its contents recursively.
    * Don't follow directories if they are symlinks.
+   * Throws an exception if deletion is unsuccessful.
    */
   def deleteRecursively(file: File) {
     if (file != null) {

http://git-wip-us.apache.org/repos/asf/spark/blob/48a19a6d/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
index 792b9cd..6608ed1 100644
--- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
@@ -63,8 +63,9 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {
     rdd.count()
     rdd.count()
 
-    // Invalidate the registered executors, disallowing access to their shuffle blocks.
-    rpcHandler.clearRegisteredExecutors()
+    // Invalidate the registered executors, disallowing access to their shuffle blocks (without
+    // deleting the actual shuffle files, so we could access them without the shuffle service).
+    rpcHandler.applicationRemoved(sc.conf.getAppId, false /* cleanupLocalDirs */)
 
     // Now Spark will receive FetchFailed, and not retry the stage due to "spark.test.noStageRetry"
     // being set.

http://git-wip-us.apache.org/repos/asf/spark/blob/48a19a6d/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java b/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
index 2856d1c..75c4a39 100644
--- a/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
+++ b/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
@@ -22,16 +22,22 @@ import java.nio.ByteBuffer;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.Closeable;
+import java.io.File;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 
+import com.google.common.base.Preconditions;
 import com.google.common.io.Closeables;
 import com.google.common.base.Charsets;
 import io.netty.buffer.Unpooled;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * General utilities available in the network package. Many of these are sourced from Spark's
+ * own Utils, just accessible within this package.
+ */
 public class JavaUtils {
   private static final Logger logger = LoggerFactory.getLogger(JavaUtils.class);
 
@@ -93,4 +99,57 @@ public class JavaUtils {
   public static String bytesToString(ByteBuffer b) {
     return Unpooled.wrappedBuffer(b).toString(Charsets.UTF_8);
   }
+
+  /*
+   * Delete a file or directory and its contents recursively.
+   * Don't follow directories if they are symlinks.
+   * Throws an exception if deletion is unsuccessful.
+   */
+  public static void deleteRecursively(File file) throws IOException {
+    if (file == null) { return; }
+
+    if (file.isDirectory() && !isSymlink(file)) {
+      IOException savedIOException = null;
+      for (File child : listFilesSafely(file)) {
+        try {
+          deleteRecursively(child);
+        } catch (IOException e) {
+          // In case of multiple exceptions, only last one will be thrown
+          savedIOException = e;
+        }
+      }
+      if (savedIOException != null) {
+        throw savedIOException;
+      }
+    }
+
+    boolean deleted = file.delete();
+    // Delete can also fail if the file simply did not exist.
+    if (!deleted && file.exists()) {
+      throw new IOException("Failed to delete: " + file.getAbsolutePath());
+    }
+  }
+
+  private static File[] listFilesSafely(File file) throws IOException {
+    if (file.exists()) {
+      File[] files = file.listFiles();
+      if (files == null) {
+        throw new IOException("Failed to list files for dir: " + file);
+      }
+      return files;
+    } else {
+      return new File[0];
+    }
+  }
+
+  private static boolean isSymlink(File file) throws IOException {
+    Preconditions.checkNotNull(file);
+    File fileInCanonicalDir = null;
+    if (file.getParent() == null) {
+      fileInCanonicalDir = file;
+    } else {
+      fileInCanonicalDir = new File(file.getParentFile().getCanonicalFile(), file.getName());
+    }
+    return !fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile());
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/48a19a6d/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
index cd3fea8..75ebf8c 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
@@ -94,9 +94,11 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
     return streamManager;
   }
 
-  /** For testing, clears all executors registered with "RegisterExecutor". */
-  @VisibleForTesting
-  public void clearRegisteredExecutors() {
-    blockManager.clearRegisteredExecutors();
+  /**
+   * Removes an application (once it has been terminated), and optionally will clean up any
+   * local directories associated with the executors of that application in a separate thread.
+   */
+  public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
+    blockManager.applicationRemoved(appId, cleanupLocalDirs);
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/48a19a6d/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java
index 6589889..98fcfb8 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java
@@ -21,9 +21,15 @@ import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+import com.google.common.collect.Maps;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,13 +49,22 @@ import org.apache.spark.network.util.JavaUtils;
 public class ExternalShuffleBlockManager {
   private final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockManager.class);
 
-  // Map from "appId-execId" to the executor's configuration.
-  private final ConcurrentHashMap<String, ExecutorShuffleInfo> executors =
-    new ConcurrentHashMap<String, ExecutorShuffleInfo>();
+  // Map containing all registered executors' metadata.
+  private final ConcurrentMap<AppExecId, ExecutorShuffleInfo> executors;
 
-  // Returns an id suitable for a single executor within a single application.
-  private String getAppExecId(String appId, String execId) {
-    return appId + "-" + execId;
+  // Single-threaded Java executor used to perform expensive recursive directory deletion.
+  private final Executor directoryCleaner;
+
+  public ExternalShuffleBlockManager() {
+    // TODO: Give this thread a name.
+    this(Executors.newSingleThreadExecutor());
+  }
+
+  // Allows tests to have more control over when directories are cleaned up.
+  @VisibleForTesting
+  ExternalShuffleBlockManager(Executor directoryCleaner) {
+    this.executors = Maps.newConcurrentMap();
+    this.directoryCleaner = directoryCleaner;
   }
 
   /** Registers a new Executor with all the configuration we need to find its shuffle files. */
@@ -57,7 +72,7 @@ public class ExternalShuffleBlockManager {
       String appId,
       String execId,
       ExecutorShuffleInfo executorInfo) {
-    String fullId = getAppExecId(appId, execId);
+    AppExecId fullId = new AppExecId(appId, execId);
     logger.info("Registered executor {} with {}", fullId, executorInfo);
     executors.put(fullId, executorInfo);
   }
@@ -78,7 +93,7 @@ public class ExternalShuffleBlockManager {
     int mapId = Integer.parseInt(blockIdParts[2]);
     int reduceId = Integer.parseInt(blockIdParts[3]);
 
-    ExecutorShuffleInfo executor = executors.get(getAppExecId(appId, execId));
+    ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
     if (executor == null) {
       throw new RuntimeException(
         String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
@@ -95,6 +110,56 @@ public class ExternalShuffleBlockManager {
   }
 
   /**
+   * Removes our metadata of all executors registered for the given application, and optionally
+   * also deletes the local directories associated with the executors of that application in a
+   * separate thread.
+   *
+   * It is not valid to call registerExecutor() for an executor with this appId after invoking
+   * this method.
+   */
+  public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
+    logger.info("Application {} removed, cleanupLocalDirs = {}", appId, cleanupLocalDirs);
+    Iterator<Map.Entry<AppExecId, ExecutorShuffleInfo>> it = executors.entrySet().iterator();
+    while (it.hasNext()) {
+      Map.Entry<AppExecId, ExecutorShuffleInfo> entry = it.next();
+      AppExecId fullId = entry.getKey();
+      final ExecutorShuffleInfo executor = entry.getValue();
+
+      // Only touch executors associated with the appId that was removed.
+      if (appId.equals(fullId.appId)) {
+        it.remove();
+
+        if (cleanupLocalDirs) {
+          logger.info("Cleaning up executor {}'s {} local dirs", fullId, executor.localDirs.length);
+
+          // Execute the actual deletion in a different thread, as it may take some time.
+          directoryCleaner.execute(new Runnable() {
+            @Override
+            public void run() {
+              deleteExecutorDirs(executor.localDirs);
+            }
+          });
+        }
+      }
+    }
+  }
+
+  /**
+   * Synchronously deletes each directory one at a time.
+   * Should be executed in its own thread, as this may take a long time.
+   */
+  private void deleteExecutorDirs(String[] dirs) {
+    for (String localDir : dirs) {
+      try {
+        JavaUtils.deleteRecursively(new File(localDir));
+        logger.debug("Successfully cleaned up directory: " + localDir);
+      } catch (Exception e) {
+        logger.error("Failed to delete directory: " + localDir, e);
+      }
+    }
+  }
+
+  /**
    * Hash-based shuffle data is simply stored as one file per block.
    * This logic is from FileShuffleBlockManager.
    */
@@ -146,9 +211,36 @@ public class ExternalShuffleBlockManager {
     return new File(new File(localDir, String.format("%02x", subDirId)), filename);
   }
 
-  /** For testing, clears all registered executors. */
-  @VisibleForTesting
-  void clearRegisteredExecutors() {
-    executors.clear();
+  /** Simply encodes an executor's full ID, which is appId + execId. */
+  private static class AppExecId {
+    final String appId;
+    final String execId;
+
+    private AppExecId(String appId, String execId) {
+      this.appId = appId;
+      this.execId = execId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      AppExecId appExecId = (AppExecId) o;
+      return Objects.equal(appId, appExecId.appId) && Objects.equal(execId, appExecId.execId);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(appId, execId);
+    }
+
+    @Override
+    public String toString() {
+      return Objects.toStringHelper(this)
+        .add("appId", appId)
+        .add("execId", execId)
+        .toString();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/48a19a6d/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
new file mode 100644
index 0000000..c8ece3b
--- /dev/null
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.util.concurrent.MoreExecutors;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ExternalShuffleCleanupSuite {
+
+  // Same-thread Executor used to ensure cleanup happens synchronously in test thread.
+  Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor();
+
+  @Test
+  public void noCleanupAndCleanup() throws IOException {
+    TestShuffleDataContext dataContext = createSomeData();
+
+    ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(sameThreadExecutor);
+    manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr"));
+    manager.applicationRemoved("app", false /* cleanup */);
+
+    assertStillThere(dataContext);
+
+    manager.registerExecutor("app", "exec1", dataContext.createExecutorInfo("shuffleMgr"));
+    manager.applicationRemoved("app", true /* cleanup */);
+
+    assertCleanedUp(dataContext);
+  }
+
+  @Test
+  public void cleanupUsesExecutor() throws IOException {
+    TestShuffleDataContext dataContext = createSomeData();
+
+    final AtomicBoolean cleanupCalled = new AtomicBoolean(false);
+
+    // Executor which does nothing to ensure we're actually using it.
+    Executor noThreadExecutor = new Executor() {
+      @Override public void execute(Runnable runnable) { cleanupCalled.set(true); }
+    };
+
+    ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(noThreadExecutor);
+
+    manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr"));
+    manager.applicationRemoved("app", true);
+
+    assertTrue(cleanupCalled.get());
+    assertStillThere(dataContext);
+
+    dataContext.cleanup();
+    assertCleanedUp(dataContext);
+  }
+
+  @Test
+  public void cleanupMultipleExecutors() throws IOException {
+    TestShuffleDataContext dataContext0 = createSomeData();
+    TestShuffleDataContext dataContext1 = createSomeData();
+
+    ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(sameThreadExecutor);
+
+    manager.registerExecutor("app", "exec0", dataContext0.createExecutorInfo("shuffleMgr"));
+    manager.registerExecutor("app", "exec1", dataContext1.createExecutorInfo("shuffleMgr"));
+    manager.applicationRemoved("app", true);
+
+    assertCleanedUp(dataContext0);
+    assertCleanedUp(dataContext1);
+  }
+
+  @Test
+  public void cleanupOnlyRemovedApp() throws IOException {
+    TestShuffleDataContext dataContext0 = createSomeData();
+    TestShuffleDataContext dataContext1 = createSomeData();
+
+    ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(sameThreadExecutor);
+
+    manager.registerExecutor("app-0", "exec0", dataContext0.createExecutorInfo("shuffleMgr"));
+    manager.registerExecutor("app-1", "exec0", dataContext1.createExecutorInfo("shuffleMgr"));
+
+    manager.applicationRemoved("app-nonexistent", true);
+    assertStillThere(dataContext0);
+    assertStillThere(dataContext1);
+
+    manager.applicationRemoved("app-0", true);
+    assertCleanedUp(dataContext0);
+    assertStillThere(dataContext1);
+
+    manager.applicationRemoved("app-1", true);
+    assertCleanedUp(dataContext0);
+    assertCleanedUp(dataContext1);
+
+    // Make sure it's not an error to cleanup multiple times
+    manager.applicationRemoved("app-1", true);
+    assertCleanedUp(dataContext0);
+    assertCleanedUp(dataContext1);
+  }
+
+  private void assertStillThere(TestShuffleDataContext dataContext) {
+    for (String localDir : dataContext.localDirs) {
+      assertTrue(localDir + " was cleaned up prematurely", new File(localDir).exists());
+    }
+  }
+
+  private void assertCleanedUp(TestShuffleDataContext dataContext) {
+    for (String localDir : dataContext.localDirs) {
+      assertFalse(localDir + " wasn't cleaned up", new File(localDir).exists());
+    }
+  }
+
+  private TestShuffleDataContext createSomeData() throws IOException {
+    Random rand = new Random(123);
+    TestShuffleDataContext dataContext = new TestShuffleDataContext(10, 5);
+
+    dataContext.create();
+    dataContext.insertSortShuffleData(rand.nextInt(1000), rand.nextInt(1000),
+      new byte[][] { "ABC".getBytes(), "DEF".getBytes() } );
+    dataContext.insertHashShuffleData(rand.nextInt(1000), rand.nextInt(1000) + 1000,
+      new byte[][] { "GHI".getBytes(), "JKLMNOPQRSTUVWXYZ".getBytes() } );
+    return dataContext;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/48a19a6d/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
index 06294fe..3bea5b0 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
@@ -105,7 +105,7 @@ public class ExternalShuffleIntegrationSuite {
 
   @After
   public void afterEach() {
-    handler.clearRegisteredExecutors();
+    handler.applicationRemoved(APP_ID, false /* cleanupLocalDirs */);
   }
 
   class FetchResult {

http://git-wip-us.apache.org/repos/asf/spark/blob/48a19a6d/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
index 442b756..337b5c7 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
@@ -30,8 +30,8 @@ import com.google.common.io.Files;
  * and cleanup of directories that can be read by the {@link ExternalShuffleBlockManager}.
  */
 public class TestShuffleDataContext {
-  private final String[] localDirs;
-  private final int subDirsPerLocalDir;
+  public final String[] localDirs;
+  public final int subDirsPerLocalDir;
 
   public TestShuffleDataContext(int numLocalDirs, int subDirsPerLocalDir) {
     this.localDirs = new String[numLocalDirs];


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org