You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/04/27 03:00:48 UTC

spark git commit: [SPARK-23355][SQL] convertMetastore should not ignore table properties

Repository: spark
Updated Branches:
  refs/heads/master 9ee9fcf52 -> 8aa1d7b0e


[SPARK-23355][SQL] convertMetastore should not ignore table properties

## What changes were proposed in this pull request?

Previously, SPARK-22158 fixed for `USING hive` syntax. This PR aims to fix for `STORED AS` syntax. Although the test case covers ORC part, the patch considers both `convertMetastoreOrc` and `convertMetastoreParquet`.

## How was this patch tested?

Pass newly added test cases.

Author: Dongjoon Hyun <do...@apache.org>

Closes #20522 from dongjoon-hyun/SPARK-22158-2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8aa1d7b0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8aa1d7b0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8aa1d7b0

Branch: refs/heads/master
Commit: 8aa1d7b0ede5115297541d29eab4ce5f4fe905cb
Parents: 9ee9fcf
Author: Dongjoon Hyun <do...@apache.org>
Authored: Fri Apr 27 11:00:41 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Apr 27 11:00:41 2018 +0800

----------------------------------------------------------------------
 .../apache/spark/sql/hive/HiveStrategies.scala  | 17 +++-
 .../spark/sql/hive/CompressionCodecSuite.scala  |  7 +-
 .../spark/sql/hive/execution/HiveDDLSuite.scala | 81 ++++++++++++++++++++
 3 files changed, 97 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8aa1d7b0/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 8df05cb..a0c197b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -186,15 +186,28 @@ case class RelationConversions(
       serde.contains("orc") && conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
   }
 
+  // Return true for Apache ORC and Hive ORC-related configuration names.
+  // Note that Spark doesn't support configurations like `hive.merge.orcfile.stripe.level`.
+  private def isOrcProperty(key: String) =
+    key.startsWith("orc.") || key.contains(".orc.")
+
+  private def isParquetProperty(key: String) =
+    key.startsWith("parquet.") || key.contains(".parquet.")
+
   private def convert(relation: HiveTableRelation): LogicalRelation = {
     val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
+
+    // Consider table and storage properties. For properties existing in both sides, storage
+    // properties will supersede table properties.
     if (serde.contains("parquet")) {
-      val options = relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA ->
+      val options = relation.tableMeta.properties.filterKeys(isParquetProperty) ++
+        relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA ->
         conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString)
       sessionCatalog.metastoreCatalog
         .convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet")
     } else {
-      val options = relation.tableMeta.storage.properties
+      val options = relation.tableMeta.properties.filterKeys(isOrcProperty) ++
+        relation.tableMeta.storage.properties
       if (conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") {
         sessionCatalog.metastoreCatalog.convertToLogicalRelation(
           relation,

http://git-wip-us.apache.org/repos/asf/spark/blob/8aa1d7b0/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala
index d10a6f2..4550d35 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala
@@ -268,12 +268,7 @@ class CompressionCodecSuite extends TestHiveSingleton with ParquetTest with Befo
             compressionCodecs = compressCodecs,
             tableCompressionCodecs = compressCodecs) {
             case (tableCodec, sessionCodec, realCodec, tableSize) =>
-              // For non-partitioned table and when convertMetastore is true, Expect session-level
-              // take effect, and in other cases expect table-level take effect
-              // TODO: It should always be table-level taking effect when the bug(SPARK-22926)
-              // is fixed
-              val expectCodec =
-                if (convertMetastore && !isPartitioned) sessionCodec else tableCodec.get
+              val expectCodec = tableCodec.get
               assert(expectCodec == realCodec)
               assert(checkTableSize(
                 format, expectCodec, isPartitioned, convertMetastore, usingCTAS, tableSize))

http://git-wip-us.apache.org/repos/asf/spark/blob/8aa1d7b0/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index c85db78..daac6af 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -38,6 +38,7 @@ import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, CONVERT_METAS
 import org.apache.spark.sql.hive.orc.OrcFileOperator
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
+import org.apache.spark.sql.internal.SQLConf.ORC_IMPLEMENTATION
 import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
 import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.sql.types._
@@ -2144,6 +2145,86 @@ class HiveDDLSuite
     }
   }
 
+  private def getReader(path: String): org.apache.orc.Reader = {
+    val conf = spark.sessionState.newHadoopConf()
+    val files = org.apache.spark.sql.execution.datasources.orc.OrcUtils.listOrcFiles(path, conf)
+    assert(files.length == 1)
+    val file = files.head
+    val fs = file.getFileSystem(conf)
+    val readerOptions = org.apache.orc.OrcFile.readerOptions(conf).filesystem(fs)
+    org.apache.orc.OrcFile.createReader(file, readerOptions)
+  }
+
+  test("SPARK-23355 convertMetastoreOrc should not ignore table properties - STORED AS") {
+    Seq("native", "hive").foreach { orcImpl =>
+      withSQLConf(ORC_IMPLEMENTATION.key -> orcImpl, CONVERT_METASTORE_ORC.key -> "true") {
+        withTable("t") {
+          withTempPath { path =>
+            sql(
+              s"""
+                |CREATE TABLE t(id int) STORED AS ORC
+                |TBLPROPERTIES (
+                |  orc.compress 'ZLIB',
+                |  orc.compress.size '1001',
+                |  orc.row.index.stride '2002',
+                |  hive.exec.orc.default.block.size '3003',
+                |  hive.exec.orc.compression.strategy 'COMPRESSION')
+                |LOCATION '${path.toURI}'
+              """.stripMargin)
+            val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+            assert(DDLUtils.isHiveTable(table))
+            assert(table.storage.serde.get.contains("orc"))
+            val properties = table.properties
+            assert(properties.get("orc.compress") == Some("ZLIB"))
+            assert(properties.get("orc.compress.size") == Some("1001"))
+            assert(properties.get("orc.row.index.stride") == Some("2002"))
+            assert(properties.get("hive.exec.orc.default.block.size") == Some("3003"))
+            assert(properties.get("hive.exec.orc.compression.strategy") == Some("COMPRESSION"))
+            assert(spark.table("t").collect().isEmpty)
+
+            sql("INSERT INTO t SELECT 1")
+            checkAnswer(spark.table("t"), Row(1))
+            val maybeFile = path.listFiles().find(_.getName.startsWith("part"))
+
+            val reader = getReader(maybeFile.head.getCanonicalPath)
+            assert(reader.getCompressionKind.name === "ZLIB")
+            assert(reader.getCompressionSize == 1001)
+            assert(reader.getRowIndexStride == 2002)
+          }
+        }
+      }
+    }
+  }
+
+  test("SPARK-23355 convertMetastoreParquet should not ignore table properties - STORED AS") {
+    withSQLConf(CONVERT_METASTORE_PARQUET.key -> "true") {
+      withTable("t") {
+        withTempPath { path =>
+          sql(
+            s"""
+               |CREATE TABLE t(id int) STORED AS PARQUET
+               |TBLPROPERTIES (
+               |  parquet.compression 'GZIP'
+               |)
+               |LOCATION '${path.toURI}'
+            """.stripMargin)
+          val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+          assert(DDLUtils.isHiveTable(table))
+          assert(table.storage.serde.get.contains("parquet"))
+          val properties = table.properties
+          assert(properties.get("parquet.compression") == Some("GZIP"))
+          assert(spark.table("t").collect().isEmpty)
+
+          sql("INSERT INTO t SELECT 1")
+          checkAnswer(spark.table("t"), Row(1))
+          val maybeFile = path.listFiles().find(_.getName.startsWith("part"))
+
+          assertCompression(maybeFile, "parquet", "GZIP")
+        }
+      }
+    }
+  }
+
   test("load command for non local invalid path validation") {
     withTable("tbl") {
       sql("CREATE TABLE tbl(i INT, j STRING)")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org