You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/02/09 22:57:33 UTC

git commit: Merge pull request #551 from qqsun8819/json-protocol.

Updated Branches:
  refs/heads/master 94ccf869a -> afc8f3cb9


Merge pull request #551 from qqsun8819/json-protocol.

[SPARK-1038] Add more fields in JsonProtocol and add tests that verify the JSON itself

This is a PR for SPARK-1038. Two major changes:
1 add some fields to JsonProtocol which is new and important to standalone-related data structures
2 Use Diff in liftweb.json to verity the stringified Json output for detecting someone mod type T to Option[T]

Author: qqsun8819 <ji...@alibaba-inc.com>

Closes #551 and squashes the following commits:

fdf0b4e [qqsun8819] [SPARK-1038] 1. Change code style for more readable according to rxin review 2. change submitdate hard-coded string to a date object toString for more complexiblity
095a26f [qqsun8819] [SPARK-1038] mod according to  review of pwendel, use hard-coded json string for json data validation. Each test use its own json string
0524e41 [qqsun8819] Merge remote-tracking branch 'upstream/master' into json-protocol
d203d5c [qqsun8819] [SPARK-1038] Add more fields in JsonProtocol and add tests that verify the JSON itself


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/afc8f3cb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/afc8f3cb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/afc8f3cb

Branch: refs/heads/master
Commit: afc8f3cb9a7afe3249500a7d135b4a54bb3e58c4
Parents: 94ccf86
Author: qqsun8819 <ji...@alibaba-inc.com>
Authored: Sun Feb 9 13:57:29 2014 -0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Sun Feb 9 13:57:29 2014 -0800

----------------------------------------------------------------------
 .../org/apache/spark/deploy/JsonProtocol.scala  | 20 +++-
 .../apache/spark/deploy/JsonProtocolSuite.scala | 97 +++++++++++++++++++-
 2 files changed, 109 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/afc8f3cb/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
index e607b8c..33e6937 100644
--- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
@@ -20,7 +20,7 @@ package org.apache.spark.deploy
 import net.liftweb.json.JsonDSL._
 
 import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
-import org.apache.spark.deploy.master.{ApplicationInfo, WorkerInfo}
+import org.apache.spark.deploy.master.{ApplicationInfo, WorkerInfo, DriverInfo}
 import org.apache.spark.deploy.worker.ExecutorRunner
 
 
@@ -32,9 +32,12 @@ private[spark] object JsonProtocol {
    ("webuiaddress" -> obj.webUiAddress) ~
    ("cores" -> obj.cores) ~
    ("coresused" -> obj.coresUsed) ~
+   ("coresfree" -> obj.coresFree) ~
    ("memory" -> obj.memory) ~
    ("memoryused" -> obj.memoryUsed) ~
-   ("state" -> obj.state.toString)
+   ("memoryfree" -> obj.memoryFree) ~
+   ("state" -> obj.state.toString) ~
+   ("lastheartbeat" -> obj.lastHeartbeat)
  }
 
   def writeApplicationInfo(obj: ApplicationInfo) = {
@@ -54,7 +57,9 @@ private[spark] object JsonProtocol {
     ("name" -> obj.name) ~
     ("cores" -> obj.maxCores) ~
     ("memoryperslave" -> obj.memoryPerSlave) ~
-    ("user" -> obj.user)
+    ("user" -> obj.user) ~
+    ("sparkhome" -> obj.sparkHome) ~
+    ("command" -> obj.command.toString)
   }
 
   def writeExecutorRunner(obj: ExecutorRunner) = {
@@ -64,6 +69,14 @@ private[spark] object JsonProtocol {
     ("appdesc" -> writeApplicationDescription(obj.appDesc))
   }
 
+  def writeDriverInfo(obj: DriverInfo) = {
+    ("id" -> obj.id) ~
+    ("starttime" -> obj.startTime.toString) ~
+    ("state" -> obj.state.toString) ~
+    ("cores" -> obj.desc.cores) ~
+    ("memory" -> obj.desc.mem)
+  }
+
   def writeMasterState(obj: MasterStateResponse) = {
     ("url" -> obj.uri) ~
     ("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~
@@ -73,6 +86,7 @@ private[spark] object JsonProtocol {
     ("memoryused" -> obj.workers.map(_.memoryUsed).sum) ~
     ("activeapps" -> obj.activeApps.toList.map(writeApplicationInfo)) ~
     ("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo)) ~
+    ("activedrivers" -> obj.activeDrivers.toList.map(writeDriverInfo)) ~
     ("status" -> obj.status.toString)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/afc8f3cb/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index 693b1ab..6445db0 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -20,8 +20,9 @@ package org.apache.spark.deploy
 import java.io.File
 import java.util.Date
 
+import net.liftweb.json.Diff
 import net.liftweb.json.{JsonAST, JsonParser}
-import net.liftweb.json.JsonAST.JValue
+import net.liftweb.json.JsonAST.{JNothing, JValue}
 import org.scalatest.FunSuite
 
 import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
@@ -29,24 +30,35 @@ import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, RecoveryStat
 import org.apache.spark.deploy.worker.{ExecutorRunner, DriverRunner}
 
 class JsonProtocolSuite extends FunSuite {
+
   test("writeApplicationInfo") {
     val output = JsonProtocol.writeApplicationInfo(createAppInfo())
     assertValidJson(output)
+    assertValidDataInJson(output, JsonParser.parse(JsonConstants.appInfoJsonStr))
   }
 
   test("writeWorkerInfo") {
     val output = JsonProtocol.writeWorkerInfo(createWorkerInfo())
     assertValidJson(output)
+    assertValidDataInJson(output, JsonParser.parse(JsonConstants.workerInfoJsonStr))
   }
 
   test("writeApplicationDescription") {
     val output = JsonProtocol.writeApplicationDescription(createAppDesc())
     assertValidJson(output)
+    assertValidDataInJson(output, JsonParser.parse(JsonConstants.appDescJsonStr))
   }
 
   test("writeExecutorRunner") {
     val output = JsonProtocol.writeExecutorRunner(createExecutorRunner())
     assertValidJson(output)
+    assertValidDataInJson(output, JsonParser.parse(JsonConstants.executorRunnerJsonStr))
+  }
+
+  test("writeDriverInfo") {
+    val output = JsonProtocol.writeDriverInfo(createDriverInfo())
+    assertValidJson(output)
+    assertValidDataInJson(output, JsonParser.parse(JsonConstants.driverInfoJsonStr))
   }
 
   test("writeMasterState") {
@@ -59,6 +71,7 @@ class JsonProtocolSuite extends FunSuite {
       activeDrivers, completedDrivers, RecoveryState.ALIVE)
     val output = JsonProtocol.writeMasterState(stateResponse)
     assertValidJson(output)
+    assertValidDataInJson(output, JsonParser.parse(JsonConstants.masterStateJsonStr))
   }
 
   test("writeWorkerState") {
@@ -70,6 +83,7 @@ class JsonProtocolSuite extends FunSuite {
       finishedExecutors, drivers, finishedDrivers, "masterUrl", 4, 1234, 4, 1234, "masterWebUiUrl")
     val output = JsonProtocol.writeWorkerState(stateResponse)
     assertValidJson(output)
+    assertValidDataInJson(output, JsonParser.parse(JsonConstants.workerStateJsonStr))
   }
 
   def createAppDesc(): ApplicationDescription = {
@@ -78,8 +92,10 @@ class JsonProtocolSuite extends FunSuite {
   }
 
   def createAppInfo() : ApplicationInfo = {
-    new ApplicationInfo(
-      3, "id", createAppDesc(), new Date(123456789), null, "appUriStr", Int.MaxValue)
+    val appInfo = new ApplicationInfo(JsonConstants.appInfoStartTime,
+      "id", createAppDesc(), JsonConstants.submitDate, null, "appUriStr", Int.MaxValue)
+    appInfo.endTime = JsonConstants.currTimeInMillis
+    appInfo
   }
 
   def createDriverCommand() = new Command(
@@ -90,10 +106,13 @@ class JsonProtocolSuite extends FunSuite {
   def createDriverDesc() = new DriverDescription("hdfs://some-dir/some.jar", 100, 3,
     false, createDriverCommand())
 
-  def createDriverInfo(): DriverInfo = new DriverInfo(3, "driver-3", createDriverDesc(), new Date())
+  def createDriverInfo(): DriverInfo = new DriverInfo(3, "driver-3",
+    createDriverDesc(), new Date())
 
   def createWorkerInfo(): WorkerInfo = {
-    new WorkerInfo("id", "host", 8080, 4, 1234, null, 80, "publicAddress")
+    val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, 80, "publicAddress")
+    workerInfo.lastHeartbeat = JsonConstants.currTimeInMillis
+    workerInfo
   }
   def createExecutorRunner(): ExecutorRunner = {
     new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host",
@@ -111,4 +130,72 @@ class JsonProtocolSuite extends FunSuite {
       case e: JsonParser.ParseException => fail("Invalid Json detected", e)
     }
   }
+
+  def assertValidDataInJson(validateJson: JValue, expectedJson: JValue) {
+    val Diff(c, a, d) = validateJson diff expectedJson
+    assert(c === JNothing, "Json changed")
+    assert(a === JNothing, "Json added")
+    assert(d === JNothing, "Json deleted")
+  }
+}
+
+object JsonConstants {
+  val currTimeInMillis = System.currentTimeMillis()
+  val appInfoStartTime = 3
+  val submitDate = new Date(123456789)
+  val appInfoJsonStr =
+    """
+      |{"starttime":3,"id":"id","name":"name","appuiurl":"appUriStr",
+      |"cores":4,"user":"%s",
+      |"memoryperslave":1234,"submitdate":"%s",
+      |"state":"WAITING","duration":%d}
+    """.format(System.getProperty("user.name", "<unknown>"),
+        submitDate.toString, (currTimeInMillis - appInfoStartTime)).stripMargin
+
+  val workerInfoJsonStr =
+    """
+      |{"id":"id","host":"host","port":8080,
+      |"webuiaddress":"http://publicAddress:80",
+      |"cores":4,"coresused":0,"coresfree":4,
+      |"memory":1234,"memoryused":0,"memoryfree":1234,
+      |"state":"ALIVE","lastheartbeat":%d}
+    """.format(currTimeInMillis).stripMargin
+
+  val appDescJsonStr =
+    """
+      |{"name":"name","cores":4,"memoryperslave":1234,
+      |"user":"%s","sparkhome":"sparkHome",
+      |"command":"Command(mainClass,List(arg1, arg2),Map())"}
+    """.format(System.getProperty("user.name", "<unknown>")).stripMargin
+
+  val executorRunnerJsonStr =
+    """
+      |{"id":123,"memory":1234,"appid":"appId",
+      |"appdesc":%s}
+    """.format(appDescJsonStr).stripMargin
+
+  val driverInfoJsonStr =
+    """
+      |{"id":"driver-3","starttime":"3","state":"SUBMITTED","cores":3,"memory":100}
+    """.stripMargin
+
+  val masterStateJsonStr =
+    """
+      |{"url":"spark://host:8080",
+      |"workers":[%s,%s],
+      |"cores":8,"coresused":0,"memory":2468,"memoryused":0,
+      |"activeapps":[%s],"completedapps":[],
+      |"activedrivers":[%s],
+      |"status":"ALIVE"}
+    """.format(workerInfoJsonStr, workerInfoJsonStr,
+        appInfoJsonStr, driverInfoJsonStr).stripMargin
+
+  val workerStateJsonStr =
+    """
+      |{"id":"workerId","masterurl":"masterUrl",
+      |"masterwebuiurl":"masterWebUiUrl",
+      |"cores":4,"coresused":4,"memory":1234,"memoryused":1234,
+      |"executors":[],
+      |"finishedexecutors":[%s,%s]}
+    """.format(executorRunnerJsonStr, executorRunnerJsonStr).stripMargin
 }