You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ad...@apache.org on 2014/02/25 08:20:52 UTC

git commit: For SPARK-1082, Use Curator for ZK interaction in standalone cluster

Repository: spark
Updated Branches:
  refs/heads/master 1f4c7f7ec -> c852201ce


For SPARK-1082, Use Curator for ZK interaction in standalone cluster

Author: Raymond Liu <ra...@intel.com>

Closes #611 from colorant/curator and squashes the following commits:

7556aa1 [Raymond Liu] Address review comments
af92e1f [Raymond Liu] Fix coding style
964f3c2 [Raymond Liu] Ignore NodeExists exception
6df2966 [Raymond Liu] Rewrite zookeeper client code with curator


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c852201c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c852201c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c852201c

Branch: refs/heads/master
Commit: c852201ce95c7c982ff3794c114427eb33e92922
Parents: 1f4c7f7
Author: Raymond Liu <ra...@intel.com>
Authored: Mon Feb 24 23:20:38 2014 -0800
Committer: Aaron Davidson <aa...@databricks.com>
Committed: Mon Feb 24 23:20:38 2014 -0800

----------------------------------------------------------------------
 core/pom.xml                                    |   4 +-
 .../deploy/master/LeaderElectionAgent.scala     |   1 +
 .../spark/deploy/master/MasterMessages.scala    |   4 -
 .../spark/deploy/master/SparkCuratorUtil.scala  |  53 +++++
 .../deploy/master/SparkZooKeeperSession.scala   | 205 -------------------
 .../master/ZooKeeperLeaderElectionAgent.scala   |  94 +++------
 .../master/ZooKeeperPersistenceEngine.scala     |  30 +--
 pom.xml                                         |   6 +-
 project/SparkBuild.scala                        |   2 +-
 9 files changed, 99 insertions(+), 300 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c852201c/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index 5576b0c..f209704 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -55,8 +55,8 @@
             <artifactId>avro-ipc</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.apache.zookeeper</groupId>
-            <artifactId>zookeeper</artifactId>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-recipes</artifactId>
         </dependency>
         <dependency>
             <groupId>org.eclipse.jetty</groupId>

http://git-wip-us.apache.org/repos/asf/spark/blob/c852201c/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala
index f25a1ad..a730fe1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala
@@ -30,6 +30,7 @@ import org.apache.spark.deploy.master.MasterMessages.ElectedLeader
  * [[org.apache.spark.deploy.master.MasterMessages.RevokedLeadership RevokedLeadership]]
  */
 private[spark] trait LeaderElectionAgent extends Actor {
+  //TODO: LeaderElectionAgent does not necessary to be an Actor anymore, need refactoring.
   val masterActor: ActorRef
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c852201c/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
index 74a9f8c..db72d8a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
@@ -28,10 +28,6 @@ private[master] object MasterMessages {
 
   case object RevokedLeadership
 
-  // Actor System to LeaderElectionAgent
-
-  case object CheckLeader
-
   // Actor System to Master
 
   case object CheckForWorkerTimeOut

http://git-wip-us.apache.org/repos/asf/spark/blob/c852201c/core/src/main/scala/org/apache/spark/deploy/master/SparkCuratorUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/SparkCuratorUtil.scala b/core/src/main/scala/org/apache/spark/deploy/master/SparkCuratorUtil.scala
new file mode 100644
index 0000000..2d35397
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/SparkCuratorUtil.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import org.apache.spark.{SparkConf, Logging}
+import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
+import org.apache.curator.retry.ExponentialBackoffRetry
+import org.apache.zookeeper.KeeperException
+
+
+object SparkCuratorUtil extends Logging {
+
+  val ZK_CONNECTION_TIMEOUT_MILLIS = 15000
+  val ZK_SESSION_TIMEOUT_MILLIS = 60000
+  val RETRY_WAIT_MILLIS = 5000
+  val MAX_RECONNECT_ATTEMPTS = 3
+
+  def newClient(conf: SparkConf): CuratorFramework = {
+    val ZK_URL = conf.get("spark.deploy.zookeeper.url")
+    val zk = CuratorFrameworkFactory.newClient(ZK_URL,
+      ZK_SESSION_TIMEOUT_MILLIS, ZK_CONNECTION_TIMEOUT_MILLIS,
+      new ExponentialBackoffRetry(RETRY_WAIT_MILLIS, MAX_RECONNECT_ATTEMPTS))
+    zk.start()
+    zk
+  }
+
+  def mkdir(zk: CuratorFramework, path: String) {
+    if (zk.checkExists().forPath(path) == null) {
+      try {
+        zk.create().creatingParentsIfNeeded().forPath(path)
+      } catch {
+        case nodeExist: KeeperException.NodeExistsException =>
+          // do nothing, ignore node existing exception.
+        case e: Exception => throw e
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c852201c/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
deleted file mode 100644
index 5775805..0000000
--- a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.master
-
-import scala.collection.JavaConversions._
-
-import org.apache.zookeeper._
-import org.apache.zookeeper.Watcher.Event.KeeperState
-import org.apache.zookeeper.data.Stat
-
-import org.apache.spark.{Logging, SparkConf}
-
-/**
- * Provides a Scala-side interface to the standard ZooKeeper client, with the addition of retry
- * logic. If the ZooKeeper session expires or otherwise dies, a new ZooKeeper session will be
- * created. If ZooKeeper remains down after several retries, the given
- * [[org.apache.spark.deploy.master.SparkZooKeeperWatcher SparkZooKeeperWatcher]] will be
- * informed via zkDown().
- *
- * Additionally, all commands sent to ZooKeeper will be retried until they either fail too many
- * times or a semantic exception is thrown (e.g., "node already exists").
- */
-private[spark] class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher,
-    conf: SparkConf) extends Logging {
-  val ZK_URL = conf.get("spark.deploy.zookeeper.url", "")
-
-  val ZK_ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE
-  val ZK_TIMEOUT_MILLIS = 30000
-  val RETRY_WAIT_MILLIS = 5000
-  val ZK_CHECK_PERIOD_MILLIS = 10000
-  val MAX_RECONNECT_ATTEMPTS = 3
-
-  private var zk: ZooKeeper = _
-
-  private val watcher = new ZooKeeperWatcher()
-  private var reconnectAttempts = 0
-  private var closed = false
-
-  /** Connect to ZooKeeper to start the session. Must be called before anything else. */
-  def connect() {
-    connectToZooKeeper()
-
-    new Thread() {
-      override def run() = sessionMonitorThread()
-    }.start()
-  }
-
-  def sessionMonitorThread(): Unit = {
-    while (!closed) {
-      Thread.sleep(ZK_CHECK_PERIOD_MILLIS)
-      if (zk.getState != ZooKeeper.States.CONNECTED) {
-        reconnectAttempts += 1
-        val attemptsLeft = MAX_RECONNECT_ATTEMPTS - reconnectAttempts
-        if (attemptsLeft <= 0) {
-          logError("Could not connect to ZooKeeper: system failure")
-          zkWatcher.zkDown()
-          close()
-        } else {
-          logWarning("ZooKeeper connection failed, retrying " + attemptsLeft + " more times...")
-          connectToZooKeeper()
-        }
-      }
-    }
-  }
-
-  def close() {
-    if (!closed && zk != null) { zk.close() }
-    closed = true
-  }
-
-  private def connectToZooKeeper() {
-    if (zk != null) zk.close()
-    zk = new ZooKeeper(ZK_URL, ZK_TIMEOUT_MILLIS, watcher)
-  }
-
-  /**
-   * Attempts to maintain a live ZooKeeper exception despite (very) transient failures.
-   * Mainly useful for handling the natural ZooKeeper session expiration.
-   */
-  private class ZooKeeperWatcher extends Watcher {
-    def process(event: WatchedEvent) {
-      if (closed) { return }
-
-      event.getState match {
-        case KeeperState.SyncConnected =>
-          reconnectAttempts = 0
-          zkWatcher.zkSessionCreated()
-        case KeeperState.Expired =>
-          connectToZooKeeper()
-        case KeeperState.Disconnected =>
-          logWarning("ZooKeeper disconnected, will retry...")
-        case s => // Do nothing
-      }
-    }
-  }
-
-  def create(path: String, bytes: Array[Byte], createMode: CreateMode): String = {
-    retry {
-      zk.create(path, bytes, ZK_ACL, createMode)
-    }
-  }
-
-  def exists(path: String, watcher: Watcher = null): Stat = {
-    retry {
-      zk.exists(path, watcher)
-    }
-  }
-
-  def getChildren(path: String, watcher: Watcher = null): List[String] = {
-    retry {
-      zk.getChildren(path, watcher).toList
-    }
-  }
-
-  def getData(path: String): Array[Byte] = {
-    retry {
-      zk.getData(path, false, null)
-    }
-  }
-
-  def delete(path: String, version: Int = -1): Unit = {
-    retry {
-      zk.delete(path, version)
-    }
-  }
-
-  /**
-   * Creates the given directory (non-recursively) if it doesn't exist.
-   * All znodes are created in PERSISTENT mode with no data.
-   */
-  def mkdir(path: String) {
-    if (exists(path) == null) {
-      try {
-        create(path, "".getBytes, CreateMode.PERSISTENT)
-      } catch {
-        case e: Exception =>
-          // If the exception caused the directory not to be created, bubble it up,
-          // otherwise ignore it.
-          if (exists(path) == null) { throw e }
-      }
-    }
-  }
-
-  /**
-   * Recursively creates all directories up to the given one.
-   * All znodes are created in PERSISTENT mode with no data.
-   */
-  def mkdirRecursive(path: String) {
-    var fullDir = ""
-    for (dentry <- path.split("/").tail) {
-      fullDir += "/" + dentry
-      mkdir(fullDir)
-    }
-  }
-
-  /**
-   * Retries the given function up to 3 times. The assumption is that failure is transient,
-   * UNLESS it is a semantic exception (i.e., trying to get data from a node that doesn't exist),
-   * in which case the exception will be thrown without retries.
-   *
-   * @param fn Block to execute, possibly multiple times.
-   */
-  def retry[T](fn: => T, n: Int = MAX_RECONNECT_ATTEMPTS): T = {
-    try {
-      fn
-    } catch {
-      case e: KeeperException.NoNodeException => throw e
-      case e: KeeperException.NodeExistsException => throw e
-      case e: Exception if n > 0 =>
-        logError("ZooKeeper exception, " + n + " more retries...", e)
-        Thread.sleep(RETRY_WAIT_MILLIS)
-        retry(fn, n-1)
-    }
-  }
-}
-
-trait SparkZooKeeperWatcher {
-  /**
-   * Called whenever a ZK session is created --
-   * this will occur when we create our first session as well as each time
-   * the session expires or errors out.
-   */
-  def zkSessionCreated()
-
-  /**
-   * Called if ZK appears to be completely down (i.e., not just a transient error).
-   * We will no longer attempt to reconnect to ZK, and the SparkZooKeeperSession is considered dead.
-   */
-  def zkDown()
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/c852201c/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
index 47b8f67..285f9b0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
@@ -18,105 +18,67 @@
 package org.apache.spark.deploy.master
 
 import akka.actor.ActorRef
-import org.apache.zookeeper._
-import org.apache.zookeeper.Watcher.Event.EventType
 
 import org.apache.spark.{Logging, SparkConf}
 import org.apache.spark.deploy.master.MasterMessages._
+import org.apache.curator.framework.CuratorFramework
+import org.apache.curator.framework.recipes.leader.{LeaderLatchListener, LeaderLatch}
 
 private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef,
     masterUrl: String, conf: SparkConf)
-  extends LeaderElectionAgent with SparkZooKeeperWatcher with Logging  {
+  extends LeaderElectionAgent with LeaderLatchListener with Logging  {
 
   val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
 
-  private val watcher = new ZooKeeperWatcher()
-  private val zk = new SparkZooKeeperSession(this, conf)
+  private var zk: CuratorFramework = _
+  private var leaderLatch: LeaderLatch = _
   private var status = LeadershipStatus.NOT_LEADER
-  private var myLeaderFile: String = _
-  private var leaderUrl: String = _
 
   override def preStart() {
+
     logInfo("Starting ZooKeeper LeaderElection agent")
-    zk.connect()
-  }
+    zk = SparkCuratorUtil.newClient(conf)
+    leaderLatch = new LeaderLatch(zk, WORKING_DIR)
+    leaderLatch.addListener(this)
 
-  override def zkSessionCreated() {
-    synchronized {
-      zk.mkdirRecursive(WORKING_DIR)
-      myLeaderFile =
-        zk.create(WORKING_DIR + "/master_", masterUrl.getBytes, CreateMode.EPHEMERAL_SEQUENTIAL)
-      self ! CheckLeader
-    }
+    leaderLatch.start()
   }
 
   override def preRestart(reason: scala.Throwable, message: scala.Option[scala.Any]) {
-    logError("LeaderElectionAgent failed, waiting " + zk.ZK_TIMEOUT_MILLIS + "...", reason)
-    Thread.sleep(zk.ZK_TIMEOUT_MILLIS)
+    logError("LeaderElectionAgent failed...", reason)
     super.preRestart(reason, message)
   }
 
-  override def zkDown() {
-    logError("ZooKeeper down! LeaderElectionAgent shutting down Master.")
-    System.exit(1)
-  }
-
   override def postStop() {
+    leaderLatch.close()
     zk.close()
   }
 
   override def receive = {
-    case CheckLeader => checkLeader()
+    case _ =>
   }
 
-  private class ZooKeeperWatcher extends Watcher {
-    def process(event: WatchedEvent) {
-      if (event.getType == EventType.NodeDeleted) {
-        logInfo("Leader file disappeared, a master is down!")
-        self ! CheckLeader
+  override def isLeader() {
+    synchronized {
+      // could have lost leadership by now.
+      if (!leaderLatch.hasLeadership) {
+        return
       }
-    }
-  }
 
-  /** Uses ZK leader election. Navigates several ZK potholes along the way. */
-  def checkLeader() {
-    val masters = zk.getChildren(WORKING_DIR).toList
-    val leader = masters.sorted.head
-    val leaderFile = WORKING_DIR + "/" + leader
-
-    // Setup a watch for the current leader.
-    zk.exists(leaderFile, watcher)
-
-    try {
-      leaderUrl = new String(zk.getData(leaderFile))
-    } catch {
-      // A NoNodeException may be thrown if old leader died since the start of this method call.
-      // This is fine -- just check again, since we're guaranteed to see the new values.
-      case e: KeeperException.NoNodeException =>
-        logInfo("Leader disappeared while reading it -- finding next leader")
-        checkLeader()
-        return
+      logInfo("We have gained leadership")
+      updateLeadershipStatus(true)
     }
+  }
 
-    // Synchronization used to ensure no interleaving between the creation of a new session and the
-    // checking of a leader, which could cause us to delete our real leader file erroneously.
+  override def notLeader() {
     synchronized {
-      val isLeader = myLeaderFile == leaderFile
-      if (!isLeader && leaderUrl == masterUrl) {
-        // We found a different master file pointing to this process.
-        // This can happen in the following two cases:
-        // (1) The master process was restarted on the same node.
-        // (2) The ZK server died between creating the file and returning the name of the file.
-        //     For this case, we will end up creating a second file, and MUST explicitly delete the
-        //     first one, since our ZK session is still open.
-        // Note that this deletion will cause a NodeDeleted event to be fired so we check again for
-        // leader changes.
-        assert(leaderFile < myLeaderFile)
-        logWarning("Cleaning up old ZK master election file that points to this master.")
-        zk.delete(leaderFile)
-      } else {
-        updateLeadershipStatus(isLeader)
+      // could have gained leadership by now.
+      if (leaderLatch.hasLeadership) {
+        return
       }
+
+      logInfo("We have lost leadership")
+      updateLeadershipStatus(false)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c852201c/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
index 48b2fc0..9390062 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
@@ -17,36 +17,28 @@
 
 package org.apache.spark.deploy.master
 
+import scala.collection.JavaConversions._
+
 import akka.serialization.Serialization
-import org.apache.zookeeper._
+import org.apache.zookeeper.CreateMode
 
 import org.apache.spark.{Logging, SparkConf}
 
 class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)
   extends PersistenceEngine
-  with SparkZooKeeperWatcher
   with Logging
 {
   val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
+  val zk = SparkCuratorUtil.newClient(conf)
 
-  val zk = new SparkZooKeeperSession(this, conf)
-
-  zk.connect()
-
-  override def zkSessionCreated() {
-    zk.mkdirRecursive(WORKING_DIR)
-  }
-
-  override def zkDown() {
-    logError("PersistenceEngine disconnected from ZooKeeper -- ZK looks down.")
-  }
+  SparkCuratorUtil.mkdir(zk, WORKING_DIR)
 
   override def addApplication(app: ApplicationInfo) {
     serializeIntoFile(WORKING_DIR + "/app_" + app.id, app)
   }
 
   override def removeApplication(app: ApplicationInfo) {
-    zk.delete(WORKING_DIR + "/app_" + app.id)
+    zk.delete().forPath(WORKING_DIR + "/app_" + app.id)
   }
 
   override def addDriver(driver: DriverInfo) {
@@ -54,7 +46,7 @@ class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)
   }
 
   override def removeDriver(driver: DriverInfo) {
-    zk.delete(WORKING_DIR + "/driver_" + driver.id)
+    zk.delete().forPath(WORKING_DIR + "/driver_" + driver.id)
   }
 
   override def addWorker(worker: WorkerInfo) {
@@ -62,7 +54,7 @@ class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)
   }
 
   override def removeWorker(worker: WorkerInfo) {
-    zk.delete(WORKING_DIR + "/worker_" + worker.id)
+    zk.delete().forPath(WORKING_DIR + "/worker_" + worker.id)
   }
 
   override def close() {
@@ -70,7 +62,7 @@ class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)
   }
 
   override def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = {
-    val sortedFiles = zk.getChildren(WORKING_DIR).toList.sorted
+    val sortedFiles = zk.getChildren().forPath(WORKING_DIR).toList.sorted
     val appFiles = sortedFiles.filter(_.startsWith("app_"))
     val apps = appFiles.map(deserializeFromFile[ApplicationInfo])
     val driverFiles = sortedFiles.filter(_.startsWith("driver_"))
@@ -83,11 +75,11 @@ class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)
   private def serializeIntoFile(path: String, value: AnyRef) {
     val serializer = serialization.findSerializerFor(value)
     val serialized = serializer.toBinary(value)
-    zk.create(path, serialized, CreateMode.PERSISTENT)
+    zk.create().withMode(CreateMode.PERSISTENT).forPath(path, serialized)
   }
 
   def deserializeFromFile[T](filename: String)(implicit m: Manifest[T]): T = {
-    val fileData = zk.getData(WORKING_DIR + "/" + filename)
+    val fileData = zk.getData().forPath(WORKING_DIR + "/" + filename)
     val clazz = m.runtimeClass.asInstanceOf[Class[T]]
     val serializer = serialization.serializerFor(clazz)
     serializer.fromBinary(fileData).asInstanceOf[T]

http://git-wip-us.apache.org/repos/asf/spark/blob/c852201c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3a53068..4f1e839 100644
--- a/pom.xml
+++ b/pom.xml
@@ -393,9 +393,9 @@
         <scope>test</scope>
       </dependency>
       <dependency>
-        <groupId>org.apache.zookeeper</groupId>
-        <artifactId>zookeeper</artifactId>
-        <version>3.4.5</version>
+        <groupId>org.apache.curator</groupId>
+        <artifactId>curator-recipes</artifactId>
+        <version>2.4.0</version>
         <exclusions>
           <exclusion>
             <groupId>org.jboss.netty</groupId>

http://git-wip-us.apache.org/repos/asf/spark/blob/c852201c/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index f0d2e74..220894a 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -277,7 +277,7 @@ object SparkBuild extends Build {
         "org.apache.hadoop"        % hadoopClient       % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib, excludeCommonsLogging, excludeSLF4J),
         "org.apache.avro"          % "avro"             % "1.7.4",
         "org.apache.avro"          % "avro-ipc"         % "1.7.4" excludeAll(excludeNetty),
-        "org.apache.zookeeper"     % "zookeeper"        % "3.4.5" excludeAll(excludeNetty),
+        "org.apache.curator"       % "curator-recipes"  % "2.4.0" excludeAll(excludeNetty),
         "com.codahale.metrics"     % "metrics-core"     % "3.0.0",
         "com.codahale.metrics"     % "metrics-jvm"      % "3.0.0",
         "com.codahale.metrics"     % "metrics-json"     % "3.0.0",