You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2018/03/08 13:18:18 UTC

spark git commit: [SPARK-23592][SQL] Add interpreted execution to DecodeUsingSerializer

Repository: spark
Updated Branches:
  refs/heads/master 7013eea11 -> 92e7ecbbb


[SPARK-23592][SQL] Add interpreted execution to DecodeUsingSerializer

## What changes were proposed in this pull request?

The PR adds interpreted execution to DecodeUsingSerializer.

## How was this patch tested?
added UT

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Marco Gaido <ma...@gmail.com>

Closes #20760 from mgaido91/SPARK-23592.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/92e7ecbb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/92e7ecbb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/92e7ecbb

Branch: refs/heads/master
Commit: 92e7ecbbbd6817378abdbd56541a9c13dcea8659
Parents: 7013eea
Author: Marco Gaido <ma...@gmail.com>
Authored: Thu Mar 8 14:18:14 2018 +0100
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Thu Mar 8 14:18:14 2018 +0100

----------------------------------------------------------------------
 .../sql/catalyst/expressions/objects/objects.scala   |  5 +++++
 .../expressions/ObjectExpressionsSuite.scala         | 15 +++++++++++++++
 2 files changed, 20 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/92e7ecbb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index 7bbc3c7..adf9ddf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -1242,6 +1242,11 @@ case class EncodeUsingSerializer(child: Expression, kryo: Boolean)
 case class DecodeUsingSerializer[T](child: Expression, tag: ClassTag[T], kryo: Boolean)
   extends UnaryExpression with NonSQLExpression with SerializerSupport {
 
+  override def nullSafeEval(input: Any): Any = {
+    val inputBytes = java.nio.ByteBuffer.wrap(input.asInstanceOf[Array[Byte]])
+    serializerInstance.deserialize(inputBytes)
+  }
+
   override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
     val serializer = addImmutableSerializerIfNeeded(ctx)
     // Code to deserialize.

http://git-wip-us.apache.org/repos/asf/spark/blob/92e7ecbb/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
index 346b132..ffeec2a 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
+import scala.reflect.ClassTag
+
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
 import org.apache.spark.sql.Row
@@ -123,4 +125,17 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
       checkEvaluation(encodeUsingSerializer, null, InternalRow.fromSeq(Seq(null)))
     }
   }
+
+  test("SPARK-23592: DecodeUsingSerializer should support interpreted execution") {
+    val cls = classOf[java.lang.Integer]
+    val inputObject = BoundReference(0, ObjectType(classOf[Array[Byte]]), nullable = true)
+    val conf = new SparkConf()
+    Seq(true, false).foreach { useKryo =>
+      val serializer = if (useKryo) new KryoSerializer(conf) else new JavaSerializer(conf)
+      val input = serializer.newInstance().serialize(new Integer(1)).array()
+      val decodeUsingSerializer = DecodeUsingSerializer(inputObject, ClassTag(cls), useKryo)
+      checkEvaluation(decodeUsingSerializer, new Integer(1), InternalRow.fromSeq(Seq(input)))
+      checkEvaluation(decodeUsingSerializer, null, InternalRow.fromSeq(Seq(null)))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org