You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/03/18 10:25:37 UTC
[spark] branch master updated: [SPARK-26811][SQL] Add capabilities
to v2.Table
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e348f14 [SPARK-26811][SQL] Add capabilities to v2.Table
e348f14 is described below
commit e348f14259d3e8699319e7c2fe220902de255f44
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Mon Mar 18 18:25:11 2019 +0800
[SPARK-26811][SQL] Add capabilities to v2.Table
## What changes were proposed in this pull request?
This adds a new method, `capabilities` to `v2.Table` that returns a set of `TableCapability`. Capabilities are used to fail queries during analysis checks, `V2WriteSupportCheck`, when the table does not support operations, like truncation.
## How was this patch tested?
Existing tests for regressions, added new analysis suite, `V2WriteSupportCheckSuite`, for new capability checks.
Closes #24012 from rdblue/SPARK-26811-add-capabilities.
Authored-by: Ryan Blue <bl...@apache.org>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../spark/sql/kafka010/KafkaSourceProvider.scala | 4 +-
.../spark/sql/sources/v2/SupportsBatchRead.java | 34 -----
.../spark/sql/sources/v2/SupportsBatchWrite.java | 33 -----
.../apache/spark/sql/sources/v2/SupportsRead.java | 2 +-
.../apache/spark/sql/sources/v2/SupportsWrite.java | 2 +-
.../org/apache/spark/sql/sources/v2/Table.java | 14 +-
.../spark/sql/sources/v2/TableCapability.java | 69 ++++++++++
.../apache/spark/sql/sources/v2/reader/Scan.java | 7 +-
.../spark/sql/sources/v2/writer/WriteBuilder.java | 4 +-
.../org/apache/spark/sql/DataFrameReader.scala | 6 +-
.../org/apache/spark/sql/DataFrameWriter.scala | 6 +-
.../datasources/noop/NoopDataSource.scala | 7 +-
.../datasources/v2/DataSourceV2Implicits.scala | 18 ++-
.../datasources/v2/DataSourceV2Relation.scala | 2 +-
.../datasources/v2/DataSourceV2Strategy.scala | 6 +-
.../sql/execution/datasources/v2/FileTable.scala | 11 +-
.../datasources/v2/V2WriteSupportCheck.scala | 56 ++++++++
.../datasources/v2/WriteToDataSourceV2Exec.scala | 12 +-
.../spark/sql/execution/streaming/console.scala | 5 +
.../spark/sql/execution/streaming/memory.scala | 4 +
.../streaming/sources/ForeachWriterTable.scala | 7 +-
.../streaming/sources/RateStreamProvider.scala | 5 +
.../sources/TextSocketSourceProvider.scala | 5 +-
.../sql/execution/streaming/sources/memoryV2.scala | 6 +-
.../sql/internal/BaseSessionStateBuilder.scala | 2 +
.../spark/sql/sources/v2/JavaSimpleBatchTable.java | 17 ++-
.../spark/sql/sources/v2/DataSourceV2Suite.scala | 8 +-
.../sources/v2/FileDataSourceV2FallBackSuite.scala | 12 +-
.../sql/sources/v2/SimpleWritableDataSource.scala | 7 +-
.../sql/sources/v2/V2WriteSupportCheckSuite.scala | 149 +++++++++++++++++++++
.../sources/StreamingDataSourceV2Suite.scala | 8 ++
.../spark/sql/hive/HiveSessionStateBuilder.scala | 2 +
32 files changed, 416 insertions(+), 114 deletions(-)
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 8496cbd..a8eff6b 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.kafka010
import java.{util => ju}
-import java.util.{Locale, UUID}
+import java.util.{Collections, Locale, UUID}
import scala.collection.JavaConverters._
@@ -359,6 +359,8 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
override def schema(): StructType = KafkaOffsetReader.kafkaSchema
+ override def capabilities(): ju.Set[TableCapability] = Collections.emptySet()
+
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new ScanBuilder {
override def build(): Scan = new KafkaScan(options)
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchRead.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchRead.java
deleted file mode 100644
index ea7c5d2..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchRead.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2;
-
-import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.sources.v2.reader.Scan;
-import org.apache.spark.sql.sources.v2.reader.ScanBuilder;
-import org.apache.spark.sql.util.CaseInsensitiveStringMap;
-
-/**
- * An empty mix-in interface for {@link Table}, to indicate this table supports batch scan.
- * <p>
- * If a {@link Table} implements this interface, the
- * {@link SupportsRead#newScanBuilder(CaseInsensitiveStringMap)} must return a {@link ScanBuilder}
- * that builds {@link Scan} with {@link Scan#toBatch()} implemented.
- * </p>
- */
-@Evolving
-public interface SupportsBatchRead extends SupportsRead { }
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java
deleted file mode 100644
index 09e23f8..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2;
-
-import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.sources.v2.writer.WriteBuilder;
-import org.apache.spark.sql.util.CaseInsensitiveStringMap;
-
-/**
- * An empty mix-in interface for {@link Table}, to indicate this table supports batch write.
- * <p>
- * If a {@link Table} implements this interface, the
- * {@link SupportsWrite#newWriteBuilder(CaseInsensitiveStringMap)} must return a
- * {@link WriteBuilder} with {@link WriteBuilder#buildForBatch()} implemented.
- * </p>
- */
-@Evolving
-public interface SupportsBatchWrite extends SupportsWrite {}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java
index 14990ef..67fc72e 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java
@@ -26,7 +26,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
* {@link #newScanBuilder(CaseInsensitiveStringMap)} that is used to create a scan for batch,
* micro-batch, or continuous processing.
*/
-interface SupportsRead extends Table {
+public interface SupportsRead extends Table {
/**
* Returns a {@link ScanBuilder} which can be used to build a {@link Scan}. Spark will call this
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java
index f0d8e44..b215963 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java
@@ -26,7 +26,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
* {@link #newWriteBuilder(CaseInsensitiveStringMap)} that is used to create a write
* for batch or streaming.
*/
-interface SupportsWrite extends Table {
+public interface SupportsWrite extends Table {
/**
* Returns a {@link WriteBuilder} which can be used to create {@link BatchWrite}. Spark will call
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java
index 0866485..78f979a 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java
@@ -20,16 +20,15 @@ package org.apache.spark.sql.sources.v2;
import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.types.StructType;
+import java.util.Set;
+
/**
* An interface representing a logical structured data set of a data source. For example, the
* implementation can be a directory on the file system, a topic of Kafka, or a table in the
* catalog, etc.
* <p>
- * This interface can mixin the following interfaces to support different operations:
- * </p>
- * <ul>
- * <li>{@link SupportsBatchRead}: this table can be read in batch queries.</li>
- * </ul>
+ * This interface can mixin the following interfaces to support different operations, like
+ * {@code SupportsRead}.
*/
@Evolving
public interface Table {
@@ -45,4 +44,9 @@ public interface Table {
* empty schema can be returned here.
*/
StructType schema();
+
+ /**
+ * Returns the set of capabilities for this table.
+ */
+ Set<TableCapability> capabilities();
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableCapability.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableCapability.java
new file mode 100644
index 0000000..8d3fdcd
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableCapability.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.Experimental;
+
+/**
+ * Capabilities that can be provided by a {@link Table} implementation.
+ * <p>
+ * Tables use {@link Table#capabilities()} to return a set of capabilities. Each capability signals
+ * to Spark that the table supports a feature identified by the capability. For example, returning
+ * {@code BATCH_READ} allows Spark to read from the table using a batch scan.
+ */
+@Experimental
+public enum TableCapability {
+ /**
+ * Signals that the table supports reads in batch execution mode.
+ */
+ BATCH_READ,
+
+ /**
+ * Signals that the table supports append writes in batch execution mode.
+ * <p>
+ * Tables that return this capability must support appending data and may also support additional
+ * write modes, like {@link #TRUNCATE}, {@link #OVERWRITE_BY_FILTER}, and
+ * {@link #OVERWRITE_DYNAMIC}.
+ */
+ BATCH_WRITE,
+
+ /**
+ * Signals that the table can be truncated in a write operation.
+ * <p>
+ * Truncating a table removes all existing rows.
+ * <p>
+ * See {@link org.apache.spark.sql.sources.v2.writer.SupportsTruncate}.
+ */
+ TRUNCATE,
+
+ /**
+ * Signals that the table can replace existing data that matches a filter with appended data in
+ * a write operation.
+ * <p>
+ * See {@link org.apache.spark.sql.sources.v2.writer.SupportsOverwrite}.
+ */
+ OVERWRITE_BY_FILTER,
+
+ /**
+ * Signals that the table can dynamically replace existing data partitions with appended data in
+ * a write operation.
+ * <p>
+ * See {@link org.apache.spark.sql.sources.v2.writer.SupportsDynamicOverwrite}.
+ */
+ OVERWRITE_DYNAMIC
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java
index 25ab06e..e97d054 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java
@@ -21,7 +21,6 @@ import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousStream;
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchStream;
import org.apache.spark.sql.types.StructType;
-import org.apache.spark.sql.sources.v2.SupportsBatchRead;
import org.apache.spark.sql.sources.v2.SupportsContinuousRead;
import org.apache.spark.sql.sources.v2.SupportsMicroBatchRead;
import org.apache.spark.sql.sources.v2.Table;
@@ -33,8 +32,8 @@ import org.apache.spark.sql.sources.v2.Table;
* This logical representation is shared between batch scan, micro-batch streaming scan and
* continuous streaming scan. Data sources must implement the corresponding methods in this
* interface, to match what the table promises to support. For example, {@link #toBatch()} must be
- * implemented, if the {@link Table} that creates this {@link Scan} implements
- * {@link SupportsBatchRead}.
+ * implemented, if the {@link Table} that creates this {@link Scan} returns BATCH_READ support in
+ * its {@link Table#capabilities()}.
* </p>
*/
@Evolving
@@ -62,7 +61,7 @@ public interface Scan {
/**
* Returns the physical representation of this scan for batch query. By default this method throws
* exception, data sources must overwrite this method to provide an implementation, if the
- * {@link Table} that creates this scan implements {@link SupportsBatchRead}.
+ * {@link Table} that creates this returns batch read support in its {@link Table#capabilities()}.
*
* @throws UnsupportedOperationException
*/
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java
index 07529fe..e08d34f 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java
@@ -18,7 +18,6 @@
package org.apache.spark.sql.sources.v2.writer;
import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.sources.v2.SupportsBatchWrite;
import org.apache.spark.sql.sources.v2.Table;
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite;
import org.apache.spark.sql.types.StructType;
@@ -58,7 +57,8 @@ public interface WriteBuilder {
/**
* Returns a {@link BatchWrite} to write data to batch source. By default this method throws
* exception, data sources must overwrite this method to provide an implementation, if the
- * {@link Table} that creates this scan implements {@link SupportsBatchWrite}.
+ * {@link Table} that creates this write returns BATCH_WRITE support in its
+ * {@link Table#capabilities()}.
*
* Note that, the returned {@link BatchWrite} can be null if the implementation supports SaveMode,
* to indicate that no writing is needed. We can clean it up after removing
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index dfba12a..e057d33 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -37,8 +37,9 @@ import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.csv._
import org.apache.spark.sql.execution.datasources.jdbc._
import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Utils, FileDataSourceV2, FileTable}
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Utils, FileDataSourceV2}
import org.apache.spark.sql.sources.v2._
+import org.apache.spark.sql.sources.v2.TableCapability._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.unsafe.types.UTF8String
@@ -220,8 +221,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
case Some(schema) => provider.getTable(dsOptions, schema)
case _ => provider.getTable(dsOptions)
}
+ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
table match {
- case _: SupportsBatchRead =>
+ case _: SupportsRead if table.supports(BATCH_READ) =>
Dataset.ofRows(sparkSession, DataSourceV2Relation.create(table, dsOptions))
case _ => loadV1Source(paths: _*)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 3c51edd..b439a82 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, Logi
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Utils, FileDataSourceV2, WriteToDataSourceV2}
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.sources.v2._
+import org.apache.spark.sql.sources.v2.TableCapability._
import org.apache.spark.sql.sources.v2.writer.SupportsSaveMode
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -264,8 +265,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
val options = sessionOptions ++ extraOptions
val dsOptions = new CaseInsensitiveStringMap(options.asJava)
+ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
provider.getTable(dsOptions) match {
- case table: SupportsBatchWrite =>
+ case table: SupportsWrite if table.supports(BATCH_WRITE) =>
lazy val relation = DataSourceV2Relation.create(table, dsOptions)
mode match {
case SaveMode.Append =>
@@ -273,7 +275,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
AppendData.byName(relation, df.logicalPlan)
}
- case SaveMode.Overwrite =>
+ case SaveMode.Overwrite if table.supportsAny(TRUNCATE, OVERWRITE_BY_FILTER) =>
// truncate the table
runCommand(df.sparkSession, "save") {
OverwriteByExpression.byName(relation, df.logicalPlan, Literal(true))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
index aa2a5e9..96a78d3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
@@ -17,6 +17,10 @@
package org.apache.spark.sql.execution.datasources.noop
+import java.util
+
+import scala.collection.JavaConverters._
+
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.DataSourceRegister
@@ -35,10 +39,11 @@ class NoopDataSource extends TableProvider with DataSourceRegister {
override def getTable(options: CaseInsensitiveStringMap): Table = NoopTable
}
-private[noop] object NoopTable extends Table with SupportsBatchWrite with SupportsStreamingWrite {
+private[noop] object NoopTable extends Table with SupportsWrite with SupportsStreamingWrite {
override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = NoopWriteBuilder
override def name(): String = "noop-table"
override def schema(): StructType = new StructType()
+ override def capabilities(): util.Set[TableCapability] = Set(TableCapability.BATCH_WRITE).asJava
}
private[noop] object NoopWriteBuilder extends WriteBuilder
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala
index 2081af3..eed69cd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala
@@ -18,26 +18,30 @@
package org.apache.spark.sql.execution.datasources.v2
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.sources.v2.{SupportsBatchRead, SupportsBatchWrite, Table}
+import org.apache.spark.sql.sources.v2.{SupportsRead, SupportsWrite, Table, TableCapability}
object DataSourceV2Implicits {
implicit class TableHelper(table: Table) {
- def asBatchReadable: SupportsBatchRead = {
+ def asReadable: SupportsRead = {
table match {
- case support: SupportsBatchRead =>
+ case support: SupportsRead =>
support
case _ =>
- throw new AnalysisException(s"Table does not support batch reads: ${table.name}")
+ throw new AnalysisException(s"Table does not support reads: ${table.name}")
}
}
- def asBatchWritable: SupportsBatchWrite = {
+ def asWritable: SupportsWrite = {
table match {
- case support: SupportsBatchWrite =>
+ case support: SupportsWrite =>
support
case _ =>
- throw new AnalysisException(s"Table does not support batch writes: ${table.name}")
+ throw new AnalysisException(s"Table does not support writes: ${table.name}")
}
}
+
+ def supports(capability: TableCapability): Boolean = table.capabilities.contains(capability)
+
+ def supportsAny(capabilities: TableCapability*): Boolean = capabilities.exists(supports)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 1740782..4119957 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -49,7 +49,7 @@ case class DataSourceV2Relation(
}
def newScanBuilder(): ScanBuilder = {
- table.asBatchReadable.newScanBuilder(options)
+ table.asReadable.newScanBuilder(options)
}
override def computeStats(): Statistics = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 424fbed..f8c7e2c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -148,7 +148,7 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil
case AppendData(r: DataSourceV2Relation, query, _) =>
- AppendDataExec(r.table.asBatchWritable, r.options, planLater(query)) :: Nil
+ AppendDataExec(r.table.asWritable, r.options, planLater(query)) :: Nil
case OverwriteByExpression(r: DataSourceV2Relation, deleteExpr, query, _) =>
// fail if any filter cannot be converted. correctness depends on removing all matching data.
@@ -158,10 +158,10 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
}.toArray
OverwriteByExpressionExec(
- r.table.asBatchWritable, filters, r.options, planLater(query)) :: Nil
+ r.table.asWritable, filters, r.options, planLater(query)) :: Nil
case OverwritePartitionsDynamic(r: DataSourceV2Relation, query, _) =>
- OverwritePartitionsDynamicExec(r.table.asBatchWritable, r.options, planLater(query)) :: Nil
+ OverwritePartitionsDynamicExec(r.table.asWritable, r.options, planLater(query)) :: Nil
case WriteToContinuousDataSource(writer, query) =>
WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
index 9423fe9..5944a20 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
@@ -22,7 +22,8 @@ import org.apache.hadoop.fs.FileStatus
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.sources.v2.{SupportsBatchRead, SupportsBatchWrite, Table}
+import org.apache.spark.sql.sources.v2.{SupportsRead, SupportsWrite, Table, TableCapability}
+import org.apache.spark.sql.sources.v2.TableCapability._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.sql.util.SchemaUtils
@@ -32,7 +33,7 @@ abstract class FileTable(
options: CaseInsensitiveStringMap,
paths: Seq[String],
userSpecifiedSchema: Option[StructType])
- extends Table with SupportsBatchRead with SupportsBatchWrite {
+ extends Table with SupportsRead with SupportsWrite {
lazy val fileIndex: PartitioningAwareFileIndex = {
val scalaMap = options.asScala.toMap
@@ -62,6 +63,8 @@ abstract class FileTable(
partitionSchema, caseSensitive)._1
}
+ override def capabilities(): java.util.Set[TableCapability] = FileTable.CAPABILITIES
+
/**
* When possible, this method should return the schema of the given `files`. When the format
* does not support inference, or no valid files are given should return None. In these cases
@@ -69,3 +72,7 @@ abstract class FileTable(
*/
def inferSchema(files: Seq[FileStatus]): Option[StructType]
}
+
+object FileTable {
+ private val CAPABILITIES = Set(BATCH_READ, BATCH_WRITE, TRUNCATE).asJava
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2WriteSupportCheck.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2WriteSupportCheck.scala
new file mode 100644
index 0000000..cf77998
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2WriteSupportCheck.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic}
+import org.apache.spark.sql.sources.v2.TableCapability._
+import org.apache.spark.sql.types.BooleanType
+
+object V2WriteSupportCheck extends (LogicalPlan => Unit) {
+ import DataSourceV2Implicits._
+
+ def failAnalysis(msg: String): Unit = throw new AnalysisException(msg)
+
+ override def apply(plan: LogicalPlan): Unit = plan foreach {
+ case AppendData(rel: DataSourceV2Relation, _, _) if !rel.table.supports(BATCH_WRITE) =>
+ failAnalysis(s"Table does not support append in batch mode: ${rel.table}")
+
+ case OverwritePartitionsDynamic(rel: DataSourceV2Relation, _, _)
+ if !rel.table.supports(BATCH_WRITE) || !rel.table.supports(OVERWRITE_DYNAMIC) =>
+ failAnalysis(s"Table does not support dynamic overwrite in batch mode: ${rel.table}")
+
+ case OverwriteByExpression(rel: DataSourceV2Relation, expr, _, _) =>
+ expr match {
+ case Literal(true, BooleanType) =>
+ if (!rel.table.supports(BATCH_WRITE) ||
+ !rel.table.supportsAny(TRUNCATE, OVERWRITE_BY_FILTER)) {
+ failAnalysis(
+ s"Table does not support truncate in batch mode: ${rel.table}")
+ }
+ case _ =>
+ if (!rel.table.supports(BATCH_WRITE) || !rel.table.supports(OVERWRITE_BY_FILTER)) {
+ failAnalysis(s"Table does not support overwrite expression ${expr.sql} " +
+ s"in batch mode: ${rel.table}")
+ }
+ }
+
+ case _ => // OK
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index 51606ab..607f2fa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.sources.{AlwaysTrue, Filter}
-import org.apache.spark.sql.sources.v2.SupportsBatchWrite
+import org.apache.spark.sql.sources.v2.SupportsWrite
import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriterFactory, SupportsDynamicOverwrite, SupportsOverwrite, SupportsSaveMode, SupportsTruncate, WriteBuilder, WriterCommitMessage}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.{LongAccumulator, Utils}
@@ -53,7 +53,7 @@ case class WriteToDataSourceV2(batchWrite: BatchWrite, query: LogicalPlan)
* Rows in the output data set are appended.
*/
case class AppendDataExec(
- table: SupportsBatchWrite,
+ table: SupportsWrite,
writeOptions: CaseInsensitiveStringMap,
query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper {
@@ -80,7 +80,7 @@ case class AppendDataExec(
* AlwaysTrue to delete all rows.
*/
case class OverwriteByExpressionExec(
- table: SupportsBatchWrite,
+ table: SupportsWrite,
deleteWhere: Array[Filter],
writeOptions: CaseInsensitiveStringMap,
query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper {
@@ -101,7 +101,7 @@ case class OverwriteByExpressionExec(
builder.overwrite(deleteWhere).buildForBatch()
case _ =>
- throw new SparkException(s"Table does not support dynamic partition overwrite: $table")
+ throw new SparkException(s"Table does not support overwrite by expression: $table")
}
doWrite(batchWrite)
@@ -118,7 +118,7 @@ case class OverwriteByExpressionExec(
* are not modified.
*/
case class OverwritePartitionsDynamicExec(
- table: SupportsBatchWrite,
+ table: SupportsWrite,
writeOptions: CaseInsensitiveStringMap,
query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper {
@@ -153,7 +153,7 @@ case class WriteToDataSourceV2Exec(
* Helper for physical plans that build batch writes.
*/
trait BatchWriteHelper {
- def table: SupportsBatchWrite
+ def table: SupportsWrite
def query: SparkPlan
def writeOptions: CaseInsensitiveStringMap
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
index dbdfcf8..884b92a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
@@ -17,6 +17,9 @@
package org.apache.spark.sql.execution.streaming
+import java.util
+import java.util.Collections
+
import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming.sources.ConsoleWrite
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister}
@@ -63,6 +66,8 @@ object ConsoleTable extends Table with SupportsStreamingWrite {
override def schema(): StructType = StructType(Nil)
+ override def capabilities(): util.Set[TableCapability] = Collections.emptySet()
+
override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
new WriteBuilder with SupportsTruncate {
private var inputSchema: StructType = _
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index df7990c..bfa9c09 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.streaming
+import java.util
+import java.util.Collections
import java.util.concurrent.atomic.AtomicInteger
import javax.annotation.concurrent.GuardedBy
@@ -97,6 +99,8 @@ class MemoryStreamTable(val stream: MemoryStreamBase[_]) extends Table
override def schema(): StructType = stream.fullSchema()
+ override def capabilities(): util.Set[TableCapability] = Collections.emptySet()
+
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
new MemoryStreamScanBuilder(stream)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala
index 44516bb..807e0b1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala
@@ -17,12 +17,15 @@
package org.apache.spark.sql.execution.streaming.sources
+import java.util
+import java.util.Collections
+
import org.apache.spark.sql.{ForeachWriter, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.python.PythonForeachWriter
-import org.apache.spark.sql.sources.v2.{SupportsStreamingWrite, Table}
+import org.apache.spark.sql.sources.v2.{SupportsStreamingWrite, Table, TableCapability}
import org.apache.spark.sql.sources.v2.writer.{DataWriter, SupportsTruncate, WriteBuilder, WriterCommitMessage}
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite}
import org.apache.spark.sql.types.StructType
@@ -45,6 +48,8 @@ case class ForeachWriterTable[T](
override def schema(): StructType = StructType(Nil)
+ override def capabilities(): util.Set[TableCapability] = Collections.emptySet()
+
override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
new WriteBuilder with SupportsTruncate {
private var inputSchema: StructType = _
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
index 3d8a90e..08aea75 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
@@ -17,6 +17,9 @@
package org.apache.spark.sql.execution.streaming.sources
+import java.util
+import java.util.Collections
+
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousStream
@@ -84,6 +87,8 @@ class RateStreamTable(
override def schema(): StructType = RateStreamProvider.SCHEMA
+ override def capabilities(): util.Set[TableCapability] = Collections.emptySet()
+
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new ScanBuilder {
override def build(): Scan = new Scan {
override def readSchema(): StructType = RateStreamProvider.SCHEMA
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
index 0adbf1d9b..c0292ac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
@@ -18,7 +18,8 @@
package org.apache.spark.sql.execution.streaming.sources
import java.text.SimpleDateFormat
-import java.util.Locale
+import java.util
+import java.util.{Collections, Locale}
import scala.util.{Failure, Success, Try}
@@ -78,6 +79,8 @@ class TextSocketTable(host: String, port: Int, numPartitions: Int, includeTimest
}
}
+ override def capabilities(): util.Set[TableCapability] = Collections.emptySet()
+
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new ScanBuilder {
override def build(): Scan = new Scan {
override def readSchema(): StructType = schema()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
index 22adceb..8eb5de0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.streaming.sources
+import java.util
+import java.util.Collections
import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable
@@ -31,7 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
import org.apache.spark.sql.execution.streaming.{MemorySinkBase, Sink}
-import org.apache.spark.sql.sources.v2.SupportsStreamingWrite
+import org.apache.spark.sql.sources.v2.{SupportsStreamingWrite, TableCapability}
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite}
import org.apache.spark.sql.types.StructType
@@ -47,6 +49,8 @@ class MemorySinkV2 extends SupportsStreamingWrite with MemorySinkBase with Loggi
override def schema(): StructType = StructType(Nil)
+ override def capabilities(): util.Set[TableCapability] = Collections.emptySet()
+
override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
new WriteBuilder with SupportsTruncate {
private var needTruncate: Boolean = false
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
index a605dc6..f05aa51 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{QueryExecution, SparkOptimizer, SparkPlanner, SparkSqlParser}
import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.v2.V2WriteSupportCheck
import org.apache.spark.sql.streaming.StreamingQueryManager
import org.apache.spark.sql.util.ExecutionListenerManager
@@ -172,6 +173,7 @@ abstract class BaseSessionStateBuilder(
PreWriteCheck +:
PreReadCheck +:
HiveOnlyCheck +:
+ V2WriteSupportCheck +:
customCheckRules
}
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleBatchTable.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleBatchTable.java
index cb5954d..9b0eb61 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleBatchTable.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleBatchTable.java
@@ -18,15 +18,23 @@
package test.org.apache.spark.sql.sources.v2;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
-import org.apache.spark.sql.sources.v2.SupportsBatchRead;
+import org.apache.spark.sql.sources.v2.SupportsRead;
import org.apache.spark.sql.sources.v2.Table;
+import org.apache.spark.sql.sources.v2.TableCapability;
import org.apache.spark.sql.sources.v2.reader.*;
import org.apache.spark.sql.types.StructType;
-abstract class JavaSimpleBatchTable implements Table, SupportsBatchRead {
+abstract class JavaSimpleBatchTable implements Table, SupportsRead {
+ private static final Set<TableCapability> CAPABILITIES = new HashSet<>(Arrays.asList(
+ TableCapability.BATCH_READ,
+ TableCapability.BATCH_WRITE,
+ TableCapability.TRUNCATE));
@Override
public StructType schema() {
@@ -37,6 +45,11 @@ abstract class JavaSimpleBatchTable implements Table, SupportsBatchRead {
public String name() {
return this.getClass().toString();
}
+
+ @Override
+ public Set<TableCapability> capabilities() {
+ return CAPABILITIES;
+ }
}
abstract class JavaSimpleScanBuilder implements ScanBuilder, Scan, Batch {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
index 705559d..587cfa9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
@@ -18,8 +18,11 @@
package org.apache.spark.sql.sources.v2
import java.io.File
+import java.util
import java.util.OptionalLong
+import scala.collection.JavaConverters._
+
import test.org.apache.spark.sql.sources.v2._
import org.apache.spark.SparkException
@@ -30,6 +33,7 @@ import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec}
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
import org.apache.spark.sql.functions._
import org.apache.spark.sql.sources.{Filter, GreaterThan}
+import org.apache.spark.sql.sources.v2.TableCapability._
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.partitioning.{ClusteredDistribution, Distribution, Partitioning}
import org.apache.spark.sql.test.SharedSQLContext
@@ -411,11 +415,13 @@ object SimpleReaderFactory extends PartitionReaderFactory {
}
}
-abstract class SimpleBatchTable extends Table with SupportsBatchRead {
+abstract class SimpleBatchTable extends Table with SupportsRead {
override def schema(): StructType = new StructType().add("i", "int").add("j", "int")
override def name(): String = this.getClass.toString
+
+ override def capabilities(): util.Set[TableCapability] = Set(BATCH_READ).asJava
}
abstract class SimpleScanBuilder extends ScanBuilder
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala
index f9f9db3..e019dbf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala
@@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.sources.v2
+import scala.collection.JavaConverters._
+
import org.apache.spark.sql.{AnalysisException, QueryTest}
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
@@ -38,7 +40,7 @@ class DummyReadOnlyFileDataSourceV2 extends FileDataSourceV2 {
}
}
-class DummyReadOnlyFileTable extends Table with SupportsBatchRead {
+class DummyReadOnlyFileTable extends Table with SupportsRead {
override def name(): String = "dummy"
override def schema(): StructType = StructType(Nil)
@@ -46,6 +48,9 @@ class DummyReadOnlyFileTable extends Table with SupportsBatchRead {
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
throw new AnalysisException("Dummy file reader")
}
+
+ override def capabilities(): java.util.Set[TableCapability] =
+ Set(TableCapability.BATCH_READ).asJava
}
class DummyWriteOnlyFileDataSourceV2 extends FileDataSourceV2 {
@@ -59,13 +64,16 @@ class DummyWriteOnlyFileDataSourceV2 extends FileDataSourceV2 {
}
}
-class DummyWriteOnlyFileTable extends Table with SupportsBatchWrite {
+class DummyWriteOnlyFileTable extends Table with SupportsWrite {
override def name(): String = "dummy"
override def schema(): StructType = StructType(Nil)
override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder =
throw new AnalysisException("Dummy file writer")
+
+ override def capabilities(): java.util.Set[TableCapability] =
+ Set(TableCapability.BATCH_WRITE).asJava
}
class FileDataSourceV2FallBackSuite extends QueryTest with SharedSQLContext {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
index 1603545..edebb0b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.sources.v2
import java.io.{BufferedReader, InputStreamReader, IOException}
+import java.util
import scala.collection.JavaConverters._
@@ -27,6 +28,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.SparkContext
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.sources.v2.TableCapability._
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.types.StructType
@@ -142,7 +144,7 @@ class SimpleWritableDataSource extends TableProvider with SessionConfigSupport {
}
class MyTable(options: CaseInsensitiveStringMap)
- extends SimpleBatchTable with SupportsBatchWrite {
+ extends SimpleBatchTable with SupportsWrite {
private val path = options.get("path")
private val conf = SparkContext.getActive.get.hadoopConfiguration
@@ -156,6 +158,9 @@ class SimpleWritableDataSource extends TableProvider with SessionConfigSupport {
override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
new MyWriteBuilder(path)
}
+
+ override def capabilities(): util.Set[TableCapability] =
+ Set(BATCH_READ, BATCH_WRITE, TRUNCATE).asJava
}
override def getTable(options: CaseInsensitiveStringMap): Table = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/V2WriteSupportCheckSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/V2WriteSupportCheckSuite.scala
new file mode 100644
index 0000000..1d76ee3
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/V2WriteSupportCheckSuite.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, NamedRelation}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal}
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LeafNode, OverwriteByExpression, OverwritePartitionsDynamic}
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, V2WriteSupportCheck}
+import org.apache.spark.sql.sources.v2.TableCapability._
+import org.apache.spark.sql.types.{LongType, StringType, StructType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class V2WriteSupportCheckSuite extends AnalysisTest {
+
+ test("AppendData: check missing capabilities") {
+ val plan = AppendData.byName(
+ DataSourceV2Relation.create(CapabilityTable(), CaseInsensitiveStringMap.empty), TestRelation)
+
+ val exc = intercept[AnalysisException]{
+ V2WriteSupportCheck.apply(plan)
+ }
+
+ assert(exc.getMessage.contains("does not support append in batch mode"))
+ }
+
+ test("AppendData: check correct capabilities") {
+ val plan = AppendData.byName(
+ DataSourceV2Relation.create(CapabilityTable(BATCH_WRITE), CaseInsensitiveStringMap.empty),
+ TestRelation)
+
+ V2WriteSupportCheck.apply(plan)
+ }
+
+ test("Truncate: check missing capabilities") {
+ Seq(CapabilityTable(),
+ CapabilityTable(BATCH_WRITE),
+ CapabilityTable(TRUNCATE),
+ CapabilityTable(OVERWRITE_BY_FILTER)).foreach { table =>
+
+ val plan = OverwriteByExpression.byName(
+ DataSourceV2Relation.create(table, CaseInsensitiveStringMap.empty), TestRelation,
+ Literal(true))
+
+ val exc = intercept[AnalysisException]{
+ V2WriteSupportCheck.apply(plan)
+ }
+
+ assert(exc.getMessage.contains("does not support truncate in batch mode"))
+ }
+ }
+
+ test("Truncate: check correct capabilities") {
+ Seq(CapabilityTable(BATCH_WRITE, TRUNCATE),
+ CapabilityTable(BATCH_WRITE, OVERWRITE_BY_FILTER)).foreach { table =>
+
+ val plan = OverwriteByExpression.byName(
+ DataSourceV2Relation.create(table, CaseInsensitiveStringMap.empty), TestRelation,
+ Literal(true))
+
+ V2WriteSupportCheck.apply(plan)
+ }
+ }
+
+ test("OverwriteByExpression: check missing capabilities") {
+ Seq(CapabilityTable(),
+ CapabilityTable(BATCH_WRITE),
+ CapabilityTable(OVERWRITE_BY_FILTER)).foreach { table =>
+
+ val plan = OverwriteByExpression.byName(
+ DataSourceV2Relation.create(table, CaseInsensitiveStringMap.empty), TestRelation,
+ EqualTo(AttributeReference("x", LongType)(), Literal(5)))
+
+ val exc = intercept[AnalysisException]{
+ V2WriteSupportCheck.apply(plan)
+ }
+
+ assert(exc.getMessage.contains(
+ "does not support overwrite expression (`x` = 5) in batch mode"))
+ }
+ }
+
+ test("OverwriteByExpression: check correct capabilities") {
+ val table = CapabilityTable(BATCH_WRITE, OVERWRITE_BY_FILTER)
+ val plan = OverwriteByExpression.byName(
+ DataSourceV2Relation.create(table, CaseInsensitiveStringMap.empty), TestRelation,
+ EqualTo(AttributeReference("x", LongType)(), Literal(5)))
+
+ V2WriteSupportCheck.apply(plan)
+ }
+
+ test("OverwritePartitionsDynamic: check missing capabilities") {
+ Seq(CapabilityTable(),
+ CapabilityTable(BATCH_WRITE),
+ CapabilityTable(OVERWRITE_DYNAMIC)).foreach { table =>
+
+ val plan = OverwritePartitionsDynamic.byName(
+ DataSourceV2Relation.create(table, CaseInsensitiveStringMap.empty), TestRelation)
+
+ val exc = intercept[AnalysisException] {
+ V2WriteSupportCheck.apply(plan)
+ }
+
+ assert(exc.getMessage.contains("does not support dynamic overwrite in batch mode"))
+ }
+ }
+
+ test("OverwritePartitionsDynamic: check correct capabilities") {
+ val table = CapabilityTable(BATCH_WRITE, OVERWRITE_DYNAMIC)
+ val plan = OverwritePartitionsDynamic.byName(
+ DataSourceV2Relation.create(table, CaseInsensitiveStringMap.empty), TestRelation)
+
+ V2WriteSupportCheck.apply(plan)
+ }
+}
+
+private object V2WriteSupportCheckSuite {
+ val schema: StructType = new StructType().add("id", LongType).add("data", StringType)
+}
+
+private case object TestRelation extends LeafNode with NamedRelation {
+ override def name: String = "source_relation"
+ override def output: Seq[AttributeReference] = V2WriteSupportCheckSuite.schema.toAttributes
+}
+
+private case class CapabilityTable(_capabilities: TableCapability*) extends Table {
+ override def name(): String = "capability_test_table"
+ override def schema(): StructType = V2WriteSupportCheckSuite.schema
+ override def capabilities(): util.Set[TableCapability] = _capabilities.toSet.asJava
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
index 13bb686..f022ede 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
@@ -17,6 +17,9 @@
package org.apache.spark.sql.streaming.sources
+import java.util
+import java.util.Collections
+
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.streaming.{RateStreamOffset, Sink, StreamingQueryWrapper}
@@ -77,18 +80,21 @@ class FakeWriteBuilder extends WriteBuilder with StreamingWrite {
trait FakeMicroBatchReadTable extends Table with SupportsMicroBatchRead {
override def name(): String = "fake"
override def schema(): StructType = StructType(Seq())
+ override def capabilities(): util.Set[TableCapability] = Collections.emptySet()
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new FakeScanBuilder
}
trait FakeContinuousReadTable extends Table with SupportsContinuousRead {
override def name(): String = "fake"
override def schema(): StructType = StructType(Seq())
+ override def capabilities(): util.Set[TableCapability] = Collections.emptySet()
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new FakeScanBuilder
}
trait FakeStreamingWriteTable extends Table with SupportsStreamingWrite {
override def name(): String = "fake"
override def schema(): StructType = StructType(Seq())
+ override def capabilities(): util.Set[TableCapability] = Collections.emptySet()
override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
new FakeWriteBuilder
}
@@ -137,6 +143,7 @@ class FakeReadNeitherMode extends DataSourceRegister with TableProvider {
new Table {
override def name(): String = "fake"
override def schema(): StructType = StructType(Nil)
+ override def capabilities(): util.Set[TableCapability] = Collections.emptySet()
}
}
}
@@ -164,6 +171,7 @@ class FakeNoWrite extends DataSourceRegister with TableProvider {
new Table {
override def name(): String = "fake"
override def schema(): StructType = StructType(Nil)
+ override def capabilities(): util.Set[TableCapability] = Collections.emptySet()
}
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
index 132b0e4..68f4b2d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlanner
import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.v2.V2WriteSupportCheck
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionResourceLoader, SessionState}
@@ -86,6 +87,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
override val extendedCheckRules: Seq[LogicalPlan => Unit] =
PreWriteCheck +:
PreReadCheck +:
+ V2WriteSupportCheck +:
customCheckRules
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org