You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/03/30 21:38:59 UTC
[spark] branch master updated: [SPARK-27326][SQL] Fall back all v2
file sources in `InsertIntoTable` to V1 FileFormat
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 5dab5f6 [SPARK-27326][SQL] Fall back all v2 file sources in `InsertIntoTable` to V1 FileFormat
5dab5f6 is described below
commit 5dab5f651fcf3a9b8c4d6900e45718dc041c54af
Author: Gengliang Wang <ge...@databricks.com>
AuthorDate: Sat Mar 30 14:38:26 2019 -0700
[SPARK-27326][SQL] Fall back all v2 file sources in `InsertIntoTable` to V1 FileFormat
## What changes were proposed in this pull request?
In the first PR for file source V2, there was a rule for falling back Orc V2 table to OrcFileFormat: https://github.com/apache/spark/pull/23383/files#diff-57e8244b6964e4f84345357a188421d5R34
As we are migrating more file sources to data source V2, we should make the rule more generic. This PR proposes to:
1. Rename the rule `FallbackOrcDataSourceV2 ` to `FallBackFileSourceV2`.The name is more generic. And we use "fall back" as verb, while "fallback" is noun.
2. Rename the method `fallBackFileFormat` in `FileDataSourceV2` to `fallbackFileFormat`. Here we should use "fallback" as noun.
3. Add new method `fallbackFileFormat` in `FileTable`. This is for falling back to V1 in rule `FallbackOrcDataSourceV2 `.
## How was this patch tested?
Existing Unit tests.
Closes #24251 from gengliangwang/fallbackV1Rule.
Authored-by: Gengliang Wang <ge...@databricks.com>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../main/scala/org/apache/spark/sql/DataFrameReader.scala | 2 +-
.../main/scala/org/apache/spark/sql/DataFrameWriter.scala | 2 +-
.../apache/spark/sql/execution/datasources/DataSource.scala | 2 +-
...lbackOrcDataSourceV2.scala => FallBackFileSourceV2.scala} | 12 +++++-------
.../sql/execution/datasources/v2/FileDataSourceV2.scala | 2 +-
.../spark/sql/execution/datasources/v2/FileTable.scala | 9 +++++++++
.../sql/execution/datasources/v2/csv/CSVDataSourceV2.scala | 6 +++---
.../spark/sql/execution/datasources/v2/csv/CSVTable.scala | 4 +++-
.../sql/execution/datasources/v2/orc/OrcDataSourceV2.scala | 6 +++---
.../spark/sql/execution/datasources/v2/orc/OrcTable.scala | 4 +++-
.../apache/spark/sql/internal/BaseSessionStateBuilder.scala | 2 +-
.../spark/sql/execution/datasources/v2/FileTableSuite.scala | 4 ++++
.../spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala | 4 ++--
.../org/apache/spark/sql/hive/HiveSessionStateBuilder.scala | 2 +-
14 files changed, 38 insertions(+), 23 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index e057d33..d514b94 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -200,7 +200,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
val cls = lookupCls.newInstance() match {
case f: FileDataSourceV2 if useV1Sources.contains(f.shortName()) ||
useV1Sources.contains(lookupCls.getCanonicalName.toLowerCase(Locale.ROOT)) =>
- f.fallBackFileFormat
+ f.fallbackFileFormat
case _ => lookupCls
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index b439a82..a06b75a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -252,7 +252,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
val cls = lookupCls.newInstance() match {
case f: FileDataSourceV2 if useV1Sources.contains(f.shortName()) ||
useV1Sources.contains(lookupCls.getCanonicalName.toLowerCase(Locale.ROOT)) =>
- f.fallBackFileFormat
+ f.fallbackFileFormat
case _ => lookupCls
}
// In Data Source V2 project, partitioning is still under development.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 516c56e..08650c8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -102,7 +102,7 @@ case class DataSource(
// [[DataFrameReader]]/[[DataFrameWriter]], since they use method `lookupDataSource`
// instead of `providingClass`.
cls.newInstance() match {
- case f: FileDataSourceV2 => f.fallBackFileFormat
+ case f: FileDataSourceV2 => f.fallbackFileFormat
case _ => cls
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallbackOrcDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala
similarity index 78%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallbackOrcDataSourceV2.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala
index d5e89be..813af82 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallbackOrcDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala
@@ -22,21 +22,19 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, FileDataSourceV2}
-import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, FileDataSourceV2, FileTable}
/**
- * Replace the ORC V2 data source of table in [[InsertIntoTable]] to V1 [[FileFormat]].
+ * Replace the File source V2 table in [[InsertIntoTable]] to V1 [[FileFormat]].
* E.g, with temporary view `t` using [[FileDataSourceV2]], inserting into view `t` fails
* since there is no corresponding physical plan.
* This is a temporary hack for making current data source V2 work. It should be
* removed when Catalog support of file data source v2 is finished.
*/
-class FallbackOrcDataSourceV2(sparkSession: SparkSession) extends Rule[LogicalPlan] {
+class FallBackFileSourceV2(sparkSession: SparkSession) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
- case i @ InsertIntoTable(d @ DataSourceV2Relation(table: OrcTable, _, _), _, _, _, _) =>
- val v1FileFormat = new OrcFileFormat
+ case i @ InsertIntoTable(d @ DataSourceV2Relation(table: FileTable, _, _), _, _, _, _) =>
+ val v1FileFormat = table.fallbackFileFormat.newInstance()
val relation = HadoopFsRelation(
table.fileIndex,
table.fileIndex.partitionSchema,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
index ebe7fee..d60d429 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
@@ -35,7 +35,7 @@ trait FileDataSourceV2 extends TableProvider with DataSourceRegister {
* source via SQL configuration and fall back to FileFormat.
* 2. Catalog support is required, which is still under development for data source V2.
*/
- def fallBackFileFormat: Class[_ <: FileFormat]
+ def fallbackFileFormat: Class[_ <: FileFormat]
lazy val sparkSession = SparkSession.active
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
index 188016c..d7284fd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
@@ -98,6 +98,15 @@ abstract class FileTable(
* }}}
*/
def formatName: String
+
+ /**
+ * Returns a V1 [[FileFormat]] class of the same file data source.
+ * This is a solution for the following cases:
+ * 1. File datasource V2 implementations cause regression. Users can disable the problematic data
+ * source via SQL configuration and fall back to FileFormat.
+ * 2. Catalog support is required, which is still under development for data source V2.
+ */
+ def fallbackFileFormat: Class[_ <: FileFormat]
}
object FileTable {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVDataSourceV2.scala
index 55222c6..045f41e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVDataSourceV2.scala
@@ -25,19 +25,19 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
class CSVDataSourceV2 extends FileDataSourceV2 {
- override def fallBackFileFormat: Class[_ <: FileFormat] = classOf[CSVFileFormat]
+ override def fallbackFileFormat: Class[_ <: FileFormat] = classOf[CSVFileFormat]
override def shortName(): String = "csv"
override def getTable(options: CaseInsensitiveStringMap): Table = {
val paths = getPaths(options)
val tableName = getTableName(paths)
- CSVTable(tableName, sparkSession, options, paths, None)
+ CSVTable(tableName, sparkSession, options, paths, None, fallbackFileFormat)
}
override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
val paths = getPaths(options)
val tableName = getTableName(paths)
- CSVTable(tableName, sparkSession, options, paths, Some(schema))
+ CSVTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala
index 852cbf0..8170661 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FileStatus
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.csv.CSVOptions
+import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.csv.CSVDataSource
import org.apache.spark.sql.execution.datasources.v2.FileTable
import org.apache.spark.sql.sources.v2.writer.WriteBuilder
@@ -33,7 +34,8 @@ case class CSVTable(
sparkSession: SparkSession,
options: CaseInsensitiveStringMap,
paths: Seq[String],
- userSpecifiedSchema: Option[StructType])
+ userSpecifiedSchema: Option[StructType],
+ fallbackFileFormat: Class[_ <: FileFormat])
extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
override def newScanBuilder(options: CaseInsensitiveStringMap): CSVScanBuilder =
CSVScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala
index e8b9e6c..1ea80d2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala
@@ -25,20 +25,20 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
class OrcDataSourceV2 extends FileDataSourceV2 {
- override def fallBackFileFormat: Class[_ <: FileFormat] = classOf[OrcFileFormat]
+ override def fallbackFileFormat: Class[_ <: FileFormat] = classOf[OrcFileFormat]
override def shortName(): String = "orc"
override def getTable(options: CaseInsensitiveStringMap): Table = {
val paths = getPaths(options)
val tableName = getTableName(paths)
- OrcTable(tableName, sparkSession, options, paths, None)
+ OrcTable(tableName, sparkSession, options, paths, None, fallbackFileFormat)
}
override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
val paths = getPaths(options)
val tableName = getTableName(paths)
- OrcTable(tableName, sparkSession, options, paths, Some(schema))
+ OrcTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala
index ace77b7c..1cc6e61 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2.orc
import org.apache.hadoop.fs.FileStatus
import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.orc.OrcUtils
import org.apache.spark.sql.execution.datasources.v2.FileTable
import org.apache.spark.sql.sources.v2.writer.WriteBuilder
@@ -30,7 +31,8 @@ case class OrcTable(
sparkSession: SparkSession,
options: CaseInsensitiveStringMap,
paths: Seq[String],
- userSpecifiedSchema: Option[StructType])
+ userSpecifiedSchema: Option[StructType],
+ fallbackFileFormat: Class[_ <: FileFormat])
extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
override def newScanBuilder(options: CaseInsensitiveStringMap): OrcScanBuilder =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
index d5543e8..4b0f68b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
@@ -160,7 +160,7 @@ abstract class BaseSessionStateBuilder(
override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
new FindDataSourceTable(session) +:
new ResolveSQLOnFile(session) +:
- new FallbackOrcDataSourceV2(session) +:
+ new FallBackFileSourceV2(session) +:
DataSourceResolution(conf) +:
customResolutionRules
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala
index 3d4f564..ac1d567 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala
@@ -21,6 +21,8 @@ import scala.collection.JavaConverters._
import org.apache.hadoop.fs.FileStatus
import org.apache.spark.sql.{QueryTest, SparkSession}
+import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.text.TextFileFormat
import org.apache.spark.sql.sources.v2.reader.ScanBuilder
import org.apache.spark.sql.sources.v2.writer.WriteBuilder
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
@@ -45,6 +47,8 @@ class DummyFileTable(
override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = null
override def supportsDataType(dataType: DataType): Boolean = dataType == StringType
+
+ override def fallbackFileFormat: Class[_ <: FileFormat] = classOf[TextFileFormat]
}
class FileTableSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala
index e019dbf..8627bdf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
class DummyReadOnlyFileDataSourceV2 extends FileDataSourceV2 {
- override def fallBackFileFormat: Class[_ <: FileFormat] = classOf[ParquetFileFormat]
+ override def fallbackFileFormat: Class[_ <: FileFormat] = classOf[ParquetFileFormat]
override def shortName(): String = "parquet"
@@ -55,7 +55,7 @@ class DummyReadOnlyFileTable extends Table with SupportsRead {
class DummyWriteOnlyFileDataSourceV2 extends FileDataSourceV2 {
- override def fallBackFileFormat: Class[_ <: FileFormat] = classOf[ParquetFileFormat]
+ override def fallbackFileFormat: Class[_ <: FileFormat] = classOf[ParquetFileFormat]
override def shortName(): String = "parquet"
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
index 877a0da..3bca770 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
@@ -72,7 +72,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
new ResolveHiveSerdeTable(session) +:
new FindDataSourceTable(session) +:
new ResolveSQLOnFile(session) +:
- new FallbackOrcDataSourceV2(session) +:
+ new FallBackFileSourceV2(session) +:
DataSourceResolution(conf) +:
customResolutionRules
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org