You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/10/22 22:11:43 UTC
spark git commit: [SPARK-9735][SQL] Respect the user specified schema
than the infer partition schema for HadoopFsRelation
Repository: spark
Updated Branches:
refs/heads/master 3535b91dd -> d4950e6be
[SPARK-9735][SQL] Respect the user specified schema than the infer partition schema for HadoopFsRelation
To enable the unit test of `hadoopFsRelationSuite.Partition column type casting`. It previously threw exception like below, as we treat the auto infer partition schema with higher priority than the user specified one.
```
java.lang.ClassCastException: java.lang.Integer cannot be cast to org.apache.spark.unsafe.types.UTF8String
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getUTF8String(rows.scala:45)
at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:220)
at org.apache.spark.sql.catalyst.expressions.JoinedRow.getUTF8String(JoinedRow.scala:102)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(generated.java:62)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$17$$anonfun$apply$9.apply(DataSourceStrategy.scala:212)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$17$$anonfun$apply$9.apply(DataSourceStrategy.scala:212)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:903)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:903)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1846)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1846)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
07:44:01.344 ERROR org.apache.spark.executor.Executor: Exception in task 14.0 in stage 3.0 (TID 206)
java.lang.ClassCastException: java.lang.Integer cannot be cast to org.apache.spark.unsafe.types.UTF8String
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getUTF8String(rows.scala:45)
at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:220)
at org.apache.spark.sql.catalyst.expressions.JoinedRow.getUTF8String(JoinedRow.scala:102)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(generated.java:62)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$17$$anonfun$apply$9.apply(DataSourceStrategy.scala:212)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$17$$anonfun$apply$9.apply(DataSourceStrategy.scala:212)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:903)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:903)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1846)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1846)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
```
Author: Cheng Hao <ha...@intel.com>
Closes #8026 from chenghao-intel/partition_discovery.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d4950e6b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d4950e6b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d4950e6b
Branch: refs/heads/master
Commit: d4950e6be48954125eeb1be550c102636521bde3
Parents: 3535b91
Author: Cheng Hao <ha...@intel.com>
Authored: Thu Oct 22 13:11:37 2015 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Thu Oct 22 13:11:37 2015 -0700
----------------------------------------------------------------------
.../apache/spark/sql/sources/interfaces.scala | 29 ++++++++++++--
.../sql/sources/hadoopFsRelationSuites.scala | 42 ++++++++++++++------
2 files changed, 55 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/d4950e6b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 7b030b7..84eef0f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
import org.apache.spark.sql.execution.{FileRelation, RDDConversions}
import org.apache.spark.sql.execution.datasources.{PartitioningUtils, PartitionSpec, Partition}
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql._
import org.apache.spark.util.SerializableConfiguration
@@ -544,11 +544,32 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
}
private def discoverPartitions(): PartitionSpec = {
- val typeInference = sqlContext.conf.partitionColumnTypeInferenceEnabled()
// We use leaf dirs containing data files to discover the schema.
val leafDirs = fileStatusCache.leafDirToChildrenFiles.keys.toSeq
- PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME,
- typeInference)
+ userDefinedPartitionColumns match {
+ case Some(userProvidedSchema) if userProvidedSchema.nonEmpty =>
+ val spec = PartitioningUtils.parsePartitions(
+ leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME, typeInference = false)
+
+ // Without auto inference, all of value in the `row` should be null or in StringType,
+ // we need to cast into the data type that user specified.
+ def castPartitionValuesToUserSchema(row: InternalRow) = {
+ InternalRow((0 until row.numFields).map { i =>
+ Cast(
+ Literal.create(row.getString(i), StringType),
+ userProvidedSchema.fields(i).dataType).eval()
+ }: _*)
+ }
+
+ PartitionSpec(userProvidedSchema, spec.partitions.map { part =>
+ part.copy(values = castPartitionValuesToUserSchema(part.values))
+ })
+
+ case _ =>
+ // user did not provide a partitioning schema
+ PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME,
+ typeInference = sqlContext.conf.partitionColumnTypeInferenceEnabled())
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/d4950e6b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 42b9b3d..e3605bb 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -510,21 +510,39 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
}
}
- // HadoopFsRelation.discoverPartitions() called by refresh(), which will ignore
- // the given partition data type.
- ignore("Partition column type casting") {
+ test("SPARK-9735 Partition column type casting") {
withTempPath { file =>
- val input = partitionedTestDF.select('a, 'b, 'p1.cast(StringType).as('ps), 'p2)
-
- input
- .write
- .format(dataSourceName)
- .mode(SaveMode.Overwrite)
- .partitionBy("ps", "p2")
- .saveAsTable("t")
+ val df = (for {
+ i <- 1 to 3
+ p2 <- Seq("foo", "bar")
+ } yield (i, s"val_$i", 1.0d, p2, 123, 123.123f)).toDF("a", "b", "p1", "p2", "p3", "f")
+
+ val input = df.select(
+ 'a,
+ 'b,
+ 'p1.cast(StringType).as('ps1),
+ 'p2,
+ 'p3.cast(FloatType).as('pf1),
+ 'f)
withTempTable("t") {
- checkAnswer(sqlContext.table("t"), input.collect())
+ input
+ .write
+ .format(dataSourceName)
+ .mode(SaveMode.Overwrite)
+ .partitionBy("ps1", "p2", "pf1", "f")
+ .saveAsTable("t")
+
+ input
+ .write
+ .format(dataSourceName)
+ .mode(SaveMode.Append)
+ .partitionBy("ps1", "p2", "pf1", "f")
+ .saveAsTable("t")
+
+ val realData = input.collect()
+
+ checkAnswer(sqlContext.table("t"), realData ++ realData)
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org