You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/10/15 19:36:59 UTC

spark git commit: [SPARK-11047] Internal accumulators miss the internal flag when replaying events in the history server

Repository: spark
Updated Branches:
  refs/heads/master 523adc24a -> d45a0d3ca


[SPARK-11047] Internal accumulators miss the internal flag when replaying events in the history server

Internal accumulators don't write the internal flag to event log. So on the history server Web UI, all accumulators are not internal. This causes incorrect peak execution memory and unwanted accumulator table displayed on the stage page.
To fix it, I add the "internal" property of AccumulableInfo when writing the event log.

Author: Carson Wang <ca...@intel.com>

Closes #9061 from carsonwang/accumulableBug.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d45a0d3c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d45a0d3c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d45a0d3c

Branch: refs/heads/master
Commit: d45a0d3ca23df86cf0a95508ccc3b4b98f1b611c
Parents: 523adc2
Author: Carson Wang <ca...@intel.com>
Authored: Thu Oct 15 10:36:54 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Thu Oct 15 10:36:54 2015 -0700

----------------------------------------------------------------------
 .../spark/scheduler/AccumulableInfo.scala       |  9 ++
 .../org/apache/spark/util/JsonProtocol.scala    |  6 +-
 .../apache/spark/util/JsonProtocolSuite.scala   | 96 ++++++++++++++------
 3 files changed, 79 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d45a0d3c/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala
index b6bff64..146cfb9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala
@@ -46,6 +46,15 @@ class AccumulableInfo private[spark] (
 }
 
 object AccumulableInfo {
+  def apply(
+      id: Long,
+      name: String,
+      update: Option[String],
+      value: String,
+      internal: Boolean): AccumulableInfo = {
+    new AccumulableInfo(id, name, update, value, internal)
+  }
+
   def apply(id: Long, name: String, update: Option[String], value: String): AccumulableInfo = {
     new AccumulableInfo(id, name, update, value, internal = false)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/d45a0d3c/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 40729fa..a06dc6f 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -282,7 +282,8 @@ private[spark] object JsonProtocol {
     ("ID" -> accumulableInfo.id) ~
     ("Name" -> accumulableInfo.name) ~
     ("Update" -> accumulableInfo.update.map(new JString(_)).getOrElse(JNothing)) ~
-    ("Value" -> accumulableInfo.value)
+    ("Value" -> accumulableInfo.value) ~
+    ("Internal" -> accumulableInfo.internal)
   }
 
   def taskMetricsToJson(taskMetrics: TaskMetrics): JValue = {
@@ -696,7 +697,8 @@ private[spark] object JsonProtocol {
     val name = (json \ "Name").extract[String]
     val update = Utils.jsonOption(json \ "Update").map(_.extract[String])
     val value = (json \ "Value").extract[String]
-    AccumulableInfo(id, name, update, value)
+    val internal = (json \ "Internal").extractOpt[Boolean].getOrElse(false)
+    AccumulableInfo(id, name, update, value, internal)
   }
 
   def taskMetricsFromJson(json: JValue): TaskMetrics = {

http://git-wip-us.apache.org/repos/asf/spark/blob/d45a0d3c/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index a24bf29..f957292 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -364,6 +364,15 @@ class JsonProtocolSuite extends SparkFunSuite {
     assertEquals(expectedDenied, JsonProtocol.taskEndReasonFromJson(oldDenied))
   }
 
+  test("AccumulableInfo backward compatibility") {
+    // "Internal" property of AccumulableInfo were added after 1.5.1.
+    val accumulableInfo = makeAccumulableInfo(1)
+    val oldJson = JsonProtocol.accumulableInfoToJson(accumulableInfo)
+      .removeField({ _._1 == "Internal" })
+    val oldInfo = JsonProtocol.accumulableInfoFromJson(oldJson)
+    assert(false === oldInfo.internal)
+  }
+
   /** -------------------------- *
    | Helper test running methods |
    * --------------------------- */
@@ -723,15 +732,15 @@ class JsonProtocolSuite extends SparkFunSuite {
     val taskInfo = new TaskInfo(a, b, c, d, "executor", "your kind sir", TaskLocality.NODE_LOCAL,
       speculative)
     val (acc1, acc2, acc3) =
-      (makeAccumulableInfo(1), makeAccumulableInfo(2), makeAccumulableInfo(3))
+      (makeAccumulableInfo(1), makeAccumulableInfo(2), makeAccumulableInfo(3, internal = true))
     taskInfo.accumulables += acc1
     taskInfo.accumulables += acc2
     taskInfo.accumulables += acc3
     taskInfo
   }
 
-  private def makeAccumulableInfo(id: Int): AccumulableInfo =
-    AccumulableInfo(id, " Accumulable " + id, Some("delta" + id), "val" + id)
+  private def makeAccumulableInfo(id: Int, internal: Boolean = false): AccumulableInfo =
+    AccumulableInfo(id, " Accumulable " + id, Some("delta" + id), "val" + id, internal)
 
   /**
    * Creates a TaskMetrics object describing a task that read data from Hadoop (if hasHadoopInput is
@@ -812,13 +821,15 @@ class JsonProtocolSuite extends SparkFunSuite {
       |        "ID": 2,
       |        "Name": "Accumulable2",
       |        "Update": "delta2",
-      |        "Value": "val2"
+      |        "Value": "val2",
+      |        "Internal": false
       |      },
       |      {
       |        "ID": 1,
       |        "Name": "Accumulable1",
       |        "Update": "delta1",
-      |        "Value": "val1"
+      |        "Value": "val1",
+      |        "Internal": false
       |      }
       |    ]
       |  },
@@ -866,13 +877,15 @@ class JsonProtocolSuite extends SparkFunSuite {
       |        "ID": 2,
       |        "Name": "Accumulable2",
       |        "Update": "delta2",
-      |        "Value": "val2"
+      |        "Value": "val2",
+      |        "Internal": false
       |      },
       |      {
       |        "ID": 1,
       |        "Name": "Accumulable1",
       |        "Update": "delta1",
-      |        "Value": "val1"
+      |        "Value": "val1",
+      |        "Internal": false
       |      }
       |    ]
       |  }
@@ -902,19 +915,22 @@ class JsonProtocolSuite extends SparkFunSuite {
       |        "ID": 1,
       |        "Name": "Accumulable1",
       |        "Update": "delta1",
-      |        "Value": "val1"
+      |        "Value": "val1",
+      |        "Internal": false
       |      },
       |      {
       |        "ID": 2,
       |        "Name": "Accumulable2",
       |        "Update": "delta2",
-      |        "Value": "val2"
+      |        "Value": "val2",
+      |        "Internal": false
       |      },
       |      {
       |        "ID": 3,
       |        "Name": "Accumulable3",
       |        "Update": "delta3",
-      |        "Value": "val3"
+      |        "Value": "val3",
+      |        "Internal": true
       |      }
       |    ]
       |  }
@@ -942,19 +958,22 @@ class JsonProtocolSuite extends SparkFunSuite {
       |        "ID": 1,
       |        "Name": "Accumulable1",
       |        "Update": "delta1",
-      |        "Value": "val1"
+      |        "Value": "val1",
+      |        "Internal": false
       |      },
       |      {
       |        "ID": 2,
       |        "Name": "Accumulable2",
       |        "Update": "delta2",
-      |        "Value": "val2"
+      |        "Value": "val2",
+      |        "Internal": false
       |      },
       |      {
       |        "ID": 3,
       |        "Name": "Accumulable3",
       |        "Update": "delta3",
-      |        "Value": "val3"
+      |        "Value": "val3",
+      |        "Internal": true
       |      }
       |    ]
       |  }
@@ -988,19 +1007,22 @@ class JsonProtocolSuite extends SparkFunSuite {
       |        "ID": 1,
       |        "Name": "Accumulable1",
       |        "Update": "delta1",
-      |        "Value": "val1"
+      |        "Value": "val1",
+      |        "Internal": false
       |      },
       |      {
       |        "ID": 2,
       |        "Name": "Accumulable2",
       |        "Update": "delta2",
-      |        "Value": "val2"
+      |        "Value": "val2",
+      |        "Internal": false
       |      },
       |      {
       |        "ID": 3,
       |        "Name": "Accumulable3",
       |        "Update": "delta3",
-      |        "Value": "val3"
+      |        "Value": "val3",
+      |        "Internal": true
       |      }
       |    ]
       |  },
@@ -1074,19 +1096,22 @@ class JsonProtocolSuite extends SparkFunSuite {
       |        "ID": 1,
       |        "Name": "Accumulable1",
       |        "Update": "delta1",
-      |        "Value": "val1"
+      |        "Value": "val1",
+      |        "Internal": false
       |      },
       |      {
       |        "ID": 2,
       |        "Name": "Accumulable2",
       |        "Update": "delta2",
-      |        "Value": "val2"
+      |        "Value": "val2",
+      |        "Internal": false
       |      },
       |      {
       |        "ID": 3,
       |        "Name": "Accumulable3",
       |        "Update": "delta3",
-      |        "Value": "val3"
+      |        "Value": "val3",
+      |        "Internal": true
       |      }
       |    ]
       |  },
@@ -1157,19 +1182,22 @@ class JsonProtocolSuite extends SparkFunSuite {
       |        "ID": 1,
       |        "Name": "Accumulable1",
       |        "Update": "delta1",
-      |        "Value": "val1"
+      |        "Value": "val1",
+      |        "Internal": false
       |      },
       |      {
       |        "ID": 2,
       |        "Name": "Accumulable2",
       |        "Update": "delta2",
-      |        "Value": "val2"
+      |        "Value": "val2",
+      |        "Internal": false
       |      },
       |      {
       |        "ID": 3,
       |        "Name": "Accumulable3",
       |        "Update": "delta3",
-      |        "Value": "val3"
+      |        "Value": "val3",
+      |        "Internal": true
       |      }
       |    ]
       |  },
@@ -1251,13 +1279,15 @@ class JsonProtocolSuite extends SparkFunSuite {
       |          "ID": 2,
       |          "Name": " Accumulable 2",
       |          "Update": "delta2",
-      |          "Value": "val2"
+      |          "Value": "val2",
+      |          "Internal": false
       |        },
       |        {
       |          "ID": 1,
       |          "Name": " Accumulable 1",
       |          "Update": "delta1",
-      |          "Value": "val1"
+      |          "Value": "val1",
+      |          "Internal": false
       |        }
       |      ]
       |    },
@@ -1309,13 +1339,15 @@ class JsonProtocolSuite extends SparkFunSuite {
       |          "ID": 2,
       |          "Name": " Accumulable 2",
       |          "Update": "delta2",
-      |          "Value": "val2"
+      |          "Value": "val2",
+      |          "Internal": false
       |        },
       |        {
       |          "ID": 1,
       |          "Name": " Accumulable 1",
       |          "Update": "delta1",
-      |          "Value": "val1"
+      |          "Value": "val1",
+      |          "Internal": false
       |        }
       |      ]
       |    },
@@ -1384,13 +1416,15 @@ class JsonProtocolSuite extends SparkFunSuite {
       |          "ID": 2,
       |          "Name": " Accumulable 2",
       |          "Update": "delta2",
-      |          "Value": "val2"
+      |          "Value": "val2",
+      |          "Internal": false
       |        },
       |        {
       |          "ID": 1,
       |          "Name": " Accumulable 1",
       |          "Update": "delta1",
-      |          "Value": "val1"
+      |          "Value": "val1",
+      |          "Internal": false
       |        }
       |      ]
       |    },
@@ -1476,13 +1510,15 @@ class JsonProtocolSuite extends SparkFunSuite {
       |          "ID": 2,
       |          "Name": " Accumulable 2",
       |          "Update": "delta2",
-      |          "Value": "val2"
+      |          "Value": "val2",
+      |          "Internal": false
       |        },
       |        {
       |          "ID": 1,
       |          "Name": " Accumulable 1",
       |          "Update": "delta1",
-      |          "Value": "val1"
+      |          "Value": "val1",
+      |          "Internal": false
       |        }
       |      ]
       |    }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org