You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/09/09 18:48:39 UTC

[1/2] spark git commit: [SPARK-4131] Support "Writing data into the filesystem from queries"

Repository: spark
Updated Branches:
  refs/heads/master e4d8f9a36 -> f76790557


http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
new file mode 100644
index 0000000..aa5cae3
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
@@ -0,0 +1,731 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{QueryTest, _}
+import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
+
+case class TestData(key: Int, value: String)
+
+case class ThreeCloumntable(key: Int, value: String, key1: String)
+
+class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
+    with SQLTestUtils {
+  import spark.implicits._
+
+  override lazy val testData = spark.sparkContext.parallelize(
+    (1 to 100).map(i => TestData(i, i.toString))).toDF()
+
+  before {
+    // Since every we are doing tests for DDL statements,
+    // it is better to reset before every test.
+    hiveContext.reset()
+    // Creates a temporary view with testData, which will be used in all tests.
+    testData.createOrReplaceTempView("testData")
+  }
+
+  test("insertInto() HiveTable") {
+    withTable("createAndInsertTest") {
+      sql("CREATE TABLE createAndInsertTest (key int, value string)")
+
+      // Add some data.
+      testData.write.mode(SaveMode.Append).insertInto("createAndInsertTest")
+
+      // Make sure the table has also been updated.
+      checkAnswer(
+        sql("SELECT * FROM createAndInsertTest"),
+        testData.collect().toSeq
+      )
+
+      // Add more data.
+      testData.write.mode(SaveMode.Append).insertInto("createAndInsertTest")
+
+      // Make sure the table has been updated.
+      checkAnswer(
+        sql("SELECT * FROM createAndInsertTest"),
+        testData.toDF().collect().toSeq ++ testData.toDF().collect().toSeq
+      )
+
+      // Now overwrite.
+      testData.write.mode(SaveMode.Overwrite).insertInto("createAndInsertTest")
+
+      // Make sure the registered table has also been updated.
+      checkAnswer(
+        sql("SELECT * FROM createAndInsertTest"),
+        testData.collect().toSeq
+      )
+    }
+  }
+
+  test("Double create fails when allowExisting = false") {
+    withTable("doubleCreateAndInsertTest") {
+      sql("CREATE TABLE doubleCreateAndInsertTest (key int, value string)")
+
+      intercept[AnalysisException] {
+        sql("CREATE TABLE doubleCreateAndInsertTest (key int, value string)")
+      }
+    }
+  }
+
+  test("Double create does not fail when allowExisting = true") {
+    withTable("doubleCreateAndInsertTest") {
+      sql("CREATE TABLE doubleCreateAndInsertTest (key int, value string)")
+      sql("CREATE TABLE IF NOT EXISTS doubleCreateAndInsertTest (key int, value string)")
+    }
+  }
+
+  test("SPARK-4052: scala.collection.Map as value type of MapType") {
+    val schema = StructType(StructField("m", MapType(StringType, StringType), true) :: Nil)
+    val rowRDD = spark.sparkContext.parallelize(
+      (1 to 100).map(i => Row(scala.collection.mutable.HashMap(s"key$i" -> s"value$i"))))
+    val df = spark.createDataFrame(rowRDD, schema)
+    df.createOrReplaceTempView("tableWithMapValue")
+    sql("CREATE TABLE hiveTableWithMapValue(m MAP <STRING, STRING>)")
+    sql("INSERT OVERWRITE TABLE hiveTableWithMapValue SELECT m FROM tableWithMapValue")
+
+    checkAnswer(
+      sql("SELECT * FROM hiveTableWithMapValue"),
+      rowRDD.collect().toSeq
+    )
+
+    sql("DROP TABLE hiveTableWithMapValue")
+  }
+
+  test("SPARK-4203:random partition directory order") {
+    sql("CREATE TABLE tmp_table (key int, value string)")
+    val tmpDir = Utils.createTempDir()
+    // The default value of hive.exec.stagingdir.
+    val stagingDir = ".hive-staging"
+
+    sql(
+      s"""
+         |CREATE TABLE table_with_partition(c1 string)
+         |PARTITIONED by (p1 string,p2 string,p3 string,p4 string,p5 string)
+         |location '${tmpDir.toURI.toString}'
+        """.stripMargin)
+    sql(
+      """
+        |INSERT OVERWRITE TABLE table_with_partition
+        |partition (p1='a',p2='b',p3='c',p4='c',p5='1')
+        |SELECT 'blarr' FROM tmp_table
+      """.stripMargin)
+    sql(
+      """
+        |INSERT OVERWRITE TABLE table_with_partition
+        |partition (p1='a',p2='b',p3='c',p4='c',p5='2')
+        |SELECT 'blarr' FROM tmp_table
+      """.stripMargin)
+    sql(
+      """
+        |INSERT OVERWRITE TABLE table_with_partition
+        |partition (p1='a',p2='b',p3='c',p4='c',p5='3')
+        |SELECT 'blarr' FROM tmp_table
+      """.stripMargin)
+    sql(
+      """
+        |INSERT OVERWRITE TABLE table_with_partition
+        |partition (p1='a',p2='b',p3='c',p4='c',p5='4')
+        |SELECT 'blarr' FROM tmp_table
+      """.stripMargin)
+    def listFolders(path: File, acc: List[String]): List[List[String]] = {
+      val dir = path.listFiles()
+      val folders = dir.filter { e => e.isDirectory && !e.getName().startsWith(stagingDir) }.toList
+      if (folders.isEmpty) {
+        List(acc.reverse)
+      } else {
+        folders.flatMap(x => listFolders(x, x.getName :: acc))
+      }
+    }
+    val expected = List(
+      "p1=a"::"p2=b"::"p3=c"::"p4=c"::"p5=2"::Nil,
+      "p1=a"::"p2=b"::"p3=c"::"p4=c"::"p5=3"::Nil,
+      "p1=a"::"p2=b"::"p3=c"::"p4=c"::"p5=1"::Nil,
+      "p1=a"::"p2=b"::"p3=c"::"p4=c"::"p5=4"::Nil
+    )
+    assert(listFolders(tmpDir, List()).sortBy(_.toString()) === expected.sortBy(_.toString))
+    sql("DROP TABLE table_with_partition")
+    sql("DROP TABLE tmp_table")
+  }
+
+  testPartitionedTable("INSERT OVERWRITE - partition IF NOT EXISTS") { tableName =>
+    val selQuery = s"select a, b, c, d from $tableName"
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE $tableName
+         |partition (b=2, c=3)
+         |SELECT 1, 4
+        """.stripMargin)
+    checkAnswer(sql(selQuery), Row(1, 2, 3, 4))
+
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE $tableName
+         |partition (b=2, c=3)
+         |SELECT 5, 6
+        """.stripMargin)
+    checkAnswer(sql(selQuery), Row(5, 2, 3, 6))
+
+    val e = intercept[AnalysisException] {
+      sql(
+        s"""
+           |INSERT OVERWRITE TABLE $tableName
+           |partition (b=2, c) IF NOT EXISTS
+           |SELECT 7, 8, 3
+          """.stripMargin)
+    }
+    assert(e.getMessage.contains(
+      "Dynamic partitions do not support IF NOT EXISTS. Specified partitions with value: [c]"))
+
+    // If the partition already exists, the insert will overwrite the data
+    // unless users specify IF NOT EXISTS
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE $tableName
+         |partition (b=2, c=3) IF NOT EXISTS
+         |SELECT 9, 10
+        """.stripMargin)
+    checkAnswer(sql(selQuery), Row(5, 2, 3, 6))
+
+    // ADD PARTITION has the same effect, even if no actual data is inserted.
+    sql(s"ALTER TABLE $tableName ADD PARTITION (b=21, c=31)")
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE $tableName
+         |partition (b=21, c=31) IF NOT EXISTS
+         |SELECT 20, 24
+        """.stripMargin)
+    checkAnswer(sql(selQuery), Row(5, 2, 3, 6))
+  }
+
+  test("Insert ArrayType.containsNull == false") {
+    val schema = StructType(Seq(
+      StructField("a", ArrayType(StringType, containsNull = false))))
+    val rowRDD = spark.sparkContext.parallelize((1 to 100).map(i => Row(Seq(s"value$i"))))
+    val df = spark.createDataFrame(rowRDD, schema)
+    df.createOrReplaceTempView("tableWithArrayValue")
+    sql("CREATE TABLE hiveTableWithArrayValue(a Array <STRING>)")
+    sql("INSERT OVERWRITE TABLE hiveTableWithArrayValue SELECT a FROM tableWithArrayValue")
+
+    checkAnswer(
+      sql("SELECT * FROM hiveTableWithArrayValue"),
+      rowRDD.collect().toSeq)
+
+    sql("DROP TABLE hiveTableWithArrayValue")
+  }
+
+  test("Insert MapType.valueContainsNull == false") {
+    val schema = StructType(Seq(
+      StructField("m", MapType(StringType, StringType, valueContainsNull = false))))
+    val rowRDD = spark.sparkContext.parallelize(
+      (1 to 100).map(i => Row(Map(s"key$i" -> s"value$i"))))
+    val df = spark.createDataFrame(rowRDD, schema)
+    df.createOrReplaceTempView("tableWithMapValue")
+    sql("CREATE TABLE hiveTableWithMapValue(m Map <STRING, STRING>)")
+    sql("INSERT OVERWRITE TABLE hiveTableWithMapValue SELECT m FROM tableWithMapValue")
+
+    checkAnswer(
+      sql("SELECT * FROM hiveTableWithMapValue"),
+      rowRDD.collect().toSeq)
+
+    sql("DROP TABLE hiveTableWithMapValue")
+  }
+
+  test("Insert StructType.fields.exists(_.nullable == false)") {
+    val schema = StructType(Seq(
+      StructField("s", StructType(Seq(StructField("f", StringType, nullable = false))))))
+    val rowRDD = spark.sparkContext.parallelize(
+      (1 to 100).map(i => Row(Row(s"value$i"))))
+    val df = spark.createDataFrame(rowRDD, schema)
+    df.createOrReplaceTempView("tableWithStructValue")
+    sql("CREATE TABLE hiveTableWithStructValue(s Struct <f: STRING>)")
+    sql("INSERT OVERWRITE TABLE hiveTableWithStructValue SELECT s FROM tableWithStructValue")
+
+    checkAnswer(
+      sql("SELECT * FROM hiveTableWithStructValue"),
+      rowRDD.collect().toSeq)
+
+    sql("DROP TABLE hiveTableWithStructValue")
+  }
+
+  test("Test partition mode = strict") {
+    withSQLConf(("hive.exec.dynamic.partition.mode", "strict")) {
+      withTable("partitioned") {
+        sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)")
+        val data = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) "even" else "odd"))
+          .toDF("id", "data", "part")
+
+        intercept[SparkException] {
+          data.write.insertInto("partitioned")
+        }
+      }
+    }
+  }
+
+  test("Detect table partitioning") {
+    withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
+      withTable("source", "partitioned") {
+        sql("CREATE TABLE source (id bigint, data string, part string)")
+        val data = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) "even" else "odd")).toDF()
+
+        data.write.insertInto("source")
+        checkAnswer(sql("SELECT * FROM source"), data.collect().toSeq)
+
+        sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)")
+        // this will pick up the output partitioning from the table definition
+        spark.table("source").write.insertInto("partitioned")
+
+        checkAnswer(sql("SELECT * FROM partitioned"), data.collect().toSeq)
+      }
+    }
+  }
+
+  private def testPartitionedHiveSerDeTable(testName: String)(f: String => Unit): Unit = {
+    test(s"Hive SerDe table - $testName") {
+      val hiveTable = "hive_table"
+
+      withTable(hiveTable) {
+        withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+          sql(
+            s"""
+              |CREATE TABLE $hiveTable (a INT, d INT)
+              |PARTITIONED BY (b INT, c INT) STORED AS TEXTFILE
+            """.stripMargin)
+          f(hiveTable)
+        }
+      }
+    }
+  }
+
+  private def testPartitionedDataSourceTable(testName: String)(f: String => Unit): Unit = {
+    test(s"Data source table - $testName") {
+      val dsTable = "ds_table"
+
+      withTable(dsTable) {
+        sql(
+          s"""
+             |CREATE TABLE $dsTable (a INT, b INT, c INT, d INT)
+             |USING PARQUET PARTITIONED BY (b, c)
+           """.stripMargin)
+        f(dsTable)
+      }
+    }
+  }
+
+  private def testPartitionedTable(testName: String)(f: String => Unit): Unit = {
+    testPartitionedHiveSerDeTable(testName)(f)
+    testPartitionedDataSourceTable(testName)(f)
+  }
+
+  testPartitionedTable("partitionBy() can't be used together with insertInto()") { tableName =>
+    val cause = intercept[AnalysisException] {
+      Seq((1, 2, 3, 4)).toDF("a", "b", "c", "d").write.partitionBy("b", "c").insertInto(tableName)
+    }
+
+    assert(cause.getMessage.contains("insertInto() can't be used together with partitionBy()."))
+  }
+
+  testPartitionedTable(
+    "SPARK-16036: better error message when insert into a table with mismatch schema") {
+    tableName =>
+      val e = intercept[AnalysisException] {
+        sql(s"INSERT INTO TABLE $tableName PARTITION(b=1, c=2) SELECT 1, 2, 3")
+      }
+      assert(e.message.contains(
+        "target table has 4 column(s) but the inserted data has 5 column(s)"))
+  }
+
+  testPartitionedTable("SPARK-16037: INSERT statement should match columns by position") {
+    tableName =>
+      withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+        sql(s"INSERT INTO TABLE $tableName SELECT 1, 4, 2 AS c, 3 AS b")
+        checkAnswer(sql(s"SELECT a, b, c, d FROM $tableName"), Row(1, 2, 3, 4))
+        sql(s"INSERT OVERWRITE TABLE $tableName SELECT 1, 4, 2, 3")
+        checkAnswer(sql(s"SELECT a, b, c, 4 FROM $tableName"), Row(1, 2, 3, 4))
+      }
+  }
+
+  testPartitionedTable("INSERT INTO a partitioned table (semantic and error handling)") {
+    tableName =>
+      withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
+        sql(s"INSERT INTO TABLE $tableName PARTITION (b=2, c=3) SELECT 1, 4")
+
+        sql(s"INSERT INTO TABLE $tableName PARTITION (b=6, c=7) SELECT 5, 8")
+
+        sql(s"INSERT INTO TABLE $tableName PARTITION (c=11, b=10) SELECT 9, 12")
+
+        // c is defined twice. Analyzer will complain.
+        intercept[AnalysisException] {
+          sql(s"INSERT INTO TABLE $tableName PARTITION (b=14, c=15, c=16) SELECT 13")
+        }
+
+        // d is not a partitioning column.
+        intercept[AnalysisException] {
+          sql(s"INSERT INTO TABLE $tableName PARTITION (b=14, c=15, d=16) SELECT 13, 14")
+        }
+
+        // d is not a partitioning column. The total number of columns is correct.
+        intercept[AnalysisException] {
+          sql(s"INSERT INTO TABLE $tableName PARTITION (b=14, c=15, d=16) SELECT 13")
+        }
+
+        // The data is missing a column.
+        intercept[AnalysisException] {
+          sql(s"INSERT INTO TABLE $tableName PARTITION (c=15, b=16) SELECT 13")
+        }
+
+        // d is not a partitioning column.
+        intercept[AnalysisException] {
+          sql(s"INSERT INTO TABLE $tableName PARTITION (b=15, d=15) SELECT 13, 14")
+        }
+
+        // The statement is missing a column.
+        intercept[AnalysisException] {
+          sql(s"INSERT INTO TABLE $tableName PARTITION (b=15) SELECT 13, 14")
+        }
+
+        // The statement is missing a column.
+        intercept[AnalysisException] {
+          sql(s"INSERT INTO TABLE $tableName PARTITION (b=15) SELECT 13, 14, 16")
+        }
+
+        sql(s"INSERT INTO TABLE $tableName PARTITION (b=14, c) SELECT 13, 16, 15")
+
+        // Dynamic partitioning columns need to be after static partitioning columns.
+        intercept[AnalysisException] {
+          sql(s"INSERT INTO TABLE $tableName PARTITION (b, c=19) SELECT 17, 20, 18")
+        }
+
+        sql(s"INSERT INTO TABLE $tableName PARTITION (b, c) SELECT 17, 20, 18, 19")
+
+        sql(s"INSERT INTO TABLE $tableName PARTITION (c, b) SELECT 21, 24, 22, 23")
+
+        sql(s"INSERT INTO TABLE $tableName SELECT 25, 28, 26, 27")
+
+        checkAnswer(
+          sql(s"SELECT a, b, c, d FROM $tableName"),
+          Row(1, 2, 3, 4) ::
+            Row(5, 6, 7, 8) ::
+            Row(9, 10, 11, 12) ::
+            Row(13, 14, 15, 16) ::
+            Row(17, 18, 19, 20) ::
+            Row(21, 22, 23, 24) ::
+            Row(25, 26, 27, 28) :: Nil
+        )
+      }
+  }
+
+  testPartitionedTable("insertInto() should match columns by position and ignore column names") {
+    tableName =>
+      withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+        // Columns `df.c` and `df.d` are resolved by position, and thus mapped to partition columns
+        // `b` and `c` of the target table.
+        val df = Seq((1, 2, 3, 4)).toDF("a", "b", "c", "d")
+        df.write.insertInto(tableName)
+
+        checkAnswer(
+          sql(s"SELECT a, b, c, d FROM $tableName"),
+          Row(1, 3, 4, 2)
+        )
+      }
+  }
+
+  testPartitionedTable("insertInto() should match unnamed columns by position") {
+    tableName =>
+      withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+        // Columns `c + 1` and `d + 1` are resolved by position, and thus mapped to partition
+        // columns `b` and `c` of the target table.
+        val df = Seq((1, 2, 3, 4)).toDF("a", "b", "c", "d")
+        df.select('a + 1, 'b + 1, 'c + 1, 'd + 1).write.insertInto(tableName)
+
+        checkAnswer(
+          sql(s"SELECT a, b, c, d FROM $tableName"),
+          Row(2, 4, 5, 3)
+        )
+      }
+  }
+
+  testPartitionedTable("insertInto() should reject missing columns") {
+    tableName =>
+      withTable("t") {
+        sql("CREATE TABLE t (a INT, b INT)")
+
+        intercept[AnalysisException] {
+          spark.table("t").write.insertInto(tableName)
+        }
+      }
+  }
+
+  testPartitionedTable("insertInto() should reject extra columns") {
+    tableName =>
+      withTable("t") {
+        sql("CREATE TABLE t (a INT, b INT, c INT, d INT, e INT)")
+
+        intercept[AnalysisException] {
+          spark.table("t").write.insertInto(tableName)
+        }
+      }
+  }
+
+  private def testBucketedTable(testName: String)(f: String => Unit): Unit = {
+    test(s"Hive SerDe table - $testName") {
+      val hiveTable = "hive_table"
+
+      withTable(hiveTable) {
+        withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+          sql(
+            s"""
+               |CREATE TABLE $hiveTable (a INT, d INT)
+               |PARTITIONED BY (b INT, c INT)
+               |CLUSTERED BY(a)
+               |SORTED BY(a, d) INTO 256 BUCKETS
+               |STORED AS TEXTFILE
+            """.stripMargin)
+          f(hiveTable)
+        }
+      }
+    }
+  }
+
+  testBucketedTable("INSERT should NOT fail if strict bucketing is NOT enforced") {
+    tableName =>
+      withSQLConf("hive.enforce.bucketing" -> "false", "hive.enforce.sorting" -> "false") {
+        sql(s"INSERT INTO TABLE $tableName SELECT 1, 4, 2 AS c, 3 AS b")
+        checkAnswer(sql(s"SELECT a, b, c, d FROM $tableName"), Row(1, 2, 3, 4))
+      }
+  }
+
+  testBucketedTable("INSERT should fail if strict bucketing / sorting is enforced") {
+    tableName =>
+      withSQLConf("hive.enforce.bucketing" -> "true", "hive.enforce.sorting" -> "false") {
+        intercept[AnalysisException] {
+          sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4")
+        }
+      }
+      withSQLConf("hive.enforce.bucketing" -> "false", "hive.enforce.sorting" -> "true") {
+        intercept[AnalysisException] {
+          sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4")
+        }
+      }
+      withSQLConf("hive.enforce.bucketing" -> "true", "hive.enforce.sorting" -> "true") {
+        intercept[AnalysisException] {
+          sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4")
+        }
+      }
+  }
+
+  test("SPARK-20594: hive.exec.stagingdir was deleted by Hive") {
+    // Set hive.exec.stagingdir under the table directory without start with ".".
+    withSQLConf("hive.exec.stagingdir" -> "./test") {
+      withTable("test_table") {
+        sql("CREATE TABLE test_table (key int)")
+        sql("INSERT OVERWRITE TABLE test_table SELECT 1")
+        checkAnswer(sql("SELECT * FROM test_table"), Row(1))
+      }
+    }
+  }
+
+  test("insert overwrite to dir from hive metastore table") {
+    withTempDir { dir =>
+      val path = dir.toURI.getPath
+
+      sql(s"INSERT OVERWRITE LOCAL DIRECTORY '${path}' SELECT * FROM src where key < 10")
+
+      sql(
+        s"""
+           |INSERT OVERWRITE LOCAL DIRECTORY '${path}'
+           |STORED AS orc
+           |SELECT * FROM src where key < 10
+         """.stripMargin)
+
+      // use orc data source to check the data of path is right.
+      withTempView("orc_source") {
+        sql(
+          s"""
+             |CREATE TEMPORARY VIEW orc_source
+             |USING org.apache.spark.sql.hive.orc
+             |OPTIONS (
+             |  PATH '${dir.getCanonicalPath}'
+             |)
+           """.stripMargin)
+
+        checkAnswer(
+          sql("select * from orc_source"),
+          sql("select * from src where key < 10"))
+      }
+    }
+  }
+
+  test("insert overwrite to local dir from temp table") {
+    withTempView("test_insert_table") {
+      spark.range(10).selectExpr("id", "id AS str").createOrReplaceTempView("test_insert_table")
+
+      withTempDir { dir =>
+        val path = dir.toURI.getPath
+
+        sql(
+          s"""
+             |INSERT OVERWRITE LOCAL DIRECTORY '${path}'
+             |ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+             |SELECT * FROM test_insert_table
+           """.stripMargin)
+
+        sql(
+          s"""
+             |INSERT OVERWRITE LOCAL DIRECTORY '${path}'
+             |STORED AS orc
+             |SELECT * FROM test_insert_table
+           """.stripMargin)
+
+        // use orc data source to check the data of path is right.
+        checkAnswer(
+          spark.read.orc(dir.getCanonicalPath),
+          sql("select * from test_insert_table"))
+      }
+    }
+  }
+
+  test("insert overwrite to dir from temp table") {
+    withTempView("test_insert_table") {
+      spark.range(10).selectExpr("id", "id AS str").createOrReplaceTempView("test_insert_table")
+
+      withTempDir { dir =>
+        val pathUri = dir.toURI
+
+        sql(
+          s"""
+             |INSERT OVERWRITE DIRECTORY '${pathUri}'
+             |ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+             |SELECT * FROM test_insert_table
+           """.stripMargin)
+
+        sql(
+          s"""
+             |INSERT OVERWRITE DIRECTORY '${pathUri}'
+             |STORED AS orc
+             |SELECT * FROM test_insert_table
+           """.stripMargin)
+
+        // use orc data source to check the data of path is right.
+        checkAnswer(
+          spark.read.orc(dir.getCanonicalPath),
+          sql("select * from test_insert_table"))
+      }
+    }
+  }
+
+  test("multi insert overwrite to dir") {
+    withTempView("test_insert_table") {
+      spark.range(10).selectExpr("id", "id AS str").createOrReplaceTempView("test_insert_table")
+
+      withTempDir { dir =>
+        val pathUri = dir.toURI
+
+        withTempDir { dir2 =>
+          val pathUri2 = dir2.toURI
+
+          sql(
+            s"""
+               |FROM test_insert_table
+               |INSERT OVERWRITE DIRECTORY '${pathUri}'
+               |STORED AS orc
+               |SELECT id
+               |INSERT OVERWRITE DIRECTORY '${pathUri2}'
+               |STORED AS orc
+               |SELECT *
+             """.stripMargin)
+
+          // use orc data source to check the data of path is right.
+          checkAnswer(
+            spark.read.orc(dir.getCanonicalPath),
+            sql("select id from test_insert_table"))
+
+          checkAnswer(
+            spark.read.orc(dir2.getCanonicalPath),
+            sql("select * from test_insert_table"))
+        }
+      }
+    }
+  }
+
+  test("insert overwrite to dir to illegal path") {
+    withTempView("test_insert_table") {
+      spark.range(10).selectExpr("id", "id AS str").createOrReplaceTempView("test_insert_table")
+
+      val e = intercept[IllegalArgumentException] {
+        sql(
+          s"""
+             |INSERT OVERWRITE LOCAL DIRECTORY 'abc://a'
+             |ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+             |SELECT * FROM test_insert_table
+           """.stripMargin)
+      }.getMessage
+
+      assert(e.contains("Wrong FS: abc://a, expected: file:///"))
+    }
+  }
+
+  test("insert overwrite to dir with mixed syntax") {
+    withTempView("test_insert_table") {
+      spark.range(10).selectExpr("id", "id AS str").createOrReplaceTempView("test_insert_table")
+
+      val e = intercept[ParseException] {
+        sql(
+          s"""
+             |INSERT OVERWRITE DIRECTORY 'file://tmp'
+             |USING json
+             |ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+             |SELECT * FROM test_insert_table
+           """.stripMargin)
+      }.getMessage
+
+      assert(e.contains("mismatched input 'ROW'"))
+    }
+  }
+
+  test("insert overwrite to dir with multi inserts") {
+    withTempView("test_insert_table") {
+      spark.range(10).selectExpr("id", "id AS str").createOrReplaceTempView("test_insert_table")
+
+      val e = intercept[ParseException] {
+        sql(
+          s"""
+             |INSERT OVERWRITE DIRECTORY 'file://tmp2'
+             |USING json
+             |ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+             |SELECT * FROM test_insert_table
+             |INSERT OVERWRITE DIRECTORY 'file://tmp2'
+             |USING json
+             |ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+             |SELECT * FROM test_insert_table
+           """.stripMargin)
+      }.getMessage
+
+      assert(e.contains("mismatched input 'ROW'"))
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[2/2] spark git commit: [SPARK-4131] Support "Writing data into the filesystem from queries"

Posted by li...@apache.org.
[SPARK-4131] Support "Writing data into the filesystem from queries"

## What changes were proposed in this pull request?

This PR implements the sql feature:
INSERT OVERWRITE [LOCAL] DIRECTORY directory1
  [ROW FORMAT row_format] [STORED AS file_format]
  SELECT ... FROM ...

## How was this patch tested?
Added new unittests and also pulled the code to fb-spark so that we could test writing to hdfs directory.

Author: Jane Wang <ja...@fb.com>

Closes #18975 from janewangfb/port_local_directory.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f7679055
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f7679055
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f7679055

Branch: refs/heads/master
Commit: f76790557b063edc3080d5c792167e2f8b7060d1
Parents: e4d8f9a
Author: Jane Wang <ja...@fb.com>
Authored: Sat Sep 9 11:48:34 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Sat Sep 9 11:48:34 2017 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/catalyst/parser/SqlBase.g4 |   8 +-
 .../analysis/UnsupportedOperationChecker.scala  |   3 +
 .../spark/sql/catalyst/parser/AstBuilder.scala  |  79 +-
 .../plans/logical/basicLogicalOperators.scala   |  26 +-
 .../spark/sql/execution/SparkSqlParser.scala    |  79 +-
 .../InsertIntoDataSourceDirCommand.scala        |  82 +++
 .../spark/sql/execution/command/ddl.scala       |  17 +-
 .../datasources/DataSourceStrategy.scala        |  21 +-
 .../sql/execution/command/DDLParserSuite.scala  |  52 +-
 .../apache/spark/sql/sources/InsertSuite.scala  |  60 ++
 .../apache/spark/sql/hive/HiveStrategies.scala  |  11 +-
 .../spark/sql/hive/execution/HiveTmpPath.scala  | 203 +++++
 .../execution/InsertIntoHiveDirCommand.scala    | 131 ++++
 .../hive/execution/InsertIntoHiveTable.scala    | 213 +-----
 .../sql/hive/execution/SaveAsHiveFile.scala     |  73 ++
 .../sql/hive/InsertIntoHiveTableSuite.scala     | 551 --------------
 .../org/apache/spark/sql/hive/InsertSuite.scala | 731 +++++++++++++++++++
 17 files changed, 1565 insertions(+), 775 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index f741dcf..239e73e 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -243,8 +243,10 @@ query
     ;
 
 insertInto
-    : INSERT OVERWRITE TABLE tableIdentifier (partitionSpec (IF NOT EXISTS)?)?
-    | INSERT INTO TABLE? tableIdentifier partitionSpec?
+    : INSERT OVERWRITE TABLE tableIdentifier (partitionSpec (IF NOT EXISTS)?)?                              #insertOverwriteTable
+    | INSERT INTO TABLE? tableIdentifier partitionSpec?                                                     #insertIntoTable
+    | INSERT OVERWRITE LOCAL? DIRECTORY path=STRING rowFormat? createFileFormat?                            #insertOverwriteHiveDir
+    | INSERT OVERWRITE LOCAL? DIRECTORY (path=STRING)? tableProvider (OPTIONS options=tablePropertyList)?   #insertOverwriteDir
     ;
 
 partitionSpecLocation
@@ -745,6 +747,7 @@ nonReserved
     | AND | CASE | CAST | DISTINCT | DIV | ELSE | END | FUNCTION | INTERVAL | MACRO | OR | STRATIFY | THEN
     | UNBOUNDED | WHEN
     | DATABASE | SELECT | FROM | WHERE | HAVING | TO | TABLE | WITH | NOT | CURRENT_DATE | CURRENT_TIMESTAMP
+    | DIRECTORY
     ;
 
 SELECT: 'SELECT';
@@ -815,6 +818,7 @@ WITH: 'WITH';
 VALUES: 'VALUES';
 CREATE: 'CREATE';
 TABLE: 'TABLE';
+DIRECTORY: 'DIRECTORY';
 VIEW: 'VIEW';
 REPLACE: 'REPLACE';
 INSERT: 'INSERT';

http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index 6ab4153..33ba086 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -146,6 +146,9 @@ object UnsupportedOperationChecker {
           throwError("Commands like CreateTable*, AlterTable*, Show* are not supported with " +
             "streaming DataFrames/Datasets")
 
+        case _: InsertIntoDir =>
+          throwError("InsertIntoDir is not supported with streaming DataFrames/Datasets")
+
         // mapGroupsWithState and flatMapGroupsWithState
         case m: FlatMapGroupsWithState if m.isStreaming =>
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 8a45c52..891f616 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -31,6 +31,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis._
+import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last}
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
@@ -178,11 +179,64 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
   }
 
   /**
-   * Add an INSERT INTO [TABLE]/INSERT OVERWRITE TABLE operation to the logical plan.
+   * Parameters used for writing query to a table:
+   *   (tableIdentifier, partitionKeys, exists).
+   */
+  type InsertTableParams = (TableIdentifier, Map[String, Option[String]], Boolean)
+
+  /**
+   * Parameters used for writing query to a directory: (isLocal, CatalogStorageFormat, provider).
+   */
+  type InsertDirParams = (Boolean, CatalogStorageFormat, Option[String])
+
+  /**
+   * Add an
+   * {{{
+   *   INSERT OVERWRITE TABLE tableIdentifier [partitionSpec [IF NOT EXISTS]]?
+   *   INSERT INTO [TABLE] tableIdentifier [partitionSpec]
+   *   INSERT OVERWRITE [LOCAL] DIRECTORY STRING [rowFormat] [createFileFormat]
+   *   INSERT OVERWRITE [LOCAL] DIRECTORY [STRING] tableProvider [OPTIONS tablePropertyList]
+   * }}}
+   * operation to logical plan
    */
   private def withInsertInto(
       ctx: InsertIntoContext,
       query: LogicalPlan): LogicalPlan = withOrigin(ctx) {
+    ctx match {
+      case table: InsertIntoTableContext =>
+        val (tableIdent, partitionKeys, exists) = visitInsertIntoTable(table)
+        InsertIntoTable(UnresolvedRelation(tableIdent), partitionKeys, query, false, exists)
+      case table: InsertOverwriteTableContext =>
+        val (tableIdent, partitionKeys, exists) = visitInsertOverwriteTable(table)
+        InsertIntoTable(UnresolvedRelation(tableIdent), partitionKeys, query, true, exists)
+      case dir: InsertOverwriteDirContext =>
+        val (isLocal, storage, provider) = visitInsertOverwriteDir(dir)
+        InsertIntoDir(isLocal, storage, provider, query, overwrite = true)
+      case hiveDir: InsertOverwriteHiveDirContext =>
+        val (isLocal, storage, provider) = visitInsertOverwriteHiveDir(hiveDir)
+        InsertIntoDir(isLocal, storage, provider, query, overwrite = true)
+      case _ =>
+        throw new ParseException("Invalid InsertIntoContext", ctx)
+    }
+  }
+
+  /**
+   * Add an INSERT INTO TABLE operation to the logical plan.
+   */
+  override def visitInsertIntoTable(
+      ctx: InsertIntoTableContext): InsertTableParams = withOrigin(ctx) {
+    val tableIdent = visitTableIdentifier(ctx.tableIdentifier)
+    val partitionKeys = Option(ctx.partitionSpec).map(visitPartitionSpec).getOrElse(Map.empty)
+
+    (tableIdent, partitionKeys, false)
+  }
+
+  /**
+   * Add an INSERT OVERWRITE TABLE operation to the logical plan.
+   */
+  override def visitInsertOverwriteTable(
+      ctx: InsertOverwriteTableContext): InsertTableParams = withOrigin(ctx) {
+    assert(ctx.OVERWRITE() != null)
     val tableIdent = visitTableIdentifier(ctx.tableIdentifier)
     val partitionKeys = Option(ctx.partitionSpec).map(visitPartitionSpec).getOrElse(Map.empty)
 
@@ -192,12 +246,23 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
         "partitions with value: " + dynamicPartitionKeys.keys.mkString("[", ",", "]"), ctx)
     }
 
-    InsertIntoTable(
-      UnresolvedRelation(tableIdent),
-      partitionKeys,
-      query,
-      ctx.OVERWRITE != null,
-      ctx.EXISTS != null)
+    (tableIdent, partitionKeys, ctx.EXISTS() != null)
+  }
+
+  /**
+   * Write to a directory, returning a [[InsertIntoDir]] logical plan.
+   */
+  override def visitInsertOverwriteDir(
+      ctx: InsertOverwriteDirContext): InsertDirParams = withOrigin(ctx) {
+    throw new ParseException("INSERT OVERWRITE DIRECTORY is not supported", ctx)
+  }
+
+  /**
+   * Write to a directory, returning a [[InsertIntoDir]] logical plan.
+   */
+  override def visitInsertOverwriteHiveDir(
+      ctx: InsertOverwriteHiveDirContext): InsertDirParams = withOrigin(ctx) {
+    throw new ParseException("INSERT OVERWRITE DIRECTORY is not supported", ctx)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 4b3054d..f443cd5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.catalyst.plans.logical
 
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.plans._
@@ -360,6 +360,30 @@ case class InsertIntoTable(
 }
 
 /**
+ * Insert query result into a directory.
+ *
+ * @param isLocal Indicates whether the specified directory is local directory
+ * @param storage Info about output file, row and what serialization format
+ * @param provider Specifies what data source to use; only used for data source file.
+ * @param child The query to be executed
+ * @param overwrite If true, the existing directory will be overwritten
+ *
+ * Note that this plan is unresolved and has to be replaced by the concrete implementations
+ * during analysis.
+ */
+case class InsertIntoDir(
+    isLocal: Boolean,
+    storage: CatalogStorageFormat,
+    provider: Option[String],
+    child: LogicalPlan,
+    overwrite: Boolean = true)
+  extends UnaryNode {
+
+  override def output: Seq[Attribute] = Seq.empty
+  override lazy val resolved: Boolean = false
+}
+
+/**
  * A container for holding the view description(CatalogTable), and the output of the view. The
  * child should be a logical plan parsed from the `CatalogTable.viewText`, should throw an error
  * if the `viewText` is not defined.

http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index d3f6ab5..d38919b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.parser._
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.datasources.{CreateTable, _}
+import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution}
 import org.apache.spark.sql.types.StructType
 
@@ -1512,4 +1512,81 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
       query: LogicalPlan): LogicalPlan = {
     RepartitionByExpression(expressions, query, conf.numShufflePartitions)
   }
+
+  /**
+   * Return the parameters for [[InsertIntoDir]] logical plan.
+   *
+   * Expected format:
+   * {{{
+   *   INSERT OVERWRITE DIRECTORY
+   *   [path]
+   *   [OPTIONS table_property_list]
+   *   select_statement;
+   * }}}
+   */
+  override def visitInsertOverwriteDir(
+      ctx: InsertOverwriteDirContext): InsertDirParams = withOrigin(ctx) {
+    if (ctx.LOCAL != null) {
+      throw new ParseException(
+        "LOCAL is not supported in INSERT OVERWRITE DIRECTORY to data source", ctx)
+    }
+
+    val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
+    var storage = DataSource.buildStorageFormatFromOptions(options)
+
+    val path = Option(ctx.path).map(string).getOrElse("")
+
+    if (!(path.isEmpty ^ storage.locationUri.isEmpty)) {
+      throw new ParseException(
+        "Directory path and 'path' in OPTIONS should be specified one, but not both", ctx)
+    }
+
+    if (!path.isEmpty) {
+      val customLocation = Some(CatalogUtils.stringToURI(path))
+      storage = storage.copy(locationUri = customLocation)
+    }
+
+    val provider = ctx.tableProvider.qualifiedName.getText
+
+    (false, storage, Some(provider))
+  }
+
+  /**
+   * Return the parameters for [[InsertIntoDir]] logical plan.
+   *
+   * Expected format:
+   * {{{
+   *   INSERT OVERWRITE [LOCAL] DIRECTORY
+   *   path
+   *   [ROW FORMAT row_format]
+   *   [STORED AS file_format]
+   *   select_statement;
+   * }}}
+   */
+  override def visitInsertOverwriteHiveDir(
+      ctx: InsertOverwriteHiveDirContext): InsertDirParams = withOrigin(ctx) {
+    validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx)
+    val rowStorage = Option(ctx.rowFormat).map(visitRowFormat)
+      .getOrElse(CatalogStorageFormat.empty)
+    val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat)
+      .getOrElse(CatalogStorageFormat.empty)
+
+    val path = string(ctx.path)
+    // The path field is required
+    if (path.isEmpty) {
+      operationNotAllowed("INSERT OVERWRITE DIRECTORY must be accompanied by path", ctx)
+    }
+
+    val defaultStorage = HiveSerDe.getDefaultStorage(conf)
+
+    val storage = CatalogStorageFormat(
+      locationUri = Some(CatalogUtils.stringToURI(path)),
+      inputFormat = fileStorage.inputFormat.orElse(defaultStorage.inputFormat),
+      outputFormat = fileStorage.outputFormat.orElse(defaultStorage.outputFormat),
+      serde = rowStorage.serde.orElse(fileStorage.serde).orElse(defaultStorage.serde),
+      compressed = false,
+      properties = rowStorage.properties ++ fileStorage.properties)
+
+    (ctx.LOCAL != null, storage, Some(DDLUtils.HIVE_PROVIDER))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala
new file mode 100644
index 0000000..633de4c
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.datasources._
+
+/**
+ * A command used to write the result of a query to a directory.
+ *
+ * The syntax of using this command in SQL is:
+ * {{{
+ *   INSERT OVERWRITE DIRECTORY (path=STRING)?
+ *   USING format OPTIONS ([option1_name "option1_value", option2_name "option2_value", ...])
+ *   SELECT ...
+ * }}}
+ *
+ * @param storage storage format used to describe how the query result is stored.
+ * @param provider the data source type to be used
+ * @param query the logical plan representing data to write to
+ * @param overwrite whthere overwrites existing directory
+ */
+case class InsertIntoDataSourceDirCommand(
+    storage: CatalogStorageFormat,
+    provider: String,
+    query: LogicalPlan,
+    overwrite: Boolean) extends RunnableCommand {
+
+  override def children: Seq[LogicalPlan] = Seq(query)
+
+  override def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] = {
+    assert(children.length == 1)
+    assert(storage.locationUri.nonEmpty, "Directory path is required")
+    assert(provider.nonEmpty, "Data source is required")
+
+    // Create the relation based on the input logical plan: `query`.
+    val pathOption = storage.locationUri.map("path" -> CatalogUtils.URIToString(_))
+
+    val dataSource = DataSource(
+      sparkSession,
+      className = provider,
+      options = storage.properties ++ pathOption,
+      catalogTable = None)
+
+    val isFileFormat = classOf[FileFormat].isAssignableFrom(dataSource.providingClass)
+    if (!isFileFormat) {
+      throw new SparkException(
+        "Only Data Sources providing FileFormat are supported: " + dataSource.providingClass)
+    }
+
+    val saveMode = if (overwrite) SaveMode.Overwrite else SaveMode.ErrorIfExists
+    try {
+      sparkSession.sessionState.executePlan(dataSource.planForWriting(saveMode, query))
+      dataSource.writeAndRead(saveMode, query)
+    } catch {
+      case ex: AnalysisException =>
+        logError(s"Failed to write to directory " + storage.locationUri.toString, ex)
+        throw ex
+    }
+
+    Seq.empty[Row]
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 7611e1c..b06f4cc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -33,7 +33,8 @@ import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
-import org.apache.spark.sql.execution.datasources.PartitioningUtils
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningUtils}
 import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
 import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter
 import org.apache.spark.sql.internal.HiveSerDe
@@ -869,4 +870,18 @@ object DDLUtils {
       }
     }
   }
+
+  /**
+   * Throws exception if outputPath tries to overwrite inputpath.
+   */
+  def verifyNotReadPath(query: LogicalPlan, outputPath: Path) : Unit = {
+    val inputPaths = query.collect {
+      case LogicalRelation(r: HadoopFsRelation, _, _, _) => r.location.rootPaths
+    }.flatten
+
+    if (inputPaths.contains(outputPath)) {
+      throw new AnalysisException(
+        "Cannot overwrite a path that is also being read from.")
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 5d6223d..018f24e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -17,8 +17,11 @@
 
 package org.apache.spark.sql.execution.datasources
 
+import java.util.Locale
 import java.util.concurrent.Callable
 
+import org.apache.hadoop.fs.Path
+
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
@@ -29,7 +32,7 @@ import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTable, LogicalPlan, Project}
 import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
@@ -142,6 +145,14 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
         parts, query, overwrite, false) if parts.isEmpty =>
       InsertIntoDataSourceCommand(l, query, overwrite)
 
+    case InsertIntoDir(_, storage, provider, query, overwrite)
+      if provider.isDefined && provider.get.toLowerCase(Locale.ROOT) != DDLUtils.HIVE_PROVIDER =>
+
+      val outputPath = new Path(storage.locationUri.get)
+      if (overwrite) DDLUtils.verifyNotReadPath(query, outputPath)
+
+      InsertIntoDataSourceDirCommand(storage, provider.get, query, overwrite)
+
     case i @ InsertIntoTable(
         l @ LogicalRelation(t: HadoopFsRelation, _, table, _), parts, query, overwrite, _) =>
       // If the InsertIntoTable command is for a partitioned HadoopFsRelation and
@@ -178,15 +189,9 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
       }
 
       val outputPath = t.location.rootPaths.head
-      val inputPaths = actualQuery.collect {
-        case LogicalRelation(r: HadoopFsRelation, _, _, _) => r.location.rootPaths
-      }.flatten
+      if (overwrite) DDLUtils.verifyNotReadPath(actualQuery, outputPath)
 
       val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
-      if (overwrite && inputPaths.contains(outputPath)) {
-        throw new AnalysisException(
-          "Cannot overwrite a path that is also being read from.")
-      }
 
       val partitionSchema = actualQuery.resolve(
         t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver)

http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
index 4ee3821..fa5172c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
@@ -32,7 +32,8 @@ import org.apache.spark.sql.catalyst.dsl.plans.DslLogicalPlan
 import org.apache.spark.sql.catalyst.expressions.JsonTuple
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans.PlanTest
-import org.apache.spark.sql.catalyst.plans.logical.{Generate, LogicalPlan, Project, ScriptTransformation}
+import org.apache.spark.sql.catalyst.plans.logical.{Generate, InsertIntoDir, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{Project, ScriptTransformation}
 import org.apache.spark.sql.execution.SparkSqlParser
 import org.apache.spark.sql.execution.datasources.CreateTable
 import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
@@ -524,6 +525,55 @@ class DDLParserSuite extends PlanTest with SharedSQLContext {
     assert(e.message.contains("you can only specify one of them."))
   }
 
+  test("insert overwrite directory") {
+    val v1 = "INSERT OVERWRITE DIRECTORY '/tmp/file' USING parquet SELECT 1 as a"
+    parser.parsePlan(v1) match {
+      case InsertIntoDir(_, storage, provider, query, overwrite) =>
+        assert(storage.locationUri.isDefined && storage.locationUri.get.toString == "/tmp/file")
+      case other =>
+        fail(s"Expected to parse ${classOf[InsertIntoDataSourceDirCommand].getClass.getName}" +
+          " from query," + s" got ${other.getClass.getName}: $v1")
+    }
+
+    val v2 = "INSERT OVERWRITE DIRECTORY USING parquet SELECT 1 as a"
+    val e2 = intercept[ParseException] {
+      parser.parsePlan(v2)
+    }
+    assert(e2.message.contains(
+      "Directory path and 'path' in OPTIONS should be specified one, but not both"))
+
+    val v3 =
+      """
+        | INSERT OVERWRITE DIRECTORY USING json
+        | OPTIONS ('path' '/tmp/file', a 1, b 0.1, c TRUE)
+        | SELECT 1 as a
+      """.stripMargin
+    parser.parsePlan(v3) match {
+      case InsertIntoDir(_, storage, provider, query, overwrite) =>
+        assert(storage.locationUri.isDefined && provider == Some("json"))
+        assert(storage.properties.get("a") == Some("1"))
+        assert(storage.properties.get("b") == Some("0.1"))
+        assert(storage.properties.get("c") == Some("true"))
+        assert(!storage.properties.contains("abc"))
+        assert(!storage.properties.contains("path"))
+      case other =>
+        fail(s"Expected to parse ${classOf[InsertIntoDataSourceDirCommand].getClass.getName}" +
+          " from query," + s"got ${other.getClass.getName}: $v1")
+    }
+
+    val v4 =
+      """
+        | INSERT OVERWRITE DIRECTORY '/tmp/file' USING json
+        | OPTIONS ('path' '/tmp/file', a 1, b 0.1, c TRUE)
+        | SELECT 1 as a
+      """.stripMargin
+    val e4 = intercept[ParseException] {
+      parser.parsePlan(v4)
+    }
+    assert(e4.message.contains(
+      "Directory path and 'path' in OPTIONS should be specified one, but not both"))
+  }
+
   // ALTER TABLE table_name RENAME TO new_table_name;
   // ALTER VIEW view_name RENAME TO new_view_name;
   test("alter table/view: rename table/view") {

http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 41abff2..875b745 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.sources
 
 import java.io.File
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.{AnalysisException, Row}
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.util.Utils
@@ -366,4 +367,63 @@ class InsertSuite extends DataSourceTest with SharedSQLContext {
         Row(Array(1, 2), Array("a", "b")))
     }
   }
+
+  test("insert overwrite directory") {
+    withTempDir { dir =>
+      val path = dir.toURI.getPath
+
+      val v1 =
+        s"""
+           | INSERT OVERWRITE DIRECTORY '${path}'
+           | USING json
+           | OPTIONS (a 1, b 0.1, c TRUE)
+           | SELECT 1 as a, 'c' as b
+         """.stripMargin
+
+      spark.sql(v1)
+
+      checkAnswer(
+        spark.read.json(dir.getCanonicalPath),
+        sql("SELECT 1 as a, 'c' as b"))
+    }
+  }
+
+  test("insert overwrite directory with path in options") {
+    withTempDir { dir =>
+      val path = dir.toURI.getPath
+
+      val v1 =
+        s"""
+           | INSERT OVERWRITE DIRECTORY
+           | USING json
+           | OPTIONS ('path' '${path}')
+           | SELECT 1 as a, 'c' as b
+         """.stripMargin
+
+      spark.sql(v1)
+
+      checkAnswer(
+        spark.read.json(dir.getCanonicalPath),
+        sql("SELECT 1 as a, 'c' as b"))
+    }
+  }
+
+  test("insert overwrite directory to data source not providing FileFormat") {
+    withTempDir { dir =>
+      val path = dir.toURI.getPath
+
+      val v1 =
+        s"""
+           | INSERT OVERWRITE DIRECTORY '${path}'
+           | USING JDBC
+           | OPTIONS (a 1, b 0.1, c TRUE)
+           | SELECT 1 as a, 'c' as b
+         """.stripMargin
+      val e = intercept[SparkException] {
+        spark.sql(v1)
+      }.getMessage
+
+      assert(e.contains("Only Data Sources providing FileFormat are supported"))
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 47203a8..caf554d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -26,7 +26,8 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning._
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, ScriptTransformation}
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTable, LogicalPlan,
+    ScriptTransformation}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
@@ -157,6 +158,14 @@ object HiveAnalysis extends Rule[LogicalPlan] {
     case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) =>
       DDLUtils.checkDataSchemaFieldNames(tableDesc)
       CreateHiveTableAsSelectCommand(tableDesc, query, mode)
+
+    case InsertIntoDir(isLocal, storage, provider, child, overwrite)
+      if provider.isDefined && provider.get.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER =>
+
+      val outputPath = new Path(storage.locationUri.get)
+      if (overwrite) DDLUtils.verifyNotReadPath(child, outputPath)
+
+      InsertIntoHiveDirCommand(isLocal, storage, child, overwrite)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTmpPath.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTmpPath.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTmpPath.scala
new file mode 100644
index 0000000..15ca1df
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTmpPath.scala
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.{File, IOException}
+import java.net.URI
+import java.text.SimpleDateFormat
+import java.util.{Date, Locale, Random}
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.hive.common.FileUtils
+import org.apache.hadoop.hive.ql.exec.TaskRunner
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.hive.HiveExternalCatalog
+import org.apache.spark.sql.hive.client.HiveVersion
+
+// Base trait for getting a temporary location for writing data
+private[hive] trait HiveTmpPath extends Logging {
+
+  var createdTempDir: Option[Path] = None
+
+  def getExternalTmpPath(
+      sparkSession: SparkSession,
+      hadoopConf: Configuration,
+      path: Path): Path = {
+    import org.apache.spark.sql.hive.client.hive._
+
+    // Before Hive 1.1, when inserting into a table, Hive will create the staging directory under
+    // a common scratch directory. After the writing is finished, Hive will simply empty the table
+    // directory and move the staging directory to it.
+    // After Hive 1.1, Hive will create the staging directory under the table directory, and when
+    // moving staging directory to table directory, Hive will still empty the table directory, but
+    // will exclude the staging directory there.
+    // We have to follow the Hive behavior here, to avoid troubles. For example, if we create
+    // staging directory under the table director for Hive prior to 1.1, the staging directory will
+    // be removed by Hive when Hive is trying to empty the table directory.
+    val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0)
+    val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] = Set(v1_1, v1_2, v2_0, v2_1)
+
+    // Ensure all the supported versions are considered here.
+    assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath ==
+      allSupportedHiveVersions)
+
+    val externalCatalog = sparkSession.sharedState.externalCatalog
+    val hiveVersion = externalCatalog.asInstanceOf[HiveExternalCatalog].client.version
+    val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
+    val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
+
+    if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) {
+      oldVersionExternalTempPath(path, hadoopConf, scratchDir)
+    } else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) {
+      newVersionExternalTempPath(path, hadoopConf, stagingDir)
+    } else {
+      throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion)
+    }
+  }
+
+  def deleteExternalTmpPath(hadoopConf: Configuration) : Unit = {
+    // Attempt to delete the staging directory and the inclusive files. If failed, the files are
+    // expected to be dropped at the normal termination of VM since deleteOnExit is used.
+    try {
+      createdTempDir.foreach { path =>
+        val fs = path.getFileSystem(hadoopConf)
+        if (fs.delete(path, true)) {
+          // If we successfully delete the staging directory, remove it from FileSystem's cache.
+          fs.cancelDeleteOnExit(path)
+        }
+      }
+    } catch {
+      case NonFatal(e) =>
+        val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
+        logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)
+    }
+  }
+
+  // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13
+  private def oldVersionExternalTempPath(
+      path: Path,
+      hadoopConf: Configuration,
+      scratchDir: String): Path = {
+    val extURI: URI = path.toUri
+    val scratchPath = new Path(scratchDir, executionId)
+    var dirPath = new Path(
+      extURI.getScheme,
+      extURI.getAuthority,
+      scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID())
+
+    try {
+      val fs: FileSystem = dirPath.getFileSystem(hadoopConf)
+      dirPath = new Path(fs.makeQualified(dirPath).toString())
+
+      if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) {
+        throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString)
+      }
+      createdTempDir = Some(dirPath)
+      fs.deleteOnExit(dirPath)
+    } catch {
+      case e: IOException =>
+        throw new RuntimeException("Cannot create staging directory: " + dirPath.toString, e)
+    }
+    dirPath
+  }
+
+  // Mostly copied from Context.java#getExternalTmpPath of Hive 1.2
+  private def newVersionExternalTempPath(
+      path: Path,
+      hadoopConf: Configuration,
+      stagingDir: String): Path = {
+    val extURI: URI = path.toUri
+    if (extURI.getScheme == "viewfs") {
+      getExtTmpPathRelTo(path.getParent, hadoopConf, stagingDir)
+    } else {
+      new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir), "-ext-10000")
+    }
+  }
+
+  private def getExtTmpPathRelTo(
+      path: Path,
+      hadoopConf: Configuration,
+      stagingDir: String): Path = {
+    new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-10000") // Hive uses 10000
+  }
+
+  private def getExternalScratchDir(
+      extURI: URI,
+      hadoopConf: Configuration,
+      stagingDir: String): Path = {
+    getStagingDir(
+      new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath),
+      hadoopConf,
+      stagingDir)
+  }
+
+  private def getStagingDir(
+      inputPath: Path,
+      hadoopConf: Configuration,
+      stagingDir: String): Path = {
+    val inputPathUri: URI = inputPath.toUri
+    val inputPathName: String = inputPathUri.getPath
+    val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
+    var stagingPathName: String =
+      if (inputPathName.indexOf(stagingDir) == -1) {
+        new Path(inputPathName, stagingDir).toString
+      } else {
+        inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length)
+      }
+
+    // SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive requires that the
+    // staging directory needs to avoid being deleted when users set hive.exec.stagingdir
+    // under the table directory.
+    if (FileUtils.isSubDir(new Path(stagingPathName), inputPath, fs) &&
+      !stagingPathName.stripPrefix(inputPathName).stripPrefix(File.separator).startsWith(".")) {
+      logDebug(s"The staging dir '$stagingPathName' should be a child directory starts " +
+        "with '.' to avoid being deleted if we set hive.exec.stagingdir under the table " +
+        "directory.")
+      stagingPathName = new Path(inputPathName, ".hive-staging").toString
+    }
+
+    val dir: Path =
+      fs.makeQualified(
+        new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID))
+    logDebug("Created staging dir = " + dir + " for path = " + inputPath)
+    try {
+      if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) {
+        throw new IllegalStateException("Cannot create staging directory  '" + dir.toString + "'")
+      }
+      createdTempDir = Some(dir)
+      fs.deleteOnExit(dir)
+    } catch {
+      case e: IOException =>
+        throw new RuntimeException(
+          "Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e)
+    }
+    dir
+  }
+
+  private def executionId: String = {
+    val rand: Random = new Random
+    val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US)
+    "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
new file mode 100644
index 0000000..2110038
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import scala.language.existentials
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.hive.common.FileUtils
+import org.apache.hadoop.hive.ql.plan.TableDesc
+import org.apache.hadoop.hive.serde.serdeConstants
+import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
+import org.apache.hadoop.mapred._
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.hive.client.HiveClientImpl
+
+/**
+ * Command for writing the results of `query` to file system.
+ *
+ * The syntax of using this command in SQL is:
+ * {{{
+ *   INSERT OVERWRITE [LOCAL] DIRECTORY
+ *   path
+ *   [ROW FORMAT row_format]
+ *   [STORED AS file_format]
+ *   SELECT ...
+ * }}}
+ *
+ * @param isLocal whether the path specified in `storage` is a local directory
+ * @param storage storage format used to describe how the query result is stored.
+ * @param query the logical plan representing data to write to
+ * @param overwrite whether overwrites existing directory
+ */
+case class InsertIntoHiveDirCommand(
+    isLocal: Boolean,
+    storage: CatalogStorageFormat,
+    query: LogicalPlan,
+    overwrite: Boolean) extends SaveAsHiveFile with HiveTmpPath {
+
+  override def children: Seq[LogicalPlan] = query :: Nil
+
+  override def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] = {
+    assert(children.length == 1)
+    assert(storage.locationUri.nonEmpty)
+
+    val hiveTable = HiveClientImpl.toHiveTable(CatalogTable(
+      identifier = TableIdentifier(storage.locationUri.get.toString, Some("default")),
+      tableType = org.apache.spark.sql.catalyst.catalog.CatalogTableType.VIEW,
+      storage = storage,
+      schema = query.schema
+    ))
+    hiveTable.getMetadata.put(serdeConstants.SERIALIZATION_LIB,
+      storage.serde.getOrElse(classOf[LazySimpleSerDe].getName))
+
+    val tableDesc = new TableDesc(
+      hiveTable.getInputFormatClass,
+      hiveTable.getOutputFormatClass,
+      hiveTable.getMetadata
+    )
+
+    val hadoopConf = sparkSession.sessionState.newHadoopConf()
+    val jobConf = new JobConf(hadoopConf)
+
+    val targetPath = new Path(storage.locationUri.get)
+    val writeToPath =
+      if (isLocal) {
+        val localFileSystem = FileSystem.getLocal(jobConf)
+        localFileSystem.makeQualified(targetPath)
+      } else {
+        val qualifiedPath = FileUtils.makeQualified(targetPath, hadoopConf)
+        val dfs = qualifiedPath.getFileSystem(jobConf)
+        if (!dfs.exists(qualifiedPath)) {
+          dfs.mkdirs(qualifiedPath.getParent)
+        }
+        qualifiedPath
+      }
+
+    val tmpPath = getExternalTmpPath(sparkSession, hadoopConf, writeToPath)
+    val fileSinkConf = new org.apache.spark.sql.hive.HiveShim.ShimFileSinkDesc(
+      tmpPath.toString, tableDesc, false)
+
+    try {
+      saveAsHiveFile(
+        sparkSession = sparkSession,
+        plan = children.head,
+        hadoopConf = hadoopConf,
+        fileSinkConf = fileSinkConf,
+        outputLocation = tmpPath.toString)
+
+      val fs = writeToPath.getFileSystem(hadoopConf)
+      if (overwrite && fs.exists(writeToPath)) {
+        fs.listStatus(writeToPath).foreach { existFile =>
+          if (Option(existFile.getPath) != createdTempDir) fs.delete(existFile.getPath, true)
+        }
+      }
+
+      fs.listStatus(tmpPath).foreach {
+        tmpFile => fs.rename(tmpFile.getPath, writeToPath)
+      }
+    } catch {
+      case e: Throwable =>
+        throw new SparkException(
+          "Failed inserting overwrite directory " + storage.locationUri.get, e)
+    } finally {
+      deleteExternalTmpPath(hadoopConf)
+    }
+
+    Seq.empty[Row]
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 46610f8..5bdc97a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -17,32 +17,22 @@
 
 package org.apache.spark.sql.hive.execution
 
-import java.io.{File, IOException}
-import java.net.URI
-import java.text.SimpleDateFormat
-import java.util.{Date, Locale, Random}
-
 import scala.util.control.NonFatal
 
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.hive.common.FileUtils
+import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hive.ql.ErrorMsg
-import org.apache.hadoop.hive.ql.exec.TaskRunner
 import org.apache.hadoop.hive.ql.plan.TableDesc
 
 import org.apache.spark.SparkException
-import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.command.{CommandUtils, DataWritingCommand}
-import org.apache.spark.sql.execution.datasources.FileFormatWriter
+import org.apache.spark.sql.execution.command.CommandUtils
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
-import org.apache.spark.sql.hive.client.{HiveClientImpl, HiveVersion}
+import org.apache.spark.sql.hive.client.HiveClientImpl
 
 
 /**
@@ -80,152 +70,10 @@ case class InsertIntoHiveTable(
     partition: Map[String, Option[String]],
     query: LogicalPlan,
     overwrite: Boolean,
-    ifPartitionNotExists: Boolean) extends DataWritingCommand {
+    ifPartitionNotExists: Boolean) extends SaveAsHiveFile with HiveTmpPath {
 
   override def children: Seq[LogicalPlan] = query :: Nil
 
-  var createdTempDir: Option[Path] = None
-
-  private def executionId: String = {
-    val rand: Random = new Random
-    val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US)
-    "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong)
-  }
-
-  private def getStagingDir(
-      inputPath: Path,
-      hadoopConf: Configuration,
-      stagingDir: String): Path = {
-    val inputPathUri: URI = inputPath.toUri
-    val inputPathName: String = inputPathUri.getPath
-    val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
-    var stagingPathName: String =
-      if (inputPathName.indexOf(stagingDir) == -1) {
-        new Path(inputPathName, stagingDir).toString
-      } else {
-        inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length)
-      }
-
-    // SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive requires that the
-    // staging directory needs to avoid being deleted when users set hive.exec.stagingdir
-    // under the table directory.
-    if (FileUtils.isSubDir(new Path(stagingPathName), inputPath, fs) &&
-      !stagingPathName.stripPrefix(inputPathName).stripPrefix(File.separator).startsWith(".")) {
-      logDebug(s"The staging dir '$stagingPathName' should be a child directory starts " +
-        "with '.' to avoid being deleted if we set hive.exec.stagingdir under the table " +
-        "directory.")
-      stagingPathName = new Path(inputPathName, ".hive-staging").toString
-    }
-
-    val dir: Path =
-      fs.makeQualified(
-        new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID))
-    logDebug("Created staging dir = " + dir + " for path = " + inputPath)
-    try {
-      if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) {
-        throw new IllegalStateException("Cannot create staging directory  '" + dir.toString + "'")
-      }
-      createdTempDir = Some(dir)
-      fs.deleteOnExit(dir)
-    } catch {
-      case e: IOException =>
-        throw new RuntimeException(
-          "Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e)
-    }
-    dir
-  }
-
-  private def getExternalScratchDir(
-      extURI: URI,
-      hadoopConf: Configuration,
-      stagingDir: String): Path = {
-    getStagingDir(
-      new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath),
-      hadoopConf,
-      stagingDir)
-  }
-
-  def getExternalTmpPath(
-      path: Path,
-      hiveVersion: HiveVersion,
-      hadoopConf: Configuration,
-      stagingDir: String,
-      scratchDir: String): Path = {
-    import org.apache.spark.sql.hive.client.hive._
-
-    // Before Hive 1.1, when inserting into a table, Hive will create the staging directory under
-    // a common scratch directory. After the writing is finished, Hive will simply empty the table
-    // directory and move the staging directory to it.
-    // After Hive 1.1, Hive will create the staging directory under the table directory, and when
-    // moving staging directory to table directory, Hive will still empty the table directory, but
-    // will exclude the staging directory there.
-    // We have to follow the Hive behavior here, to avoid troubles. For example, if we create
-    // staging directory under the table director for Hive prior to 1.1, the staging directory will
-    // be removed by Hive when Hive is trying to empty the table directory.
-    val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0)
-    val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] = Set(v1_1, v1_2, v2_0, v2_1)
-
-    // Ensure all the supported versions are considered here.
-    assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath ==
-      allSupportedHiveVersions)
-
-    if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) {
-      oldVersionExternalTempPath(path, hadoopConf, scratchDir)
-    } else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) {
-      newVersionExternalTempPath(path, hadoopConf, stagingDir)
-    } else {
-      throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion)
-    }
-  }
-
-  // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13
-  def oldVersionExternalTempPath(
-      path: Path,
-      hadoopConf: Configuration,
-      scratchDir: String): Path = {
-    val extURI: URI = path.toUri
-    val scratchPath = new Path(scratchDir, executionId)
-    var dirPath = new Path(
-      extURI.getScheme,
-      extURI.getAuthority,
-      scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID())
-
-    try {
-      val fs: FileSystem = dirPath.getFileSystem(hadoopConf)
-      dirPath = new Path(fs.makeQualified(dirPath).toString())
-
-      if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) {
-        throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString)
-      }
-      createdTempDir = Some(dirPath)
-      fs.deleteOnExit(dirPath)
-    } catch {
-      case e: IOException =>
-        throw new RuntimeException("Cannot create staging directory: " + dirPath.toString, e)
-    }
-    dirPath
-  }
-
-  // Mostly copied from Context.java#getExternalTmpPath of Hive 1.2
-  def newVersionExternalTempPath(
-      path: Path,
-      hadoopConf: Configuration,
-      stagingDir: String): Path = {
-    val extURI: URI = path.toUri
-    if (extURI.getScheme == "viewfs") {
-      getExtTmpPathRelTo(path.getParent, hadoopConf, stagingDir)
-    } else {
-      new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir), "-ext-10000")
-    }
-  }
-
-  def getExtTmpPathRelTo(
-      path: Path,
-      hadoopConf: Configuration,
-      stagingDir: String): Path = {
-    new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-10000") // Hive uses 10000
-  }
-
   /**
    * Inserts all the rows in the table into Hive.  Row objects are properly serialized with the
    * `org.apache.hadoop.hive.serde2.SerDe` and the
@@ -234,12 +82,8 @@ case class InsertIntoHiveTable(
   override def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] = {
     assert(children.length == 1)
 
-    val sessionState = sparkSession.sessionState
     val externalCatalog = sparkSession.sharedState.externalCatalog
-    val hiveVersion = externalCatalog.asInstanceOf[HiveExternalCatalog].client.version
-    val hadoopConf = sessionState.newHadoopConf()
-    val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
-    val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
+    val hadoopConf = sparkSession.sessionState.newHadoopConf()
 
     val hiveQlTable = HiveClientImpl.toHiveTable(table)
     // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer
@@ -254,23 +98,8 @@ case class InsertIntoHiveTable(
       hiveQlTable.getMetadata
     )
     val tableLocation = hiveQlTable.getDataLocation
-    val tmpLocation =
-      getExternalTmpPath(tableLocation, hiveVersion, hadoopConf, stagingDir, scratchDir)
+    val tmpLocation = getExternalTmpPath(sparkSession, hadoopConf, tableLocation)
     val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
-    val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean
-
-    if (isCompressed) {
-      // Please note that isCompressed, "mapreduce.output.fileoutputformat.compress",
-      // "mapreduce.output.fileoutputformat.compress.codec", and
-      // "mapreduce.output.fileoutputformat.compress.type"
-      // have no impact on ORC because it uses table properties to store compression information.
-      hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true")
-      fileSinkConf.setCompressed(true)
-      fileSinkConf.setCompressCodec(hadoopConf
-        .get("mapreduce.output.fileoutputformat.compress.codec"))
-      fileSinkConf.setCompressType(hadoopConf
-        .get("mapreduce.output.fileoutputformat.compress.type"))
-    }
 
     val numDynamicPartitions = partition.values.count(_.isEmpty)
     val numStaticPartitions = partition.values.count(_.nonEmpty)
@@ -332,11 +161,6 @@ case class InsertIntoHiveTable(
       case _ => // do nothing since table has no bucketing
     }
 
-    val committer = FileCommitProtocol.instantiate(
-      sparkSession.sessionState.conf.fileCommitProtocolClass,
-      jobId = java.util.UUID.randomUUID().toString,
-      outputPath = tmpLocation.toString)
-
     val partitionAttributes = partitionColumnNames.takeRight(numDynamicPartitions).map { name =>
       query.resolve(name :: Nil, sparkSession.sessionState.analyzer.resolver).getOrElse {
         throw new AnalysisException(
@@ -344,17 +168,13 @@ case class InsertIntoHiveTable(
       }.asInstanceOf[Attribute]
     }
 
-    FileFormatWriter.write(
+    saveAsHiveFile(
       sparkSession = sparkSession,
       plan = children.head,
-      fileFormat = new HiveFileFormat(fileSinkConf),
-      committer = committer,
-      outputSpec = FileFormatWriter.OutputSpec(tmpLocation.toString, Map.empty),
       hadoopConf = hadoopConf,
-      partitionColumns = partitionAttributes,
-      bucketSpec = None,
-      statsTrackers = Seq(basicWriteJobStatsTracker(hadoopConf)),
-      options = Map.empty)
+      fileSinkConf = fileSinkConf,
+      outputLocation = tmpLocation.toString,
+      partitionAttributes = partitionAttributes)
 
     if (partition.nonEmpty) {
       if (numDynamicPartitions > 0) {
@@ -422,18 +242,7 @@ case class InsertIntoHiveTable(
 
     // Attempt to delete the staging directory and the inclusive files. If failed, the files are
     // expected to be dropped at the normal termination of VM since deleteOnExit is used.
-    try {
-      createdTempDir.foreach { path =>
-        val fs = path.getFileSystem(hadoopConf)
-        if (fs.delete(path, true)) {
-          // If we successfully delete the staging directory, remove it from FileSystem's cache.
-          fs.cancelDeleteOnExit(path)
-        }
-      }
-    } catch {
-      case NonFatal(e) =>
-        logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)
-    }
+    deleteExternalTmpPath(hadoopConf)
 
     // un-cache this table.
     sparkSession.catalog.uncacheTable(table.identifier.quotedString)

http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
new file mode 100644
index 0000000..7de9b42
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.internal.io.FileCommitProtocol
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.command.DataWritingCommand
+import org.apache.spark.sql.execution.datasources.FileFormatWriter
+import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
+
+// Base trait from which all hive insert statement physical execution extends.
+private[hive] trait SaveAsHiveFile extends DataWritingCommand {
+
+  protected def saveAsHiveFile(
+      sparkSession: SparkSession,
+      plan: SparkPlan,
+      hadoopConf: Configuration,
+      fileSinkConf: FileSinkDesc,
+      outputLocation: String,
+      partitionAttributes: Seq[Attribute] = Nil): Unit = {
+
+    val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean
+    if (isCompressed) {
+      // Please note that isCompressed, "mapreduce.output.fileoutputformat.compress",
+      // "mapreduce.output.fileoutputformat.compress.codec", and
+      // "mapreduce.output.fileoutputformat.compress.type"
+      // have no impact on ORC because it uses table properties to store compression information.
+      hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true")
+      fileSinkConf.setCompressed(true)
+      fileSinkConf.setCompressCodec(hadoopConf
+        .get("mapreduce.output.fileoutputformat.compress.codec"))
+      fileSinkConf.setCompressType(hadoopConf
+        .get("mapreduce.output.fileoutputformat.compress.type"))
+    }
+
+    val committer = FileCommitProtocol.instantiate(
+      sparkSession.sessionState.conf.fileCommitProtocolClass,
+      jobId = java.util.UUID.randomUUID().toString,
+      outputPath = outputLocation)
+
+    FileFormatWriter.write(
+      sparkSession = sparkSession,
+      plan = plan,
+      fileFormat = new HiveFileFormat(fileSinkConf),
+      committer = committer,
+      outputSpec = FileFormatWriter.OutputSpec(outputLocation, Map.empty),
+      hadoopConf = hadoopConf,
+      partitionColumns = partitionAttributes,
+      bucketSpec = None,
+      statsTrackers = Seq(basicWriteJobStatsTracker(hadoopConf)),
+      options = Map.empty)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
deleted file mode 100644
index e93c654..0000000
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
+++ /dev/null
@@ -1,551 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import java.io.File
-
-import org.scalatest.BeforeAndAfter
-
-import org.apache.spark.SparkException
-import org.apache.spark.sql.{QueryTest, _}
-import org.apache.spark.sql.catalyst.parser.ParseException
-import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
-import org.apache.spark.sql.hive.test.TestHiveSingleton
-import org.apache.spark.sql.test.SQLTestUtils
-import org.apache.spark.sql.types._
-import org.apache.spark.util.Utils
-
-case class TestData(key: Int, value: String)
-
-case class ThreeCloumntable(key: Int, value: String, key1: String)
-
-class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
-    with SQLTestUtils {
-  import spark.implicits._
-
-  override lazy val testData = spark.sparkContext.parallelize(
-    (1 to 100).map(i => TestData(i, i.toString))).toDF()
-
-  before {
-    // Since every we are doing tests for DDL statements,
-    // it is better to reset before every test.
-    hiveContext.reset()
-    // Creates a temporary view with testData, which will be used in all tests.
-    testData.createOrReplaceTempView("testData")
-  }
-
-  test("insertInto() HiveTable") {
-    withTable("createAndInsertTest") {
-      sql("CREATE TABLE createAndInsertTest (key int, value string)")
-
-      // Add some data.
-      testData.write.mode(SaveMode.Append).insertInto("createAndInsertTest")
-
-      // Make sure the table has also been updated.
-      checkAnswer(
-        sql("SELECT * FROM createAndInsertTest"),
-        testData.collect().toSeq
-      )
-
-      // Add more data.
-      testData.write.mode(SaveMode.Append).insertInto("createAndInsertTest")
-
-      // Make sure the table has been updated.
-      checkAnswer(
-        sql("SELECT * FROM createAndInsertTest"),
-        testData.toDF().collect().toSeq ++ testData.toDF().collect().toSeq
-      )
-
-      // Now overwrite.
-      testData.write.mode(SaveMode.Overwrite).insertInto("createAndInsertTest")
-
-      // Make sure the registered table has also been updated.
-      checkAnswer(
-        sql("SELECT * FROM createAndInsertTest"),
-        testData.collect().toSeq
-      )
-    }
-  }
-
-  test("Double create fails when allowExisting = false") {
-    withTable("doubleCreateAndInsertTest") {
-      sql("CREATE TABLE doubleCreateAndInsertTest (key int, value string)")
-
-      intercept[AnalysisException] {
-        sql("CREATE TABLE doubleCreateAndInsertTest (key int, value string)")
-      }
-    }
-  }
-
-  test("Double create does not fail when allowExisting = true") {
-    withTable("doubleCreateAndInsertTest") {
-      sql("CREATE TABLE doubleCreateAndInsertTest (key int, value string)")
-      sql("CREATE TABLE IF NOT EXISTS doubleCreateAndInsertTest (key int, value string)")
-    }
-  }
-
-  test("SPARK-4052: scala.collection.Map as value type of MapType") {
-    val schema = StructType(StructField("m", MapType(StringType, StringType), true) :: Nil)
-    val rowRDD = spark.sparkContext.parallelize(
-      (1 to 100).map(i => Row(scala.collection.mutable.HashMap(s"key$i" -> s"value$i"))))
-    val df = spark.createDataFrame(rowRDD, schema)
-    df.createOrReplaceTempView("tableWithMapValue")
-    sql("CREATE TABLE hiveTableWithMapValue(m MAP <STRING, STRING>)")
-    sql("INSERT OVERWRITE TABLE hiveTableWithMapValue SELECT m FROM tableWithMapValue")
-
-    checkAnswer(
-      sql("SELECT * FROM hiveTableWithMapValue"),
-      rowRDD.collect().toSeq
-    )
-
-    sql("DROP TABLE hiveTableWithMapValue")
-  }
-
-  test("SPARK-4203:random partition directory order") {
-    sql("CREATE TABLE tmp_table (key int, value string)")
-    val tmpDir = Utils.createTempDir()
-    // The default value of hive.exec.stagingdir.
-    val stagingDir = ".hive-staging"
-
-    sql(
-      s"""
-         |CREATE TABLE table_with_partition(c1 string)
-         |PARTITIONED by (p1 string,p2 string,p3 string,p4 string,p5 string)
-         |location '${tmpDir.toURI.toString}'
-        """.stripMargin)
-    sql(
-      """
-        |INSERT OVERWRITE TABLE table_with_partition
-        |partition (p1='a',p2='b',p3='c',p4='c',p5='1')
-        |SELECT 'blarr' FROM tmp_table
-      """.stripMargin)
-    sql(
-      """
-        |INSERT OVERWRITE TABLE table_with_partition
-        |partition (p1='a',p2='b',p3='c',p4='c',p5='2')
-        |SELECT 'blarr' FROM tmp_table
-      """.stripMargin)
-    sql(
-      """
-        |INSERT OVERWRITE TABLE table_with_partition
-        |partition (p1='a',p2='b',p3='c',p4='c',p5='3')
-        |SELECT 'blarr' FROM tmp_table
-      """.stripMargin)
-    sql(
-      """
-        |INSERT OVERWRITE TABLE table_with_partition
-        |partition (p1='a',p2='b',p3='c',p4='c',p5='4')
-        |SELECT 'blarr' FROM tmp_table
-      """.stripMargin)
-    def listFolders(path: File, acc: List[String]): List[List[String]] = {
-      val dir = path.listFiles()
-      val folders = dir.filter { e => e.isDirectory && !e.getName().startsWith(stagingDir) }.toList
-      if (folders.isEmpty) {
-        List(acc.reverse)
-      } else {
-        folders.flatMap(x => listFolders(x, x.getName :: acc))
-      }
-    }
-    val expected = List(
-      "p1=a"::"p2=b"::"p3=c"::"p4=c"::"p5=2"::Nil,
-      "p1=a"::"p2=b"::"p3=c"::"p4=c"::"p5=3"::Nil,
-      "p1=a"::"p2=b"::"p3=c"::"p4=c"::"p5=1"::Nil,
-      "p1=a"::"p2=b"::"p3=c"::"p4=c"::"p5=4"::Nil
-    )
-    assert(listFolders(tmpDir, List()).sortBy(_.toString()) === expected.sortBy(_.toString))
-    sql("DROP TABLE table_with_partition")
-    sql("DROP TABLE tmp_table")
-  }
-
-  testPartitionedTable("INSERT OVERWRITE - partition IF NOT EXISTS") { tableName =>
-    val selQuery = s"select a, b, c, d from $tableName"
-    sql(
-      s"""
-         |INSERT OVERWRITE TABLE $tableName
-         |partition (b=2, c=3)
-         |SELECT 1, 4
-        """.stripMargin)
-    checkAnswer(sql(selQuery), Row(1, 2, 3, 4))
-
-    sql(
-      s"""
-         |INSERT OVERWRITE TABLE $tableName
-         |partition (b=2, c=3)
-         |SELECT 5, 6
-        """.stripMargin)
-    checkAnswer(sql(selQuery), Row(5, 2, 3, 6))
-
-    val e = intercept[AnalysisException] {
-      sql(
-        s"""
-           |INSERT OVERWRITE TABLE $tableName
-           |partition (b=2, c) IF NOT EXISTS
-           |SELECT 7, 8, 3
-          """.stripMargin)
-    }
-    assert(e.getMessage.contains(
-      "Dynamic partitions do not support IF NOT EXISTS. Specified partitions with value: [c]"))
-
-    // If the partition already exists, the insert will overwrite the data
-    // unless users specify IF NOT EXISTS
-    sql(
-      s"""
-         |INSERT OVERWRITE TABLE $tableName
-         |partition (b=2, c=3) IF NOT EXISTS
-         |SELECT 9, 10
-        """.stripMargin)
-    checkAnswer(sql(selQuery), Row(5, 2, 3, 6))
-
-    // ADD PARTITION has the same effect, even if no actual data is inserted.
-    sql(s"ALTER TABLE $tableName ADD PARTITION (b=21, c=31)")
-    sql(
-      s"""
-         |INSERT OVERWRITE TABLE $tableName
-         |partition (b=21, c=31) IF NOT EXISTS
-         |SELECT 20, 24
-        """.stripMargin)
-    checkAnswer(sql(selQuery), Row(5, 2, 3, 6))
-  }
-
-  test("Insert ArrayType.containsNull == false") {
-    val schema = StructType(Seq(
-      StructField("a", ArrayType(StringType, containsNull = false))))
-    val rowRDD = spark.sparkContext.parallelize((1 to 100).map(i => Row(Seq(s"value$i"))))
-    val df = spark.createDataFrame(rowRDD, schema)
-    df.createOrReplaceTempView("tableWithArrayValue")
-    sql("CREATE TABLE hiveTableWithArrayValue(a Array <STRING>)")
-    sql("INSERT OVERWRITE TABLE hiveTableWithArrayValue SELECT a FROM tableWithArrayValue")
-
-    checkAnswer(
-      sql("SELECT * FROM hiveTableWithArrayValue"),
-      rowRDD.collect().toSeq)
-
-    sql("DROP TABLE hiveTableWithArrayValue")
-  }
-
-  test("Insert MapType.valueContainsNull == false") {
-    val schema = StructType(Seq(
-      StructField("m", MapType(StringType, StringType, valueContainsNull = false))))
-    val rowRDD = spark.sparkContext.parallelize(
-      (1 to 100).map(i => Row(Map(s"key$i" -> s"value$i"))))
-    val df = spark.createDataFrame(rowRDD, schema)
-    df.createOrReplaceTempView("tableWithMapValue")
-    sql("CREATE TABLE hiveTableWithMapValue(m Map <STRING, STRING>)")
-    sql("INSERT OVERWRITE TABLE hiveTableWithMapValue SELECT m FROM tableWithMapValue")
-
-    checkAnswer(
-      sql("SELECT * FROM hiveTableWithMapValue"),
-      rowRDD.collect().toSeq)
-
-    sql("DROP TABLE hiveTableWithMapValue")
-  }
-
-  test("Insert StructType.fields.exists(_.nullable == false)") {
-    val schema = StructType(Seq(
-      StructField("s", StructType(Seq(StructField("f", StringType, nullable = false))))))
-    val rowRDD = spark.sparkContext.parallelize(
-      (1 to 100).map(i => Row(Row(s"value$i"))))
-    val df = spark.createDataFrame(rowRDD, schema)
-    df.createOrReplaceTempView("tableWithStructValue")
-    sql("CREATE TABLE hiveTableWithStructValue(s Struct <f: STRING>)")
-    sql("INSERT OVERWRITE TABLE hiveTableWithStructValue SELECT s FROM tableWithStructValue")
-
-    checkAnswer(
-      sql("SELECT * FROM hiveTableWithStructValue"),
-      rowRDD.collect().toSeq)
-
-    sql("DROP TABLE hiveTableWithStructValue")
-  }
-
-  test("Test partition mode = strict") {
-    withSQLConf(("hive.exec.dynamic.partition.mode", "strict")) {
-      withTable("partitioned") {
-        sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)")
-        val data = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) "even" else "odd"))
-          .toDF("id", "data", "part")
-
-        intercept[SparkException] {
-          data.write.insertInto("partitioned")
-        }
-      }
-    }
-  }
-
-  test("Detect table partitioning") {
-    withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
-      withTable("source", "partitioned") {
-        sql("CREATE TABLE source (id bigint, data string, part string)")
-        val data = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) "even" else "odd")).toDF()
-
-        data.write.insertInto("source")
-        checkAnswer(sql("SELECT * FROM source"), data.collect().toSeq)
-
-        sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)")
-        // this will pick up the output partitioning from the table definition
-        spark.table("source").write.insertInto("partitioned")
-
-        checkAnswer(sql("SELECT * FROM partitioned"), data.collect().toSeq)
-      }
-    }
-  }
-
-  private def testPartitionedHiveSerDeTable(testName: String)(f: String => Unit): Unit = {
-    test(s"Hive SerDe table - $testName") {
-      val hiveTable = "hive_table"
-
-      withTable(hiveTable) {
-        withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
-          sql(
-            s"""
-              |CREATE TABLE $hiveTable (a INT, d INT)
-              |PARTITIONED BY (b INT, c INT) STORED AS TEXTFILE
-            """.stripMargin)
-          f(hiveTable)
-        }
-      }
-    }
-  }
-
-  private def testPartitionedDataSourceTable(testName: String)(f: String => Unit): Unit = {
-    test(s"Data source table - $testName") {
-      val dsTable = "ds_table"
-
-      withTable(dsTable) {
-        sql(
-          s"""
-             |CREATE TABLE $dsTable (a INT, b INT, c INT, d INT)
-             |USING PARQUET PARTITIONED BY (b, c)
-           """.stripMargin)
-        f(dsTable)
-      }
-    }
-  }
-
-  private def testPartitionedTable(testName: String)(f: String => Unit): Unit = {
-    testPartitionedHiveSerDeTable(testName)(f)
-    testPartitionedDataSourceTable(testName)(f)
-  }
-
-  testPartitionedTable("partitionBy() can't be used together with insertInto()") { tableName =>
-    val cause = intercept[AnalysisException] {
-      Seq((1, 2, 3, 4)).toDF("a", "b", "c", "d").write.partitionBy("b", "c").insertInto(tableName)
-    }
-
-    assert(cause.getMessage.contains("insertInto() can't be used together with partitionBy()."))
-  }
-
-  testPartitionedTable(
-    "SPARK-16036: better error message when insert into a table with mismatch schema") {
-    tableName =>
-      val e = intercept[AnalysisException] {
-        sql(s"INSERT INTO TABLE $tableName PARTITION(b=1, c=2) SELECT 1, 2, 3")
-      }
-      assert(e.message.contains(
-        "target table has 4 column(s) but the inserted data has 5 column(s)"))
-  }
-
-  testPartitionedTable("SPARK-16037: INSERT statement should match columns by position") {
-    tableName =>
-      withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
-        sql(s"INSERT INTO TABLE $tableName SELECT 1, 4, 2 AS c, 3 AS b")
-        checkAnswer(sql(s"SELECT a, b, c, d FROM $tableName"), Row(1, 2, 3, 4))
-        sql(s"INSERT OVERWRITE TABLE $tableName SELECT 1, 4, 2, 3")
-        checkAnswer(sql(s"SELECT a, b, c, 4 FROM $tableName"), Row(1, 2, 3, 4))
-      }
-  }
-
-  testPartitionedTable("INSERT INTO a partitioned table (semantic and error handling)") {
-    tableName =>
-      withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
-        sql(s"INSERT INTO TABLE $tableName PARTITION (b=2, c=3) SELECT 1, 4")
-
-        sql(s"INSERT INTO TABLE $tableName PARTITION (b=6, c=7) SELECT 5, 8")
-
-        sql(s"INSERT INTO TABLE $tableName PARTITION (c=11, b=10) SELECT 9, 12")
-
-        // c is defined twice. Analyzer will complain.
-        intercept[AnalysisException] {
-          sql(s"INSERT INTO TABLE $tableName PARTITION (b=14, c=15, c=16) SELECT 13")
-        }
-
-        // d is not a partitioning column.
-        intercept[AnalysisException] {
-          sql(s"INSERT INTO TABLE $tableName PARTITION (b=14, c=15, d=16) SELECT 13, 14")
-        }
-
-        // d is not a partitioning column. The total number of columns is correct.
-        intercept[AnalysisException] {
-          sql(s"INSERT INTO TABLE $tableName PARTITION (b=14, c=15, d=16) SELECT 13")
-        }
-
-        // The data is missing a column.
-        intercept[AnalysisException] {
-          sql(s"INSERT INTO TABLE $tableName PARTITION (c=15, b=16) SELECT 13")
-        }
-
-        // d is not a partitioning column.
-        intercept[AnalysisException] {
-          sql(s"INSERT INTO TABLE $tableName PARTITION (b=15, d=15) SELECT 13, 14")
-        }
-
-        // The statement is missing a column.
-        intercept[AnalysisException] {
-          sql(s"INSERT INTO TABLE $tableName PARTITION (b=15) SELECT 13, 14")
-        }
-
-        // The statement is missing a column.
-        intercept[AnalysisException] {
-          sql(s"INSERT INTO TABLE $tableName PARTITION (b=15) SELECT 13, 14, 16")
-        }
-
-        sql(s"INSERT INTO TABLE $tableName PARTITION (b=14, c) SELECT 13, 16, 15")
-
-        // Dynamic partitioning columns need to be after static partitioning columns.
-        intercept[AnalysisException] {
-          sql(s"INSERT INTO TABLE $tableName PARTITION (b, c=19) SELECT 17, 20, 18")
-        }
-
-        sql(s"INSERT INTO TABLE $tableName PARTITION (b, c) SELECT 17, 20, 18, 19")
-
-        sql(s"INSERT INTO TABLE $tableName PARTITION (c, b) SELECT 21, 24, 22, 23")
-
-        sql(s"INSERT INTO TABLE $tableName SELECT 25, 28, 26, 27")
-
-        checkAnswer(
-          sql(s"SELECT a, b, c, d FROM $tableName"),
-          Row(1, 2, 3, 4) ::
-            Row(5, 6, 7, 8) ::
-            Row(9, 10, 11, 12) ::
-            Row(13, 14, 15, 16) ::
-            Row(17, 18, 19, 20) ::
-            Row(21, 22, 23, 24) ::
-            Row(25, 26, 27, 28) :: Nil
-        )
-      }
-  }
-
-  testPartitionedTable("insertInto() should match columns by position and ignore column names") {
-    tableName =>
-      withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
-        // Columns `df.c` and `df.d` are resolved by position, and thus mapped to partition columns
-        // `b` and `c` of the target table.
-        val df = Seq((1, 2, 3, 4)).toDF("a", "b", "c", "d")
-        df.write.insertInto(tableName)
-
-        checkAnswer(
-          sql(s"SELECT a, b, c, d FROM $tableName"),
-          Row(1, 3, 4, 2)
-        )
-      }
-  }
-
-  testPartitionedTable("insertInto() should match unnamed columns by position") {
-    tableName =>
-      withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
-        // Columns `c + 1` and `d + 1` are resolved by position, and thus mapped to partition
-        // columns `b` and `c` of the target table.
-        val df = Seq((1, 2, 3, 4)).toDF("a", "b", "c", "d")
-        df.select('a + 1, 'b + 1, 'c + 1, 'd + 1).write.insertInto(tableName)
-
-        checkAnswer(
-          sql(s"SELECT a, b, c, d FROM $tableName"),
-          Row(2, 4, 5, 3)
-        )
-      }
-  }
-
-  testPartitionedTable("insertInto() should reject missing columns") {
-    tableName =>
-      withTable("t") {
-        sql("CREATE TABLE t (a INT, b INT)")
-
-        intercept[AnalysisException] {
-          spark.table("t").write.insertInto(tableName)
-        }
-      }
-  }
-
-  testPartitionedTable("insertInto() should reject extra columns") {
-    tableName =>
-      withTable("t") {
-        sql("CREATE TABLE t (a INT, b INT, c INT, d INT, e INT)")
-
-        intercept[AnalysisException] {
-          spark.table("t").write.insertInto(tableName)
-        }
-      }
-  }
-
-  private def testBucketedTable(testName: String)(f: String => Unit): Unit = {
-    test(s"Hive SerDe table - $testName") {
-      val hiveTable = "hive_table"
-
-      withTable(hiveTable) {
-        withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
-          sql(
-            s"""
-               |CREATE TABLE $hiveTable (a INT, d INT)
-               |PARTITIONED BY (b INT, c INT)
-               |CLUSTERED BY(a)
-               |SORTED BY(a, d) INTO 256 BUCKETS
-               |STORED AS TEXTFILE
-            """.stripMargin)
-          f(hiveTable)
-        }
-      }
-    }
-  }
-
-  testBucketedTable("INSERT should NOT fail if strict bucketing is NOT enforced") {
-    tableName =>
-      withSQLConf("hive.enforce.bucketing" -> "false", "hive.enforce.sorting" -> "false") {
-        sql(s"INSERT INTO TABLE $tableName SELECT 1, 4, 2 AS c, 3 AS b")
-        checkAnswer(sql(s"SELECT a, b, c, d FROM $tableName"), Row(1, 2, 3, 4))
-      }
-  }
-
-  testBucketedTable("INSERT should fail if strict bucketing / sorting is enforced") {
-    tableName =>
-      withSQLConf("hive.enforce.bucketing" -> "true", "hive.enforce.sorting" -> "false") {
-        intercept[AnalysisException] {
-          sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4")
-        }
-      }
-      withSQLConf("hive.enforce.bucketing" -> "false", "hive.enforce.sorting" -> "true") {
-        intercept[AnalysisException] {
-          sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4")
-        }
-      }
-      withSQLConf("hive.enforce.bucketing" -> "true", "hive.enforce.sorting" -> "true") {
-        intercept[AnalysisException] {
-          sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4")
-        }
-      }
-  }
-
-  test("SPARK-20594: hive.exec.stagingdir was deleted by Hive") {
-    // Set hive.exec.stagingdir under the table directory without start with ".".
-    withSQLConf("hive.exec.stagingdir" -> "./test") {
-      withTable("test_table") {
-        sql("CREATE TABLE test_table (key int)")
-        sql("INSERT OVERWRITE TABLE test_table SELECT 1")
-        checkAnswer(sql("SELECT * FROM test_table"), Row(1))
-      }
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org