You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ad...@apache.org on 2014/03/07 19:22:36 UTC
git commit: SPARK-1136: Fix FaultToleranceTest for Docker 0.8.1
Repository: spark
Updated Branches:
refs/heads/master 33baf14b0 -> dabeb6f16
SPARK-1136: Fix FaultToleranceTest for Docker 0.8.1
This patch allows the FaultToleranceTest to work in newer versions of Docker.
See https://spark-project.atlassian.net/browse/SPARK-1136 for more details.
Besides changing the Docker and FaultToleranceTest internals, this patch also changes the behavior of Master to accept new Workers which share an address with a Worker that we are currently trying to recover. This can only happen when the Worker itself was restarted and got the same IP address/port at the same time as a Master recovery occurs.
Finally, this adds a good bit of ASCII art to the test to make failures, successes, and actions more apparent. This is very much needed.
Author: Aaron Davidson <aa...@databricks.com>
Closes #5 from aarondav/zookeeper and squashes the following commits:
5d7a72a [Aaron Davidson] SPARK-1136: Fix FaultToleranceTest for Docker 0.8.1
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dabeb6f1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dabeb6f1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dabeb6f1
Branch: refs/heads/master
Commit: dabeb6f160f7ad7df1c54b1b8b069700dd4b74dd
Parents: 33baf14
Author: Aaron Davidson <aa...@databricks.com>
Authored: Fri Mar 7 10:22:27 2014 -0800
Committer: Aaron Davidson <aa...@databricks.com>
Committed: Fri Mar 7 10:22:27 2014 -0800
----------------------------------------------------------------------
.../spark/deploy/FaultToleranceTest.scala | 41 +++++++++++++++++---
.../org/apache/spark/deploy/master/Master.scala | 11 +++++-
.../spark/deploy/master/SparkCuratorUtil.scala | 13 ++++++-
docker/README.md | 4 +-
docker/spark-test/master/default_cmd | 8 +++-
docker/spark-test/worker/default_cmd | 8 +++-
6 files changed, 73 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/dabeb6f1/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
index d48c189..f4eb160 100644
--- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
@@ -30,20 +30,24 @@ import scala.sys.process._
import org.json4s._
import org.json4s.jackson.JsonMethods
-import org.apache.spark.{Logging, SparkContext}
-import org.apache.spark.deploy.master.RecoveryState
+import org.apache.spark.{Logging, SparkConf, SparkContext}
+import org.apache.spark.deploy.master.{RecoveryState, SparkCuratorUtil}
/**
* This suite tests the fault tolerance of the Spark standalone scheduler, mainly the Master.
* In order to mimic a real distributed cluster more closely, Docker is used.
* Execute using
- * ./spark-class org.apache.spark.deploy.FaultToleranceTest
+ * ./bin/spark-class org.apache.spark.deploy.FaultToleranceTest
*
- * Make sure that that the environment includes the following properties in SPARK_DAEMON_JAVA_OPTS:
+ * Make sure that that the environment includes the following properties in SPARK_DAEMON_JAVA_OPTS
+ * *and* SPARK_JAVA_OPTS:
* - spark.deploy.recoveryMode=ZOOKEEPER
* - spark.deploy.zookeeper.url=172.17.42.1:2181
* Note that 172.17.42.1 is the default docker ip for the host and 2181 is the default ZK port.
*
+ * In case of failure, make sure to kill off prior docker containers before restarting:
+ * docker kill $(docker ps -q)
+ *
* Unfortunately, due to the Docker dependency this suite cannot be run automatically without a
* working installation of Docker. In addition to having Docker, the following are assumed:
* - Docker can run without sudo (see http://docs.docker.io/en/latest/use/basics/)
@@ -51,10 +55,16 @@ import org.apache.spark.deploy.master.RecoveryState
* docker/ directory. Run 'docker/spark-test/build' to generate these.
*/
private[spark] object FaultToleranceTest extends App with Logging {
+
+ val conf = new SparkConf()
+ val ZK_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark")
+
val masters = ListBuffer[TestMasterInfo]()
val workers = ListBuffer[TestWorkerInfo]()
var sc: SparkContext = _
+ val zk = SparkCuratorUtil.newClient(conf)
+
var numPassed = 0
var numFailed = 0
@@ -72,6 +82,10 @@ private[spark] object FaultToleranceTest extends App with Logging {
sc = null
}
terminateCluster()
+
+ // Clear ZK directories in between tests (for speed purposes)
+ SparkCuratorUtil.deleteRecursive(zk, ZK_DIR + "/spark_leader")
+ SparkCuratorUtil.deleteRecursive(zk, ZK_DIR + "/master_status")
}
test("sanity-basic") {
@@ -168,26 +182,34 @@ private[spark] object FaultToleranceTest extends App with Logging {
try {
fn
numPassed += 1
+ logInfo("==============================================")
logInfo("Passed: " + name)
+ logInfo("==============================================")
} catch {
case e: Exception =>
numFailed += 1
+ logInfo("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
logError("FAILED: " + name, e)
+ logInfo("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
+ sys.exit(1)
}
afterEach()
}
def addMasters(num: Int) {
+ logInfo(s">>>>> ADD MASTERS $num <<<<<")
(1 to num).foreach { _ => masters += SparkDocker.startMaster(dockerMountDir) }
}
def addWorkers(num: Int) {
+ logInfo(s">>>>> ADD WORKERS $num <<<<<")
val masterUrls = getMasterUrls(masters)
(1 to num).foreach { _ => workers += SparkDocker.startWorker(dockerMountDir, masterUrls) }
}
/** Creates a SparkContext, which constructs a Client to interact with our cluster. */
def createClient() = {
+ logInfo(">>>>> CREATE CLIENT <<<<<")
if (sc != null) { sc.stop() }
// Counter-hack: Because of a hack in SparkEnv#create() that changes this
// property, we need to reset it.
@@ -206,6 +228,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
}
def killLeader(): Unit = {
+ logInfo(">>>>> KILL LEADER <<<<<")
masters.foreach(_.readState())
val leader = getLeader
masters -= leader
@@ -215,6 +238,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
def delay(secs: Duration = 5.seconds) = Thread.sleep(secs.toMillis)
def terminateCluster() {
+ logInfo(">>>>> TERMINATE CLUSTER <<<<<")
masters.foreach(_.kill())
workers.foreach(_.kill())
masters.clear()
@@ -245,6 +269,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
* are all alive in a proper configuration (e.g., only one leader).
*/
def assertValidClusterState() = {
+ logInfo(">>>>> ASSERT VALID CLUSTER STATE <<<<<")
assertUsable()
var numAlive = 0
var numStandby = 0
@@ -326,7 +351,11 @@ private[spark] class TestMasterInfo(val ip: String, val dockerId: DockerId, val
val workers = json \ "workers"
val liveWorkers = workers.children.filter(w => (w \ "state").extract[String] == "ALIVE")
- liveWorkerIPs = liveWorkers.map(w => (w \ "host").extract[String])
+ // Extract the worker IP from "webuiaddress" (rather than "host") because the host name
+ // on containers is a weird hash instead of the actual IP address.
+ liveWorkerIPs = liveWorkers.map {
+ w => (w \ "webuiaddress").extract[String].stripPrefix("http://").stripSuffix(":8081")
+ }
numLiveApps = (json \ "activeapps").children.size
@@ -403,7 +432,7 @@ private[spark] object Docker extends Logging {
def makeRunCmd(imageTag: String, args: String = "", mountDir: String = ""): ProcessBuilder = {
val mountCmd = if (mountDir != "") { " -v " + mountDir } else ""
- val cmd = "docker run %s %s %s".format(mountCmd, imageTag, args)
+ val cmd = "docker run -privileged %s %s %s".format(mountCmd, imageTag, args)
logDebug("Run command: " + cmd)
cmd
}
http://git-wip-us.apache.org/repos/asf/spark/blob/dabeb6f1/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 2d6d0c3..b8dfa44 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -531,8 +531,15 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int,
val workerAddress = worker.actor.path.address
if (addressToWorker.contains(workerAddress)) {
- logInfo("Attempted to re-register worker at same address: " + workerAddress)
- return false
+ val oldWorker = addressToWorker(workerAddress)
+ if (oldWorker.state == WorkerState.UNKNOWN) {
+ // A worker registering from UNKNOWN implies that the worker was restarted during recovery.
+ // The old worker must thus be dead, so we will remove it and accept the new worker.
+ removeWorker(oldWorker)
+ } else {
+ logInfo("Attempted to re-register worker at same address: " + workerAddress)
+ return false
+ }
}
workers += worker
http://git-wip-us.apache.org/repos/asf/spark/blob/dabeb6f1/core/src/main/scala/org/apache/spark/deploy/master/SparkCuratorUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/SparkCuratorUtil.scala b/core/src/main/scala/org/apache/spark/deploy/master/SparkCuratorUtil.scala
index 2d35397..4781a80 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/SparkCuratorUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/SparkCuratorUtil.scala
@@ -17,11 +17,13 @@
package org.apache.spark.deploy.master
-import org.apache.spark.{SparkConf, Logging}
+import scala.collection.JavaConversions._
+
import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.zookeeper.KeeperException
+import org.apache.spark.{Logging, SparkConf}
object SparkCuratorUtil extends Logging {
@@ -50,4 +52,13 @@ object SparkCuratorUtil extends Logging {
}
}
}
+
+ def deleteRecursive(zk: CuratorFramework, path: String) {
+ if (zk.checkExists().forPath(path) != null) {
+ for (child <- zk.getChildren.forPath(path)) {
+ zk.delete().forPath(path + "/" + child)
+ }
+ zk.delete().forPath(path)
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/dabeb6f1/docker/README.md
----------------------------------------------------------------------
diff --git a/docker/README.md b/docker/README.md
index bf59e77..40ba9c3 100644
--- a/docker/README.md
+++ b/docker/README.md
@@ -2,4 +2,6 @@ Spark docker files
===========
Drawn from Matt Massie's docker files (https://github.com/massie/dockerfiles),
-as well as some updates from Andre Schumacher (https://github.com/AndreSchumacher/docker).
\ No newline at end of file
+as well as some updates from Andre Schumacher (https://github.com/AndreSchumacher/docker).
+
+Tested with Docker version 0.8.1.
http://git-wip-us.apache.org/repos/asf/spark/blob/dabeb6f1/docker/spark-test/master/default_cmd
----------------------------------------------------------------------
diff --git a/docker/spark-test/master/default_cmd b/docker/spark-test/master/default_cmd
index a5b1303..5a7da34 100755
--- a/docker/spark-test/master/default_cmd
+++ b/docker/spark-test/master/default_cmd
@@ -19,4 +19,10 @@
IP=$(ip -o -4 addr list eth0 | perl -n -e 'if (m{inet\s([\d\.]+)\/\d+\s}xms) { print $1 }')
echo "CONTAINER_IP=$IP"
-/opt/spark/spark-class org.apache.spark.deploy.master.Master -i $IP
+export SPARK_LOCAL_IP=$IP
+export SPARK_PUBLIC_DNS=$IP
+
+# Avoid the default Docker behavior of mapping our IP address to an unreachable host name
+umount /etc/hosts
+
+/opt/spark/bin/spark-class org.apache.spark.deploy.master.Master -i $IP
http://git-wip-us.apache.org/repos/asf/spark/blob/dabeb6f1/docker/spark-test/worker/default_cmd
----------------------------------------------------------------------
diff --git a/docker/spark-test/worker/default_cmd b/docker/spark-test/worker/default_cmd
index ab6336f..31b06cb 100755
--- a/docker/spark-test/worker/default_cmd
+++ b/docker/spark-test/worker/default_cmd
@@ -19,4 +19,10 @@
IP=$(ip -o -4 addr list eth0 | perl -n -e 'if (m{inet\s([\d\.]+)\/\d+\s}xms) { print $1 }')
echo "CONTAINER_IP=$IP"
-/opt/spark/spark-class org.apache.spark.deploy.worker.Worker $1
+export SPARK_LOCAL_IP=$IP
+export SPARK_PUBLIC_DNS=$IP
+
+# Avoid the default Docker behavior of mapping our IP address to an unreachable host name
+umount /etc/hosts
+
+/opt/spark/bin/spark-class org.apache.spark.deploy.worker.Worker $1