You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/04/16 01:15:15 UTC
spark git commit: [SPARK-5277][SQL] - SparkSqlSerializer doesn't
always register user specified KryoRegistrators
Repository: spark
Updated Branches:
refs/heads/master d5f1b9650 -> 8a53de16f
[SPARK-5277][SQL] - SparkSqlSerializer doesn't always register user specified KryoRegistrators
[SPARK-5277][SQL] - SparkSqlSerializer doesn't always register user specified KryoRegistrators
There were a few places where new SparkSqlSerializer instances were created with new, empty SparkConfs resulting in user specified registrators sometimes not getting initialized.
The fix is to try and pull a conf from the SparkEnv, and construct a new conf (that loads defaults) if one cannot be found.
The changes touched:
1) SparkSqlSerializer's resource pool (this appears to fix the issue in the comment)
2) execution.Exchange (for all of the partitioners)
3) execution.Limit (for the HashPartitioner)
A few tests were added to ColumnTypeSuite, ensuring that a custom registrator and serde is initialized and used when in-memory columns are written.
Author: Max Seiden <ma...@platfora.com>
This patch had conflicts when merged, resolved by
Committer: Michael Armbrust <mi...@databricks.com>
Closes #5237 from mhseiden/sql_udt_kryo and squashes the following commits:
3175c2f [Max Seiden] [SPARK-5277][SQL] - address code review comments
e5011fb [Max Seiden] [SPARK-5277][SQL] - SparkSqlSerializer does not register user specified KryoRegistrators
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8a53de16
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8a53de16
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8a53de16
Branch: refs/heads/master
Commit: 8a53de16fc8208358b76d0f3d45538e0304bcc8e
Parents: d5f1b96
Author: Max Seiden <ma...@platfora.com>
Authored: Wed Apr 15 16:15:11 2015 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Wed Apr 15 16:15:11 2015 -0700
----------------------------------------------------------------------
.../apache/spark/sql/execution/Exchange.scala | 9 +--
.../sql/execution/SparkSqlSerializer.scala | 7 +--
.../spark/sql/execution/basicOperators.scala | 2 +-
.../spark/sql/columnar/ColumnTypeSuite.scala | 62 +++++++++++++++++++-
4 files changed, 68 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/8a53de16/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 518fc9e..69a620e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -78,6 +78,8 @@ case class Exchange(
}
override def execute(): RDD[Row] = attachTree(this , "execute") {
+ lazy val sparkConf = child.sqlContext.sparkContext.getConf
+
newPartitioning match {
case HashPartitioning(expressions, numPartitions) =>
// TODO: Eliminate redundant expressions in grouping key and value.
@@ -109,7 +111,7 @@ case class Exchange(
} else {
new ShuffledRDD[Row, Row, Row](rdd, part)
}
- shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false)))
+ shuffled.setSerializer(new SparkSqlSerializer(sparkConf))
shuffled.map(_._2)
case RangePartitioning(sortingExpressions, numPartitions) =>
@@ -132,8 +134,7 @@ case class Exchange(
} else {
new ShuffledRDD[Row, Null, Null](rdd, part)
}
- shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false)))
-
+ shuffled.setSerializer(new SparkSqlSerializer(sparkConf))
shuffled.map(_._1)
case SinglePartition =>
@@ -151,7 +152,7 @@ case class Exchange(
}
val partitioner = new HashPartitioner(1)
val shuffled = new ShuffledRDD[Null, Row, Row](rdd, partitioner)
- shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false)))
+ shuffled.setSerializer(new SparkSqlSerializer(sparkConf))
shuffled.map(_._2)
case _ => sys.error(s"Exchange not implemented for $newPartitioning")
http://git-wip-us.apache.org/repos/asf/spark/blob/8a53de16/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
index 914f387..eea15af 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
@@ -65,12 +65,9 @@ private[sql] class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(co
private[execution] class KryoResourcePool(size: Int)
extends ResourcePool[SerializerInstance](size) {
- val ser: KryoSerializer = {
+ val ser: SparkSqlSerializer = {
val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf())
- // TODO (lian) Using KryoSerializer here is workaround, needs further investigation
- // Using SparkSqlSerializer here makes BasicQuerySuite to fail because of Kryo serialization
- // related error.
- new KryoSerializer(sparkConf)
+ new SparkSqlSerializer(sparkConf)
}
def newInstance(): SerializerInstance = ser.newInstance()
http://git-wip-us.apache.org/repos/asf/spark/blob/8a53de16/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 308dae2..d286fe8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -121,7 +121,7 @@ case class Limit(limit: Int, child: SparkPlan)
}
val part = new HashPartitioner(1)
val shuffled = new ShuffledRDD[Boolean, Row, Row](rdd, part)
- shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false)))
+ shuffled.setSerializer(new SparkSqlSerializer(child.sqlContext.sparkContext.getConf))
shuffled.mapPartitions(_.take(limit).map(_._2))
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/8a53de16/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
index c86ef33..b48bed1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
@@ -20,9 +20,12 @@ package org.apache.spark.sql.columnar
import java.nio.ByteBuffer
import java.sql.Timestamp
+import com.esotericsoftware.kryo.{Serializer, Kryo}
+import com.esotericsoftware.kryo.io.{Input, Output}
+import org.apache.spark.serializer.KryoRegistrator
import org.scalatest.FunSuite
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
import org.apache.spark.sql.columnar.ColumnarTestUtils._
import org.apache.spark.sql.execution.SparkSqlSerializer
@@ -73,7 +76,7 @@ class ColumnTypeSuite extends FunSuite with Logging {
checkActualSize(BINARY, binary, 4 + 4)
val generic = Map(1 -> "a")
- checkActualSize(GENERIC, SparkSqlSerializer.serialize(generic), 4 + 11)
+ checkActualSize(GENERIC, SparkSqlSerializer.serialize(generic), 4 + 8)
}
testNativeColumnType[BooleanType.type](
@@ -158,6 +161,41 @@ class ColumnTypeSuite extends FunSuite with Logging {
}
}
+ test("CUSTOM") {
+ val conf = new SparkConf()
+ conf.set("spark.kryo.registrator", "org.apache.spark.sql.columnar.Registrator")
+ val serializer = new SparkSqlSerializer(conf).newInstance()
+
+ val buffer = ByteBuffer.allocate(512)
+ val obj = CustomClass(Int.MaxValue,Long.MaxValue)
+ val serializedObj = serializer.serialize(obj).array()
+
+ GENERIC.append(serializer.serialize(obj).array(), buffer)
+ buffer.rewind()
+
+ val length = buffer.getInt
+ assert(length === serializedObj.length)
+ assert(13 == length) // id (1) + int (4) + long (8)
+
+ val genericSerializedObj = SparkSqlSerializer.serialize(obj)
+ assert(length != genericSerializedObj.length)
+ assert(length < genericSerializedObj.length)
+
+ assertResult(obj, "Custom deserialized object didn't equal the original object") {
+ val bytes = new Array[Byte](length)
+ buffer.get(bytes, 0, length)
+ serializer.deserialize(ByteBuffer.wrap(bytes))
+ }
+
+ buffer.rewind()
+ buffer.putInt(serializedObj.length).put(serializedObj)
+
+ assertResult(obj, "Custom deserialized object didn't equal the original object") {
+ buffer.rewind()
+ serializer.deserialize(ByteBuffer.wrap(GENERIC.extract(buffer)))
+ }
+ }
+
def testNativeColumnType[T <: NativeType](
columnType: NativeColumnType[T],
putter: (ByteBuffer, T#JvmType) => Unit,
@@ -229,3 +267,23 @@ class ColumnTypeSuite extends FunSuite with Logging {
}
}
}
+
+private[columnar] final case class CustomClass(a: Int, b: Long)
+
+private[columnar] object CustomerSerializer extends Serializer[CustomClass] {
+ override def write(kryo: Kryo, output: Output, t: CustomClass) {
+ output.writeInt(t.a)
+ output.writeLong(t.b)
+ }
+ override def read(kryo: Kryo, input: Input, aClass: Class[CustomClass]): CustomClass = {
+ val a = input.readInt()
+ val b = input.readLong()
+ CustomClass(a,b)
+ }
+}
+
+private[columnar] final class Registrator extends KryoRegistrator {
+ override def registerClasses(kryo: Kryo) {
+ kryo.register(classOf[CustomClass], CustomerSerializer)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org