You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/03/30 21:38:59 UTC

[spark] branch master updated: [SPARK-27326][SQL] Fall back all v2 file sources in `InsertIntoTable` to V1 FileFormat

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5dab5f6  [SPARK-27326][SQL] Fall back all v2 file sources in `InsertIntoTable` to V1 FileFormat
5dab5f6 is described below

commit 5dab5f651fcf3a9b8c4d6900e45718dc041c54af
Author: Gengliang Wang <ge...@databricks.com>
AuthorDate: Sat Mar 30 14:38:26 2019 -0700

    [SPARK-27326][SQL] Fall back all v2 file sources in `InsertIntoTable` to V1 FileFormat
    
    ## What changes were proposed in this pull request?
    
    In the first PR for file source V2, there was a rule for falling back Orc V2 table to OrcFileFormat: https://github.com/apache/spark/pull/23383/files#diff-57e8244b6964e4f84345357a188421d5R34
    
    As we are migrating more file sources to data source V2, we should make the rule more generic. This PR proposes to:
    1. Rename the rule `FallbackOrcDataSourceV2 ` to `FallBackFileSourceV2`.The name is more generic. And we use "fall back" as verb, while "fallback" is noun.
    2. Rename the method `fallBackFileFormat` in `FileDataSourceV2` to `fallbackFileFormat`. Here we should use "fallback" as noun.
    3. Add new method `fallbackFileFormat` in `FileTable`. This is for falling back to V1 in rule `FallbackOrcDataSourceV2 `.
    
    ## How was this patch tested?
    
    Existing Unit tests.
    
    Closes #24251 from gengliangwang/fallbackV1Rule.
    
    Authored-by: Gengliang Wang <ge...@databricks.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../main/scala/org/apache/spark/sql/DataFrameReader.scala    |  2 +-
 .../main/scala/org/apache/spark/sql/DataFrameWriter.scala    |  2 +-
 .../apache/spark/sql/execution/datasources/DataSource.scala  |  2 +-
 ...lbackOrcDataSourceV2.scala => FallBackFileSourceV2.scala} | 12 +++++-------
 .../sql/execution/datasources/v2/FileDataSourceV2.scala      |  2 +-
 .../spark/sql/execution/datasources/v2/FileTable.scala       |  9 +++++++++
 .../sql/execution/datasources/v2/csv/CSVDataSourceV2.scala   |  6 +++---
 .../spark/sql/execution/datasources/v2/csv/CSVTable.scala    |  4 +++-
 .../sql/execution/datasources/v2/orc/OrcDataSourceV2.scala   |  6 +++---
 .../spark/sql/execution/datasources/v2/orc/OrcTable.scala    |  4 +++-
 .../apache/spark/sql/internal/BaseSessionStateBuilder.scala  |  2 +-
 .../spark/sql/execution/datasources/v2/FileTableSuite.scala  |  4 ++++
 .../spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala |  4 ++--
 .../org/apache/spark/sql/hive/HiveSessionStateBuilder.scala  |  2 +-
 14 files changed, 38 insertions(+), 23 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index e057d33..d514b94 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -200,7 +200,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
     val cls = lookupCls.newInstance() match {
       case f: FileDataSourceV2 if useV1Sources.contains(f.shortName()) ||
         useV1Sources.contains(lookupCls.getCanonicalName.toLowerCase(Locale.ROOT)) =>
-        f.fallBackFileFormat
+        f.fallbackFileFormat
       case _ => lookupCls
     }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index b439a82..a06b75a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -252,7 +252,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
     val cls = lookupCls.newInstance() match {
       case f: FileDataSourceV2 if useV1Sources.contains(f.shortName()) ||
         useV1Sources.contains(lookupCls.getCanonicalName.toLowerCase(Locale.ROOT)) =>
-        f.fallBackFileFormat
+        f.fallbackFileFormat
       case _ => lookupCls
     }
     // In Data Source V2 project, partitioning is still under development.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 516c56e..08650c8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -102,7 +102,7 @@ case class DataSource(
     // [[DataFrameReader]]/[[DataFrameWriter]], since they use method `lookupDataSource`
     // instead of `providingClass`.
     cls.newInstance() match {
-      case f: FileDataSourceV2 => f.fallBackFileFormat
+      case f: FileDataSourceV2 => f.fallbackFileFormat
       case _ => cls
     }
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallbackOrcDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala
similarity index 78%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallbackOrcDataSourceV2.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala
index d5e89be..813af82 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallbackOrcDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala
@@ -22,21 +22,19 @@ import scala.collection.JavaConverters._
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, FileDataSourceV2}
-import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, FileDataSourceV2, FileTable}
 
 /**
- * Replace the ORC V2 data source of table in [[InsertIntoTable]] to V1 [[FileFormat]].
+ * Replace the File source V2 table in [[InsertIntoTable]] to V1 [[FileFormat]].
  * E.g, with temporary view `t` using [[FileDataSourceV2]], inserting into  view `t` fails
  * since there is no corresponding physical plan.
  * This is a temporary hack for making current data source V2 work. It should be
  * removed when Catalog support of file data source v2 is finished.
  */
-class FallbackOrcDataSourceV2(sparkSession: SparkSession) extends Rule[LogicalPlan] {
+class FallBackFileSourceV2(sparkSession: SparkSession) extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-    case i @ InsertIntoTable(d @ DataSourceV2Relation(table: OrcTable, _, _), _, _, _, _) =>
-      val v1FileFormat = new OrcFileFormat
+    case i @ InsertIntoTable(d @ DataSourceV2Relation(table: FileTable, _, _), _, _, _, _) =>
+      val v1FileFormat = table.fallbackFileFormat.newInstance()
       val relation = HadoopFsRelation(
         table.fileIndex,
         table.fileIndex.partitionSchema,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
index ebe7fee..d60d429 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
@@ -35,7 +35,7 @@ trait FileDataSourceV2 extends TableProvider with DataSourceRegister {
    *    source via SQL configuration and fall back to FileFormat.
    * 2. Catalog support is required, which is still under development for data source V2.
    */
-  def fallBackFileFormat: Class[_ <: FileFormat]
+  def fallbackFileFormat: Class[_ <: FileFormat]
 
   lazy val sparkSession = SparkSession.active
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
index 188016c..d7284fd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
@@ -98,6 +98,15 @@ abstract class FileTable(
    * }}}
    */
   def formatName: String
+
+  /**
+   * Returns a V1 [[FileFormat]] class of the same file data source.
+   * This is a solution for the following cases:
+   * 1. File datasource V2 implementations cause regression. Users can disable the problematic data
+   *    source via SQL configuration and fall back to FileFormat.
+   * 2. Catalog support is required, which is still under development for data source V2.
+   */
+  def fallbackFileFormat: Class[_ <: FileFormat]
 }
 
 object FileTable {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVDataSourceV2.scala
index 55222c6..045f41e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVDataSourceV2.scala
@@ -25,19 +25,19 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 class CSVDataSourceV2 extends FileDataSourceV2 {
 
-  override def fallBackFileFormat: Class[_ <: FileFormat] = classOf[CSVFileFormat]
+  override def fallbackFileFormat: Class[_ <: FileFormat] = classOf[CSVFileFormat]
 
   override def shortName(): String = "csv"
 
   override def getTable(options: CaseInsensitiveStringMap): Table = {
     val paths = getPaths(options)
     val tableName = getTableName(paths)
-    CSVTable(tableName, sparkSession, options, paths, None)
+    CSVTable(tableName, sparkSession, options, paths, None, fallbackFileFormat)
   }
 
   override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
     val paths = getPaths(options)
     val tableName = getTableName(paths)
-    CSVTable(tableName, sparkSession, options, paths, Some(schema))
+    CSVTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat)
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala
index 852cbf0..8170661 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FileStatus
 
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.csv.CSVOptions
+import org.apache.spark.sql.execution.datasources.FileFormat
 import org.apache.spark.sql.execution.datasources.csv.CSVDataSource
 import org.apache.spark.sql.execution.datasources.v2.FileTable
 import org.apache.spark.sql.sources.v2.writer.WriteBuilder
@@ -33,7 +34,8 @@ case class CSVTable(
     sparkSession: SparkSession,
     options: CaseInsensitiveStringMap,
     paths: Seq[String],
-    userSpecifiedSchema: Option[StructType])
+    userSpecifiedSchema: Option[StructType],
+    fallbackFileFormat: Class[_ <: FileFormat])
   extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
   override def newScanBuilder(options: CaseInsensitiveStringMap): CSVScanBuilder =
     CSVScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala
index e8b9e6c..1ea80d2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala
@@ -25,20 +25,20 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 class OrcDataSourceV2 extends FileDataSourceV2 {
 
-  override def fallBackFileFormat: Class[_ <: FileFormat] = classOf[OrcFileFormat]
+  override def fallbackFileFormat: Class[_ <: FileFormat] = classOf[OrcFileFormat]
 
   override def shortName(): String = "orc"
 
   override def getTable(options: CaseInsensitiveStringMap): Table = {
     val paths = getPaths(options)
     val tableName = getTableName(paths)
-    OrcTable(tableName, sparkSession, options, paths, None)
+    OrcTable(tableName, sparkSession, options, paths, None, fallbackFileFormat)
   }
 
   override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
     val paths = getPaths(options)
     val tableName = getTableName(paths)
-    OrcTable(tableName, sparkSession, options, paths, Some(schema))
+    OrcTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat)
   }
 }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala
index ace77b7c..1cc6e61 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2.orc
 import org.apache.hadoop.fs.FileStatus
 
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.datasources.FileFormat
 import org.apache.spark.sql.execution.datasources.orc.OrcUtils
 import org.apache.spark.sql.execution.datasources.v2.FileTable
 import org.apache.spark.sql.sources.v2.writer.WriteBuilder
@@ -30,7 +31,8 @@ case class OrcTable(
     sparkSession: SparkSession,
     options: CaseInsensitiveStringMap,
     paths: Seq[String],
-    userSpecifiedSchema: Option[StructType])
+    userSpecifiedSchema: Option[StructType],
+    fallbackFileFormat: Class[_ <: FileFormat])
   extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
 
   override def newScanBuilder(options: CaseInsensitiveStringMap): OrcScanBuilder =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
index d5543e8..4b0f68b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
@@ -160,7 +160,7 @@ abstract class BaseSessionStateBuilder(
     override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
       new FindDataSourceTable(session) +:
         new ResolveSQLOnFile(session) +:
-        new FallbackOrcDataSourceV2(session) +:
+        new FallBackFileSourceV2(session) +:
         DataSourceResolution(conf) +:
         customResolutionRules
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala
index 3d4f564..ac1d567 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala
@@ -21,6 +21,8 @@ import scala.collection.JavaConverters._
 import org.apache.hadoop.fs.FileStatus
 
 import org.apache.spark.sql.{QueryTest, SparkSession}
+import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.text.TextFileFormat
 import org.apache.spark.sql.sources.v2.reader.ScanBuilder
 import org.apache.spark.sql.sources.v2.writer.WriteBuilder
 import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
@@ -45,6 +47,8 @@ class DummyFileTable(
   override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = null
 
   override def supportsDataType(dataType: DataType): Boolean = dataType == StringType
+
+  override def fallbackFileFormat: Class[_ <: FileFormat] = classOf[TextFileFormat]
 }
 
 class FileTableSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala
index e019dbf..8627bdf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 class DummyReadOnlyFileDataSourceV2 extends FileDataSourceV2 {
 
-  override def fallBackFileFormat: Class[_ <: FileFormat] = classOf[ParquetFileFormat]
+  override def fallbackFileFormat: Class[_ <: FileFormat] = classOf[ParquetFileFormat]
 
   override def shortName(): String = "parquet"
 
@@ -55,7 +55,7 @@ class DummyReadOnlyFileTable extends Table with SupportsRead {
 
 class DummyWriteOnlyFileDataSourceV2 extends FileDataSourceV2 {
 
-  override def fallBackFileFormat: Class[_ <: FileFormat] = classOf[ParquetFileFormat]
+  override def fallbackFileFormat: Class[_ <: FileFormat] = classOf[ParquetFileFormat]
 
   override def shortName(): String = "parquet"
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
index 877a0da..3bca770 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
@@ -72,7 +72,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
       new ResolveHiveSerdeTable(session) +:
         new FindDataSourceTable(session) +:
         new ResolveSQLOnFile(session) +:
-        new FallbackOrcDataSourceV2(session) +:
+        new FallBackFileSourceV2(session) +:
         DataSourceResolution(conf) +:
         customResolutionRules
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org