You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2016/09/06 02:57:33 UTC
spark git commit: [SPARK-17358][SQL] Cached table(parquet/orc) should
be shard between beelines
Repository: spark
Updated Branches:
refs/heads/master afb3d5d30 -> 64e826f91
[SPARK-17358][SQL] Cached table(parquet/orc) should be shard between beelines
## What changes were proposed in this pull request?
Cached table(parquet/orc) couldn't be shard between beelines, because the `sameResult` method used by `CacheManager` always return false(`sparkSession` are different) when compare two `HadoopFsRelation` in different beelines. So we make `sparkSession` a curry parameter.
## How was this patch tested?
Beeline1
```
1: jdbc:hive2://localhost:10000> CACHE TABLE src_pqt;
+---------+--+
| Result |
+---------+--+
+---------+--+
No rows selected (5.143 seconds)
1: jdbc:hive2://localhost:10000> EXPLAIN SELECT * FROM src_pqt;
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+
| plan |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+
| == Physical Plan ==
InMemoryTableScan [key#49, value#50]
+- InMemoryRelation [key#49, value#50], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `src_pqt`
+- *FileScan parquet default.src_pqt[key#0,value#1] Batched: true, Format: ParquetFormat, InputPaths: hdfs://199.0.0.1:9000/qiyadong/src_pqt, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<key:int,value:string> |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+
```
Beeline2
```
0: jdbc:hive2://localhost:10000> EXPLAIN SELECT * FROM src_pqt;
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+
| plan |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+
| == Physical Plan ==
InMemoryTableScan [key#68, value#69]
+- InMemoryRelation [key#68, value#69], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `src_pqt`
+- *FileScan parquet default.src_pqt[key#0,value#1] Batched: true, Format: ParquetFormat, InputPaths: hdfs://199.0.0.1:9000/qiyadong/src_pqt, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<key:int,value:string> |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--+
```
Author: Yadong Qi <qi...@gmail.com>
Closes #14913 from watermen/SPARK-17358.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/64e826f9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/64e826f9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/64e826f9
Branch: refs/heads/master
Commit: 64e826f91eabb1a22d3d163d71fbb7b6d2185f25
Parents: afb3d5d
Author: Yadong Qi <qi...@gmail.com>
Authored: Tue Sep 6 10:57:21 2016 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Sep 6 10:57:21 2016 +0800
----------------------------------------------------------------------
.../apache/spark/sql/execution/datasources/DataSource.scala | 6 ++----
.../spark/sql/execution/datasources/fileSourceInterfaces.scala | 4 ++--
.../sql/execution/datasources/FileSourceStrategySuite.scala | 3 ++-
.../scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 3 +--
4 files changed, 7 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/64e826f9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 5968db8..9c99a80 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -351,13 +351,12 @@ case class DataSource(
}
HadoopFsRelation(
- sparkSession,
fileCatalog,
partitionSchema = fileCatalog.partitionSpec().partitionColumns,
dataSchema = dataSchema,
bucketSpec = None,
format,
- options)
+ options)(sparkSession)
// This is a non-streaming file based datasource.
case (format: FileFormat, _) =>
@@ -409,13 +408,12 @@ case class DataSource(
}
HadoopFsRelation(
- sparkSession,
fileCatalog,
partitionSchema = fileCatalog.partitionSpec().partitionColumns,
dataSchema = dataSchema.asNullable,
bucketSpec = bucketSpec,
format,
- caseInsensitiveOptions)
+ caseInsensitiveOptions)(sparkSession)
case _ =>
throw new AnalysisException(
http://git-wip-us.apache.org/repos/asf/spark/blob/64e826f9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index e03a232..7e40c35 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -134,13 +134,13 @@ abstract class OutputWriter {
* @param options Configuration used when reading / writing data.
*/
case class HadoopFsRelation(
- sparkSession: SparkSession,
location: FileCatalog,
partitionSchema: StructType,
dataSchema: StructType,
bucketSpec: Option[BucketSpec],
fileFormat: FileFormat,
- options: Map[String, String]) extends BaseRelation with FileRelation {
+ options: Map[String, String])(val sparkSession: SparkSession)
+ extends BaseRelation with FileRelation {
override def sqlContext: SQLContext = sparkSession.sqlContext
http://git-wip-us.apache.org/repos/asf/spark/blob/64e826f9/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 09fd750..45411fa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -508,7 +508,8 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
val bucketed = df.queryExecution.analyzed transform {
case l @ LogicalRelation(r: HadoopFsRelation, _, _) =>
l.copy(relation =
- r.copy(bucketSpec = Some(BucketSpec(numBuckets = buckets, "c1" :: Nil, Nil))))
+ r.copy(bucketSpec =
+ Some(BucketSpec(numBuckets = buckets, "c1" :: Nil, Nil)))(r.sparkSession))
}
Dataset.ofRows(spark, bucketed)
} else {
http://git-wip-us.apache.org/repos/asf/spark/blob/64e826f9/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index d31a8d6..c48d4ed 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -249,13 +249,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
}
val relation = HadoopFsRelation(
- sparkSession = sparkSession,
location = fileCatalog,
partitionSchema = partitionSchema,
dataSchema = inferredSchema,
bucketSpec = bucketSpec,
fileFormat = defaultSource,
- options = options)
+ options = options)(sparkSession = sparkSession)
val created = LogicalRelation(relation, catalogTable = Some(metastoreRelation.catalogTable))
cachedDataSourceTables.put(tableIdentifier, created)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org