You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2018/03/07 17:32:04 UTC
spark git commit: [SPARK-23591][SQL] Add interpreted execution to
EncodeUsingSerializer
Repository: spark
Updated Branches:
refs/heads/master 33c2cb22b -> aff7d81cb
[SPARK-23591][SQL] Add interpreted execution to EncodeUsingSerializer
## What changes were proposed in this pull request?
The PR adds interpreted execution to EncodeUsingSerializer.
## How was this patch tested?
added UT
Author: Marco Gaido <ma...@gmail.com>
Closes #20751 from mgaido91/SPARK-23591.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aff7d81c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aff7d81c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aff7d81c
Branch: refs/heads/master
Commit: aff7d81cb73133483fc2256ca10e21b4b8101647
Parents: 33c2cb2
Author: Marco Gaido <ma...@gmail.com>
Authored: Wed Mar 7 18:31:59 2018 +0100
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Wed Mar 7 18:31:59 2018 +0100
----------------------------------------------------------------------
.../catalyst/expressions/objects/objects.scala | 114 ++++++++++---------
.../expressions/ObjectExpressionsSuite.scala | 16 ++-
2 files changed, 77 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/aff7d81c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index 721d589..7bbc3c7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -106,6 +106,61 @@ trait InvokeLike extends Expression with NonSQLExpression {
}
/**
+ * Common trait for [[DecodeUsingSerializer]] and [[EncodeUsingSerializer]]
+ */
+trait SerializerSupport {
+ /**
+ * If true, Kryo serialization is used, otherwise the Java one is used
+ */
+ val kryo: Boolean
+
+ /**
+ * The serializer instance to be used for serialization/deserialization in interpreted execution
+ */
+ lazy val serializerInstance: SerializerInstance = SerializerSupport.newSerializer(kryo)
+
+ /**
+ * Adds a immutable state to the generated class containing a reference to the serializer.
+ * @return a string containing the name of the variable referencing the serializer
+ */
+ def addImmutableSerializerIfNeeded(ctx: CodegenContext): String = {
+ val (serializerInstance, serializerInstanceClass) = {
+ if (kryo) {
+ ("kryoSerializer",
+ classOf[KryoSerializerInstance].getName)
+ } else {
+ ("javaSerializer",
+ classOf[JavaSerializerInstance].getName)
+ }
+ }
+ val newSerializerMethod = s"${classOf[SerializerSupport].getName}$$.MODULE$$.newSerializer"
+ // Code to initialize the serializer
+ ctx.addImmutableStateIfNotExists(serializerInstanceClass, serializerInstance, v =>
+ s"""
+ |$v = ($serializerInstanceClass) $newSerializerMethod($kryo);
+ """.stripMargin)
+ serializerInstance
+ }
+}
+
+object SerializerSupport {
+ /**
+ * It creates a new `SerializerInstance` which is either a `KryoSerializerInstance` (is
+ * `useKryo` is set to `true`) or a `JavaSerializerInstance`.
+ */
+ def newSerializer(useKryo: Boolean): SerializerInstance = {
+ // try conf from env, otherwise create a new one
+ val conf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf)
+ val s = if (useKryo) {
+ new KryoSerializer(conf)
+ } else {
+ new JavaSerializer(conf)
+ }
+ s.newInstance()
+ }
+}
+
+/**
* Invokes a static function, returning the result. By default, any of the arguments being null
* will result in returning null instead of calling the function.
*
@@ -1154,36 +1209,14 @@ case class CreateExternalRow(children: Seq[Expression], schema: StructType)
* @param kryo if true, use Kryo. Otherwise, use Java.
*/
case class EncodeUsingSerializer(child: Expression, kryo: Boolean)
- extends UnaryExpression with NonSQLExpression {
+ extends UnaryExpression with NonSQLExpression with SerializerSupport {
- override def eval(input: InternalRow): Any =
- throw new UnsupportedOperationException("Only code-generated evaluation is supported")
+ override def nullSafeEval(input: Any): Any = {
+ serializerInstance.serialize(input).array()
+ }
override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
- // Code to initialize the serializer.
- val (serializer, serializerClass, serializerInstanceClass) = {
- if (kryo) {
- ("kryoSerializer",
- classOf[KryoSerializer].getName,
- classOf[KryoSerializerInstance].getName)
- } else {
- ("javaSerializer",
- classOf[JavaSerializer].getName,
- classOf[JavaSerializerInstance].getName)
- }
- }
- // try conf from env, otherwise create a new one
- val env = s"${classOf[SparkEnv].getName}.get()"
- val sparkConf = s"new ${classOf[SparkConf].getName}()"
- ctx.addImmutableStateIfNotExists(serializerInstanceClass, serializer, v =>
- s"""
- |if ($env == null) {
- | $v = ($serializerInstanceClass) new $serializerClass($sparkConf).newInstance();
- |} else {
- | $v = ($serializerInstanceClass) new $serializerClass($env.conf()).newInstance();
- |}
- """.stripMargin)
-
+ val serializer = addImmutableSerializerIfNeeded(ctx)
// Code to serialize.
val input = child.genCode(ctx)
val javaType = CodeGenerator.javaType(dataType)
@@ -1207,33 +1240,10 @@ case class EncodeUsingSerializer(child: Expression, kryo: Boolean)
* @param kryo if true, use Kryo. Otherwise, use Java.
*/
case class DecodeUsingSerializer[T](child: Expression, tag: ClassTag[T], kryo: Boolean)
- extends UnaryExpression with NonSQLExpression {
+ extends UnaryExpression with NonSQLExpression with SerializerSupport {
override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
- // Code to initialize the serializer.
- val (serializer, serializerClass, serializerInstanceClass) = {
- if (kryo) {
- ("kryoSerializer",
- classOf[KryoSerializer].getName,
- classOf[KryoSerializerInstance].getName)
- } else {
- ("javaSerializer",
- classOf[JavaSerializer].getName,
- classOf[JavaSerializerInstance].getName)
- }
- }
- // try conf from env, otherwise create a new one
- val env = s"${classOf[SparkEnv].getName}.get()"
- val sparkConf = s"new ${classOf[SparkConf].getName}()"
- ctx.addImmutableStateIfNotExists(serializerInstanceClass, serializer, v =>
- s"""
- |if ($env == null) {
- | $v = ($serializerInstanceClass) new $serializerClass($sparkConf).newInstance();
- |} else {
- | $v = ($serializerInstanceClass) new $serializerClass($env.conf()).newInstance();
- |}
- """.stripMargin)
-
+ val serializer = addImmutableSerializerIfNeeded(ctx)
// Code to deserialize.
val input = child.genCode(ctx)
val javaType = CodeGenerator.javaType(dataType)
http://git-wip-us.apache.org/repos/asf/spark/blob/aff7d81c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
index cbfbb65..346b132 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
@@ -17,7 +17,8 @@
package org.apache.spark.sql.catalyst.expressions
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
@@ -109,4 +110,17 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
InternalRow.fromSeq(Seq(Row(null))),
"The 0th field 'c0' of input row cannot be null.")
}
+
+ test("SPARK-23591: EncodeUsingSerializer should support interpreted execution") {
+ val cls = ObjectType(classOf[java.lang.Integer])
+ val inputObject = BoundReference(0, cls, nullable = true)
+ val conf = new SparkConf()
+ Seq(true, false).foreach { useKryo =>
+ val serializer = if (useKryo) new KryoSerializer(conf) else new JavaSerializer(conf)
+ val expected = serializer.newInstance().serialize(new Integer(1)).array()
+ val encodeUsingSerializer = EncodeUsingSerializer(inputObject, useKryo)
+ checkEvaluation(encodeUsingSerializer, expected, InternalRow.fromSeq(Seq(1)))
+ checkEvaluation(encodeUsingSerializer, null, InternalRow.fromSeq(Seq(null)))
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org