You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/10/22 22:11:43 UTC

spark git commit: [SPARK-9735][SQL] Respect the user specified schema than the infer partition schema for HadoopFsRelation

Repository: spark
Updated Branches:
  refs/heads/master 3535b91dd -> d4950e6be


[SPARK-9735][SQL] Respect the user specified schema than the infer partition schema for HadoopFsRelation

To enable the unit test of `hadoopFsRelationSuite.Partition column type casting`. It previously threw exception like below, as we treat the auto infer partition schema with higher priority than the user specified one.

```
java.lang.ClassCastException: java.lang.Integer cannot be cast to org.apache.spark.unsafe.types.UTF8String
	at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getUTF8String(rows.scala:45)
	at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:220)
	at org.apache.spark.sql.catalyst.expressions.JoinedRow.getUTF8String(JoinedRow.scala:102)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(generated.java:62)
	at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$17$$anonfun$apply$9.apply(DataSourceStrategy.scala:212)
	at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$17$$anonfun$apply$9.apply(DataSourceStrategy.scala:212)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
	at scala.collection.AbstractIterator.to(Iterator.scala:1157)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:903)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:903)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1846)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1846)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:88)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)
07:44:01.344 ERROR org.apache.spark.executor.Executor: Exception in task 14.0 in stage 3.0 (TID 206)
java.lang.ClassCastException: java.lang.Integer cannot be cast to org.apache.spark.unsafe.types.UTF8String
	at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getUTF8String(rows.scala:45)
	at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:220)
	at org.apache.spark.sql.catalyst.expressions.JoinedRow.getUTF8String(JoinedRow.scala:102)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(generated.java:62)
	at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$17$$anonfun$apply$9.apply(DataSourceStrategy.scala:212)
	at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$17$$anonfun$apply$9.apply(DataSourceStrategy.scala:212)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
	at scala.collection.AbstractIterator.to(Iterator.scala:1157)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:903)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:903)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1846)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1846)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:88)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)
```

Author: Cheng Hao <ha...@intel.com>

Closes #8026 from chenghao-intel/partition_discovery.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d4950e6b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d4950e6b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d4950e6b

Branch: refs/heads/master
Commit: d4950e6be48954125eeb1be550c102636521bde3
Parents: 3535b91
Author: Cheng Hao <ha...@intel.com>
Authored: Thu Oct 22 13:11:37 2015 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Thu Oct 22 13:11:37 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/sources/interfaces.scala   | 29 ++++++++++++--
 .../sql/sources/hadoopFsRelationSuites.scala    | 42 ++++++++++++++------
 2 files changed, 55 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d4950e6b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 7b030b7..84eef0f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
 import org.apache.spark.sql.execution.{FileRelation, RDDConversions}
 import org.apache.spark.sql.execution.datasources.{PartitioningUtils, PartitionSpec, Partition}
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.sql._
 import org.apache.spark.util.SerializableConfiguration
 
@@ -544,11 +544,32 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
   }
 
   private def discoverPartitions(): PartitionSpec = {
-    val typeInference = sqlContext.conf.partitionColumnTypeInferenceEnabled()
     // We use leaf dirs containing data files to discover the schema.
     val leafDirs = fileStatusCache.leafDirToChildrenFiles.keys.toSeq
-    PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME,
-      typeInference)
+    userDefinedPartitionColumns match {
+      case Some(userProvidedSchema) if userProvidedSchema.nonEmpty =>
+        val spec = PartitioningUtils.parsePartitions(
+          leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME, typeInference = false)
+
+        // Without auto inference, all of value in the `row` should be null or in StringType,
+        // we need to cast into the data type that user specified.
+        def castPartitionValuesToUserSchema(row: InternalRow) = {
+          InternalRow((0 until row.numFields).map { i =>
+            Cast(
+              Literal.create(row.getString(i), StringType),
+              userProvidedSchema.fields(i).dataType).eval()
+          }: _*)
+        }
+
+        PartitionSpec(userProvidedSchema, spec.partitions.map { part =>
+          part.copy(values = castPartitionValuesToUserSchema(part.values))
+        })
+
+      case _ =>
+        // user did not provide a partitioning schema
+        PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME,
+          typeInference = sqlContext.conf.partitionColumnTypeInferenceEnabled())
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/d4950e6b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 42b9b3d..e3605bb 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -510,21 +510,39 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
     }
   }
 
-  // HadoopFsRelation.discoverPartitions() called by refresh(), which will ignore
-  // the given partition data type.
-  ignore("Partition column type casting") {
+  test("SPARK-9735 Partition column type casting") {
     withTempPath { file =>
-      val input = partitionedTestDF.select('a, 'b, 'p1.cast(StringType).as('ps), 'p2)
-
-      input
-        .write
-        .format(dataSourceName)
-        .mode(SaveMode.Overwrite)
-        .partitionBy("ps", "p2")
-        .saveAsTable("t")
+      val df = (for {
+        i <- 1 to 3
+        p2 <- Seq("foo", "bar")
+      } yield (i, s"val_$i", 1.0d, p2, 123, 123.123f)).toDF("a", "b", "p1", "p2", "p3", "f")
+
+      val input = df.select(
+        'a,
+        'b,
+        'p1.cast(StringType).as('ps1),
+        'p2,
+        'p3.cast(FloatType).as('pf1),
+        'f)
 
       withTempTable("t") {
-        checkAnswer(sqlContext.table("t"), input.collect())
+        input
+          .write
+          .format(dataSourceName)
+          .mode(SaveMode.Overwrite)
+          .partitionBy("ps1", "p2", "pf1", "f")
+          .saveAsTable("t")
+
+        input
+          .write
+          .format(dataSourceName)
+          .mode(SaveMode.Append)
+          .partitionBy("ps1", "p2", "pf1", "f")
+          .saveAsTable("t")
+
+        val realData = input.collect()
+
+        checkAnswer(sqlContext.table("t"), realData ++ realData)
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org