You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/10/03 18:42:59 UTC
spark git commit: [SPARK-22158][SQL][BRANCH-2.2] convertMetastore
should not ignore table property
Repository: spark
Updated Branches:
refs/heads/branch-2.2 b9adddb6a -> 3c30be53b
[SPARK-22158][SQL][BRANCH-2.2] convertMetastore should not ignore table property
## What changes were proposed in this pull request?
>From the beginning, **convertMetastoreOrc** ignores table properties and use an empty map instead. This PR fixes that. **convertMetastoreParquet** also ignore.
```scala
val options = Map[String, String]()
```
- [SPARK-14070: HiveMetastoreCatalog.scala](https://github.com/apache/spark/pull/11891/files#diff-ee66e11b56c21364760a5ed2b783f863R650)
- [Master branch: HiveStrategies.scala](https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala#L197
)
## How was this patch tested?
Pass the Jenkins with an updated test suite.
Author: Dongjoon Hyun <do...@apache.org>
Closes #19417 from dongjoon-hyun/SPARK-22158-BRANCH-2.2.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3c30be53
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3c30be53
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3c30be53
Branch: refs/heads/branch-2.2
Commit: 3c30be53bae5754929c675b50ea932e164d2ff9f
Parents: b9adddb
Author: Dongjoon Hyun <do...@apache.org>
Authored: Tue Oct 3 11:42:55 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Tue Oct 3 11:42:55 2017 -0700
----------------------------------------------------------------------
.../apache/spark/sql/hive/HiveStrategies.scala | 4 +-
.../spark/sql/hive/execution/HiveDDLSuite.scala | 54 +++++++++++++++++---
2 files changed, 50 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/3c30be53/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 53e500e..85a88df 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -193,12 +193,12 @@ case class RelationConversions(
private def convert(relation: HiveTableRelation): LogicalRelation = {
val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
if (serde.contains("parquet")) {
- val options = Map(ParquetOptions.MERGE_SCHEMA ->
+ val options = relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA ->
conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString)
sessionCatalog.metastoreCatalog
.convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet")
} else {
- val options = Map[String, String]()
+ val options = relation.tableMeta.storage.properties
sessionCatalog.metastoreCatalog
.convertToLogicalRelation(relation, options, classOf[OrcFileFormat], "orc")
}
http://git-wip-us.apache.org/repos/asf/spark/blob/3c30be53/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 6b19b5c..c1c8281 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -21,6 +21,8 @@ import java.io.File
import java.net.URI
import org.apache.hadoop.fs.Path
+import org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER
+import org.apache.parquet.hadoop.ParquetFileReader
import org.scalatest.BeforeAndAfterEach
import org.apache.spark.SparkException
@@ -30,6 +32,7 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils}
import org.apache.spark.sql.hive.HiveExternalCatalog
+import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, CONVERT_METASTORE_PARQUET}
import org.apache.spark.sql.hive.orc.OrcFileOperator
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
@@ -1438,12 +1441,8 @@ class HiveDDLSuite
sql("INSERT INTO t SELECT 1")
checkAnswer(spark.table("t"), Row(1))
// Check if this is compressed as ZLIB.
- val maybeOrcFile = path.listFiles().find(!_.getName.endsWith(".crc"))
- assert(maybeOrcFile.isDefined)
- val orcFilePath = maybeOrcFile.get.toPath.toString
- val expectedCompressionKind =
- OrcFileOperator.getFileReader(orcFilePath).get.getCompression
- assert("ZLIB" === expectedCompressionKind.name())
+ val maybeOrcFile = path.listFiles().find(_.getName.startsWith("part"))
+ assertCompression(maybeOrcFile, "orc", "ZLIB")
sql("CREATE TABLE t2 USING HIVE AS SELECT 1 AS c1, 'a' AS c2")
val table2 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t2"))
@@ -1941,4 +1940,47 @@ class HiveDDLSuite
}
}
}
+
+ private def assertCompression(maybeFile: Option[File], format: String, compression: String) = {
+ assert(maybeFile.isDefined)
+
+ val actualCompression = format match {
+ case "orc" =>
+ OrcFileOperator.getFileReader(maybeFile.get.toPath.toString).get.getCompression.name
+
+ case "parquet" =>
+ val footer = ParquetFileReader.readFooter(
+ sparkContext.hadoopConfiguration, new Path(maybeFile.get.getPath), NO_FILTER)
+ footer.getBlocks.get(0).getColumns.get(0).getCodec.toString
+ }
+
+ assert(compression === actualCompression)
+ }
+
+ Seq(("orc", "ZLIB"), ("parquet", "GZIP")).foreach { case (fileFormat, compression) =>
+ test(s"SPARK-22158 convertMetastore should not ignore table property - $fileFormat") {
+ withSQLConf(CONVERT_METASTORE_ORC.key -> "true", CONVERT_METASTORE_PARQUET.key -> "true") {
+ withTable("t") {
+ withTempPath { path =>
+ sql(
+ s"""
+ |CREATE TABLE t(id int) USING hive
+ |OPTIONS(fileFormat '$fileFormat', compression '$compression')
+ |LOCATION '${path.toURI}'
+ """.stripMargin)
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+ assert(DDLUtils.isHiveTable(table))
+ assert(table.storage.serde.get.contains(fileFormat))
+ assert(table.storage.properties.get("compression") == Some(compression))
+ assert(spark.table("t").collect().isEmpty)
+
+ sql("INSERT INTO t SELECT 1")
+ checkAnswer(spark.table("t"), Row(1))
+ val maybeFile = path.listFiles().find(_.getName.startsWith("part"))
+ assertCompression(maybeFile, fileFormat, compression)
+ }
+ }
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org