You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/09/09 18:48:39 UTC
[1/2] spark git commit: [SPARK-4131] Support "Writing data into the
filesystem from queries"
Repository: spark
Updated Branches:
refs/heads/master e4d8f9a36 -> f76790557
http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
new file mode 100644
index 0000000..aa5cae3
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
@@ -0,0 +1,731 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{QueryTest, _}
+import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
+
+case class TestData(key: Int, value: String)
+
+case class ThreeCloumntable(key: Int, value: String, key1: String)
+
+class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
+ with SQLTestUtils {
+ import spark.implicits._
+
+ override lazy val testData = spark.sparkContext.parallelize(
+ (1 to 100).map(i => TestData(i, i.toString))).toDF()
+
+ before {
+ // Since every we are doing tests for DDL statements,
+ // it is better to reset before every test.
+ hiveContext.reset()
+ // Creates a temporary view with testData, which will be used in all tests.
+ testData.createOrReplaceTempView("testData")
+ }
+
+ test("insertInto() HiveTable") {
+ withTable("createAndInsertTest") {
+ sql("CREATE TABLE createAndInsertTest (key int, value string)")
+
+ // Add some data.
+ testData.write.mode(SaveMode.Append).insertInto("createAndInsertTest")
+
+ // Make sure the table has also been updated.
+ checkAnswer(
+ sql("SELECT * FROM createAndInsertTest"),
+ testData.collect().toSeq
+ )
+
+ // Add more data.
+ testData.write.mode(SaveMode.Append).insertInto("createAndInsertTest")
+
+ // Make sure the table has been updated.
+ checkAnswer(
+ sql("SELECT * FROM createAndInsertTest"),
+ testData.toDF().collect().toSeq ++ testData.toDF().collect().toSeq
+ )
+
+ // Now overwrite.
+ testData.write.mode(SaveMode.Overwrite).insertInto("createAndInsertTest")
+
+ // Make sure the registered table has also been updated.
+ checkAnswer(
+ sql("SELECT * FROM createAndInsertTest"),
+ testData.collect().toSeq
+ )
+ }
+ }
+
+ test("Double create fails when allowExisting = false") {
+ withTable("doubleCreateAndInsertTest") {
+ sql("CREATE TABLE doubleCreateAndInsertTest (key int, value string)")
+
+ intercept[AnalysisException] {
+ sql("CREATE TABLE doubleCreateAndInsertTest (key int, value string)")
+ }
+ }
+ }
+
+ test("Double create does not fail when allowExisting = true") {
+ withTable("doubleCreateAndInsertTest") {
+ sql("CREATE TABLE doubleCreateAndInsertTest (key int, value string)")
+ sql("CREATE TABLE IF NOT EXISTS doubleCreateAndInsertTest (key int, value string)")
+ }
+ }
+
+ test("SPARK-4052: scala.collection.Map as value type of MapType") {
+ val schema = StructType(StructField("m", MapType(StringType, StringType), true) :: Nil)
+ val rowRDD = spark.sparkContext.parallelize(
+ (1 to 100).map(i => Row(scala.collection.mutable.HashMap(s"key$i" -> s"value$i"))))
+ val df = spark.createDataFrame(rowRDD, schema)
+ df.createOrReplaceTempView("tableWithMapValue")
+ sql("CREATE TABLE hiveTableWithMapValue(m MAP <STRING, STRING>)")
+ sql("INSERT OVERWRITE TABLE hiveTableWithMapValue SELECT m FROM tableWithMapValue")
+
+ checkAnswer(
+ sql("SELECT * FROM hiveTableWithMapValue"),
+ rowRDD.collect().toSeq
+ )
+
+ sql("DROP TABLE hiveTableWithMapValue")
+ }
+
+ test("SPARK-4203:random partition directory order") {
+ sql("CREATE TABLE tmp_table (key int, value string)")
+ val tmpDir = Utils.createTempDir()
+ // The default value of hive.exec.stagingdir.
+ val stagingDir = ".hive-staging"
+
+ sql(
+ s"""
+ |CREATE TABLE table_with_partition(c1 string)
+ |PARTITIONED by (p1 string,p2 string,p3 string,p4 string,p5 string)
+ |location '${tmpDir.toURI.toString}'
+ """.stripMargin)
+ sql(
+ """
+ |INSERT OVERWRITE TABLE table_with_partition
+ |partition (p1='a',p2='b',p3='c',p4='c',p5='1')
+ |SELECT 'blarr' FROM tmp_table
+ """.stripMargin)
+ sql(
+ """
+ |INSERT OVERWRITE TABLE table_with_partition
+ |partition (p1='a',p2='b',p3='c',p4='c',p5='2')
+ |SELECT 'blarr' FROM tmp_table
+ """.stripMargin)
+ sql(
+ """
+ |INSERT OVERWRITE TABLE table_with_partition
+ |partition (p1='a',p2='b',p3='c',p4='c',p5='3')
+ |SELECT 'blarr' FROM tmp_table
+ """.stripMargin)
+ sql(
+ """
+ |INSERT OVERWRITE TABLE table_with_partition
+ |partition (p1='a',p2='b',p3='c',p4='c',p5='4')
+ |SELECT 'blarr' FROM tmp_table
+ """.stripMargin)
+ def listFolders(path: File, acc: List[String]): List[List[String]] = {
+ val dir = path.listFiles()
+ val folders = dir.filter { e => e.isDirectory && !e.getName().startsWith(stagingDir) }.toList
+ if (folders.isEmpty) {
+ List(acc.reverse)
+ } else {
+ folders.flatMap(x => listFolders(x, x.getName :: acc))
+ }
+ }
+ val expected = List(
+ "p1=a"::"p2=b"::"p3=c"::"p4=c"::"p5=2"::Nil,
+ "p1=a"::"p2=b"::"p3=c"::"p4=c"::"p5=3"::Nil,
+ "p1=a"::"p2=b"::"p3=c"::"p4=c"::"p5=1"::Nil,
+ "p1=a"::"p2=b"::"p3=c"::"p4=c"::"p5=4"::Nil
+ )
+ assert(listFolders(tmpDir, List()).sortBy(_.toString()) === expected.sortBy(_.toString))
+ sql("DROP TABLE table_with_partition")
+ sql("DROP TABLE tmp_table")
+ }
+
+ testPartitionedTable("INSERT OVERWRITE - partition IF NOT EXISTS") { tableName =>
+ val selQuery = s"select a, b, c, d from $tableName"
+ sql(
+ s"""
+ |INSERT OVERWRITE TABLE $tableName
+ |partition (b=2, c=3)
+ |SELECT 1, 4
+ """.stripMargin)
+ checkAnswer(sql(selQuery), Row(1, 2, 3, 4))
+
+ sql(
+ s"""
+ |INSERT OVERWRITE TABLE $tableName
+ |partition (b=2, c=3)
+ |SELECT 5, 6
+ """.stripMargin)
+ checkAnswer(sql(selQuery), Row(5, 2, 3, 6))
+
+ val e = intercept[AnalysisException] {
+ sql(
+ s"""
+ |INSERT OVERWRITE TABLE $tableName
+ |partition (b=2, c) IF NOT EXISTS
+ |SELECT 7, 8, 3
+ """.stripMargin)
+ }
+ assert(e.getMessage.contains(
+ "Dynamic partitions do not support IF NOT EXISTS. Specified partitions with value: [c]"))
+
+ // If the partition already exists, the insert will overwrite the data
+ // unless users specify IF NOT EXISTS
+ sql(
+ s"""
+ |INSERT OVERWRITE TABLE $tableName
+ |partition (b=2, c=3) IF NOT EXISTS
+ |SELECT 9, 10
+ """.stripMargin)
+ checkAnswer(sql(selQuery), Row(5, 2, 3, 6))
+
+ // ADD PARTITION has the same effect, even if no actual data is inserted.
+ sql(s"ALTER TABLE $tableName ADD PARTITION (b=21, c=31)")
+ sql(
+ s"""
+ |INSERT OVERWRITE TABLE $tableName
+ |partition (b=21, c=31) IF NOT EXISTS
+ |SELECT 20, 24
+ """.stripMargin)
+ checkAnswer(sql(selQuery), Row(5, 2, 3, 6))
+ }
+
+ test("Insert ArrayType.containsNull == false") {
+ val schema = StructType(Seq(
+ StructField("a", ArrayType(StringType, containsNull = false))))
+ val rowRDD = spark.sparkContext.parallelize((1 to 100).map(i => Row(Seq(s"value$i"))))
+ val df = spark.createDataFrame(rowRDD, schema)
+ df.createOrReplaceTempView("tableWithArrayValue")
+ sql("CREATE TABLE hiveTableWithArrayValue(a Array <STRING>)")
+ sql("INSERT OVERWRITE TABLE hiveTableWithArrayValue SELECT a FROM tableWithArrayValue")
+
+ checkAnswer(
+ sql("SELECT * FROM hiveTableWithArrayValue"),
+ rowRDD.collect().toSeq)
+
+ sql("DROP TABLE hiveTableWithArrayValue")
+ }
+
+ test("Insert MapType.valueContainsNull == false") {
+ val schema = StructType(Seq(
+ StructField("m", MapType(StringType, StringType, valueContainsNull = false))))
+ val rowRDD = spark.sparkContext.parallelize(
+ (1 to 100).map(i => Row(Map(s"key$i" -> s"value$i"))))
+ val df = spark.createDataFrame(rowRDD, schema)
+ df.createOrReplaceTempView("tableWithMapValue")
+ sql("CREATE TABLE hiveTableWithMapValue(m Map <STRING, STRING>)")
+ sql("INSERT OVERWRITE TABLE hiveTableWithMapValue SELECT m FROM tableWithMapValue")
+
+ checkAnswer(
+ sql("SELECT * FROM hiveTableWithMapValue"),
+ rowRDD.collect().toSeq)
+
+ sql("DROP TABLE hiveTableWithMapValue")
+ }
+
+ test("Insert StructType.fields.exists(_.nullable == false)") {
+ val schema = StructType(Seq(
+ StructField("s", StructType(Seq(StructField("f", StringType, nullable = false))))))
+ val rowRDD = spark.sparkContext.parallelize(
+ (1 to 100).map(i => Row(Row(s"value$i"))))
+ val df = spark.createDataFrame(rowRDD, schema)
+ df.createOrReplaceTempView("tableWithStructValue")
+ sql("CREATE TABLE hiveTableWithStructValue(s Struct <f: STRING>)")
+ sql("INSERT OVERWRITE TABLE hiveTableWithStructValue SELECT s FROM tableWithStructValue")
+
+ checkAnswer(
+ sql("SELECT * FROM hiveTableWithStructValue"),
+ rowRDD.collect().toSeq)
+
+ sql("DROP TABLE hiveTableWithStructValue")
+ }
+
+ test("Test partition mode = strict") {
+ withSQLConf(("hive.exec.dynamic.partition.mode", "strict")) {
+ withTable("partitioned") {
+ sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)")
+ val data = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) "even" else "odd"))
+ .toDF("id", "data", "part")
+
+ intercept[SparkException] {
+ data.write.insertInto("partitioned")
+ }
+ }
+ }
+ }
+
+ test("Detect table partitioning") {
+ withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
+ withTable("source", "partitioned") {
+ sql("CREATE TABLE source (id bigint, data string, part string)")
+ val data = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) "even" else "odd")).toDF()
+
+ data.write.insertInto("source")
+ checkAnswer(sql("SELECT * FROM source"), data.collect().toSeq)
+
+ sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)")
+ // this will pick up the output partitioning from the table definition
+ spark.table("source").write.insertInto("partitioned")
+
+ checkAnswer(sql("SELECT * FROM partitioned"), data.collect().toSeq)
+ }
+ }
+ }
+
+ private def testPartitionedHiveSerDeTable(testName: String)(f: String => Unit): Unit = {
+ test(s"Hive SerDe table - $testName") {
+ val hiveTable = "hive_table"
+
+ withTable(hiveTable) {
+ withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+ sql(
+ s"""
+ |CREATE TABLE $hiveTable (a INT, d INT)
+ |PARTITIONED BY (b INT, c INT) STORED AS TEXTFILE
+ """.stripMargin)
+ f(hiveTable)
+ }
+ }
+ }
+ }
+
+ private def testPartitionedDataSourceTable(testName: String)(f: String => Unit): Unit = {
+ test(s"Data source table - $testName") {
+ val dsTable = "ds_table"
+
+ withTable(dsTable) {
+ sql(
+ s"""
+ |CREATE TABLE $dsTable (a INT, b INT, c INT, d INT)
+ |USING PARQUET PARTITIONED BY (b, c)
+ """.stripMargin)
+ f(dsTable)
+ }
+ }
+ }
+
+ private def testPartitionedTable(testName: String)(f: String => Unit): Unit = {
+ testPartitionedHiveSerDeTable(testName)(f)
+ testPartitionedDataSourceTable(testName)(f)
+ }
+
+ testPartitionedTable("partitionBy() can't be used together with insertInto()") { tableName =>
+ val cause = intercept[AnalysisException] {
+ Seq((1, 2, 3, 4)).toDF("a", "b", "c", "d").write.partitionBy("b", "c").insertInto(tableName)
+ }
+
+ assert(cause.getMessage.contains("insertInto() can't be used together with partitionBy()."))
+ }
+
+ testPartitionedTable(
+ "SPARK-16036: better error message when insert into a table with mismatch schema") {
+ tableName =>
+ val e = intercept[AnalysisException] {
+ sql(s"INSERT INTO TABLE $tableName PARTITION(b=1, c=2) SELECT 1, 2, 3")
+ }
+ assert(e.message.contains(
+ "target table has 4 column(s) but the inserted data has 5 column(s)"))
+ }
+
+ testPartitionedTable("SPARK-16037: INSERT statement should match columns by position") {
+ tableName =>
+ withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+ sql(s"INSERT INTO TABLE $tableName SELECT 1, 4, 2 AS c, 3 AS b")
+ checkAnswer(sql(s"SELECT a, b, c, d FROM $tableName"), Row(1, 2, 3, 4))
+ sql(s"INSERT OVERWRITE TABLE $tableName SELECT 1, 4, 2, 3")
+ checkAnswer(sql(s"SELECT a, b, c, 4 FROM $tableName"), Row(1, 2, 3, 4))
+ }
+ }
+
+ testPartitionedTable("INSERT INTO a partitioned table (semantic and error handling)") {
+ tableName =>
+ withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
+ sql(s"INSERT INTO TABLE $tableName PARTITION (b=2, c=3) SELECT 1, 4")
+
+ sql(s"INSERT INTO TABLE $tableName PARTITION (b=6, c=7) SELECT 5, 8")
+
+ sql(s"INSERT INTO TABLE $tableName PARTITION (c=11, b=10) SELECT 9, 12")
+
+ // c is defined twice. Analyzer will complain.
+ intercept[AnalysisException] {
+ sql(s"INSERT INTO TABLE $tableName PARTITION (b=14, c=15, c=16) SELECT 13")
+ }
+
+ // d is not a partitioning column.
+ intercept[AnalysisException] {
+ sql(s"INSERT INTO TABLE $tableName PARTITION (b=14, c=15, d=16) SELECT 13, 14")
+ }
+
+ // d is not a partitioning column. The total number of columns is correct.
+ intercept[AnalysisException] {
+ sql(s"INSERT INTO TABLE $tableName PARTITION (b=14, c=15, d=16) SELECT 13")
+ }
+
+ // The data is missing a column.
+ intercept[AnalysisException] {
+ sql(s"INSERT INTO TABLE $tableName PARTITION (c=15, b=16) SELECT 13")
+ }
+
+ // d is not a partitioning column.
+ intercept[AnalysisException] {
+ sql(s"INSERT INTO TABLE $tableName PARTITION (b=15, d=15) SELECT 13, 14")
+ }
+
+ // The statement is missing a column.
+ intercept[AnalysisException] {
+ sql(s"INSERT INTO TABLE $tableName PARTITION (b=15) SELECT 13, 14")
+ }
+
+ // The statement is missing a column.
+ intercept[AnalysisException] {
+ sql(s"INSERT INTO TABLE $tableName PARTITION (b=15) SELECT 13, 14, 16")
+ }
+
+ sql(s"INSERT INTO TABLE $tableName PARTITION (b=14, c) SELECT 13, 16, 15")
+
+ // Dynamic partitioning columns need to be after static partitioning columns.
+ intercept[AnalysisException] {
+ sql(s"INSERT INTO TABLE $tableName PARTITION (b, c=19) SELECT 17, 20, 18")
+ }
+
+ sql(s"INSERT INTO TABLE $tableName PARTITION (b, c) SELECT 17, 20, 18, 19")
+
+ sql(s"INSERT INTO TABLE $tableName PARTITION (c, b) SELECT 21, 24, 22, 23")
+
+ sql(s"INSERT INTO TABLE $tableName SELECT 25, 28, 26, 27")
+
+ checkAnswer(
+ sql(s"SELECT a, b, c, d FROM $tableName"),
+ Row(1, 2, 3, 4) ::
+ Row(5, 6, 7, 8) ::
+ Row(9, 10, 11, 12) ::
+ Row(13, 14, 15, 16) ::
+ Row(17, 18, 19, 20) ::
+ Row(21, 22, 23, 24) ::
+ Row(25, 26, 27, 28) :: Nil
+ )
+ }
+ }
+
+ testPartitionedTable("insertInto() should match columns by position and ignore column names") {
+ tableName =>
+ withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+ // Columns `df.c` and `df.d` are resolved by position, and thus mapped to partition columns
+ // `b` and `c` of the target table.
+ val df = Seq((1, 2, 3, 4)).toDF("a", "b", "c", "d")
+ df.write.insertInto(tableName)
+
+ checkAnswer(
+ sql(s"SELECT a, b, c, d FROM $tableName"),
+ Row(1, 3, 4, 2)
+ )
+ }
+ }
+
+ testPartitionedTable("insertInto() should match unnamed columns by position") {
+ tableName =>
+ withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+ // Columns `c + 1` and `d + 1` are resolved by position, and thus mapped to partition
+ // columns `b` and `c` of the target table.
+ val df = Seq((1, 2, 3, 4)).toDF("a", "b", "c", "d")
+ df.select('a + 1, 'b + 1, 'c + 1, 'd + 1).write.insertInto(tableName)
+
+ checkAnswer(
+ sql(s"SELECT a, b, c, d FROM $tableName"),
+ Row(2, 4, 5, 3)
+ )
+ }
+ }
+
+ testPartitionedTable("insertInto() should reject missing columns") {
+ tableName =>
+ withTable("t") {
+ sql("CREATE TABLE t (a INT, b INT)")
+
+ intercept[AnalysisException] {
+ spark.table("t").write.insertInto(tableName)
+ }
+ }
+ }
+
+ testPartitionedTable("insertInto() should reject extra columns") {
+ tableName =>
+ withTable("t") {
+ sql("CREATE TABLE t (a INT, b INT, c INT, d INT, e INT)")
+
+ intercept[AnalysisException] {
+ spark.table("t").write.insertInto(tableName)
+ }
+ }
+ }
+
+ private def testBucketedTable(testName: String)(f: String => Unit): Unit = {
+ test(s"Hive SerDe table - $testName") {
+ val hiveTable = "hive_table"
+
+ withTable(hiveTable) {
+ withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+ sql(
+ s"""
+ |CREATE TABLE $hiveTable (a INT, d INT)
+ |PARTITIONED BY (b INT, c INT)
+ |CLUSTERED BY(a)
+ |SORTED BY(a, d) INTO 256 BUCKETS
+ |STORED AS TEXTFILE
+ """.stripMargin)
+ f(hiveTable)
+ }
+ }
+ }
+ }
+
+ testBucketedTable("INSERT should NOT fail if strict bucketing is NOT enforced") {
+ tableName =>
+ withSQLConf("hive.enforce.bucketing" -> "false", "hive.enforce.sorting" -> "false") {
+ sql(s"INSERT INTO TABLE $tableName SELECT 1, 4, 2 AS c, 3 AS b")
+ checkAnswer(sql(s"SELECT a, b, c, d FROM $tableName"), Row(1, 2, 3, 4))
+ }
+ }
+
+ testBucketedTable("INSERT should fail if strict bucketing / sorting is enforced") {
+ tableName =>
+ withSQLConf("hive.enforce.bucketing" -> "true", "hive.enforce.sorting" -> "false") {
+ intercept[AnalysisException] {
+ sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4")
+ }
+ }
+ withSQLConf("hive.enforce.bucketing" -> "false", "hive.enforce.sorting" -> "true") {
+ intercept[AnalysisException] {
+ sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4")
+ }
+ }
+ withSQLConf("hive.enforce.bucketing" -> "true", "hive.enforce.sorting" -> "true") {
+ intercept[AnalysisException] {
+ sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4")
+ }
+ }
+ }
+
+ test("SPARK-20594: hive.exec.stagingdir was deleted by Hive") {
+ // Set hive.exec.stagingdir under the table directory without start with ".".
+ withSQLConf("hive.exec.stagingdir" -> "./test") {
+ withTable("test_table") {
+ sql("CREATE TABLE test_table (key int)")
+ sql("INSERT OVERWRITE TABLE test_table SELECT 1")
+ checkAnswer(sql("SELECT * FROM test_table"), Row(1))
+ }
+ }
+ }
+
+ test("insert overwrite to dir from hive metastore table") {
+ withTempDir { dir =>
+ val path = dir.toURI.getPath
+
+ sql(s"INSERT OVERWRITE LOCAL DIRECTORY '${path}' SELECT * FROM src where key < 10")
+
+ sql(
+ s"""
+ |INSERT OVERWRITE LOCAL DIRECTORY '${path}'
+ |STORED AS orc
+ |SELECT * FROM src where key < 10
+ """.stripMargin)
+
+ // use orc data source to check the data of path is right.
+ withTempView("orc_source") {
+ sql(
+ s"""
+ |CREATE TEMPORARY VIEW orc_source
+ |USING org.apache.spark.sql.hive.orc
+ |OPTIONS (
+ | PATH '${dir.getCanonicalPath}'
+ |)
+ """.stripMargin)
+
+ checkAnswer(
+ sql("select * from orc_source"),
+ sql("select * from src where key < 10"))
+ }
+ }
+ }
+
+ test("insert overwrite to local dir from temp table") {
+ withTempView("test_insert_table") {
+ spark.range(10).selectExpr("id", "id AS str").createOrReplaceTempView("test_insert_table")
+
+ withTempDir { dir =>
+ val path = dir.toURI.getPath
+
+ sql(
+ s"""
+ |INSERT OVERWRITE LOCAL DIRECTORY '${path}'
+ |ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+ |SELECT * FROM test_insert_table
+ """.stripMargin)
+
+ sql(
+ s"""
+ |INSERT OVERWRITE LOCAL DIRECTORY '${path}'
+ |STORED AS orc
+ |SELECT * FROM test_insert_table
+ """.stripMargin)
+
+ // use orc data source to check the data of path is right.
+ checkAnswer(
+ spark.read.orc(dir.getCanonicalPath),
+ sql("select * from test_insert_table"))
+ }
+ }
+ }
+
+ test("insert overwrite to dir from temp table") {
+ withTempView("test_insert_table") {
+ spark.range(10).selectExpr("id", "id AS str").createOrReplaceTempView("test_insert_table")
+
+ withTempDir { dir =>
+ val pathUri = dir.toURI
+
+ sql(
+ s"""
+ |INSERT OVERWRITE DIRECTORY '${pathUri}'
+ |ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+ |SELECT * FROM test_insert_table
+ """.stripMargin)
+
+ sql(
+ s"""
+ |INSERT OVERWRITE DIRECTORY '${pathUri}'
+ |STORED AS orc
+ |SELECT * FROM test_insert_table
+ """.stripMargin)
+
+ // use orc data source to check the data of path is right.
+ checkAnswer(
+ spark.read.orc(dir.getCanonicalPath),
+ sql("select * from test_insert_table"))
+ }
+ }
+ }
+
+ test("multi insert overwrite to dir") {
+ withTempView("test_insert_table") {
+ spark.range(10).selectExpr("id", "id AS str").createOrReplaceTempView("test_insert_table")
+
+ withTempDir { dir =>
+ val pathUri = dir.toURI
+
+ withTempDir { dir2 =>
+ val pathUri2 = dir2.toURI
+
+ sql(
+ s"""
+ |FROM test_insert_table
+ |INSERT OVERWRITE DIRECTORY '${pathUri}'
+ |STORED AS orc
+ |SELECT id
+ |INSERT OVERWRITE DIRECTORY '${pathUri2}'
+ |STORED AS orc
+ |SELECT *
+ """.stripMargin)
+
+ // use orc data source to check the data of path is right.
+ checkAnswer(
+ spark.read.orc(dir.getCanonicalPath),
+ sql("select id from test_insert_table"))
+
+ checkAnswer(
+ spark.read.orc(dir2.getCanonicalPath),
+ sql("select * from test_insert_table"))
+ }
+ }
+ }
+ }
+
+ test("insert overwrite to dir to illegal path") {
+ withTempView("test_insert_table") {
+ spark.range(10).selectExpr("id", "id AS str").createOrReplaceTempView("test_insert_table")
+
+ val e = intercept[IllegalArgumentException] {
+ sql(
+ s"""
+ |INSERT OVERWRITE LOCAL DIRECTORY 'abc://a'
+ |ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+ |SELECT * FROM test_insert_table
+ """.stripMargin)
+ }.getMessage
+
+ assert(e.contains("Wrong FS: abc://a, expected: file:///"))
+ }
+ }
+
+ test("insert overwrite to dir with mixed syntax") {
+ withTempView("test_insert_table") {
+ spark.range(10).selectExpr("id", "id AS str").createOrReplaceTempView("test_insert_table")
+
+ val e = intercept[ParseException] {
+ sql(
+ s"""
+ |INSERT OVERWRITE DIRECTORY 'file://tmp'
+ |USING json
+ |ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+ |SELECT * FROM test_insert_table
+ """.stripMargin)
+ }.getMessage
+
+ assert(e.contains("mismatched input 'ROW'"))
+ }
+ }
+
+ test("insert overwrite to dir with multi inserts") {
+ withTempView("test_insert_table") {
+ spark.range(10).selectExpr("id", "id AS str").createOrReplaceTempView("test_insert_table")
+
+ val e = intercept[ParseException] {
+ sql(
+ s"""
+ |INSERT OVERWRITE DIRECTORY 'file://tmp2'
+ |USING json
+ |ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+ |SELECT * FROM test_insert_table
+ |INSERT OVERWRITE DIRECTORY 'file://tmp2'
+ |USING json
+ |ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+ |SELECT * FROM test_insert_table
+ """.stripMargin)
+ }.getMessage
+
+ assert(e.contains("mismatched input 'ROW'"))
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org
[2/2] spark git commit: [SPARK-4131] Support "Writing data into the
filesystem from queries"
Posted by li...@apache.org.
[SPARK-4131] Support "Writing data into the filesystem from queries"
## What changes were proposed in this pull request?
This PR implements the sql feature:
INSERT OVERWRITE [LOCAL] DIRECTORY directory1
[ROW FORMAT row_format] [STORED AS file_format]
SELECT ... FROM ...
## How was this patch tested?
Added new unittests and also pulled the code to fb-spark so that we could test writing to hdfs directory.
Author: Jane Wang <ja...@fb.com>
Closes #18975 from janewangfb/port_local_directory.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f7679055
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f7679055
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f7679055
Branch: refs/heads/master
Commit: f76790557b063edc3080d5c792167e2f8b7060d1
Parents: e4d8f9a
Author: Jane Wang <ja...@fb.com>
Authored: Sat Sep 9 11:48:34 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Sat Sep 9 11:48:34 2017 -0700
----------------------------------------------------------------------
.../apache/spark/sql/catalyst/parser/SqlBase.g4 | 8 +-
.../analysis/UnsupportedOperationChecker.scala | 3 +
.../spark/sql/catalyst/parser/AstBuilder.scala | 79 +-
.../plans/logical/basicLogicalOperators.scala | 26 +-
.../spark/sql/execution/SparkSqlParser.scala | 79 +-
.../InsertIntoDataSourceDirCommand.scala | 82 +++
.../spark/sql/execution/command/ddl.scala | 17 +-
.../datasources/DataSourceStrategy.scala | 21 +-
.../sql/execution/command/DDLParserSuite.scala | 52 +-
.../apache/spark/sql/sources/InsertSuite.scala | 60 ++
.../apache/spark/sql/hive/HiveStrategies.scala | 11 +-
.../spark/sql/hive/execution/HiveTmpPath.scala | 203 +++++
.../execution/InsertIntoHiveDirCommand.scala | 131 ++++
.../hive/execution/InsertIntoHiveTable.scala | 213 +-----
.../sql/hive/execution/SaveAsHiveFile.scala | 73 ++
.../sql/hive/InsertIntoHiveTableSuite.scala | 551 --------------
.../org/apache/spark/sql/hive/InsertSuite.scala | 731 +++++++++++++++++++
17 files changed, 1565 insertions(+), 775 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index f741dcf..239e73e 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -243,8 +243,10 @@ query
;
insertInto
- : INSERT OVERWRITE TABLE tableIdentifier (partitionSpec (IF NOT EXISTS)?)?
- | INSERT INTO TABLE? tableIdentifier partitionSpec?
+ : INSERT OVERWRITE TABLE tableIdentifier (partitionSpec (IF NOT EXISTS)?)? #insertOverwriteTable
+ | INSERT INTO TABLE? tableIdentifier partitionSpec? #insertIntoTable
+ | INSERT OVERWRITE LOCAL? DIRECTORY path=STRING rowFormat? createFileFormat? #insertOverwriteHiveDir
+ | INSERT OVERWRITE LOCAL? DIRECTORY (path=STRING)? tableProvider (OPTIONS options=tablePropertyList)? #insertOverwriteDir
;
partitionSpecLocation
@@ -745,6 +747,7 @@ nonReserved
| AND | CASE | CAST | DISTINCT | DIV | ELSE | END | FUNCTION | INTERVAL | MACRO | OR | STRATIFY | THEN
| UNBOUNDED | WHEN
| DATABASE | SELECT | FROM | WHERE | HAVING | TO | TABLE | WITH | NOT | CURRENT_DATE | CURRENT_TIMESTAMP
+ | DIRECTORY
;
SELECT: 'SELECT';
@@ -815,6 +818,7 @@ WITH: 'WITH';
VALUES: 'VALUES';
CREATE: 'CREATE';
TABLE: 'TABLE';
+DIRECTORY: 'DIRECTORY';
VIEW: 'VIEW';
REPLACE: 'REPLACE';
INSERT: 'INSERT';
http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index 6ab4153..33ba086 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -146,6 +146,9 @@ object UnsupportedOperationChecker {
throwError("Commands like CreateTable*, AlterTable*, Show* are not supported with " +
"streaming DataFrames/Datasets")
+ case _: InsertIntoDir =>
+ throwError("InsertIntoDir is not supported with streaming DataFrames/Datasets")
+
// mapGroupsWithState and flatMapGroupsWithState
case m: FlatMapGroupsWithState if m.isStreaming =>
http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 8a45c52..891f616 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -31,6 +31,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
+import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last}
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
@@ -178,11 +179,64 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
}
/**
- * Add an INSERT INTO [TABLE]/INSERT OVERWRITE TABLE operation to the logical plan.
+ * Parameters used for writing query to a table:
+ * (tableIdentifier, partitionKeys, exists).
+ */
+ type InsertTableParams = (TableIdentifier, Map[String, Option[String]], Boolean)
+
+ /**
+ * Parameters used for writing query to a directory: (isLocal, CatalogStorageFormat, provider).
+ */
+ type InsertDirParams = (Boolean, CatalogStorageFormat, Option[String])
+
+ /**
+ * Add an
+ * {{{
+ * INSERT OVERWRITE TABLE tableIdentifier [partitionSpec [IF NOT EXISTS]]?
+ * INSERT INTO [TABLE] tableIdentifier [partitionSpec]
+ * INSERT OVERWRITE [LOCAL] DIRECTORY STRING [rowFormat] [createFileFormat]
+ * INSERT OVERWRITE [LOCAL] DIRECTORY [STRING] tableProvider [OPTIONS tablePropertyList]
+ * }}}
+ * operation to logical plan
*/
private def withInsertInto(
ctx: InsertIntoContext,
query: LogicalPlan): LogicalPlan = withOrigin(ctx) {
+ ctx match {
+ case table: InsertIntoTableContext =>
+ val (tableIdent, partitionKeys, exists) = visitInsertIntoTable(table)
+ InsertIntoTable(UnresolvedRelation(tableIdent), partitionKeys, query, false, exists)
+ case table: InsertOverwriteTableContext =>
+ val (tableIdent, partitionKeys, exists) = visitInsertOverwriteTable(table)
+ InsertIntoTable(UnresolvedRelation(tableIdent), partitionKeys, query, true, exists)
+ case dir: InsertOverwriteDirContext =>
+ val (isLocal, storage, provider) = visitInsertOverwriteDir(dir)
+ InsertIntoDir(isLocal, storage, provider, query, overwrite = true)
+ case hiveDir: InsertOverwriteHiveDirContext =>
+ val (isLocal, storage, provider) = visitInsertOverwriteHiveDir(hiveDir)
+ InsertIntoDir(isLocal, storage, provider, query, overwrite = true)
+ case _ =>
+ throw new ParseException("Invalid InsertIntoContext", ctx)
+ }
+ }
+
+ /**
+ * Add an INSERT INTO TABLE operation to the logical plan.
+ */
+ override def visitInsertIntoTable(
+ ctx: InsertIntoTableContext): InsertTableParams = withOrigin(ctx) {
+ val tableIdent = visitTableIdentifier(ctx.tableIdentifier)
+ val partitionKeys = Option(ctx.partitionSpec).map(visitPartitionSpec).getOrElse(Map.empty)
+
+ (tableIdent, partitionKeys, false)
+ }
+
+ /**
+ * Add an INSERT OVERWRITE TABLE operation to the logical plan.
+ */
+ override def visitInsertOverwriteTable(
+ ctx: InsertOverwriteTableContext): InsertTableParams = withOrigin(ctx) {
+ assert(ctx.OVERWRITE() != null)
val tableIdent = visitTableIdentifier(ctx.tableIdentifier)
val partitionKeys = Option(ctx.partitionSpec).map(visitPartitionSpec).getOrElse(Map.empty)
@@ -192,12 +246,23 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
"partitions with value: " + dynamicPartitionKeys.keys.mkString("[", ",", "]"), ctx)
}
- InsertIntoTable(
- UnresolvedRelation(tableIdent),
- partitionKeys,
- query,
- ctx.OVERWRITE != null,
- ctx.EXISTS != null)
+ (tableIdent, partitionKeys, ctx.EXISTS() != null)
+ }
+
+ /**
+ * Write to a directory, returning a [[InsertIntoDir]] logical plan.
+ */
+ override def visitInsertOverwriteDir(
+ ctx: InsertOverwriteDirContext): InsertDirParams = withOrigin(ctx) {
+ throw new ParseException("INSERT OVERWRITE DIRECTORY is not supported", ctx)
+ }
+
+ /**
+ * Write to a directory, returning a [[InsertIntoDir]] logical plan.
+ */
+ override def visitInsertOverwriteHiveDir(
+ ctx: InsertOverwriteHiveDirContext): InsertDirParams = withOrigin(ctx) {
+ throw new ParseException("INSERT OVERWRITE DIRECTORY is not supported", ctx)
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 4b3054d..f443cd5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
@@ -360,6 +360,30 @@ case class InsertIntoTable(
}
/**
+ * Insert query result into a directory.
+ *
+ * @param isLocal Indicates whether the specified directory is local directory
+ * @param storage Info about output file, row and what serialization format
+ * @param provider Specifies what data source to use; only used for data source file.
+ * @param child The query to be executed
+ * @param overwrite If true, the existing directory will be overwritten
+ *
+ * Note that this plan is unresolved and has to be replaced by the concrete implementations
+ * during analysis.
+ */
+case class InsertIntoDir(
+ isLocal: Boolean,
+ storage: CatalogStorageFormat,
+ provider: Option[String],
+ child: LogicalPlan,
+ overwrite: Boolean = true)
+ extends UnaryNode {
+
+ override def output: Seq[Attribute] = Seq.empty
+ override lazy val resolved: Boolean = false
+}
+
+/**
* A container for holding the view description(CatalogTable), and the output of the view. The
* child should be a logical plan parsed from the `CatalogTable.viewText`, should throw an error
* if the `viewText` is not defined.
http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index d3f6ab5..d38919b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.parser._
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.datasources.{CreateTable, _}
+import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution}
import org.apache.spark.sql.types.StructType
@@ -1512,4 +1512,81 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
query: LogicalPlan): LogicalPlan = {
RepartitionByExpression(expressions, query, conf.numShufflePartitions)
}
+
+ /**
+ * Return the parameters for [[InsertIntoDir]] logical plan.
+ *
+ * Expected format:
+ * {{{
+ * INSERT OVERWRITE DIRECTORY
+ * [path]
+ * [OPTIONS table_property_list]
+ * select_statement;
+ * }}}
+ */
+ override def visitInsertOverwriteDir(
+ ctx: InsertOverwriteDirContext): InsertDirParams = withOrigin(ctx) {
+ if (ctx.LOCAL != null) {
+ throw new ParseException(
+ "LOCAL is not supported in INSERT OVERWRITE DIRECTORY to data source", ctx)
+ }
+
+ val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
+ var storage = DataSource.buildStorageFormatFromOptions(options)
+
+ val path = Option(ctx.path).map(string).getOrElse("")
+
+ if (!(path.isEmpty ^ storage.locationUri.isEmpty)) {
+ throw new ParseException(
+ "Directory path and 'path' in OPTIONS should be specified one, but not both", ctx)
+ }
+
+ if (!path.isEmpty) {
+ val customLocation = Some(CatalogUtils.stringToURI(path))
+ storage = storage.copy(locationUri = customLocation)
+ }
+
+ val provider = ctx.tableProvider.qualifiedName.getText
+
+ (false, storage, Some(provider))
+ }
+
+ /**
+ * Return the parameters for [[InsertIntoDir]] logical plan.
+ *
+ * Expected format:
+ * {{{
+ * INSERT OVERWRITE [LOCAL] DIRECTORY
+ * path
+ * [ROW FORMAT row_format]
+ * [STORED AS file_format]
+ * select_statement;
+ * }}}
+ */
+ override def visitInsertOverwriteHiveDir(
+ ctx: InsertOverwriteHiveDirContext): InsertDirParams = withOrigin(ctx) {
+ validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx)
+ val rowStorage = Option(ctx.rowFormat).map(visitRowFormat)
+ .getOrElse(CatalogStorageFormat.empty)
+ val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat)
+ .getOrElse(CatalogStorageFormat.empty)
+
+ val path = string(ctx.path)
+ // The path field is required
+ if (path.isEmpty) {
+ operationNotAllowed("INSERT OVERWRITE DIRECTORY must be accompanied by path", ctx)
+ }
+
+ val defaultStorage = HiveSerDe.getDefaultStorage(conf)
+
+ val storage = CatalogStorageFormat(
+ locationUri = Some(CatalogUtils.stringToURI(path)),
+ inputFormat = fileStorage.inputFormat.orElse(defaultStorage.inputFormat),
+ outputFormat = fileStorage.outputFormat.orElse(defaultStorage.outputFormat),
+ serde = rowStorage.serde.orElse(fileStorage.serde).orElse(defaultStorage.serde),
+ compressed = false,
+ properties = rowStorage.properties ++ fileStorage.properties)
+
+ (ctx.LOCAL != null, storage, Some(DDLUtils.HIVE_PROVIDER))
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala
new file mode 100644
index 0000000..633de4c
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.datasources._
+
+/**
+ * A command used to write the result of a query to a directory.
+ *
+ * The syntax of using this command in SQL is:
+ * {{{
+ * INSERT OVERWRITE DIRECTORY (path=STRING)?
+ * USING format OPTIONS ([option1_name "option1_value", option2_name "option2_value", ...])
+ * SELECT ...
+ * }}}
+ *
+ * @param storage storage format used to describe how the query result is stored.
+ * @param provider the data source type to be used
+ * @param query the logical plan representing data to write to
+ * @param overwrite whthere overwrites existing directory
+ */
+case class InsertIntoDataSourceDirCommand(
+ storage: CatalogStorageFormat,
+ provider: String,
+ query: LogicalPlan,
+ overwrite: Boolean) extends RunnableCommand {
+
+ override def children: Seq[LogicalPlan] = Seq(query)
+
+ override def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] = {
+ assert(children.length == 1)
+ assert(storage.locationUri.nonEmpty, "Directory path is required")
+ assert(provider.nonEmpty, "Data source is required")
+
+ // Create the relation based on the input logical plan: `query`.
+ val pathOption = storage.locationUri.map("path" -> CatalogUtils.URIToString(_))
+
+ val dataSource = DataSource(
+ sparkSession,
+ className = provider,
+ options = storage.properties ++ pathOption,
+ catalogTable = None)
+
+ val isFileFormat = classOf[FileFormat].isAssignableFrom(dataSource.providingClass)
+ if (!isFileFormat) {
+ throw new SparkException(
+ "Only Data Sources providing FileFormat are supported: " + dataSource.providingClass)
+ }
+
+ val saveMode = if (overwrite) SaveMode.Overwrite else SaveMode.ErrorIfExists
+ try {
+ sparkSession.sessionState.executePlan(dataSource.planForWriting(saveMode, query))
+ dataSource.writeAndRead(saveMode, query)
+ } catch {
+ case ex: AnalysisException =>
+ logError(s"Failed to write to directory " + storage.locationUri.toString, ex)
+ throw ex
+ }
+
+ Seq.empty[Row]
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 7611e1c..b06f4cc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -33,7 +33,8 @@ import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
-import org.apache.spark.sql.execution.datasources.PartitioningUtils
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningUtils}
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter
import org.apache.spark.sql.internal.HiveSerDe
@@ -869,4 +870,18 @@ object DDLUtils {
}
}
}
+
+ /**
+ * Throws exception if outputPath tries to overwrite inputpath.
+ */
+ def verifyNotReadPath(query: LogicalPlan, outputPath: Path) : Unit = {
+ val inputPaths = query.collect {
+ case LogicalRelation(r: HadoopFsRelation, _, _, _) => r.location.rootPaths
+ }.flatten
+
+ if (inputPaths.contains(outputPath)) {
+ throw new AnalysisException(
+ "Cannot overwrite a path that is also being read from.")
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 5d6223d..018f24e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -17,8 +17,11 @@
package org.apache.spark.sql.execution.datasources
+import java.util.Locale
import java.util.concurrent.Callable
+import org.apache.hadoop.fs.Path
+
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
@@ -29,7 +32,7 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTable, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
@@ -142,6 +145,14 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
parts, query, overwrite, false) if parts.isEmpty =>
InsertIntoDataSourceCommand(l, query, overwrite)
+ case InsertIntoDir(_, storage, provider, query, overwrite)
+ if provider.isDefined && provider.get.toLowerCase(Locale.ROOT) != DDLUtils.HIVE_PROVIDER =>
+
+ val outputPath = new Path(storage.locationUri.get)
+ if (overwrite) DDLUtils.verifyNotReadPath(query, outputPath)
+
+ InsertIntoDataSourceDirCommand(storage, provider.get, query, overwrite)
+
case i @ InsertIntoTable(
l @ LogicalRelation(t: HadoopFsRelation, _, table, _), parts, query, overwrite, _) =>
// If the InsertIntoTable command is for a partitioned HadoopFsRelation and
@@ -178,15 +189,9 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
}
val outputPath = t.location.rootPaths.head
- val inputPaths = actualQuery.collect {
- case LogicalRelation(r: HadoopFsRelation, _, _, _) => r.location.rootPaths
- }.flatten
+ if (overwrite) DDLUtils.verifyNotReadPath(actualQuery, outputPath)
val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
- if (overwrite && inputPaths.contains(outputPath)) {
- throw new AnalysisException(
- "Cannot overwrite a path that is also being read from.")
- }
val partitionSchema = actualQuery.resolve(
t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver)
http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
index 4ee3821..fa5172c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
@@ -32,7 +32,8 @@ import org.apache.spark.sql.catalyst.dsl.plans.DslLogicalPlan
import org.apache.spark.sql.catalyst.expressions.JsonTuple
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.PlanTest
-import org.apache.spark.sql.catalyst.plans.logical.{Generate, LogicalPlan, Project, ScriptTransformation}
+import org.apache.spark.sql.catalyst.plans.logical.{Generate, InsertIntoDir, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{Project, ScriptTransformation}
import org.apache.spark.sql.execution.SparkSqlParser
import org.apache.spark.sql.execution.datasources.CreateTable
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
@@ -524,6 +525,55 @@ class DDLParserSuite extends PlanTest with SharedSQLContext {
assert(e.message.contains("you can only specify one of them."))
}
+ test("insert overwrite directory") {
+ val v1 = "INSERT OVERWRITE DIRECTORY '/tmp/file' USING parquet SELECT 1 as a"
+ parser.parsePlan(v1) match {
+ case InsertIntoDir(_, storage, provider, query, overwrite) =>
+ assert(storage.locationUri.isDefined && storage.locationUri.get.toString == "/tmp/file")
+ case other =>
+ fail(s"Expected to parse ${classOf[InsertIntoDataSourceDirCommand].getClass.getName}" +
+ " from query," + s" got ${other.getClass.getName}: $v1")
+ }
+
+ val v2 = "INSERT OVERWRITE DIRECTORY USING parquet SELECT 1 as a"
+ val e2 = intercept[ParseException] {
+ parser.parsePlan(v2)
+ }
+ assert(e2.message.contains(
+ "Directory path and 'path' in OPTIONS should be specified one, but not both"))
+
+ val v3 =
+ """
+ | INSERT OVERWRITE DIRECTORY USING json
+ | OPTIONS ('path' '/tmp/file', a 1, b 0.1, c TRUE)
+ | SELECT 1 as a
+ """.stripMargin
+ parser.parsePlan(v3) match {
+ case InsertIntoDir(_, storage, provider, query, overwrite) =>
+ assert(storage.locationUri.isDefined && provider == Some("json"))
+ assert(storage.properties.get("a") == Some("1"))
+ assert(storage.properties.get("b") == Some("0.1"))
+ assert(storage.properties.get("c") == Some("true"))
+ assert(!storage.properties.contains("abc"))
+ assert(!storage.properties.contains("path"))
+ case other =>
+ fail(s"Expected to parse ${classOf[InsertIntoDataSourceDirCommand].getClass.getName}" +
+ " from query," + s"got ${other.getClass.getName}: $v1")
+ }
+
+ val v4 =
+ """
+ | INSERT OVERWRITE DIRECTORY '/tmp/file' USING json
+ | OPTIONS ('path' '/tmp/file', a 1, b 0.1, c TRUE)
+ | SELECT 1 as a
+ """.stripMargin
+ val e4 = intercept[ParseException] {
+ parser.parsePlan(v4)
+ }
+ assert(e4.message.contains(
+ "Directory path and 'path' in OPTIONS should be specified one, but not both"))
+ }
+
// ALTER TABLE table_name RENAME TO new_table_name;
// ALTER VIEW view_name RENAME TO new_view_name;
test("alter table/view: rename table/view") {
http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 41abff2..875b745 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.sources
import java.io.File
+import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.Utils
@@ -366,4 +367,63 @@ class InsertSuite extends DataSourceTest with SharedSQLContext {
Row(Array(1, 2), Array("a", "b")))
}
}
+
+ test("insert overwrite directory") {
+ withTempDir { dir =>
+ val path = dir.toURI.getPath
+
+ val v1 =
+ s"""
+ | INSERT OVERWRITE DIRECTORY '${path}'
+ | USING json
+ | OPTIONS (a 1, b 0.1, c TRUE)
+ | SELECT 1 as a, 'c' as b
+ """.stripMargin
+
+ spark.sql(v1)
+
+ checkAnswer(
+ spark.read.json(dir.getCanonicalPath),
+ sql("SELECT 1 as a, 'c' as b"))
+ }
+ }
+
+ test("insert overwrite directory with path in options") {
+ withTempDir { dir =>
+ val path = dir.toURI.getPath
+
+ val v1 =
+ s"""
+ | INSERT OVERWRITE DIRECTORY
+ | USING json
+ | OPTIONS ('path' '${path}')
+ | SELECT 1 as a, 'c' as b
+ """.stripMargin
+
+ spark.sql(v1)
+
+ checkAnswer(
+ spark.read.json(dir.getCanonicalPath),
+ sql("SELECT 1 as a, 'c' as b"))
+ }
+ }
+
+ test("insert overwrite directory to data source not providing FileFormat") {
+ withTempDir { dir =>
+ val path = dir.toURI.getPath
+
+ val v1 =
+ s"""
+ | INSERT OVERWRITE DIRECTORY '${path}'
+ | USING JDBC
+ | OPTIONS (a 1, b 0.1, c TRUE)
+ | SELECT 1 as a, 'c' as b
+ """.stripMargin
+ val e = intercept[SparkException] {
+ spark.sql(v1)
+ }.getMessage
+
+ assert(e.contains("Only Data Sources providing FileFormat are supported"))
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 47203a8..caf554d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -26,7 +26,8 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, ScriptTransformation}
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTable, LogicalPlan,
+ ScriptTransformation}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
@@ -157,6 +158,14 @@ object HiveAnalysis extends Rule[LogicalPlan] {
case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) =>
DDLUtils.checkDataSchemaFieldNames(tableDesc)
CreateHiveTableAsSelectCommand(tableDesc, query, mode)
+
+ case InsertIntoDir(isLocal, storage, provider, child, overwrite)
+ if provider.isDefined && provider.get.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER =>
+
+ val outputPath = new Path(storage.locationUri.get)
+ if (overwrite) DDLUtils.verifyNotReadPath(child, outputPath)
+
+ InsertIntoHiveDirCommand(isLocal, storage, child, overwrite)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTmpPath.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTmpPath.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTmpPath.scala
new file mode 100644
index 0000000..15ca1df
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTmpPath.scala
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.{File, IOException}
+import java.net.URI
+import java.text.SimpleDateFormat
+import java.util.{Date, Locale, Random}
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.hive.common.FileUtils
+import org.apache.hadoop.hive.ql.exec.TaskRunner
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.hive.HiveExternalCatalog
+import org.apache.spark.sql.hive.client.HiveVersion
+
+// Base trait for getting a temporary location for writing data
+private[hive] trait HiveTmpPath extends Logging {
+
+ var createdTempDir: Option[Path] = None
+
+ def getExternalTmpPath(
+ sparkSession: SparkSession,
+ hadoopConf: Configuration,
+ path: Path): Path = {
+ import org.apache.spark.sql.hive.client.hive._
+
+ // Before Hive 1.1, when inserting into a table, Hive will create the staging directory under
+ // a common scratch directory. After the writing is finished, Hive will simply empty the table
+ // directory and move the staging directory to it.
+ // After Hive 1.1, Hive will create the staging directory under the table directory, and when
+ // moving staging directory to table directory, Hive will still empty the table directory, but
+ // will exclude the staging directory there.
+ // We have to follow the Hive behavior here, to avoid troubles. For example, if we create
+ // staging directory under the table director for Hive prior to 1.1, the staging directory will
+ // be removed by Hive when Hive is trying to empty the table directory.
+ val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0)
+ val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] = Set(v1_1, v1_2, v2_0, v2_1)
+
+ // Ensure all the supported versions are considered here.
+ assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath ==
+ allSupportedHiveVersions)
+
+ val externalCatalog = sparkSession.sharedState.externalCatalog
+ val hiveVersion = externalCatalog.asInstanceOf[HiveExternalCatalog].client.version
+ val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
+ val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
+
+ if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) {
+ oldVersionExternalTempPath(path, hadoopConf, scratchDir)
+ } else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) {
+ newVersionExternalTempPath(path, hadoopConf, stagingDir)
+ } else {
+ throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion)
+ }
+ }
+
+ def deleteExternalTmpPath(hadoopConf: Configuration) : Unit = {
+ // Attempt to delete the staging directory and the inclusive files. If failed, the files are
+ // expected to be dropped at the normal termination of VM since deleteOnExit is used.
+ try {
+ createdTempDir.foreach { path =>
+ val fs = path.getFileSystem(hadoopConf)
+ if (fs.delete(path, true)) {
+ // If we successfully delete the staging directory, remove it from FileSystem's cache.
+ fs.cancelDeleteOnExit(path)
+ }
+ }
+ } catch {
+ case NonFatal(e) =>
+ val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
+ logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)
+ }
+ }
+
+ // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13
+ private def oldVersionExternalTempPath(
+ path: Path,
+ hadoopConf: Configuration,
+ scratchDir: String): Path = {
+ val extURI: URI = path.toUri
+ val scratchPath = new Path(scratchDir, executionId)
+ var dirPath = new Path(
+ extURI.getScheme,
+ extURI.getAuthority,
+ scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID())
+
+ try {
+ val fs: FileSystem = dirPath.getFileSystem(hadoopConf)
+ dirPath = new Path(fs.makeQualified(dirPath).toString())
+
+ if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) {
+ throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString)
+ }
+ createdTempDir = Some(dirPath)
+ fs.deleteOnExit(dirPath)
+ } catch {
+ case e: IOException =>
+ throw new RuntimeException("Cannot create staging directory: " + dirPath.toString, e)
+ }
+ dirPath
+ }
+
+ // Mostly copied from Context.java#getExternalTmpPath of Hive 1.2
+ private def newVersionExternalTempPath(
+ path: Path,
+ hadoopConf: Configuration,
+ stagingDir: String): Path = {
+ val extURI: URI = path.toUri
+ if (extURI.getScheme == "viewfs") {
+ getExtTmpPathRelTo(path.getParent, hadoopConf, stagingDir)
+ } else {
+ new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir), "-ext-10000")
+ }
+ }
+
+ private def getExtTmpPathRelTo(
+ path: Path,
+ hadoopConf: Configuration,
+ stagingDir: String): Path = {
+ new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-10000") // Hive uses 10000
+ }
+
+ private def getExternalScratchDir(
+ extURI: URI,
+ hadoopConf: Configuration,
+ stagingDir: String): Path = {
+ getStagingDir(
+ new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath),
+ hadoopConf,
+ stagingDir)
+ }
+
+ private def getStagingDir(
+ inputPath: Path,
+ hadoopConf: Configuration,
+ stagingDir: String): Path = {
+ val inputPathUri: URI = inputPath.toUri
+ val inputPathName: String = inputPathUri.getPath
+ val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
+ var stagingPathName: String =
+ if (inputPathName.indexOf(stagingDir) == -1) {
+ new Path(inputPathName, stagingDir).toString
+ } else {
+ inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length)
+ }
+
+ // SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive requires that the
+ // staging directory needs to avoid being deleted when users set hive.exec.stagingdir
+ // under the table directory.
+ if (FileUtils.isSubDir(new Path(stagingPathName), inputPath, fs) &&
+ !stagingPathName.stripPrefix(inputPathName).stripPrefix(File.separator).startsWith(".")) {
+ logDebug(s"The staging dir '$stagingPathName' should be a child directory starts " +
+ "with '.' to avoid being deleted if we set hive.exec.stagingdir under the table " +
+ "directory.")
+ stagingPathName = new Path(inputPathName, ".hive-staging").toString
+ }
+
+ val dir: Path =
+ fs.makeQualified(
+ new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID))
+ logDebug("Created staging dir = " + dir + " for path = " + inputPath)
+ try {
+ if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) {
+ throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'")
+ }
+ createdTempDir = Some(dir)
+ fs.deleteOnExit(dir)
+ } catch {
+ case e: IOException =>
+ throw new RuntimeException(
+ "Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e)
+ }
+ dir
+ }
+
+ private def executionId: String = {
+ val rand: Random = new Random
+ val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US)
+ "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong)
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
new file mode 100644
index 0000000..2110038
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import scala.language.existentials
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.hive.common.FileUtils
+import org.apache.hadoop.hive.ql.plan.TableDesc
+import org.apache.hadoop.hive.serde.serdeConstants
+import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
+import org.apache.hadoop.mapred._
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.hive.client.HiveClientImpl
+
+/**
+ * Command for writing the results of `query` to file system.
+ *
+ * The syntax of using this command in SQL is:
+ * {{{
+ * INSERT OVERWRITE [LOCAL] DIRECTORY
+ * path
+ * [ROW FORMAT row_format]
+ * [STORED AS file_format]
+ * SELECT ...
+ * }}}
+ *
+ * @param isLocal whether the path specified in `storage` is a local directory
+ * @param storage storage format used to describe how the query result is stored.
+ * @param query the logical plan representing data to write to
+ * @param overwrite whether overwrites existing directory
+ */
+case class InsertIntoHiveDirCommand(
+ isLocal: Boolean,
+ storage: CatalogStorageFormat,
+ query: LogicalPlan,
+ overwrite: Boolean) extends SaveAsHiveFile with HiveTmpPath {
+
+ override def children: Seq[LogicalPlan] = query :: Nil
+
+ override def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] = {
+ assert(children.length == 1)
+ assert(storage.locationUri.nonEmpty)
+
+ val hiveTable = HiveClientImpl.toHiveTable(CatalogTable(
+ identifier = TableIdentifier(storage.locationUri.get.toString, Some("default")),
+ tableType = org.apache.spark.sql.catalyst.catalog.CatalogTableType.VIEW,
+ storage = storage,
+ schema = query.schema
+ ))
+ hiveTable.getMetadata.put(serdeConstants.SERIALIZATION_LIB,
+ storage.serde.getOrElse(classOf[LazySimpleSerDe].getName))
+
+ val tableDesc = new TableDesc(
+ hiveTable.getInputFormatClass,
+ hiveTable.getOutputFormatClass,
+ hiveTable.getMetadata
+ )
+
+ val hadoopConf = sparkSession.sessionState.newHadoopConf()
+ val jobConf = new JobConf(hadoopConf)
+
+ val targetPath = new Path(storage.locationUri.get)
+ val writeToPath =
+ if (isLocal) {
+ val localFileSystem = FileSystem.getLocal(jobConf)
+ localFileSystem.makeQualified(targetPath)
+ } else {
+ val qualifiedPath = FileUtils.makeQualified(targetPath, hadoopConf)
+ val dfs = qualifiedPath.getFileSystem(jobConf)
+ if (!dfs.exists(qualifiedPath)) {
+ dfs.mkdirs(qualifiedPath.getParent)
+ }
+ qualifiedPath
+ }
+
+ val tmpPath = getExternalTmpPath(sparkSession, hadoopConf, writeToPath)
+ val fileSinkConf = new org.apache.spark.sql.hive.HiveShim.ShimFileSinkDesc(
+ tmpPath.toString, tableDesc, false)
+
+ try {
+ saveAsHiveFile(
+ sparkSession = sparkSession,
+ plan = children.head,
+ hadoopConf = hadoopConf,
+ fileSinkConf = fileSinkConf,
+ outputLocation = tmpPath.toString)
+
+ val fs = writeToPath.getFileSystem(hadoopConf)
+ if (overwrite && fs.exists(writeToPath)) {
+ fs.listStatus(writeToPath).foreach { existFile =>
+ if (Option(existFile.getPath) != createdTempDir) fs.delete(existFile.getPath, true)
+ }
+ }
+
+ fs.listStatus(tmpPath).foreach {
+ tmpFile => fs.rename(tmpFile.getPath, writeToPath)
+ }
+ } catch {
+ case e: Throwable =>
+ throw new SparkException(
+ "Failed inserting overwrite directory " + storage.locationUri.get, e)
+ } finally {
+ deleteExternalTmpPath(hadoopConf)
+ }
+
+ Seq.empty[Row]
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 46610f8..5bdc97a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -17,32 +17,22 @@
package org.apache.spark.sql.hive.execution
-import java.io.{File, IOException}
-import java.net.URI
-import java.text.SimpleDateFormat
-import java.util.{Date, Locale, Random}
-
import scala.util.control.NonFatal
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.hive.common.FileUtils
+import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.ql.ErrorMsg
-import org.apache.hadoop.hive.ql.exec.TaskRunner
import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.spark.SparkException
-import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.command.{CommandUtils, DataWritingCommand}
-import org.apache.spark.sql.execution.datasources.FileFormatWriter
+import org.apache.spark.sql.execution.command.CommandUtils
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
-import org.apache.spark.sql.hive.client.{HiveClientImpl, HiveVersion}
+import org.apache.spark.sql.hive.client.HiveClientImpl
/**
@@ -80,152 +70,10 @@ case class InsertIntoHiveTable(
partition: Map[String, Option[String]],
query: LogicalPlan,
overwrite: Boolean,
- ifPartitionNotExists: Boolean) extends DataWritingCommand {
+ ifPartitionNotExists: Boolean) extends SaveAsHiveFile with HiveTmpPath {
override def children: Seq[LogicalPlan] = query :: Nil
- var createdTempDir: Option[Path] = None
-
- private def executionId: String = {
- val rand: Random = new Random
- val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US)
- "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong)
- }
-
- private def getStagingDir(
- inputPath: Path,
- hadoopConf: Configuration,
- stagingDir: String): Path = {
- val inputPathUri: URI = inputPath.toUri
- val inputPathName: String = inputPathUri.getPath
- val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
- var stagingPathName: String =
- if (inputPathName.indexOf(stagingDir) == -1) {
- new Path(inputPathName, stagingDir).toString
- } else {
- inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length)
- }
-
- // SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive requires that the
- // staging directory needs to avoid being deleted when users set hive.exec.stagingdir
- // under the table directory.
- if (FileUtils.isSubDir(new Path(stagingPathName), inputPath, fs) &&
- !stagingPathName.stripPrefix(inputPathName).stripPrefix(File.separator).startsWith(".")) {
- logDebug(s"The staging dir '$stagingPathName' should be a child directory starts " +
- "with '.' to avoid being deleted if we set hive.exec.stagingdir under the table " +
- "directory.")
- stagingPathName = new Path(inputPathName, ".hive-staging").toString
- }
-
- val dir: Path =
- fs.makeQualified(
- new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID))
- logDebug("Created staging dir = " + dir + " for path = " + inputPath)
- try {
- if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) {
- throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'")
- }
- createdTempDir = Some(dir)
- fs.deleteOnExit(dir)
- } catch {
- case e: IOException =>
- throw new RuntimeException(
- "Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e)
- }
- dir
- }
-
- private def getExternalScratchDir(
- extURI: URI,
- hadoopConf: Configuration,
- stagingDir: String): Path = {
- getStagingDir(
- new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath),
- hadoopConf,
- stagingDir)
- }
-
- def getExternalTmpPath(
- path: Path,
- hiveVersion: HiveVersion,
- hadoopConf: Configuration,
- stagingDir: String,
- scratchDir: String): Path = {
- import org.apache.spark.sql.hive.client.hive._
-
- // Before Hive 1.1, when inserting into a table, Hive will create the staging directory under
- // a common scratch directory. After the writing is finished, Hive will simply empty the table
- // directory and move the staging directory to it.
- // After Hive 1.1, Hive will create the staging directory under the table directory, and when
- // moving staging directory to table directory, Hive will still empty the table directory, but
- // will exclude the staging directory there.
- // We have to follow the Hive behavior here, to avoid troubles. For example, if we create
- // staging directory under the table director for Hive prior to 1.1, the staging directory will
- // be removed by Hive when Hive is trying to empty the table directory.
- val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0)
- val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] = Set(v1_1, v1_2, v2_0, v2_1)
-
- // Ensure all the supported versions are considered here.
- assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath ==
- allSupportedHiveVersions)
-
- if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) {
- oldVersionExternalTempPath(path, hadoopConf, scratchDir)
- } else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) {
- newVersionExternalTempPath(path, hadoopConf, stagingDir)
- } else {
- throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion)
- }
- }
-
- // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13
- def oldVersionExternalTempPath(
- path: Path,
- hadoopConf: Configuration,
- scratchDir: String): Path = {
- val extURI: URI = path.toUri
- val scratchPath = new Path(scratchDir, executionId)
- var dirPath = new Path(
- extURI.getScheme,
- extURI.getAuthority,
- scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID())
-
- try {
- val fs: FileSystem = dirPath.getFileSystem(hadoopConf)
- dirPath = new Path(fs.makeQualified(dirPath).toString())
-
- if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) {
- throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString)
- }
- createdTempDir = Some(dirPath)
- fs.deleteOnExit(dirPath)
- } catch {
- case e: IOException =>
- throw new RuntimeException("Cannot create staging directory: " + dirPath.toString, e)
- }
- dirPath
- }
-
- // Mostly copied from Context.java#getExternalTmpPath of Hive 1.2
- def newVersionExternalTempPath(
- path: Path,
- hadoopConf: Configuration,
- stagingDir: String): Path = {
- val extURI: URI = path.toUri
- if (extURI.getScheme == "viewfs") {
- getExtTmpPathRelTo(path.getParent, hadoopConf, stagingDir)
- } else {
- new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir), "-ext-10000")
- }
- }
-
- def getExtTmpPathRelTo(
- path: Path,
- hadoopConf: Configuration,
- stagingDir: String): Path = {
- new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-10000") // Hive uses 10000
- }
-
/**
* Inserts all the rows in the table into Hive. Row objects are properly serialized with the
* `org.apache.hadoop.hive.serde2.SerDe` and the
@@ -234,12 +82,8 @@ case class InsertIntoHiveTable(
override def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] = {
assert(children.length == 1)
- val sessionState = sparkSession.sessionState
val externalCatalog = sparkSession.sharedState.externalCatalog
- val hiveVersion = externalCatalog.asInstanceOf[HiveExternalCatalog].client.version
- val hadoopConf = sessionState.newHadoopConf()
- val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
- val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
+ val hadoopConf = sparkSession.sessionState.newHadoopConf()
val hiveQlTable = HiveClientImpl.toHiveTable(table)
// Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer
@@ -254,23 +98,8 @@ case class InsertIntoHiveTable(
hiveQlTable.getMetadata
)
val tableLocation = hiveQlTable.getDataLocation
- val tmpLocation =
- getExternalTmpPath(tableLocation, hiveVersion, hadoopConf, stagingDir, scratchDir)
+ val tmpLocation = getExternalTmpPath(sparkSession, hadoopConf, tableLocation)
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
- val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean
-
- if (isCompressed) {
- // Please note that isCompressed, "mapreduce.output.fileoutputformat.compress",
- // "mapreduce.output.fileoutputformat.compress.codec", and
- // "mapreduce.output.fileoutputformat.compress.type"
- // have no impact on ORC because it uses table properties to store compression information.
- hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true")
- fileSinkConf.setCompressed(true)
- fileSinkConf.setCompressCodec(hadoopConf
- .get("mapreduce.output.fileoutputformat.compress.codec"))
- fileSinkConf.setCompressType(hadoopConf
- .get("mapreduce.output.fileoutputformat.compress.type"))
- }
val numDynamicPartitions = partition.values.count(_.isEmpty)
val numStaticPartitions = partition.values.count(_.nonEmpty)
@@ -332,11 +161,6 @@ case class InsertIntoHiveTable(
case _ => // do nothing since table has no bucketing
}
- val committer = FileCommitProtocol.instantiate(
- sparkSession.sessionState.conf.fileCommitProtocolClass,
- jobId = java.util.UUID.randomUUID().toString,
- outputPath = tmpLocation.toString)
-
val partitionAttributes = partitionColumnNames.takeRight(numDynamicPartitions).map { name =>
query.resolve(name :: Nil, sparkSession.sessionState.analyzer.resolver).getOrElse {
throw new AnalysisException(
@@ -344,17 +168,13 @@ case class InsertIntoHiveTable(
}.asInstanceOf[Attribute]
}
- FileFormatWriter.write(
+ saveAsHiveFile(
sparkSession = sparkSession,
plan = children.head,
- fileFormat = new HiveFileFormat(fileSinkConf),
- committer = committer,
- outputSpec = FileFormatWriter.OutputSpec(tmpLocation.toString, Map.empty),
hadoopConf = hadoopConf,
- partitionColumns = partitionAttributes,
- bucketSpec = None,
- statsTrackers = Seq(basicWriteJobStatsTracker(hadoopConf)),
- options = Map.empty)
+ fileSinkConf = fileSinkConf,
+ outputLocation = tmpLocation.toString,
+ partitionAttributes = partitionAttributes)
if (partition.nonEmpty) {
if (numDynamicPartitions > 0) {
@@ -422,18 +242,7 @@ case class InsertIntoHiveTable(
// Attempt to delete the staging directory and the inclusive files. If failed, the files are
// expected to be dropped at the normal termination of VM since deleteOnExit is used.
- try {
- createdTempDir.foreach { path =>
- val fs = path.getFileSystem(hadoopConf)
- if (fs.delete(path, true)) {
- // If we successfully delete the staging directory, remove it from FileSystem's cache.
- fs.cancelDeleteOnExit(path)
- }
- }
- } catch {
- case NonFatal(e) =>
- logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)
- }
+ deleteExternalTmpPath(hadoopConf)
// un-cache this table.
sparkSession.catalog.uncacheTable(table.identifier.quotedString)
http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
new file mode 100644
index 0000000..7de9b42
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.internal.io.FileCommitProtocol
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.command.DataWritingCommand
+import org.apache.spark.sql.execution.datasources.FileFormatWriter
+import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
+
+// Base trait from which all hive insert statement physical execution extends.
+private[hive] trait SaveAsHiveFile extends DataWritingCommand {
+
+ protected def saveAsHiveFile(
+ sparkSession: SparkSession,
+ plan: SparkPlan,
+ hadoopConf: Configuration,
+ fileSinkConf: FileSinkDesc,
+ outputLocation: String,
+ partitionAttributes: Seq[Attribute] = Nil): Unit = {
+
+ val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean
+ if (isCompressed) {
+ // Please note that isCompressed, "mapreduce.output.fileoutputformat.compress",
+ // "mapreduce.output.fileoutputformat.compress.codec", and
+ // "mapreduce.output.fileoutputformat.compress.type"
+ // have no impact on ORC because it uses table properties to store compression information.
+ hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true")
+ fileSinkConf.setCompressed(true)
+ fileSinkConf.setCompressCodec(hadoopConf
+ .get("mapreduce.output.fileoutputformat.compress.codec"))
+ fileSinkConf.setCompressType(hadoopConf
+ .get("mapreduce.output.fileoutputformat.compress.type"))
+ }
+
+ val committer = FileCommitProtocol.instantiate(
+ sparkSession.sessionState.conf.fileCommitProtocolClass,
+ jobId = java.util.UUID.randomUUID().toString,
+ outputPath = outputLocation)
+
+ FileFormatWriter.write(
+ sparkSession = sparkSession,
+ plan = plan,
+ fileFormat = new HiveFileFormat(fileSinkConf),
+ committer = committer,
+ outputSpec = FileFormatWriter.OutputSpec(outputLocation, Map.empty),
+ hadoopConf = hadoopConf,
+ partitionColumns = partitionAttributes,
+ bucketSpec = None,
+ statsTrackers = Seq(basicWriteJobStatsTracker(hadoopConf)),
+ options = Map.empty)
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
deleted file mode 100644
index e93c654..0000000
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
+++ /dev/null
@@ -1,551 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import java.io.File
-
-import org.scalatest.BeforeAndAfter
-
-import org.apache.spark.SparkException
-import org.apache.spark.sql.{QueryTest, _}
-import org.apache.spark.sql.catalyst.parser.ParseException
-import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
-import org.apache.spark.sql.hive.test.TestHiveSingleton
-import org.apache.spark.sql.test.SQLTestUtils
-import org.apache.spark.sql.types._
-import org.apache.spark.util.Utils
-
-case class TestData(key: Int, value: String)
-
-case class ThreeCloumntable(key: Int, value: String, key1: String)
-
-class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
- with SQLTestUtils {
- import spark.implicits._
-
- override lazy val testData = spark.sparkContext.parallelize(
- (1 to 100).map(i => TestData(i, i.toString))).toDF()
-
- before {
- // Since every we are doing tests for DDL statements,
- // it is better to reset before every test.
- hiveContext.reset()
- // Creates a temporary view with testData, which will be used in all tests.
- testData.createOrReplaceTempView("testData")
- }
-
- test("insertInto() HiveTable") {
- withTable("createAndInsertTest") {
- sql("CREATE TABLE createAndInsertTest (key int, value string)")
-
- // Add some data.
- testData.write.mode(SaveMode.Append).insertInto("createAndInsertTest")
-
- // Make sure the table has also been updated.
- checkAnswer(
- sql("SELECT * FROM createAndInsertTest"),
- testData.collect().toSeq
- )
-
- // Add more data.
- testData.write.mode(SaveMode.Append).insertInto("createAndInsertTest")
-
- // Make sure the table has been updated.
- checkAnswer(
- sql("SELECT * FROM createAndInsertTest"),
- testData.toDF().collect().toSeq ++ testData.toDF().collect().toSeq
- )
-
- // Now overwrite.
- testData.write.mode(SaveMode.Overwrite).insertInto("createAndInsertTest")
-
- // Make sure the registered table has also been updated.
- checkAnswer(
- sql("SELECT * FROM createAndInsertTest"),
- testData.collect().toSeq
- )
- }
- }
-
- test("Double create fails when allowExisting = false") {
- withTable("doubleCreateAndInsertTest") {
- sql("CREATE TABLE doubleCreateAndInsertTest (key int, value string)")
-
- intercept[AnalysisException] {
- sql("CREATE TABLE doubleCreateAndInsertTest (key int, value string)")
- }
- }
- }
-
- test("Double create does not fail when allowExisting = true") {
- withTable("doubleCreateAndInsertTest") {
- sql("CREATE TABLE doubleCreateAndInsertTest (key int, value string)")
- sql("CREATE TABLE IF NOT EXISTS doubleCreateAndInsertTest (key int, value string)")
- }
- }
-
- test("SPARK-4052: scala.collection.Map as value type of MapType") {
- val schema = StructType(StructField("m", MapType(StringType, StringType), true) :: Nil)
- val rowRDD = spark.sparkContext.parallelize(
- (1 to 100).map(i => Row(scala.collection.mutable.HashMap(s"key$i" -> s"value$i"))))
- val df = spark.createDataFrame(rowRDD, schema)
- df.createOrReplaceTempView("tableWithMapValue")
- sql("CREATE TABLE hiveTableWithMapValue(m MAP <STRING, STRING>)")
- sql("INSERT OVERWRITE TABLE hiveTableWithMapValue SELECT m FROM tableWithMapValue")
-
- checkAnswer(
- sql("SELECT * FROM hiveTableWithMapValue"),
- rowRDD.collect().toSeq
- )
-
- sql("DROP TABLE hiveTableWithMapValue")
- }
-
- test("SPARK-4203:random partition directory order") {
- sql("CREATE TABLE tmp_table (key int, value string)")
- val tmpDir = Utils.createTempDir()
- // The default value of hive.exec.stagingdir.
- val stagingDir = ".hive-staging"
-
- sql(
- s"""
- |CREATE TABLE table_with_partition(c1 string)
- |PARTITIONED by (p1 string,p2 string,p3 string,p4 string,p5 string)
- |location '${tmpDir.toURI.toString}'
- """.stripMargin)
- sql(
- """
- |INSERT OVERWRITE TABLE table_with_partition
- |partition (p1='a',p2='b',p3='c',p4='c',p5='1')
- |SELECT 'blarr' FROM tmp_table
- """.stripMargin)
- sql(
- """
- |INSERT OVERWRITE TABLE table_with_partition
- |partition (p1='a',p2='b',p3='c',p4='c',p5='2')
- |SELECT 'blarr' FROM tmp_table
- """.stripMargin)
- sql(
- """
- |INSERT OVERWRITE TABLE table_with_partition
- |partition (p1='a',p2='b',p3='c',p4='c',p5='3')
- |SELECT 'blarr' FROM tmp_table
- """.stripMargin)
- sql(
- """
- |INSERT OVERWRITE TABLE table_with_partition
- |partition (p1='a',p2='b',p3='c',p4='c',p5='4')
- |SELECT 'blarr' FROM tmp_table
- """.stripMargin)
- def listFolders(path: File, acc: List[String]): List[List[String]] = {
- val dir = path.listFiles()
- val folders = dir.filter { e => e.isDirectory && !e.getName().startsWith(stagingDir) }.toList
- if (folders.isEmpty) {
- List(acc.reverse)
- } else {
- folders.flatMap(x => listFolders(x, x.getName :: acc))
- }
- }
- val expected = List(
- "p1=a"::"p2=b"::"p3=c"::"p4=c"::"p5=2"::Nil,
- "p1=a"::"p2=b"::"p3=c"::"p4=c"::"p5=3"::Nil,
- "p1=a"::"p2=b"::"p3=c"::"p4=c"::"p5=1"::Nil,
- "p1=a"::"p2=b"::"p3=c"::"p4=c"::"p5=4"::Nil
- )
- assert(listFolders(tmpDir, List()).sortBy(_.toString()) === expected.sortBy(_.toString))
- sql("DROP TABLE table_with_partition")
- sql("DROP TABLE tmp_table")
- }
-
- testPartitionedTable("INSERT OVERWRITE - partition IF NOT EXISTS") { tableName =>
- val selQuery = s"select a, b, c, d from $tableName"
- sql(
- s"""
- |INSERT OVERWRITE TABLE $tableName
- |partition (b=2, c=3)
- |SELECT 1, 4
- """.stripMargin)
- checkAnswer(sql(selQuery), Row(1, 2, 3, 4))
-
- sql(
- s"""
- |INSERT OVERWRITE TABLE $tableName
- |partition (b=2, c=3)
- |SELECT 5, 6
- """.stripMargin)
- checkAnswer(sql(selQuery), Row(5, 2, 3, 6))
-
- val e = intercept[AnalysisException] {
- sql(
- s"""
- |INSERT OVERWRITE TABLE $tableName
- |partition (b=2, c) IF NOT EXISTS
- |SELECT 7, 8, 3
- """.stripMargin)
- }
- assert(e.getMessage.contains(
- "Dynamic partitions do not support IF NOT EXISTS. Specified partitions with value: [c]"))
-
- // If the partition already exists, the insert will overwrite the data
- // unless users specify IF NOT EXISTS
- sql(
- s"""
- |INSERT OVERWRITE TABLE $tableName
- |partition (b=2, c=3) IF NOT EXISTS
- |SELECT 9, 10
- """.stripMargin)
- checkAnswer(sql(selQuery), Row(5, 2, 3, 6))
-
- // ADD PARTITION has the same effect, even if no actual data is inserted.
- sql(s"ALTER TABLE $tableName ADD PARTITION (b=21, c=31)")
- sql(
- s"""
- |INSERT OVERWRITE TABLE $tableName
- |partition (b=21, c=31) IF NOT EXISTS
- |SELECT 20, 24
- """.stripMargin)
- checkAnswer(sql(selQuery), Row(5, 2, 3, 6))
- }
-
- test("Insert ArrayType.containsNull == false") {
- val schema = StructType(Seq(
- StructField("a", ArrayType(StringType, containsNull = false))))
- val rowRDD = spark.sparkContext.parallelize((1 to 100).map(i => Row(Seq(s"value$i"))))
- val df = spark.createDataFrame(rowRDD, schema)
- df.createOrReplaceTempView("tableWithArrayValue")
- sql("CREATE TABLE hiveTableWithArrayValue(a Array <STRING>)")
- sql("INSERT OVERWRITE TABLE hiveTableWithArrayValue SELECT a FROM tableWithArrayValue")
-
- checkAnswer(
- sql("SELECT * FROM hiveTableWithArrayValue"),
- rowRDD.collect().toSeq)
-
- sql("DROP TABLE hiveTableWithArrayValue")
- }
-
- test("Insert MapType.valueContainsNull == false") {
- val schema = StructType(Seq(
- StructField("m", MapType(StringType, StringType, valueContainsNull = false))))
- val rowRDD = spark.sparkContext.parallelize(
- (1 to 100).map(i => Row(Map(s"key$i" -> s"value$i"))))
- val df = spark.createDataFrame(rowRDD, schema)
- df.createOrReplaceTempView("tableWithMapValue")
- sql("CREATE TABLE hiveTableWithMapValue(m Map <STRING, STRING>)")
- sql("INSERT OVERWRITE TABLE hiveTableWithMapValue SELECT m FROM tableWithMapValue")
-
- checkAnswer(
- sql("SELECT * FROM hiveTableWithMapValue"),
- rowRDD.collect().toSeq)
-
- sql("DROP TABLE hiveTableWithMapValue")
- }
-
- test("Insert StructType.fields.exists(_.nullable == false)") {
- val schema = StructType(Seq(
- StructField("s", StructType(Seq(StructField("f", StringType, nullable = false))))))
- val rowRDD = spark.sparkContext.parallelize(
- (1 to 100).map(i => Row(Row(s"value$i"))))
- val df = spark.createDataFrame(rowRDD, schema)
- df.createOrReplaceTempView("tableWithStructValue")
- sql("CREATE TABLE hiveTableWithStructValue(s Struct <f: STRING>)")
- sql("INSERT OVERWRITE TABLE hiveTableWithStructValue SELECT s FROM tableWithStructValue")
-
- checkAnswer(
- sql("SELECT * FROM hiveTableWithStructValue"),
- rowRDD.collect().toSeq)
-
- sql("DROP TABLE hiveTableWithStructValue")
- }
-
- test("Test partition mode = strict") {
- withSQLConf(("hive.exec.dynamic.partition.mode", "strict")) {
- withTable("partitioned") {
- sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)")
- val data = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) "even" else "odd"))
- .toDF("id", "data", "part")
-
- intercept[SparkException] {
- data.write.insertInto("partitioned")
- }
- }
- }
- }
-
- test("Detect table partitioning") {
- withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
- withTable("source", "partitioned") {
- sql("CREATE TABLE source (id bigint, data string, part string)")
- val data = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) "even" else "odd")).toDF()
-
- data.write.insertInto("source")
- checkAnswer(sql("SELECT * FROM source"), data.collect().toSeq)
-
- sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)")
- // this will pick up the output partitioning from the table definition
- spark.table("source").write.insertInto("partitioned")
-
- checkAnswer(sql("SELECT * FROM partitioned"), data.collect().toSeq)
- }
- }
- }
-
- private def testPartitionedHiveSerDeTable(testName: String)(f: String => Unit): Unit = {
- test(s"Hive SerDe table - $testName") {
- val hiveTable = "hive_table"
-
- withTable(hiveTable) {
- withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
- sql(
- s"""
- |CREATE TABLE $hiveTable (a INT, d INT)
- |PARTITIONED BY (b INT, c INT) STORED AS TEXTFILE
- """.stripMargin)
- f(hiveTable)
- }
- }
- }
- }
-
- private def testPartitionedDataSourceTable(testName: String)(f: String => Unit): Unit = {
- test(s"Data source table - $testName") {
- val dsTable = "ds_table"
-
- withTable(dsTable) {
- sql(
- s"""
- |CREATE TABLE $dsTable (a INT, b INT, c INT, d INT)
- |USING PARQUET PARTITIONED BY (b, c)
- """.stripMargin)
- f(dsTable)
- }
- }
- }
-
- private def testPartitionedTable(testName: String)(f: String => Unit): Unit = {
- testPartitionedHiveSerDeTable(testName)(f)
- testPartitionedDataSourceTable(testName)(f)
- }
-
- testPartitionedTable("partitionBy() can't be used together with insertInto()") { tableName =>
- val cause = intercept[AnalysisException] {
- Seq((1, 2, 3, 4)).toDF("a", "b", "c", "d").write.partitionBy("b", "c").insertInto(tableName)
- }
-
- assert(cause.getMessage.contains("insertInto() can't be used together with partitionBy()."))
- }
-
- testPartitionedTable(
- "SPARK-16036: better error message when insert into a table with mismatch schema") {
- tableName =>
- val e = intercept[AnalysisException] {
- sql(s"INSERT INTO TABLE $tableName PARTITION(b=1, c=2) SELECT 1, 2, 3")
- }
- assert(e.message.contains(
- "target table has 4 column(s) but the inserted data has 5 column(s)"))
- }
-
- testPartitionedTable("SPARK-16037: INSERT statement should match columns by position") {
- tableName =>
- withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
- sql(s"INSERT INTO TABLE $tableName SELECT 1, 4, 2 AS c, 3 AS b")
- checkAnswer(sql(s"SELECT a, b, c, d FROM $tableName"), Row(1, 2, 3, 4))
- sql(s"INSERT OVERWRITE TABLE $tableName SELECT 1, 4, 2, 3")
- checkAnswer(sql(s"SELECT a, b, c, 4 FROM $tableName"), Row(1, 2, 3, 4))
- }
- }
-
- testPartitionedTable("INSERT INTO a partitioned table (semantic and error handling)") {
- tableName =>
- withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
- sql(s"INSERT INTO TABLE $tableName PARTITION (b=2, c=3) SELECT 1, 4")
-
- sql(s"INSERT INTO TABLE $tableName PARTITION (b=6, c=7) SELECT 5, 8")
-
- sql(s"INSERT INTO TABLE $tableName PARTITION (c=11, b=10) SELECT 9, 12")
-
- // c is defined twice. Analyzer will complain.
- intercept[AnalysisException] {
- sql(s"INSERT INTO TABLE $tableName PARTITION (b=14, c=15, c=16) SELECT 13")
- }
-
- // d is not a partitioning column.
- intercept[AnalysisException] {
- sql(s"INSERT INTO TABLE $tableName PARTITION (b=14, c=15, d=16) SELECT 13, 14")
- }
-
- // d is not a partitioning column. The total number of columns is correct.
- intercept[AnalysisException] {
- sql(s"INSERT INTO TABLE $tableName PARTITION (b=14, c=15, d=16) SELECT 13")
- }
-
- // The data is missing a column.
- intercept[AnalysisException] {
- sql(s"INSERT INTO TABLE $tableName PARTITION (c=15, b=16) SELECT 13")
- }
-
- // d is not a partitioning column.
- intercept[AnalysisException] {
- sql(s"INSERT INTO TABLE $tableName PARTITION (b=15, d=15) SELECT 13, 14")
- }
-
- // The statement is missing a column.
- intercept[AnalysisException] {
- sql(s"INSERT INTO TABLE $tableName PARTITION (b=15) SELECT 13, 14")
- }
-
- // The statement is missing a column.
- intercept[AnalysisException] {
- sql(s"INSERT INTO TABLE $tableName PARTITION (b=15) SELECT 13, 14, 16")
- }
-
- sql(s"INSERT INTO TABLE $tableName PARTITION (b=14, c) SELECT 13, 16, 15")
-
- // Dynamic partitioning columns need to be after static partitioning columns.
- intercept[AnalysisException] {
- sql(s"INSERT INTO TABLE $tableName PARTITION (b, c=19) SELECT 17, 20, 18")
- }
-
- sql(s"INSERT INTO TABLE $tableName PARTITION (b, c) SELECT 17, 20, 18, 19")
-
- sql(s"INSERT INTO TABLE $tableName PARTITION (c, b) SELECT 21, 24, 22, 23")
-
- sql(s"INSERT INTO TABLE $tableName SELECT 25, 28, 26, 27")
-
- checkAnswer(
- sql(s"SELECT a, b, c, d FROM $tableName"),
- Row(1, 2, 3, 4) ::
- Row(5, 6, 7, 8) ::
- Row(9, 10, 11, 12) ::
- Row(13, 14, 15, 16) ::
- Row(17, 18, 19, 20) ::
- Row(21, 22, 23, 24) ::
- Row(25, 26, 27, 28) :: Nil
- )
- }
- }
-
- testPartitionedTable("insertInto() should match columns by position and ignore column names") {
- tableName =>
- withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
- // Columns `df.c` and `df.d` are resolved by position, and thus mapped to partition columns
- // `b` and `c` of the target table.
- val df = Seq((1, 2, 3, 4)).toDF("a", "b", "c", "d")
- df.write.insertInto(tableName)
-
- checkAnswer(
- sql(s"SELECT a, b, c, d FROM $tableName"),
- Row(1, 3, 4, 2)
- )
- }
- }
-
- testPartitionedTable("insertInto() should match unnamed columns by position") {
- tableName =>
- withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
- // Columns `c + 1` and `d + 1` are resolved by position, and thus mapped to partition
- // columns `b` and `c` of the target table.
- val df = Seq((1, 2, 3, 4)).toDF("a", "b", "c", "d")
- df.select('a + 1, 'b + 1, 'c + 1, 'd + 1).write.insertInto(tableName)
-
- checkAnswer(
- sql(s"SELECT a, b, c, d FROM $tableName"),
- Row(2, 4, 5, 3)
- )
- }
- }
-
- testPartitionedTable("insertInto() should reject missing columns") {
- tableName =>
- withTable("t") {
- sql("CREATE TABLE t (a INT, b INT)")
-
- intercept[AnalysisException] {
- spark.table("t").write.insertInto(tableName)
- }
- }
- }
-
- testPartitionedTable("insertInto() should reject extra columns") {
- tableName =>
- withTable("t") {
- sql("CREATE TABLE t (a INT, b INT, c INT, d INT, e INT)")
-
- intercept[AnalysisException] {
- spark.table("t").write.insertInto(tableName)
- }
- }
- }
-
- private def testBucketedTable(testName: String)(f: String => Unit): Unit = {
- test(s"Hive SerDe table - $testName") {
- val hiveTable = "hive_table"
-
- withTable(hiveTable) {
- withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
- sql(
- s"""
- |CREATE TABLE $hiveTable (a INT, d INT)
- |PARTITIONED BY (b INT, c INT)
- |CLUSTERED BY(a)
- |SORTED BY(a, d) INTO 256 BUCKETS
- |STORED AS TEXTFILE
- """.stripMargin)
- f(hiveTable)
- }
- }
- }
- }
-
- testBucketedTable("INSERT should NOT fail if strict bucketing is NOT enforced") {
- tableName =>
- withSQLConf("hive.enforce.bucketing" -> "false", "hive.enforce.sorting" -> "false") {
- sql(s"INSERT INTO TABLE $tableName SELECT 1, 4, 2 AS c, 3 AS b")
- checkAnswer(sql(s"SELECT a, b, c, d FROM $tableName"), Row(1, 2, 3, 4))
- }
- }
-
- testBucketedTable("INSERT should fail if strict bucketing / sorting is enforced") {
- tableName =>
- withSQLConf("hive.enforce.bucketing" -> "true", "hive.enforce.sorting" -> "false") {
- intercept[AnalysisException] {
- sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4")
- }
- }
- withSQLConf("hive.enforce.bucketing" -> "false", "hive.enforce.sorting" -> "true") {
- intercept[AnalysisException] {
- sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4")
- }
- }
- withSQLConf("hive.enforce.bucketing" -> "true", "hive.enforce.sorting" -> "true") {
- intercept[AnalysisException] {
- sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4")
- }
- }
- }
-
- test("SPARK-20594: hive.exec.stagingdir was deleted by Hive") {
- // Set hive.exec.stagingdir under the table directory without start with ".".
- withSQLConf("hive.exec.stagingdir" -> "./test") {
- withTable("test_table") {
- sql("CREATE TABLE test_table (key int)")
- sql("INSERT OVERWRITE TABLE test_table SELECT 1")
- checkAnswer(sql("SELECT * FROM test_table"), Row(1))
- }
- }
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org