You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2018/03/07 17:32:04 UTC

spark git commit: [SPARK-23591][SQL] Add interpreted execution to EncodeUsingSerializer

Repository: spark
Updated Branches:
  refs/heads/master 33c2cb22b -> aff7d81cb


[SPARK-23591][SQL] Add interpreted execution to EncodeUsingSerializer

## What changes were proposed in this pull request?

The PR adds interpreted execution to EncodeUsingSerializer.

## How was this patch tested?

added UT

Author: Marco Gaido <ma...@gmail.com>

Closes #20751 from mgaido91/SPARK-23591.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aff7d81c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aff7d81c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aff7d81c

Branch: refs/heads/master
Commit: aff7d81cb73133483fc2256ca10e21b4b8101647
Parents: 33c2cb2
Author: Marco Gaido <ma...@gmail.com>
Authored: Wed Mar 7 18:31:59 2018 +0100
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Wed Mar 7 18:31:59 2018 +0100

----------------------------------------------------------------------
 .../catalyst/expressions/objects/objects.scala  | 114 ++++++++++---------
 .../expressions/ObjectExpressionsSuite.scala    |  16 ++-
 2 files changed, 77 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/aff7d81c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index 721d589..7bbc3c7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -106,6 +106,61 @@ trait InvokeLike extends Expression with NonSQLExpression {
 }
 
 /**
+ * Common trait for [[DecodeUsingSerializer]] and [[EncodeUsingSerializer]]
+ */
+trait SerializerSupport {
+  /**
+   * If true, Kryo serialization is used, otherwise the Java one is used
+   */
+  val kryo: Boolean
+
+  /**
+   * The serializer instance to be used for serialization/deserialization in interpreted execution
+   */
+  lazy val serializerInstance: SerializerInstance = SerializerSupport.newSerializer(kryo)
+
+  /**
+   * Adds a immutable state to the generated class containing a reference to the serializer.
+   * @return a string containing the name of the variable referencing the serializer
+   */
+  def addImmutableSerializerIfNeeded(ctx: CodegenContext): String = {
+    val (serializerInstance, serializerInstanceClass) = {
+      if (kryo) {
+        ("kryoSerializer",
+          classOf[KryoSerializerInstance].getName)
+      } else {
+        ("javaSerializer",
+          classOf[JavaSerializerInstance].getName)
+      }
+    }
+    val newSerializerMethod = s"${classOf[SerializerSupport].getName}$$.MODULE$$.newSerializer"
+    // Code to initialize the serializer
+    ctx.addImmutableStateIfNotExists(serializerInstanceClass, serializerInstance, v =>
+      s"""
+         |$v = ($serializerInstanceClass) $newSerializerMethod($kryo);
+       """.stripMargin)
+    serializerInstance
+  }
+}
+
+object SerializerSupport {
+  /**
+   * It creates a new `SerializerInstance` which is either a `KryoSerializerInstance` (is
+   * `useKryo` is set to `true`) or a `JavaSerializerInstance`.
+   */
+  def newSerializer(useKryo: Boolean): SerializerInstance = {
+    // try conf from env, otherwise create a new one
+    val conf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf)
+    val s = if (useKryo) {
+      new KryoSerializer(conf)
+    } else {
+      new JavaSerializer(conf)
+    }
+    s.newInstance()
+  }
+}
+
+/**
  * Invokes a static function, returning the result.  By default, any of the arguments being null
  * will result in returning null instead of calling the function.
  *
@@ -1154,36 +1209,14 @@ case class CreateExternalRow(children: Seq[Expression], schema: StructType)
  * @param kryo if true, use Kryo. Otherwise, use Java.
  */
 case class EncodeUsingSerializer(child: Expression, kryo: Boolean)
-  extends UnaryExpression with NonSQLExpression {
+  extends UnaryExpression with NonSQLExpression with SerializerSupport {
 
-  override def eval(input: InternalRow): Any =
-    throw new UnsupportedOperationException("Only code-generated evaluation is supported")
+  override def nullSafeEval(input: Any): Any = {
+    serializerInstance.serialize(input).array()
+  }
 
   override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
-    // Code to initialize the serializer.
-    val (serializer, serializerClass, serializerInstanceClass) = {
-      if (kryo) {
-        ("kryoSerializer",
-          classOf[KryoSerializer].getName,
-          classOf[KryoSerializerInstance].getName)
-      } else {
-        ("javaSerializer",
-          classOf[JavaSerializer].getName,
-          classOf[JavaSerializerInstance].getName)
-      }
-    }
-    // try conf from env, otherwise create a new one
-    val env = s"${classOf[SparkEnv].getName}.get()"
-    val sparkConf = s"new ${classOf[SparkConf].getName}()"
-    ctx.addImmutableStateIfNotExists(serializerInstanceClass, serializer, v =>
-      s"""
-         |if ($env == null) {
-         |  $v = ($serializerInstanceClass) new $serializerClass($sparkConf).newInstance();
-         |} else {
-         |  $v = ($serializerInstanceClass) new $serializerClass($env.conf()).newInstance();
-         |}
-       """.stripMargin)
-
+    val serializer = addImmutableSerializerIfNeeded(ctx)
     // Code to serialize.
     val input = child.genCode(ctx)
     val javaType = CodeGenerator.javaType(dataType)
@@ -1207,33 +1240,10 @@ case class EncodeUsingSerializer(child: Expression, kryo: Boolean)
  * @param kryo if true, use Kryo. Otherwise, use Java.
  */
 case class DecodeUsingSerializer[T](child: Expression, tag: ClassTag[T], kryo: Boolean)
-  extends UnaryExpression with NonSQLExpression {
+  extends UnaryExpression with NonSQLExpression with SerializerSupport {
 
   override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
-    // Code to initialize the serializer.
-    val (serializer, serializerClass, serializerInstanceClass) = {
-      if (kryo) {
-        ("kryoSerializer",
-          classOf[KryoSerializer].getName,
-          classOf[KryoSerializerInstance].getName)
-      } else {
-        ("javaSerializer",
-          classOf[JavaSerializer].getName,
-          classOf[JavaSerializerInstance].getName)
-      }
-    }
-    // try conf from env, otherwise create a new one
-    val env = s"${classOf[SparkEnv].getName}.get()"
-    val sparkConf = s"new ${classOf[SparkConf].getName}()"
-    ctx.addImmutableStateIfNotExists(serializerInstanceClass, serializer, v =>
-      s"""
-         |if ($env == null) {
-         |  $v = ($serializerInstanceClass) new $serializerClass($sparkConf).newInstance();
-         |} else {
-         |  $v = ($serializerInstanceClass) new $serializerClass($env.conf()).newInstance();
-         |}
-       """.stripMargin)
-
+    val serializer = addImmutableSerializerIfNeeded(ctx)
     // Code to deserialize.
     val input = child.genCode(ctx)
     val javaType = CodeGenerator.javaType(dataType)

http://git-wip-us.apache.org/repos/asf/spark/blob/aff7d81c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
index cbfbb65..346b132 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
@@ -109,4 +110,17 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
       InternalRow.fromSeq(Seq(Row(null))),
       "The 0th field 'c0' of input row cannot be null.")
   }
+
+  test("SPARK-23591: EncodeUsingSerializer should support interpreted execution") {
+    val cls = ObjectType(classOf[java.lang.Integer])
+    val inputObject = BoundReference(0, cls, nullable = true)
+    val conf = new SparkConf()
+    Seq(true, false).foreach { useKryo =>
+      val serializer = if (useKryo) new KryoSerializer(conf) else new JavaSerializer(conf)
+      val expected = serializer.newInstance().serialize(new Integer(1)).array()
+      val encodeUsingSerializer = EncodeUsingSerializer(inputObject, useKryo)
+      checkEvaluation(encodeUsingSerializer, expected, InternalRow.fromSeq(Seq(1)))
+      checkEvaluation(encodeUsingSerializer, null, InternalRow.fromSeq(Seq(null)))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org