You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/12/05 18:50:30 UTC
spark git commit: Revert "[SPARK-18284][SQL] Make
ExpressionEncoder.serializer.nullable precise"
Repository: spark
Updated Branches:
refs/heads/branch-2.1 afd2321b6 -> 30c074308
Revert "[SPARK-18284][SQL] Make ExpressionEncoder.serializer.nullable precise"
This reverts commit fce1be6cc81b1fe3991a4df91128f4fcd14ff615 from branch-2.1.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/30c07430
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/30c07430
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/30c07430
Branch: refs/heads/branch-2.1
Commit: 30c074308f723f95823b43fbc54e2e4742d52840
Parents: afd2321
Author: Reynold Xin <rx...@databricks.com>
Authored: Mon Dec 5 10:49:22 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Mon Dec 5 10:49:22 2016 -0800
----------------------------------------------------------------------
.../spark/sql/catalyst/JavaTypeInference.scala | 4 +-
.../spark/sql/catalyst/ScalaReflection.scala | 7 +--
.../catalyst/encoders/ExpressionEncoder.scala | 7 ++-
.../expressions/ReferenceToExpressions.scala | 2 +-
.../catalyst/expressions/objects/objects.scala | 24 ++++-----
.../encoders/ExpressionEncoderSuite.scala | 19 +------
.../org/apache/spark/sql/DatasetSuite.scala | 52 +-------------------
.../sql/streaming/FileStreamSinkSuite.scala | 2 +-
8 files changed, 21 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/30c07430/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
index 7e8e4da..04f0cfc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
@@ -396,14 +396,12 @@ object JavaTypeInference {
case _ if mapType.isAssignableFrom(typeToken) =>
val (keyType, valueType) = mapKeyValueType(typeToken)
-
ExternalMapToCatalyst(
inputObject,
ObjectType(keyType.getRawType),
serializerFor(_, keyType),
ObjectType(valueType.getRawType),
- serializerFor(_, valueType),
- valueNullable = true
+ serializerFor(_, valueType)
)
case other =>
http://git-wip-us.apache.org/repos/asf/spark/blob/30c07430/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index 6e20096..0aa21b9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -498,8 +498,7 @@ object ScalaReflection extends ScalaReflection {
dataTypeFor(keyType),
serializerFor(_, keyType, keyPath),
dataTypeFor(valueType),
- serializerFor(_, valueType, valuePath),
- valueNullable = !valueType.typeSymbol.asClass.isPrimitive)
+ serializerFor(_, valueType, valuePath))
case t if t <:< localTypeOf[String] =>
StaticInvoke(
@@ -591,9 +590,7 @@ object ScalaReflection extends ScalaReflection {
"cannot be used as field name\n" + walkedTypePath.mkString("\n"))
}
- val fieldValue = Invoke(
- AssertNotNull(inputObject, walkedTypePath), fieldName, dataTypeFor(fieldType),
- returnNullable = !fieldType.typeSymbol.asClass.isPrimitive)
+ val fieldValue = Invoke(inputObject, fieldName, dataTypeFor(fieldType))
val clsName = getClassNameFromType(fieldType)
val newPath = s"""- field (class: "$clsName", name: "$fieldName")""" +: walkedTypePath
expressions.Literal(fieldName) :: serializerFor(fieldValue, fieldType, newPath) :: Nil
http://git-wip-us.apache.org/repos/asf/spark/blob/30c07430/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
index 3757ecc..9c4818d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
@@ -60,7 +60,7 @@ object ExpressionEncoder {
val cls = mirror.runtimeClass(tpe)
val flat = !ScalaReflection.definedByConstructorParams(tpe)
- val inputObject = BoundReference(0, ScalaReflection.dataTypeFor[T], nullable = !cls.isPrimitive)
+ val inputObject = BoundReference(0, ScalaReflection.dataTypeFor[T], nullable = true)
val nullSafeInput = if (flat) {
inputObject
} else {
@@ -71,7 +71,10 @@ object ExpressionEncoder {
val serializer = ScalaReflection.serializerFor[T](nullSafeInput)
val deserializer = ScalaReflection.deserializerFor[T]
- val schema = serializer.dataType
+ val schema = ScalaReflection.schemaFor[T] match {
+ case ScalaReflection.Schema(s: StructType, _) => s
+ case ScalaReflection.Schema(dt, nullable) => new StructType().add("value", dt, nullable)
+ }
new ExpressionEncoder[T](
schema,
http://git-wip-us.apache.org/repos/asf/spark/blob/30c07430/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala
index 2ca77e8..6c75a7a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ReferenceToExpressions.scala
@@ -74,7 +74,7 @@ case class ReferenceToExpressions(result: Expression, children: Seq[Expression])
ctx.addMutableState("boolean", classChildVarIsNull, "")
val classChildVar =
- LambdaVariable(classChildVarName, classChildVarIsNull, child.dataType, child.nullable)
+ LambdaVariable(classChildVarName, classChildVarIsNull, child.dataType)
val initCode = s"${classChildVar.value} = ${childGen.value};\n" +
s"${classChildVar.isNull} = ${childGen.isNull};"
http://git-wip-us.apache.org/repos/asf/spark/blob/30c07430/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index a8aa1e7..e517ec1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -171,18 +171,15 @@ case class StaticInvoke(
* @param arguments An optional list of expressions, whos evaluation will be passed to the function.
* @param propagateNull When true, and any of the arguments is null, null will be returned instead
* of calling the function.
- * @param returnNullable When false, indicating the invoked method will always return
- * non-null value.
*/
case class Invoke(
targetObject: Expression,
functionName: String,
dataType: DataType,
arguments: Seq[Expression] = Nil,
- propagateNull: Boolean = true,
- returnNullable : Boolean = true) extends InvokeLike {
+ propagateNull: Boolean = true) extends InvokeLike {
- override def nullable: Boolean = targetObject.nullable || needNullCheck || returnNullable
+ override def nullable: Boolean = true
override def children: Seq[Expression] = targetObject +: arguments
override def eval(input: InternalRow): Any =
@@ -408,15 +405,13 @@ case class WrapOption(child: Expression, optType: DataType)
* A place holder for the loop variable used in [[MapObjects]]. This should never be constructed
* manually, but will instead be passed into the provided lambda function.
*/
-case class LambdaVariable(
- value: String,
- isNull: String,
- dataType: DataType,
- nullable: Boolean = true) extends LeafExpression
+case class LambdaVariable(value: String, isNull: String, dataType: DataType) extends LeafExpression
with Unevaluable with NonSQLExpression {
+ override def nullable: Boolean = true
+
override def genCode(ctx: CodegenContext): ExprCode = {
- ExprCode(code = "", value = value, isNull = if (nullable) isNull else "false")
+ ExprCode(code = "", value = value, isNull = isNull)
}
}
@@ -597,8 +592,7 @@ object ExternalMapToCatalyst {
keyType: DataType,
keyConverter: Expression => Expression,
valueType: DataType,
- valueConverter: Expression => Expression,
- valueNullable: Boolean): ExternalMapToCatalyst = {
+ valueConverter: Expression => Expression): ExternalMapToCatalyst = {
val id = curId.getAndIncrement()
val keyName = "ExternalMapToCatalyst_key" + id
val valueName = "ExternalMapToCatalyst_value" + id
@@ -607,11 +601,11 @@ object ExternalMapToCatalyst {
ExternalMapToCatalyst(
keyName,
keyType,
- keyConverter(LambdaVariable(keyName, "false", keyType, false)),
+ keyConverter(LambdaVariable(keyName, "false", keyType)),
valueName,
valueIsNull,
valueType,
- valueConverter(LambdaVariable(valueName, valueIsNull, valueType, valueNullable)),
+ valueConverter(LambdaVariable(valueName, valueIsNull, valueType)),
inputMap
)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/30c07430/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
index 080f11b..4d896c2 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
@@ -24,7 +24,7 @@ import java.util.Arrays
import scala.collection.mutable.ArrayBuffer
import scala.reflect.runtime.universe.TypeTag
-import org.apache.spark.sql.{Encoder, Encoders}
+import org.apache.spark.sql.Encoders
import org.apache.spark.sql.catalyst.{OptionalData, PrimitiveData}
import org.apache.spark.sql.catalyst.analysis.AnalysisTest
import org.apache.spark.sql.catalyst.dsl.plans._
@@ -300,11 +300,6 @@ class ExpressionEncoderSuite extends PlanTest with AnalysisTest {
encodeDecodeTest(
ReferenceValueClass(ReferenceValueClass.Container(1)), "reference value class")
- encodeDecodeTest(Option(31), "option of int")
- encodeDecodeTest(Option.empty[Int], "empty option of int")
- encodeDecodeTest(Option("abc"), "option of string")
- encodeDecodeTest(Option.empty[String], "empty option of string")
-
productTest(("UDT", new ExamplePoint(0.1, 0.2)))
test("nullable of encoder schema") {
@@ -343,18 +338,6 @@ class ExpressionEncoderSuite extends PlanTest with AnalysisTest {
}
}
- test("nullable of encoder serializer") {
- def checkNullable[T: Encoder](nullable: Boolean): Unit = {
- assert(encoderFor[T].serializer.forall(_.nullable === nullable))
- }
-
- // test for flat encoders
- checkNullable[Int](false)
- checkNullable[Option[Int]](true)
- checkNullable[java.lang.Integer](true)
- checkNullable[String](true)
- }
-
test("null check for map key") {
val encoder = ExpressionEncoder[Map[String, Int]]()
val e = intercept[RuntimeException](encoder.toRow(Map(("a", 1), (null, 2))))
http://git-wip-us.apache.org/repos/asf/spark/blob/30c07430/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index d31c766..1174d73 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -28,10 +28,7 @@ import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types._
-
-case class TestDataPoint(x: Int, y: Double, s: String, t: TestDataPoint2)
-case class TestDataPoint2(x: Int, s: String)
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
class DatasetSuite extends QueryTest with SharedSQLContext {
import testImplicits._
@@ -972,53 +969,6 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
assert(dataset.collect() sameElements Array(resultValue, resultValue))
}
- test("SPARK-18284: Serializer should have correct nullable value") {
- val df1 = Seq(1, 2, 3, 4).toDF
- assert(df1.schema(0).nullable == false)
- val df2 = Seq(Integer.valueOf(1), Integer.valueOf(2)).toDF
- assert(df2.schema(0).nullable == true)
-
- val df3 = Seq(Seq(1, 2), Seq(3, 4)).toDF
- assert(df3.schema(0).nullable == true)
- assert(df3.schema(0).dataType.asInstanceOf[ArrayType].containsNull == false)
- val df4 = Seq(Seq("a", "b"), Seq("c", "d")).toDF
- assert(df4.schema(0).nullable == true)
- assert(df4.schema(0).dataType.asInstanceOf[ArrayType].containsNull == true)
-
- val df5 = Seq((0, 1.0), (2, 2.0)).toDF("id", "v")
- assert(df5.schema(0).nullable == false)
- assert(df5.schema(1).nullable == false)
- val df6 = Seq((0, 1.0, "a"), (2, 2.0, "b")).toDF("id", "v1", "v2")
- assert(df6.schema(0).nullable == false)
- assert(df6.schema(1).nullable == false)
- assert(df6.schema(2).nullable == true)
-
- val df7 = (Tuple1(Array(1, 2, 3)) :: Nil).toDF("a")
- assert(df7.schema(0).nullable == true)
- assert(df7.schema(0).dataType.asInstanceOf[ArrayType].containsNull == false)
-
- val df8 = (Tuple1(Array((null: Integer), (null: Integer))) :: Nil).toDF("a")
- assert(df8.schema(0).nullable == true)
- assert(df8.schema(0).dataType.asInstanceOf[ArrayType].containsNull == true)
-
- val df9 = (Tuple1(Map(2 -> 3)) :: Nil).toDF("m")
- assert(df9.schema(0).nullable == true)
- assert(df9.schema(0).dataType.asInstanceOf[MapType].valueContainsNull == false)
-
- val df10 = (Tuple1(Map(1 -> (null: Integer))) :: Nil).toDF("m")
- assert(df10.schema(0).nullable == true)
- assert(df10.schema(0).dataType.asInstanceOf[MapType].valueContainsNull == true)
-
- val df11 = Seq(TestDataPoint(1, 2.2, "a", null),
- TestDataPoint(3, 4.4, "null", (TestDataPoint2(33, "b")))).toDF
- assert(df11.schema(0).nullable == false)
- assert(df11.schema(1).nullable == false)
- assert(df11.schema(2).nullable == true)
- assert(df11.schema(3).nullable == true)
- assert(df11.schema(3).dataType.asInstanceOf[StructType].fields(0).nullable == false)
- assert(df11.schema(3).dataType.asInstanceOf[StructType].fields(1).nullable == true)
- }
-
Seq(true, false).foreach { eager =>
def testCheckpointing(testName: String)(f: => Unit): Unit = {
test(s"Dataset.checkpoint() - $testName (eager = $eager)") {
http://git-wip-us.apache.org/repos/asf/spark/blob/30c07430/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 54efae3..09613ef 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -86,7 +86,7 @@ class FileStreamSinkSuite extends StreamTest {
val outputDf = spark.read.parquet(outputDir)
val expectedSchema = new StructType()
- .add(StructField("value", IntegerType, nullable = false))
+ .add(StructField("value", IntegerType))
.add(StructField("id", IntegerType))
assert(outputDf.schema === expectedSchema)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org