You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/11/22 23:05:45 UTC

spark git commit: [SPARK-4377] Fixed serialization issue by switching to akka provided serializer.

Repository: spark
Updated Branches:
  refs/heads/master b5d17ef10 -> 9b2a3c612


[SPARK-4377] Fixed serialization issue by switching to akka provided serializer.

... - there is no way around this for deserializing actorRef(s).

Author: Prashant Sharma <pr...@imaginea.com>

Closes #3402 from ScrapCodes/SPARK-4377/troubleDeserializing and squashes the following commits:

77233fd [Prashant Sharma] Style fixes
9b35c6e [Prashant Sharma] Scalastyle fixes
29880da [Prashant Sharma] [SPARK-4377] Fixed serialization issue by switching to akka provided serializer - there is no way around this for deserializing actorRef(s).


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9b2a3c61
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9b2a3c61
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9b2a3c61

Branch: refs/heads/master
Commit: 9b2a3c6126e4fe8485e506f8a56a26cb72509a5f
Parents: b5d17ef
Author: Prashant Sharma <pr...@imaginea.com>
Authored: Sat Nov 22 14:05:38 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Sat Nov 22 14:05:38 2014 -0800

----------------------------------------------------------------------
 .../master/FileSystemPersistenceEngine.scala    | 26 +++++++++++---------
 .../org/apache/spark/deploy/master/Master.scala | 12 ++++++---
 .../deploy/master/RecoveryModeFactory.scala     | 17 +++++++------
 .../master/ZooKeeperPersistenceEngine.scala     | 22 ++++++++---------
 4 files changed, 42 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9b2a3c61/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
index 6ff2aa5..36a2e2c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
@@ -18,12 +18,13 @@
 package org.apache.spark.deploy.master
 
 import java.io._
-import java.nio.ByteBuffer
+
+import scala.reflect.ClassTag
+
+import akka.serialization.Serialization
 
 import org.apache.spark.Logging
-import org.apache.spark.serializer.Serializer
 
-import scala.reflect.ClassTag
 
 /**
  * Stores data in a single on-disk directory with one file per application and worker.
@@ -34,10 +35,9 @@ import scala.reflect.ClassTag
  */
 private[spark] class FileSystemPersistenceEngine(
     val dir: String,
-    val serialization: Serializer)
+    val serialization: Serialization)
   extends PersistenceEngine with Logging {
 
-  val serializer = serialization.newInstance()
   new File(dir).mkdir()
 
   override def persist(name: String, obj: Object): Unit = {
@@ -56,17 +56,17 @@ private[spark] class FileSystemPersistenceEngine(
   private def serializeIntoFile(file: File, value: AnyRef) {
     val created = file.createNewFile()
     if (!created) { throw new IllegalStateException("Could not create file: " + file) }
-
-    val out = serializer.serializeStream(new FileOutputStream(file))   
+    val serializer = serialization.findSerializerFor(value)
+    val serialized = serializer.toBinary(value)
+    val out = new FileOutputStream(file)
     try {
-      out.writeObject(value)
+      out.write(serialized)
     } finally {
       out.close()
     }
-
   }
 
-  def deserializeFromFile[T](file: File): T = {
+  private def deserializeFromFile[T](file: File)(implicit m: ClassTag[T]): T = {
     val fileData = new Array[Byte](file.length().asInstanceOf[Int])
     val dis = new DataInputStream(new FileInputStream(file))
     try {
@@ -74,7 +74,9 @@ private[spark] class FileSystemPersistenceEngine(
     } finally {
       dis.close()
     }
-
-    serializer.deserializeStream(dis).readObject()
+    val clazz = m.runtimeClass.asInstanceOf[Class[T]]
+    val serializer = serialization.serializerFor(clazz)
+    serializer.fromBinary(fileData).asInstanceOf[T]
   }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9b2a3c61/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 021454e..7b32c50 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -30,6 +30,7 @@ import scala.util.Random
 import akka.actor._
 import akka.pattern.ask
 import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
+import akka.serialization.Serialization
 import akka.serialization.SerializationExtension
 
 import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
@@ -132,15 +133,18 @@ private[spark] class Master(
     val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
       case "ZOOKEEPER" =>
         logInfo("Persisting recovery state to ZooKeeper")
-        val zkFactory = new ZooKeeperRecoveryModeFactory(conf)
+        val zkFactory =
+          new ZooKeeperRecoveryModeFactory(conf, SerializationExtension(context.system))
         (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
       case "FILESYSTEM" =>
-        val fsFactory = new FileSystemRecoveryModeFactory(conf)
+        val fsFactory =
+          new FileSystemRecoveryModeFactory(conf, SerializationExtension(context.system))
         (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
       case "CUSTOM" =>
         val clazz = Class.forName(conf.get("spark.deploy.recoveryMode.factory"))
-        val factory = clazz.getConstructor(conf.getClass)
-          .newInstance(conf).asInstanceOf[StandaloneRecoveryModeFactory]
+        val factory = clazz.getConstructor(conf.getClass, Serialization.getClass)
+          .newInstance(conf, SerializationExtension(context.system))
+          .asInstanceOf[StandaloneRecoveryModeFactory]
         (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
       case _ =>
         (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))

http://git-wip-us.apache.org/repos/asf/spark/blob/9b2a3c61/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala
index d9d36c1..1096eb0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala
@@ -17,9 +17,10 @@
 
 package org.apache.spark.deploy.master
 
+import akka.serialization.Serialization
+
 import org.apache.spark.{Logging, SparkConf}
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.serializer.JavaSerializer
 
 /**
  * ::DeveloperApi::
@@ -29,7 +30,7 @@ import org.apache.spark.serializer.JavaSerializer
  *
  */
 @DeveloperApi
-abstract class StandaloneRecoveryModeFactory(conf: SparkConf) {
+abstract class StandaloneRecoveryModeFactory(conf: SparkConf, serializer: Serialization) {
 
   /**
    * PersistenceEngine defines how the persistent data(Information about worker, driver etc..)
@@ -48,21 +49,21 @@ abstract class StandaloneRecoveryModeFactory(conf: SparkConf) {
  * LeaderAgent in this case is a no-op. Since leader is forever leader as the actual
  * recovery is made by restoring from filesystem.
  */
-private[spark] class FileSystemRecoveryModeFactory(conf: SparkConf)
-  extends StandaloneRecoveryModeFactory(conf) with Logging {
+private[spark] class FileSystemRecoveryModeFactory(conf: SparkConf, serializer: Serialization)
+  extends StandaloneRecoveryModeFactory(conf, serializer) with Logging {
   val RECOVERY_DIR = conf.get("spark.deploy.recoveryDirectory", "")
 
   def createPersistenceEngine() = {
     logInfo("Persisting recovery state to directory: " + RECOVERY_DIR)
-    new FileSystemPersistenceEngine(RECOVERY_DIR, new JavaSerializer(conf))
+    new FileSystemPersistenceEngine(RECOVERY_DIR, serializer)
   }
 
   def createLeaderElectionAgent(master: LeaderElectable) = new MonarchyLeaderAgent(master)
 }
 
-private[spark] class ZooKeeperRecoveryModeFactory(conf: SparkConf)
-  extends StandaloneRecoveryModeFactory(conf) {
-  def createPersistenceEngine() = new ZooKeeperPersistenceEngine(new JavaSerializer(conf), conf)
+private[spark] class ZooKeeperRecoveryModeFactory(conf: SparkConf, serializer: Serialization)
+  extends StandaloneRecoveryModeFactory(conf, serializer) {
+  def createPersistenceEngine() = new ZooKeeperPersistenceEngine(conf, serializer)
 
   def createLeaderElectionAgent(master: LeaderElectable) =
     new ZooKeeperLeaderElectionAgent(master, conf)

http://git-wip-us.apache.org/repos/asf/spark/blob/9b2a3c61/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
index 96c2139..e11ac03 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
@@ -17,27 +17,24 @@
 
 package org.apache.spark.deploy.master
 
+import akka.serialization.Serialization
+
 import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
 
 import org.apache.curator.framework.CuratorFramework
 import org.apache.zookeeper.CreateMode
 
 import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.serializer.Serializer
-import java.nio.ByteBuffer
 
-import scala.reflect.ClassTag
 
-
-private[spark] class ZooKeeperPersistenceEngine(val serialization: Serializer, conf: SparkConf)
+private[spark] class ZooKeeperPersistenceEngine(conf: SparkConf, val serialization: Serialization)
   extends PersistenceEngine
   with Logging
 {
   val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
   val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)
 
-  val serializer = serialization.newInstance()
-
   SparkCuratorUtil.mkdir(zk, WORKING_DIR)
 
 
@@ -59,14 +56,17 @@ private[spark] class ZooKeeperPersistenceEngine(val serialization: Serializer, c
   }
 
   private def serializeIntoFile(path: String, value: AnyRef) {
-    val serialized = serializer.serialize(value)
-    zk.create().withMode(CreateMode.PERSISTENT).forPath(path, serialized.array())
+    val serializer = serialization.findSerializerFor(value)
+    val serialized = serializer.toBinary(value)
+    zk.create().withMode(CreateMode.PERSISTENT).forPath(path, serialized)
   }
 
-  def deserializeFromFile[T](filename: String): Option[T] = {
+  def deserializeFromFile[T](filename: String)(implicit m: ClassTag[T]): Option[T] = {
     val fileData = zk.getData().forPath(WORKING_DIR + "/" + filename)
+    val clazz = m.runtimeClass.asInstanceOf[Class[T]]
+    val serializer = serialization.serializerFor(clazz)
     try {
-      Some(serializer.deserialize(ByteBuffer.wrap(fileData)))
+      Some(serializer.fromBinary(fileData).asInstanceOf[T])
     } catch {
       case e: Exception => {
         logWarning("Exception while reading persisted file, deleting", e)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org