You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2018/03/08 13:18:18 UTC
spark git commit: [SPARK-23592][SQL] Add interpreted execution to
DecodeUsingSerializer
Repository: spark
Updated Branches:
refs/heads/master 7013eea11 -> 92e7ecbbb
[SPARK-23592][SQL] Add interpreted execution to DecodeUsingSerializer
## What changes were proposed in this pull request?
The PR adds interpreted execution to DecodeUsingSerializer.
## How was this patch tested?
added UT
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Marco Gaido <ma...@gmail.com>
Closes #20760 from mgaido91/SPARK-23592.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/92e7ecbb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/92e7ecbb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/92e7ecbb
Branch: refs/heads/master
Commit: 92e7ecbbbd6817378abdbd56541a9c13dcea8659
Parents: 7013eea
Author: Marco Gaido <ma...@gmail.com>
Authored: Thu Mar 8 14:18:14 2018 +0100
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Thu Mar 8 14:18:14 2018 +0100
----------------------------------------------------------------------
.../sql/catalyst/expressions/objects/objects.scala | 5 +++++
.../expressions/ObjectExpressionsSuite.scala | 15 +++++++++++++++
2 files changed, 20 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/92e7ecbb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index 7bbc3c7..adf9ddf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -1242,6 +1242,11 @@ case class EncodeUsingSerializer(child: Expression, kryo: Boolean)
case class DecodeUsingSerializer[T](child: Expression, tag: ClassTag[T], kryo: Boolean)
extends UnaryExpression with NonSQLExpression with SerializerSupport {
+ override def nullSafeEval(input: Any): Any = {
+ val inputBytes = java.nio.ByteBuffer.wrap(input.asInstanceOf[Array[Byte]])
+ serializerInstance.deserialize(inputBytes)
+ }
+
override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val serializer = addImmutableSerializerIfNeeded(ctx)
// Code to deserialize.
http://git-wip-us.apache.org/repos/asf/spark/blob/92e7ecbb/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
index 346b132..ffeec2a 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.catalyst.expressions
+import scala.reflect.ClassTag
+
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
import org.apache.spark.sql.Row
@@ -123,4 +125,17 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(encodeUsingSerializer, null, InternalRow.fromSeq(Seq(null)))
}
}
+
+ test("SPARK-23592: DecodeUsingSerializer should support interpreted execution") {
+ val cls = classOf[java.lang.Integer]
+ val inputObject = BoundReference(0, ObjectType(classOf[Array[Byte]]), nullable = true)
+ val conf = new SparkConf()
+ Seq(true, false).foreach { useKryo =>
+ val serializer = if (useKryo) new KryoSerializer(conf) else new JavaSerializer(conf)
+ val input = serializer.newInstance().serialize(new Integer(1)).array()
+ val decodeUsingSerializer = DecodeUsingSerializer(inputObject, ClassTag(cls), useKryo)
+ checkEvaluation(decodeUsingSerializer, new Integer(1), InternalRow.fromSeq(Seq(input)))
+ checkEvaluation(decodeUsingSerializer, null, InternalRow.fromSeq(Seq(null)))
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org