You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/04/27 03:00:48 UTC
spark git commit: [SPARK-23355][SQL] convertMetastore should not
ignore table properties
Repository: spark
Updated Branches:
refs/heads/master 9ee9fcf52 -> 8aa1d7b0e
[SPARK-23355][SQL] convertMetastore should not ignore table properties
## What changes were proposed in this pull request?
Previously, SPARK-22158 fixed for `USING hive` syntax. This PR aims to fix for `STORED AS` syntax. Although the test case covers ORC part, the patch considers both `convertMetastoreOrc` and `convertMetastoreParquet`.
## How was this patch tested?
Pass newly added test cases.
Author: Dongjoon Hyun <do...@apache.org>
Closes #20522 from dongjoon-hyun/SPARK-22158-2.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8aa1d7b0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8aa1d7b0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8aa1d7b0
Branch: refs/heads/master
Commit: 8aa1d7b0ede5115297541d29eab4ce5f4fe905cb
Parents: 9ee9fcf
Author: Dongjoon Hyun <do...@apache.org>
Authored: Fri Apr 27 11:00:41 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Apr 27 11:00:41 2018 +0800
----------------------------------------------------------------------
.../apache/spark/sql/hive/HiveStrategies.scala | 17 +++-
.../spark/sql/hive/CompressionCodecSuite.scala | 7 +-
.../spark/sql/hive/execution/HiveDDLSuite.scala | 81 ++++++++++++++++++++
3 files changed, 97 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/8aa1d7b0/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 8df05cb..a0c197b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -186,15 +186,28 @@ case class RelationConversions(
serde.contains("orc") && conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
}
+ // Return true for Apache ORC and Hive ORC-related configuration names.
+ // Note that Spark doesn't support configurations like `hive.merge.orcfile.stripe.level`.
+ private def isOrcProperty(key: String) =
+ key.startsWith("orc.") || key.contains(".orc.")
+
+ private def isParquetProperty(key: String) =
+ key.startsWith("parquet.") || key.contains(".parquet.")
+
private def convert(relation: HiveTableRelation): LogicalRelation = {
val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
+
+ // Consider table and storage properties. For properties existing in both sides, storage
+ // properties will supersede table properties.
if (serde.contains("parquet")) {
- val options = relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA ->
+ val options = relation.tableMeta.properties.filterKeys(isParquetProperty) ++
+ relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA ->
conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString)
sessionCatalog.metastoreCatalog
.convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet")
} else {
- val options = relation.tableMeta.storage.properties
+ val options = relation.tableMeta.properties.filterKeys(isOrcProperty) ++
+ relation.tableMeta.storage.properties
if (conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") {
sessionCatalog.metastoreCatalog.convertToLogicalRelation(
relation,
http://git-wip-us.apache.org/repos/asf/spark/blob/8aa1d7b0/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala
index d10a6f2..4550d35 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala
@@ -268,12 +268,7 @@ class CompressionCodecSuite extends TestHiveSingleton with ParquetTest with Befo
compressionCodecs = compressCodecs,
tableCompressionCodecs = compressCodecs) {
case (tableCodec, sessionCodec, realCodec, tableSize) =>
- // For non-partitioned table and when convertMetastore is true, Expect session-level
- // take effect, and in other cases expect table-level take effect
- // TODO: It should always be table-level taking effect when the bug(SPARK-22926)
- // is fixed
- val expectCodec =
- if (convertMetastore && !isPartitioned) sessionCodec else tableCodec.get
+ val expectCodec = tableCodec.get
assert(expectCodec == realCodec)
assert(checkTableSize(
format, expectCodec, isPartitioned, convertMetastore, usingCTAS, tableSize))
http://git-wip-us.apache.org/repos/asf/spark/blob/8aa1d7b0/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index c85db78..daac6af 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -38,6 +38,7 @@ import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, CONVERT_METAS
import org.apache.spark.sql.hive.orc.OrcFileOperator
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
+import org.apache.spark.sql.internal.SQLConf.ORC_IMPLEMENTATION
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
@@ -2144,6 +2145,86 @@ class HiveDDLSuite
}
}
+ private def getReader(path: String): org.apache.orc.Reader = {
+ val conf = spark.sessionState.newHadoopConf()
+ val files = org.apache.spark.sql.execution.datasources.orc.OrcUtils.listOrcFiles(path, conf)
+ assert(files.length == 1)
+ val file = files.head
+ val fs = file.getFileSystem(conf)
+ val readerOptions = org.apache.orc.OrcFile.readerOptions(conf).filesystem(fs)
+ org.apache.orc.OrcFile.createReader(file, readerOptions)
+ }
+
+ test("SPARK-23355 convertMetastoreOrc should not ignore table properties - STORED AS") {
+ Seq("native", "hive").foreach { orcImpl =>
+ withSQLConf(ORC_IMPLEMENTATION.key -> orcImpl, CONVERT_METASTORE_ORC.key -> "true") {
+ withTable("t") {
+ withTempPath { path =>
+ sql(
+ s"""
+ |CREATE TABLE t(id int) STORED AS ORC
+ |TBLPROPERTIES (
+ | orc.compress 'ZLIB',
+ | orc.compress.size '1001',
+ | orc.row.index.stride '2002',
+ | hive.exec.orc.default.block.size '3003',
+ | hive.exec.orc.compression.strategy 'COMPRESSION')
+ |LOCATION '${path.toURI}'
+ """.stripMargin)
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+ assert(DDLUtils.isHiveTable(table))
+ assert(table.storage.serde.get.contains("orc"))
+ val properties = table.properties
+ assert(properties.get("orc.compress") == Some("ZLIB"))
+ assert(properties.get("orc.compress.size") == Some("1001"))
+ assert(properties.get("orc.row.index.stride") == Some("2002"))
+ assert(properties.get("hive.exec.orc.default.block.size") == Some("3003"))
+ assert(properties.get("hive.exec.orc.compression.strategy") == Some("COMPRESSION"))
+ assert(spark.table("t").collect().isEmpty)
+
+ sql("INSERT INTO t SELECT 1")
+ checkAnswer(spark.table("t"), Row(1))
+ val maybeFile = path.listFiles().find(_.getName.startsWith("part"))
+
+ val reader = getReader(maybeFile.head.getCanonicalPath)
+ assert(reader.getCompressionKind.name === "ZLIB")
+ assert(reader.getCompressionSize == 1001)
+ assert(reader.getRowIndexStride == 2002)
+ }
+ }
+ }
+ }
+ }
+
+ test("SPARK-23355 convertMetastoreParquet should not ignore table properties - STORED AS") {
+ withSQLConf(CONVERT_METASTORE_PARQUET.key -> "true") {
+ withTable("t") {
+ withTempPath { path =>
+ sql(
+ s"""
+ |CREATE TABLE t(id int) STORED AS PARQUET
+ |TBLPROPERTIES (
+ | parquet.compression 'GZIP'
+ |)
+ |LOCATION '${path.toURI}'
+ """.stripMargin)
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+ assert(DDLUtils.isHiveTable(table))
+ assert(table.storage.serde.get.contains("parquet"))
+ val properties = table.properties
+ assert(properties.get("parquet.compression") == Some("GZIP"))
+ assert(spark.table("t").collect().isEmpty)
+
+ sql("INSERT INTO t SELECT 1")
+ checkAnswer(spark.table("t"), Row(1))
+ val maybeFile = path.listFiles().find(_.getName.startsWith("part"))
+
+ assertCompression(maybeFile, "parquet", "GZIP")
+ }
+ }
+ }
+ }
+
test("load command for non local invalid path validation") {
withTable("tbl") {
sql("CREATE TABLE tbl(i INT, j STRING)")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org