You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/07/25 20:42:55 UTC
spark git commit: [SPARK-9285] [SQL] Fixes Row/InternalRow conversion
for HadoopFsRelation
Repository: spark
Updated Branches:
refs/heads/master c980e20cf -> e2ec018e3
[SPARK-9285] [SQL] Fixes Row/InternalRow conversion for HadoopFsRelation
This is a follow-up of #7626. It fixes `Row`/`InternalRow` conversion for data sources extending `HadoopFsRelation` with `needConversion` being `true`.
Author: Cheng Lian <li...@databricks.com>
Closes #7649 from liancheng/spark-9285-conversion-fix and squashes the following commits:
036a50c [Cheng Lian] Addresses PR comment
f6d7c6a [Cheng Lian] Fixes Row/InternalRow conversion for HadoopFsRelation
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e2ec018e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e2ec018e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e2ec018e
Branch: refs/heads/master
Commit: e2ec018e37cb699077b5fa2bd662f2055cb42296
Parents: c980e20
Author: Cheng Lian <li...@databricks.com>
Authored: Sat Jul 25 11:42:49 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Sat Jul 25 11:42:49 2015 -0700
----------------------------------------------------------------------
.../apache/spark/sql/sources/interfaces.scala | 23 +++++++++++++++++---
.../SimpleTextHadoopFsRelationSuite.scala | 5 -----
2 files changed, 20 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/e2ec018e/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 119bac7..7126145 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -28,7 +28,7 @@ import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
import org.apache.spark.sql.execution.RDDConversions
@@ -593,6 +593,11 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
*
* @since 1.4.0
*/
+ // TODO Tries to eliminate the extra Catalyst-to-Scala conversion when `needConversion` is true
+ //
+ // PR #7626 separated `Row` and `InternalRow` completely. One of the consequences is that we can
+ // no longer treat an `InternalRow` containing Catalyst values as a `Row`. Thus we have to
+ // introduce another row value conversion for data sources whose `needConversion` is true.
def buildScan(requiredColumns: Array[String], inputFiles: Array[FileStatus]): RDD[Row] = {
// Yeah, to workaround serialization...
val dataSchema = this.dataSchema
@@ -611,14 +616,26 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
} else {
rdd.asInstanceOf[RDD[InternalRow]]
}
+
converted.mapPartitions { rows =>
val buildProjection = if (codegenEnabled) {
GenerateMutableProjection.generate(requiredOutput, dataSchema.toAttributes)
} else {
() => new InterpretedMutableProjection(requiredOutput, dataSchema.toAttributes)
}
- val mutableProjection = buildProjection()
- rows.map(r => mutableProjection(r))
+
+ val projectedRows = {
+ val mutableProjection = buildProjection()
+ rows.map(r => mutableProjection(r))
+ }
+
+ if (needConversion) {
+ val requiredSchema = StructType(requiredColumns.map(dataSchema(_)))
+ val toScala = CatalystTypeConverters.createToScalaConverter(requiredSchema)
+ projectedRows.map(toScala(_).asInstanceOf[Row])
+ } else {
+ projectedRows
+ }
}.asInstanceOf[RDD[Row]]
}
http://git-wip-us.apache.org/repos/asf/spark/blob/e2ec018e/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
index d761909..e8975e5 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
@@ -22,10 +22,6 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
-/*
-This is commented out due a bug in the data source API (SPARK-9291).
-
-
class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
override val dataSourceName: String = classOf[SimpleTextSource].getCanonicalName
@@ -54,4 +50,3 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
}
}
}
-*/
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org