You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2018/12/28 05:41:37 UTC

[GitHub] weixiuli closed pull request #23243: [SPARK-26288][CORE]add initRegisteredExecutorsDB

weixiuli closed pull request #23243: [SPARK-26288][CORE]add initRegisteredExecutorsDB
URL: https://github.com/apache/spark/pull/23243
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
index 788a845c57755..6a28668824e6a 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
@@ -17,23 +17,8 @@
 
 package org.apache.spark.network.shuffle;
 
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.Metric;
-import com.codahale.metrics.MetricSet;
-import com.codahale.metrics.Timer;
-import com.codahale.metrics.Counter;
+import com.codahale.metrics.*;
 import com.google.common.annotations.VisibleForTesting;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.spark.network.buffer.ManagedBuffer;
 import org.apache.spark.network.client.RpcResponseCallback;
 import org.apache.spark.network.client.TransportClient;
@@ -42,8 +27,18 @@
 import org.apache.spark.network.server.StreamManager;
 import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.AppExecId;
 import org.apache.spark.network.shuffle.protocol.*;
-import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
 import org.apache.spark.network.util.TransportConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
 
 /**
  * RPC Handler for a server which can serve shuffle blocks from outside of an Executor process.
@@ -66,6 +61,12 @@ public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFi
       new ExternalShuffleBlockResolver(conf, registeredExecutorFile));
   }
 
+
+  /** ForTesting */
+  public  ExternalShuffleBlockResolver getBlockResolver(){
+    return blockManager;
+  }
+
   /** Enables mocking out the StreamManager and BlockManager. */
   @VisibleForTesting
   public ExternalShuffleBlockHandler(
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
index 0b7a27402369d..68b3aeda62e0f 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
@@ -17,16 +17,6 @@
 
 package org.apache.spark.network.shuffle;
 
-import java.io.*;
-import java.nio.charset.StandardCharsets;
-import java.util.*;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -37,19 +27,33 @@
 import com.google.common.cache.LoadingCache;
 import com.google.common.cache.Weigher;
 import com.google.common.collect.Maps;
-import org.iq80.leveldb.DB;
-import org.iq80.leveldb.DBIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
 import org.apache.spark.network.buffer.ManagedBuffer;
 import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
+import org.apache.spark.network.util.JavaUtils;
 import org.apache.spark.network.util.LevelDBProvider;
 import org.apache.spark.network.util.LevelDBProvider.StoreVersion;
-import org.apache.spark.network.util.JavaUtils;
 import org.apache.spark.network.util.NettyUtils;
 import org.apache.spark.network.util.TransportConf;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.DBIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 /**
  * Manages converting shuffle BlockIds into physical segments of local files, from a process outside
@@ -321,6 +325,16 @@ void close() {
     }
   }
 
+  /**ForTesting**/
+  public void closeForTest() {
+    close();
+  }
+
+  /**ForTesting**/
+  public static File getFileForTest(String[] localDirs, int subDirsPerLocalDir, String filename) {
+    return getFile(localDirs, subDirsPerLocalDir, filename);
+  }
+
   /**
    * This method is needed to avoid the situation when multiple File instances for the
    * same pathname "foo/bar" are created, each with a separate copy of the "foo/bar" String.
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
index 03e3abb3ce569..60790ae9ef9be 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.deploy
 
+import java.io.{File, IOException}
 import java.util.concurrent.CountDownLatch
 
 import scala.collection.JavaConverters._
@@ -32,6 +33,7 @@ import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
 import org.apache.spark.network.util.TransportConf
 import org.apache.spark.util.{ShutdownHookManager, Utils}
 
+
 /**
  * Provides a server from which Executors can read shuffle files (rather than reading directly from
  * each other), to provide uninterrupted access to the files in the face of executors being turned
@@ -48,6 +50,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
   private val enabled = sparkConf.get(config.SHUFFLE_SERVICE_ENABLED)
   private val port = sparkConf.get(config.SHUFFLE_SERVICE_PORT)
 
+  private val registeredExecutorsDB = "registeredExecutors.ldb"
+
   private val transportConf =
     SparkTransportConf.fromSparkConf(sparkConf, "shuffle", numUsableCores = 0)
   private val blockHandler = newShuffleBlockHandler(transportConf)
@@ -56,11 +60,55 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
 
   private var server: TransportServer = _
 
+  private final val  MAX_DIR_CREATION_ATTEMPTS = 10
+
   private val shuffleServiceSource = new ExternalShuffleServiceSource
 
+  protected def createDirectory(root: String, name: String): File = {
+    var attempts = 0
+    val maxAttempts = MAX_DIR_CREATION_ATTEMPTS
+    var dir: File = null
+    while (dir == null) {
+      attempts += 1
+      if (attempts > maxAttempts) {
+        throw new IOException("Failed to create a temp directory (under " + root + ") after " +
+          maxAttempts + " attempts!")
+      }
+      try {
+        dir = new File(root, "registeredExecutors")
+        if (!dir.exists() && !dir.mkdirs()) {
+          dir = null
+        }
+      } catch { case e: SecurityException => dir = null; }
+    }
+    logInfo(s"registeredExecutorsDb path is ${dir.getAbsolutePath}")
+    new File(dir.getAbsolutePath, name)
+  }
+
+  protected def initRegisteredExecutorsDB(dbName: String): File = {
+    val localDirs = sparkConf.get("spark.local.dir", "").split(",")
+    if (localDirs.length >= 1 && !"".equals(localDirs(0))) {
+      createDirectory(localDirs(0), dbName)
+    }
+    else {
+      logWarning(s"'spark.local.dir' should be set first.")
+      null
+    }
+  }
+
+  /** ForTesting */
+  def getBlockHandler: ExternalShuffleBlockHandler = {
+    blockHandler
+  }
+
   /** Create a new shuffle block handler. Factored out for subclasses to override. */
   protected def newShuffleBlockHandler(conf: TransportConf): ExternalShuffleBlockHandler = {
-    new ExternalShuffleBlockHandler(conf, null)
+    if (sparkConf.get(config.SHUFFLE_SERVICE_DB_ENABLED) && enabled) {
+      new ExternalShuffleBlockHandler(conf, initRegisteredExecutorsDB(registeredExecutorsDB))
+    }
+    else {
+      new ExternalShuffleBlockHandler(conf, null)
+    }
   }
 
   /** Starts the external shuffle service if the user has configured us to. */
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index f1c1c034df49a..6633b82b116b7 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -187,6 +187,12 @@ package object config {
   private[spark] val SHUFFLE_SERVICE_ENABLED =
     ConfigBuilder("spark.shuffle.service.enabled").booleanConf.createWithDefault(false)
 
+  private[spark] val SHUFFLE_SERVICE_DB_ENABLED =
+    ConfigBuilder("spark.shuffle.service.db.enabled")
+      .doc("Whether use db in ExternalShuffleService.")
+      .booleanConf
+      .createWithDefault(true)
+
   private[spark] val SHUFFLE_SERVICE_PORT =
     ConfigBuilder("spark.shuffle.service.port").intConf.createWithDefault(7337)
 
diff --git a/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceDbSuite.scala b/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceDbSuite.scala
new file mode 100644
index 0000000000000..0002425dd3a91
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceDbSuite.scala
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import java.io._
+import java.nio.charset.StandardCharsets
+
+import com.google.common.io.{CharStreams, Closeables, Files}
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.{SecurityManager, ShuffleSuite, SparkConf, SparkException}
+import org.apache.spark.network.shuffle.{ExternalShuffleBlockHandler, ExternalShuffleBlockResolver}
+import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.Utils
+
+
+/**
+ * This suite gets BlockData when the ExternalShuffleService is restarted
+ * with #spark.shuffle.service.db.enabled = true or false
+ * Note that failures in this suite may arise when#spark.shuffle.service.db.enabled = false
+ */
+class ExternalShuffleServiceDbSuite extends ShuffleSuite with BeforeAndAfterAll {
+  val sortBlock0 = "Hello!"
+  val sortBlock1 = "World!"
+  val SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager"
+
+  var sparkConf: SparkConf = _
+  var dataContext: TestShuffleDataContext = _
+
+  var securityManager: SecurityManager = _
+  var externalShuffleService: ExternalShuffleService = _
+  var bockHandler: ExternalShuffleBlockHandler = _
+  var blockResolver: ExternalShuffleBlockResolver = _
+
+  override def beforeAll() {
+    super.beforeAll()
+    sparkConf = new SparkConf()
+    sparkConf.set("spark.shuffle.service.enabled", "true")
+    sparkConf.set("spark.local.dir", System.getProperty("java.io.tmpdir"))
+    Utils.loadDefaultSparkProperties(sparkConf, null)
+    securityManager = new SecurityManager(sparkConf)
+
+    dataContext = new TestShuffleDataContext(2, 5)
+    dataContext.create()
+    // Write some sort data.
+    dataContext.insertSortShuffleData(0, 0,
+      Array[Array[Byte]](sortBlock0.getBytes(StandardCharsets.UTF_8),
+        sortBlock1.getBytes(StandardCharsets.UTF_8)))
+    registerExecutor()
+  }
+
+  override def afterAll() {
+    try {
+      dataContext.cleanup()
+    } finally {
+      super.afterAll()
+    }
+  }
+
+  def registerExecutor(): Unit = {
+    sparkConf.set("spark.shuffle.service.db.enabled", "true")
+    externalShuffleService = new ExternalShuffleService(sparkConf, securityManager)
+
+    // external Shuffle Service start
+    externalShuffleService.start()
+    bockHandler = externalShuffleService.getBlockHandler
+    blockResolver = bockHandler.getBlockResolver
+    blockResolver.registerExecutor("app0", "exec0", dataContext.createExecutorInfo(SORT_MANAGER))
+    blockResolver.closeForTest()
+    // external Shuffle Service stop
+    externalShuffleService.stop()
+  }
+
+  // This test getBlockData will be passed when the external shuffle service is restarted.
+  test("restart External Shuffle Service With InitRegisteredExecutorsDB") {
+    sparkConf.set("spark.shuffle.service.db.enabled", "true")
+    externalShuffleService = new ExternalShuffleService(sparkConf, securityManager)
+    // externalShuffleService restart
+    externalShuffleService.start()
+    bockHandler = externalShuffleService.getBlockHandler
+    blockResolver = bockHandler.getBlockResolver
+
+    val block0Stream = blockResolver.getBlockData("app0", "exec0", 0, 0, 0).createInputStream
+    val block0 = CharStreams.toString(new InputStreamReader(block0Stream, StandardCharsets.UTF_8))
+    block0Stream.close()
+    assert(sortBlock0 == block0)
+    // pass
+    blockResolver.closeForTest()
+    // externalShuffleService stop
+    externalShuffleService.stop()
+
+  }
+
+  // This test getBlockData will't be passed when the external shuffle service is restarted.
+  test("restart External Shuffle Service Without InitRegisteredExecutorsDB") {
+    sparkConf.set("spark.shuffle.service.db.enabled", "false")
+    externalShuffleService = new ExternalShuffleService(sparkConf, securityManager)
+    // externalShuffleService restart
+    externalShuffleService.start()
+    bockHandler = externalShuffleService.getBlockHandler
+    blockResolver = bockHandler.getBlockResolver
+
+    val error = intercept[RuntimeException] {
+      val block0Stream = blockResolver.getBlockData("app0", "exec0", 0, 0, 0).createInputStream
+      val block0 = CharStreams.toString(new InputStreamReader(block0Stream, StandardCharsets.UTF_8))
+      block0Stream.close()
+      assert(sortBlock0 == block0)
+    }.getMessage
+
+    assert(error.contains("not registered"))
+    blockResolver.closeForTest()
+    // externalShuffleService stop
+    externalShuffleService.stop()
+  }
+
+  /**
+   * Manages some sort-shuffle data, including the creation
+   * and cleanup of directories that can be read by the
+   *
+   * Copy from org.apache.spark.network.shuffle.TestShuffleDataContext
+   */
+  class TestShuffleDataContext(val numLocalDirs: Int, val subDirsPerLocalDir: Int) {
+    val localDirs: Array[String] = new Array[String](numLocalDirs)
+
+    def create(): Unit = {
+      for (i <- 0 to numLocalDirs - 1) {
+        localDirs(i) = Files.createTempDir().getAbsolutePath()
+        for (p <- 0 to subDirsPerLocalDir - 1) {
+          new File(localDirs(i), "%02x".format(p)).mkdirs()
+        }
+      }
+    }
+
+    def cleanup(): Unit = {
+      for (i <- 0 to numLocalDirs - 1) {
+        try {
+          JavaUtils.deleteRecursively(new File(localDirs(i)))
+        }
+        catch {
+          case e: IOException =>
+            logError("Unable to cleanup localDir = " + localDirs(i), e)
+        }
+      }
+    }
+
+    // Creates reducer blocks in a sort-based data format within our local dirs.
+    def insertSortShuffleData(shuffleId: Int, mapId: Int, blocks: Array[Array[Byte]]): Unit = {
+      val blockId = "shuffle_" + shuffleId + "_" + mapId + "_0"
+      var dataStream: FileOutputStream = null
+      var indexStream: DataOutputStream = null
+      var suppressExceptionsDuringClose = true
+      try {
+        dataStream = new FileOutputStream(ExternalShuffleBlockResolver.getFileForTest(localDirs,
+          subDirsPerLocalDir, blockId + ".data"))
+        indexStream = new DataOutputStream(new FileOutputStream(ExternalShuffleBlockResolver
+          .getFileForTest(localDirs, subDirsPerLocalDir, blockId + ".index")))
+        var offset = 0
+        indexStream.writeLong(offset)
+        for (block <- blocks) {
+          offset += block.length
+          dataStream.write(block)
+          indexStream.writeLong(offset)
+        }
+        suppressExceptionsDuringClose = false
+      } finally {
+        Closeables.close(dataStream, suppressExceptionsDuringClose)
+        Closeables.close(indexStream, suppressExceptionsDuringClose)
+      }
+    }
+
+    // Creates an ExecutorShuffleInfo object based on the given shuffle manager
+    // which targets this context's directories.
+    def createExecutorInfo(shuffleManager: String): ExecutorShuffleInfo = {
+      new ExecutorShuffleInfo(localDirs, subDirsPerLocalDir, shuffleManager)
+    }
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org