You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/04/03 03:14:40 UTC

git commit: [SQL] SPARK-1364 Improve datatype and test coverage for ScalaReflection schema inference.

Repository: spark
Updated Branches:
  refs/heads/master 9c65fa76f -> 47ebea546


[SQL] SPARK-1364 Improve datatype and test coverage for ScalaReflection schema inference.

Author: Michael Armbrust <mi...@databricks.com>

Closes #293 from marmbrus/reflectTypes and squashes the following commits:

f54e8e8 [Michael Armbrust] Improve datatype and test coverage for ScalaReflection schema inference.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/47ebea54
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/47ebea54
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/47ebea54

Branch: refs/heads/master
Commit: 47ebea5468df2e4f94ef493c5403fcdcda8c5eb2
Parents: 9c65fa7
Author: Michael Armbrust <mi...@databricks.com>
Authored: Wed Apr 2 18:14:31 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Wed Apr 2 18:14:31 2014 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/ScalaReflection.scala    | 10 ++++
 .../sql/ScalaReflectionRelationSuite.scala      | 56 ++++++++++++++++++++
 2 files changed, 66 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/47ebea54/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index 976dda8..5aaa63b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -43,15 +43,25 @@ object ScalaReflection {
       val params = t.member("<init>": TermName).asMethod.paramss
       StructType(
         params.head.map(p => StructField(p.name.toString, schemaFor(p.typeSignature), true)))
+    // Need to decide if we actually need a special type here.
+    case t if t <:< typeOf[Array[Byte]] => BinaryType
+    case t if t <:< typeOf[Array[_]] =>
+      sys.error(s"Only Array[Byte] supported now, use Seq instead of $t")
     case t if t <:< typeOf[Seq[_]] =>
       val TypeRef(_, _, Seq(elementType)) = t
       ArrayType(schemaFor(elementType))
+    case t if t <:< typeOf[Map[_,_]] =>
+      val TypeRef(_, _, Seq(keyType, valueType)) = t
+      MapType(schemaFor(keyType), schemaFor(valueType))
     case t if t <:< typeOf[String] => StringType
     case t if t <:< definitions.IntTpe => IntegerType
     case t if t <:< definitions.LongTpe => LongType
+    case t if t <:< definitions.FloatTpe => FloatType
     case t if t <:< definitions.DoubleTpe => DoubleType
     case t if t <:< definitions.ShortTpe => ShortType
     case t if t <:< definitions.ByteTpe => ByteType
+    case t if t <:< definitions.BooleanTpe => BooleanType
+    case t if t <:< typeOf[BigDecimal] => DecimalType
   }
 
   implicit class CaseClassRelation[A <: Product : TypeTag](data: Seq[A]) {

http://git-wip-us.apache.org/repos/asf/spark/blob/47ebea54/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala
new file mode 100644
index 0000000..70033a0
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.sql.test.TestSQLContext._
+
+case class ReflectData(
+    stringField: String,
+    intField: Int,
+    longField: Long,
+    floatField: Float,
+    doubleField: Double,
+    shortField: Short,
+    byteField: Byte,
+    booleanField: Boolean,
+    decimalField: BigDecimal,
+    seqInt: Seq[Int])
+
+case class ReflectBinary(data: Array[Byte])
+
+class ScalaReflectionRelationSuite extends FunSuite {
+  test("query case class RDD") {
+    val data = ReflectData("a", 1, 1L, 1.toFloat, 1.toDouble, 1.toShort, 1.toByte, true,
+                           BigDecimal(1), Seq(1,2,3))
+    val rdd = sparkContext.parallelize(data :: Nil)
+    rdd.registerAsTable("reflectData")
+
+    assert(sql("SELECT * FROM reflectData").collect().head === data.productIterator.toSeq)
+  }
+
+  // Equality is broken for Arrays, so we test that separately.
+  test("query binary data") {
+    val rdd = sparkContext.parallelize(ReflectBinary(Array[Byte](1)) :: Nil)
+    rdd.registerAsTable("reflectBinary")
+
+    val result = sql("SELECT data FROM reflectBinary").collect().head(0).asInstanceOf[Array[Byte]]
+    assert(result.toSeq === Seq[Byte](1))
+  }
+}
\ No newline at end of file