You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/11/18 02:45:18 UTC
spark git commit: [SPARK-18462] Fix ClassCastException in
SparkListenerDriverAccumUpdates event
Repository: spark
Updated Branches:
refs/heads/master ce13c2672 -> d9dd979d1
[SPARK-18462] Fix ClassCastException in SparkListenerDriverAccumUpdates event
## What changes were proposed in this pull request?
This patch fixes a `ClassCastException: java.lang.Integer cannot be cast to java.lang.Long` error which could occur in the HistoryServer while trying to process a deserialized `SparkListenerDriverAccumUpdates` event.
The problem stems from how `jackson-module-scala` handles primitive type parameters (see https://github.com/FasterXML/jackson-module-scala/wiki/FAQ#deserializing-optionint-and-other-primitive-challenges for more details). This was causing a problem where our code expected a field to be deserialized as a `(Long, Long)` tuple but we got an `(Int, Int)` tuple instead.
This patch hacks around this issue by registering a custom `Converter` with Jackson in order to deserialize the tuples as `(Object, Object)` and perform the appropriate casting.
## How was this patch tested?
New regression tests in `SQLListenerSuite`.
Author: Josh Rosen <jo...@databricks.com>
Closes #15922 from JoshRosen/SPARK-18462.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d9dd979d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d9dd979d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d9dd979d
Branch: refs/heads/master
Commit: d9dd979d170f44383a9a87f892f2486ddb3cca7d
Parents: ce13c26
Author: Josh Rosen <jo...@databricks.com>
Authored: Thu Nov 17 18:45:15 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Thu Nov 17 18:45:15 2016 -0800
----------------------------------------------------------------------
.../spark/sql/execution/ui/SQLListener.scala | 39 ++++++++++++++++-
.../sql/execution/ui/SQLListenerSuite.scala | 44 +++++++++++++++++++-
2 files changed, 80 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/d9dd979d/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index 60f1343..5daf215 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -19,6 +19,11 @@ package org.apache.spark.sql.execution.ui
import scala.collection.mutable
+import com.fasterxml.jackson.databind.JavaType
+import com.fasterxml.jackson.databind.`type`.TypeFactory
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize
+import com.fasterxml.jackson.databind.util.Converter
+
import org.apache.spark.{JobExecutionStatus, SparkConf}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
@@ -43,9 +48,41 @@ case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long)
extends SparkListenerEvent
@DeveloperApi
-case class SparkListenerDriverAccumUpdates(executionId: Long, accumUpdates: Seq[(Long, Long)])
+case class SparkListenerDriverAccumUpdates(
+ executionId: Long,
+ @JsonDeserialize(contentConverter = classOf[LongLongTupleConverter])
+ accumUpdates: Seq[(Long, Long)])
extends SparkListenerEvent
+/**
+ * Jackson [[Converter]] for converting an (Int, Int) tuple into a (Long, Long) tuple.
+ *
+ * This is necessary due to limitations in how Jackson's scala module deserializes primitives;
+ * see the "Deserializing Option[Int] and other primitive challenges" section in
+ * https://github.com/FasterXML/jackson-module-scala/wiki/FAQ for a discussion of this issue and
+ * SPARK-18462 for the specific problem that motivated this conversion.
+ */
+private class LongLongTupleConverter extends Converter[(Object, Object), (Long, Long)] {
+
+ override def convert(in: (Object, Object)): (Long, Long) = {
+ def toLong(a: Object): Long = a match {
+ case i: java.lang.Integer => i.intValue()
+ case l: java.lang.Long => l.longValue()
+ }
+ (toLong(in._1), toLong(in._2))
+ }
+
+ override def getInputType(typeFactory: TypeFactory): JavaType = {
+ val objectType = typeFactory.uncheckedSimpleType(classOf[Object])
+ typeFactory.constructSimpleType(classOf[(_, _)], classOf[(_, _)], Array(objectType, objectType))
+ }
+
+ override def getOutputType(typeFactory: TypeFactory): JavaType = {
+ val longType = typeFactory.uncheckedSimpleType(classOf[Long])
+ typeFactory.constructSimpleType(classOf[(_, _)], classOf[(_, _)], Array(longType, longType))
+ }
+}
+
class SQLHistoryListenerFactory extends SparkHistoryListenerFactory {
override def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener] = {
http://git-wip-us.apache.org/repos/asf/spark/blob/d9dd979d/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index 948a155..8aea112 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.ui
import java.util.Properties
+import org.json4s.jackson.JsonMethods._
import org.mockito.Mockito.mock
import org.apache.spark._
@@ -35,10 +36,10 @@ import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanIn
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.ui.SparkUI
-import org.apache.spark.util.{AccumulatorMetadata, LongAccumulator}
+import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, LongAccumulator}
-class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
+class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTestUtils {
import testImplicits._
import org.apache.spark.AccumulatorSuite.makeInfo
@@ -416,6 +417,45 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
assert(driverUpdates(physicalPlan.longMetric("dummy").id) == expectedAccumValue)
}
+ test("roundtripping SparkListenerDriverAccumUpdates through JsonProtocol (SPARK-18462)") {
+ val event = SparkListenerDriverAccumUpdates(1L, Seq((2L, 3L)))
+ val json = JsonProtocol.sparkEventToJson(event)
+ assertValidDataInJson(json,
+ parse("""
+ |{
+ | "Event": "org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates",
+ | "executionId": 1,
+ | "accumUpdates": [[2,3]]
+ |}
+ """.stripMargin))
+ JsonProtocol.sparkEventFromJson(json) match {
+ case SparkListenerDriverAccumUpdates(executionId, accums) =>
+ assert(executionId == 1L)
+ accums.foreach { case (a, b) =>
+ assert(a == 2L)
+ assert(b == 3L)
+ }
+ }
+
+ // Test a case where the numbers in the JSON can only fit in longs:
+ val longJson = parse(
+ """
+ |{
+ | "Event": "org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates",
+ | "executionId": 4294967294,
+ | "accumUpdates": [[4294967294,3]]
+ |}
+ """.stripMargin)
+ JsonProtocol.sparkEventFromJson(longJson) match {
+ case SparkListenerDriverAccumUpdates(executionId, accums) =>
+ assert(executionId == 4294967294L)
+ accums.foreach { case (a, b) =>
+ assert(a == 4294967294L)
+ assert(b == 3L)
+ }
+ }
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org