You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/04/26 07:44:52 UTC
[spark] branch master updated: [SPARK-27190][SQL] add table
capability for streaming
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 85fd552 [SPARK-27190][SQL] add table capability for streaming
85fd552 is described below
commit 85fd552ed6304967f25574baef3cf9657957bcb1
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Fri Apr 26 15:44:23 2019 +0800
[SPARK-27190][SQL] add table capability for streaming
## What changes were proposed in this pull request?
This is a followup of https://github.com/apache/spark/pull/24012 , to add the corresponding capabilities for streaming.
## How was this patch tested?
existing tests
Closes #24129 from cloud-fan/capability.
Authored-by: Wenchen Fan <we...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../spark/sql/kafka010/KafkaSourceProvider.scala | 11 +-
.../sql/sources/v2/SupportsContinuousRead.java | 35 ------
.../sql/sources/v2/SupportsMicroBatchRead.java | 35 ------
.../sql/sources/v2/SupportsStreamingWrite.java | 34 ------
.../spark/sql/sources/v2/TableCapability.java | 19 +++
.../apache/spark/sql/sources/v2/reader/Scan.java | 10 +-
.../datasources/noop/NoopDataSource.scala | 7 +-
.../v2/V2StreamingScanSupportCheck.scala | 64 ++++++++++
.../execution/streaming/MicroBatchExecution.scala | 61 +++++-----
.../sql/execution/streaming/StreamExecution.scala | 4 +-
.../spark/sql/execution/streaming/console.scala | 9 +-
.../streaming/continuous/ContinuousExecution.scala | 22 ++--
.../spark/sql/execution/streaming/memory.scala | 9 +-
.../streaming/sources/ForeachWriterTable.scala | 12 +-
.../streaming/sources/RateStreamProvider.scala | 9 +-
.../sources/TextSocketSourceProvider.scala | 9 +-
.../sql/execution/streaming/sources/memoryV2.scala | 10 +-
.../sql/internal/BaseSessionStateBuilder.scala | 3 +-
.../spark/sql/streaming/DataStreamReader.scala | 5 +-
.../spark/sql/streaming/DataStreamWriter.scala | 7 +-
.../sql/streaming/StreamingQueryManager.scala | 6 +-
.../v2/V2StreamingScanSupportCheckSuite.scala | 130 +++++++++++++++++++++
.../sources/StreamingDataSourceV2Suite.scala | 106 ++++++++++-------
.../spark/sql/hive/HiveSessionStateBuilder.scala | 3 +-
24 files changed, 389 insertions(+), 231 deletions(-)
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index f7a2032..bb76a30 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.kafka010
import java.{util => ju}
-import java.util.{Collections, Locale, UUID}
+import java.util.{Locale, UUID}
import scala.collection.JavaConverters._
@@ -29,9 +29,10 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
import org.apache.spark.internal.Logging
import org.apache.spark.kafka010.KafkaConfigUpdater
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
-import org.apache.spark.sql.execution.streaming.{Sink, Source}
+import org.apache.spark.sql.execution.streaming.{BaseStreamingSink, Sink, Source}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.sources.v2._
+import org.apache.spark.sql.sources.v2.TableCapability._
import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.sources.v2.writer.WriteBuilder
@@ -353,13 +354,15 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
}
class KafkaTable(strategy: => ConsumerStrategy) extends Table
- with SupportsMicroBatchRead with SupportsContinuousRead with SupportsStreamingWrite {
+ with SupportsRead with SupportsWrite with BaseStreamingSink {
override def name(): String = s"Kafka $strategy"
override def schema(): StructType = KafkaOffsetReader.kafkaSchema
- override def capabilities(): ju.Set[TableCapability] = Collections.emptySet()
+ override def capabilities(): ju.Set[TableCapability] = {
+ Set(MICRO_BATCH_READ, CONTINUOUS_READ, STREAMING_WRITE).asJava
+ }
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder =
() => new KafkaScan(options)
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsContinuousRead.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsContinuousRead.java
deleted file mode 100644
index 5cc9848..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsContinuousRead.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2;
-
-import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.sources.v2.reader.Scan;
-import org.apache.spark.sql.sources.v2.reader.ScanBuilder;
-import org.apache.spark.sql.util.CaseInsensitiveStringMap;
-
-/**
- * An empty mix-in interface for {@link Table}, to indicate this table supports streaming scan with
- * continuous mode.
- * <p>
- * If a {@link Table} implements this interface, the
- * {@link SupportsRead#newScanBuilder(CaseInsensitiveStringMap)} must return a {@link ScanBuilder}
- * that builds {@link Scan} with {@link Scan#toContinuousStream(String)} implemented.
- * </p>
- */
-@Evolving
-public interface SupportsContinuousRead extends SupportsRead { }
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsMicroBatchRead.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsMicroBatchRead.java
deleted file mode 100644
index c98f3f1..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsMicroBatchRead.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2;
-
-import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.sources.v2.reader.Scan;
-import org.apache.spark.sql.sources.v2.reader.ScanBuilder;
-import org.apache.spark.sql.util.CaseInsensitiveStringMap;
-
-/**
- * An empty mix-in interface for {@link Table}, to indicate this table supports streaming scan with
- * micro-batch mode.
- * <p>
- * If a {@link Table} implements this interface, the
- * {@link SupportsRead#newScanBuilder(CaseInsensitiveStringMap)} must return a {@link ScanBuilder}
- * that builds {@link Scan} with {@link Scan#toMicroBatchStream(String)} implemented.
- * </p>
- */
-@Evolving
-public interface SupportsMicroBatchRead extends SupportsRead { }
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsStreamingWrite.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsStreamingWrite.java
deleted file mode 100644
index ac11e48..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsStreamingWrite.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2;
-
-import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
-import org.apache.spark.sql.sources.v2.writer.WriteBuilder;
-import org.apache.spark.sql.util.CaseInsensitiveStringMap;
-
-/**
- * An empty mix-in interface for {@link Table}, to indicate this table supports streaming write.
- * <p>
- * If a {@link Table} implements this interface, the
- * {@link SupportsWrite#newWriteBuilder(CaseInsensitiveStringMap)} must return a
- * {@link WriteBuilder} with {@link WriteBuilder#buildForStreaming()} implemented.
- * </p>
- */
-@Evolving
-public interface SupportsStreamingWrite extends SupportsWrite, BaseStreamingSink { }
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableCapability.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableCapability.java
index 8d3fdcd..4640c61 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableCapability.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableCapability.java
@@ -34,6 +34,16 @@ public enum TableCapability {
BATCH_READ,
/**
+ * Signals that the table supports reads in micro-batch streaming execution mode.
+ */
+ MICRO_BATCH_READ,
+
+ /**
+ * Signals that the table supports reads in continuous streaming execution mode.
+ */
+ CONTINUOUS_READ,
+
+ /**
* Signals that the table supports append writes in batch execution mode.
* <p>
* Tables that return this capability must support appending data and may also support additional
@@ -43,6 +53,15 @@ public enum TableCapability {
BATCH_WRITE,
/**
+ * Signals that the table supports append writes in streaming execution mode.
+ * <p>
+ * Tables that return this capability must support appending data and may also support additional
+ * write modes, like {@link #TRUNCATE}, {@link #OVERWRITE_BY_FILTER}, and
+ * {@link #OVERWRITE_DYNAMIC}.
+ */
+ STREAMING_WRITE,
+
+ /**
* Signals that the table can be truncated in a write operation.
* <p>
* Truncating a table removes all existing rows.
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java
index f6085b9..c3964e2 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java
@@ -21,8 +21,6 @@ import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousStream;
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchStream;
import org.apache.spark.sql.types.StructType;
-import org.apache.spark.sql.sources.v2.SupportsContinuousRead;
-import org.apache.spark.sql.sources.v2.SupportsMicroBatchRead;
import org.apache.spark.sql.sources.v2.Table;
import org.apache.spark.sql.sources.v2.TableCapability;
@@ -74,8 +72,8 @@ public interface Scan {
/**
* Returns the physical representation of this scan for streaming query with micro-batch mode. By
* default this method throws exception, data sources must overwrite this method to provide an
- * implementation, if the {@link Table} that creates this scan implements
- * {@link SupportsMicroBatchRead}.
+ * implementation, if the {@link Table} that creates this scan returns
+ * {@link TableCapability#MICRO_BATCH_READ} support in its {@link Table#capabilities()}.
*
* @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure
* recovery. Data streams for the same logical source in the same query
@@ -90,8 +88,8 @@ public interface Scan {
/**
* Returns the physical representation of this scan for streaming query with continuous mode. By
* default this method throws exception, data sources must overwrite this method to provide an
- * implementation, if the {@link Table} that creates this scan implements
- * {@link SupportsContinuousRead}.
+ * implementation, if the {@link Table} that creates this scan returns
+ * {@link TableCapability#CONTINUOUS_READ} support in its {@link Table#capabilities()}.
*
* @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure
* recovery. Data streams for the same logical source in the same query
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
index 96a78d3..c8b2f65 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala
@@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.streaming.BaseStreamingSink
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.writer._
@@ -39,11 +40,13 @@ class NoopDataSource extends TableProvider with DataSourceRegister {
override def getTable(options: CaseInsensitiveStringMap): Table = NoopTable
}
-private[noop] object NoopTable extends Table with SupportsWrite with SupportsStreamingWrite {
+private[noop] object NoopTable extends Table with SupportsWrite with BaseStreamingSink {
override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = NoopWriteBuilder
override def name(): String = "noop-table"
override def schema(): StructType = new StructType()
- override def capabilities(): util.Set[TableCapability] = Set(TableCapability.BATCH_WRITE).asJava
+ override def capabilities(): util.Set[TableCapability] = {
+ Set(TableCapability.BATCH_WRITE, TableCapability.STREAMING_WRITE).asJava
+ }
}
private[noop] object NoopWriteBuilder extends WriteBuilder
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2StreamingScanSupportCheck.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2StreamingScanSupportCheck.scala
new file mode 100644
index 0000000..c029acc
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2StreamingScanSupportCheck.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.streaming.{StreamingRelation, StreamingRelationV2}
+import org.apache.spark.sql.sources.v2.TableCapability.{CONTINUOUS_READ, MICRO_BATCH_READ}
+
+/**
+ * This rules adds some basic table capability check for streaming scan, without knowing the actual
+ * streaming execution mode.
+ */
+object V2StreamingScanSupportCheck extends (LogicalPlan => Unit) {
+ import DataSourceV2Implicits._
+
+ override def apply(plan: LogicalPlan): Unit = {
+ plan.foreach {
+ case r: StreamingRelationV2 if !r.table.supportsAny(MICRO_BATCH_READ, CONTINUOUS_READ) =>
+ throw new AnalysisException(
+ s"Table ${r.table.name()} does not support either micro-batch or continuous scan.")
+ case _ =>
+ }
+
+ val streamingSources = plan.collect {
+ case r: StreamingRelationV2 => r.table
+ }
+ val v1StreamingRelations = plan.collect {
+ case r: StreamingRelation => r
+ }
+
+ if (streamingSources.length + v1StreamingRelations.length > 1) {
+ val allSupportsMicroBatch = streamingSources.forall(_.supports(MICRO_BATCH_READ))
+ // v1 streaming data source only supports micro-batch.
+ val allSupportsContinuous = streamingSources.forall(_.supports(CONTINUOUS_READ)) &&
+ v1StreamingRelations.isEmpty
+ if (!allSupportsMicroBatch && !allSupportsContinuous) {
+ val microBatchSources =
+ streamingSources.filter(_.supports(MICRO_BATCH_READ)).map(_.name()) ++
+ v1StreamingRelations.map(_.sourceName)
+ val continuousSources = streamingSources.filter(_.supports(CONTINUOUS_READ)).map(_.name())
+ throw new AnalysisException(
+ "The streaming sources in a query do not have a common supported execution mode.\n" +
+ "Sources support micro-batch: " + microBatchSources.mkString(", ") + "\n" +
+ "Sources support continuous: " + continuousSources.mkString(", "))
+ }
+ }
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index fdd80cc..d9fe836 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.execution.streaming
-import scala.collection.JavaConverters._
import scala.collection.mutable.{Map => MutableMap}
import org.apache.spark.sql.{Dataset, SparkSession}
@@ -78,6 +77,7 @@ class MicroBatchExecution(
val disabledSources =
sparkSession.sqlContext.conf.disabledV2StreamingMicroBatchReaders.split(",")
+ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
val _logicalPlan = analyzedPlan.transform {
case streamingRelation@StreamingRelation(dataSourceV1, sourceName, output) =>
toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
@@ -88,31 +88,33 @@ class MicroBatchExecution(
logInfo(s"Using Source [$source] from DataSourceV1 named '$sourceName' [$dataSourceV1]")
StreamingExecutionRelation(source, output)(sparkSession)
})
- case s @ StreamingRelationV2(ds, dsName, table: SupportsMicroBatchRead, options, output, _)
- if !disabledSources.contains(ds.getClass.getCanonicalName) =>
- v2ToRelationMap.getOrElseUpdate(s, {
- // Materialize source to avoid creating it in every batch
- val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
- nextSourceId += 1
- logInfo(s"Reading table [$table] from DataSourceV2 named '$dsName' [$ds]")
- // TODO: operator pushdown.
- val scan = table.newScanBuilder(options).build()
- val stream = scan.toMicroBatchStream(metadataPath)
- StreamingDataSourceV2Relation(output, scan, stream)
- })
- case s @ StreamingRelationV2(ds, dsName, _, _, output, v1Relation) =>
- v2ToExecutionRelationMap.getOrElseUpdate(s, {
- // Materialize source to avoid creating it in every batch
- val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
- if (v1Relation.isEmpty) {
- throw new UnsupportedOperationException(
- s"Data source $dsName does not support microbatch processing.")
- }
- val source = v1Relation.get.dataSource.createSource(metadataPath)
- nextSourceId += 1
- logInfo(s"Using Source [$source] from DataSourceV2 named '$dsName' [$ds]")
- StreamingExecutionRelation(source, output)(sparkSession)
- })
+
+ case s @ StreamingRelationV2(src, srcName, table: SupportsRead, options, output, v1) =>
+ val v2Disabled = disabledSources.contains(src.getClass.getCanonicalName)
+ if (!v2Disabled && table.supports(TableCapability.MICRO_BATCH_READ)) {
+ v2ToRelationMap.getOrElseUpdate(s, {
+ // Materialize source to avoid creating it in every batch
+ val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
+ nextSourceId += 1
+ logInfo(s"Reading table [$table] from DataSourceV2 named '$srcName' [$src]")
+ // TODO: operator pushdown.
+ val scan = table.newScanBuilder(options).build()
+ val stream = scan.toMicroBatchStream(metadataPath)
+ StreamingDataSourceV2Relation(output, scan, stream)
+ })
+ } else if (v1.isEmpty) {
+ throw new UnsupportedOperationException(
+ s"Data source $srcName does not support microbatch processing.")
+ } else {
+ v2ToExecutionRelationMap.getOrElseUpdate(s, {
+ // Materialize source to avoid creating it in every batch
+ val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
+ val source = v1.get.dataSource.createSource(metadataPath)
+ nextSourceId += 1
+ logInfo(s"Using Source [$source] from DataSourceV2 named '$srcName' [$src]")
+ StreamingExecutionRelation(source, output)(sparkSession)
+ })
+ }
}
sources = _logicalPlan.collect {
// v1 source
@@ -122,8 +124,9 @@ class MicroBatchExecution(
}
uniqueSources = sources.distinct
+ // TODO (SPARK-27484): we should add the writing node before the plan is analyzed.
sink match {
- case s: SupportsStreamingWrite =>
+ case s: SupportsWrite =>
val streamingWrite = createStreamingWrite(s, extraOptions, _logicalPlan)
WriteToMicroBatchDataSource(streamingWrite, _logicalPlan)
@@ -519,7 +522,7 @@ class MicroBatchExecution(
val triggerLogicalPlan = sink match {
case _: Sink => newAttributePlan
- case _: SupportsStreamingWrite =>
+ case _: SupportsWrite =>
newAttributePlan.asInstanceOf[WriteToMicroBatchDataSource].createPlan(currentBatchId)
case _ => throw new IllegalArgumentException(s"unknown sink type for $sink")
}
@@ -550,7 +553,7 @@ class MicroBatchExecution(
SQLExecution.withNewExecutionId(sparkSessionToRunBatch, lastExecution) {
sink match {
case s: Sink => s.addBatch(currentBatchId, nextBatch)
- case _: SupportsStreamingWrite =>
+ case _: SupportsWrite =>
// This doesn't accumulate any data - it just forces execution of the microbatch writer.
nextBatch.collect()
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index cc44193..fd95961 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -40,7 +40,7 @@ import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.command.StreamingExplainCommand
import org.apache.spark.sql.execution.datasources.v2.StreamWriterCommitProgress
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.v2.SupportsStreamingWrite
+import org.apache.spark.sql.sources.v2.SupportsWrite
import org.apache.spark.sql.sources.v2.writer.SupportsTruncate
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
import org.apache.spark.sql.streaming._
@@ -582,7 +582,7 @@ abstract class StreamExecution(
}
protected def createStreamingWrite(
- table: SupportsStreamingWrite,
+ table: SupportsWrite,
options: Map[String, String],
inputPlan: LogicalPlan): StreamingWrite = {
val writeBuilder = table.newWriteBuilder(new CaseInsensitiveStringMap(options.asJava))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
index 884b92a..c7161d3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
@@ -18,7 +18,8 @@
package org.apache.spark.sql.execution.streaming
import java.util
-import java.util.Collections
+
+import scala.collection.JavaConverters._
import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming.sources.ConsoleWrite
@@ -60,13 +61,15 @@ class ConsoleSinkProvider extends TableProvider
def shortName(): String = "console"
}
-object ConsoleTable extends Table with SupportsStreamingWrite {
+object ConsoleTable extends Table with SupportsWrite with BaseStreamingSink {
override def name(): String = "console"
override def schema(): StructType = StructType(Nil)
- override def capabilities(): util.Set[TableCapability] = Collections.emptySet()
+ override def capabilities(): util.Set[TableCapability] = {
+ Set(TableCapability.STREAMING_WRITE).asJava
+ }
override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
new WriteBuilder with SupportsTruncate {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index 4c13e72..a1fb212 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
import org.apache.spark.sql.execution.streaming.{StreamingRelationV2, _}
import org.apache.spark.sql.sources.v2
-import org.apache.spark.sql.sources.v2.{SupportsContinuousRead, SupportsStreamingWrite}
+import org.apache.spark.sql.sources.v2.{SupportsRead, SupportsWrite, TableCapability}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset}
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
import org.apache.spark.util.Clock
@@ -42,14 +42,14 @@ class ContinuousExecution(
name: String,
checkpointRoot: String,
analyzedPlan: LogicalPlan,
- sink: SupportsStreamingWrite,
+ sink: SupportsWrite,
trigger: Trigger,
triggerClock: Clock,
outputMode: OutputMode,
extraOptions: Map[String, String],
deleteCheckpointOnStop: Boolean)
extends StreamExecution(
- sparkSession, name, checkpointRoot, analyzedPlan, sink,
+ sparkSession, name, checkpointRoot, analyzedPlan, sink.asInstanceOf[BaseStreamingSink],
trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
@volatile protected var sources: Seq[ContinuousStream] = Seq()
@@ -63,22 +63,23 @@ class ContinuousExecution(
override val logicalPlan: WriteToContinuousDataSource = {
val v2ToRelationMap = MutableMap[StreamingRelationV2, StreamingDataSourceV2Relation]()
var nextSourceId = 0
+ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
val _logicalPlan = analyzedPlan.transform {
- case s @ StreamingRelationV2(
- ds, dsName, table: SupportsContinuousRead, options, output, _) =>
+ case s @ StreamingRelationV2(ds, sourceName, table: SupportsRead, options, output, _) =>
+ if (!table.supports(TableCapability.CONTINUOUS_READ)) {
+ throw new UnsupportedOperationException(
+ s"Data source $sourceName does not support continuous processing.")
+ }
+
v2ToRelationMap.getOrElseUpdate(s, {
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
nextSourceId += 1
- logInfo(s"Reading table [$table] from DataSourceV2 named '$dsName' [$ds]")
+ logInfo(s"Reading table [$table] from DataSourceV2 named '$sourceName' [$ds]")
// TODO: operator pushdown.
val scan = table.newScanBuilder(options).build()
val stream = scan.toContinuousStream(metadataPath)
StreamingDataSourceV2Relation(output, scan, stream)
})
-
- case StreamingRelationV2(_, sourceName, _, _, _, _) =>
- throw new UnsupportedOperationException(
- s"Data source $sourceName does not support continuous processing.")
}
sources = _logicalPlan.collect {
@@ -86,6 +87,7 @@ class ContinuousExecution(
}
uniqueSources = sources.distinct
+ // TODO (SPARK-27484): we should add the writing node before the plan is analyzed.
WriteToContinuousDataSource(
createStreamingWrite(sink, extraOptions, _logicalPlan), _logicalPlan)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index bfa9c09..0dcbdd3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -18,10 +18,10 @@
package org.apache.spark.sql.execution.streaming
import java.util
-import java.util.Collections
import java.util.concurrent.atomic.AtomicInteger
import javax.annotation.concurrent.GuardedBy
+import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.util.control.NonFatal
@@ -92,14 +92,15 @@ object MemoryStreamTableProvider extends TableProvider {
}
}
-class MemoryStreamTable(val stream: MemoryStreamBase[_]) extends Table
- with SupportsMicroBatchRead with SupportsContinuousRead {
+class MemoryStreamTable(val stream: MemoryStreamBase[_]) extends Table with SupportsRead {
override def name(): String = "MemoryStreamDataSource"
override def schema(): StructType = stream.fullSchema()
- override def capabilities(): util.Set[TableCapability] = Collections.emptySet()
+ override def capabilities(): util.Set[TableCapability] = {
+ Set(TableCapability.MICRO_BATCH_READ, TableCapability.CONTINUOUS_READ).asJava
+ }
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
new MemoryStreamScanBuilder(stream)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala
index 807e0b1..838ede6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala
@@ -18,14 +18,16 @@
package org.apache.spark.sql.execution.streaming.sources
import java.util
-import java.util.Collections
+
+import scala.collection.JavaConverters._
import org.apache.spark.sql.{ForeachWriter, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.python.PythonForeachWriter
-import org.apache.spark.sql.sources.v2.{SupportsStreamingWrite, Table, TableCapability}
+import org.apache.spark.sql.execution.streaming.BaseStreamingSink
+import org.apache.spark.sql.sources.v2.{SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.sources.v2.writer.{DataWriter, SupportsTruncate, WriteBuilder, WriterCommitMessage}
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite}
import org.apache.spark.sql.types.StructType
@@ -42,13 +44,15 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
case class ForeachWriterTable[T](
writer: ForeachWriter[T],
converter: Either[ExpressionEncoder[T], InternalRow => T])
- extends Table with SupportsStreamingWrite {
+ extends Table with SupportsWrite with BaseStreamingSink {
override def name(): String = "ForeachSink"
override def schema(): StructType = StructType(Nil)
- override def capabilities(): util.Set[TableCapability] = Collections.emptySet()
+ override def capabilities(): util.Set[TableCapability] = {
+ Set(TableCapability.STREAMING_WRITE).asJava
+ }
override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
new WriteBuilder with SupportsTruncate {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
index a652eeb..f61e9db 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
@@ -18,7 +18,8 @@
package org.apache.spark.sql.execution.streaming.sources
import java.util
-import java.util.Collections
+
+import scala.collection.JavaConverters._
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.sql.SparkSession
@@ -78,7 +79,7 @@ class RateStreamTable(
rowsPerSecond: Long,
rampUpTimeSeconds: Long,
numPartitions: Int)
- extends Table with SupportsMicroBatchRead with SupportsContinuousRead {
+ extends Table with SupportsRead {
override def name(): String = {
s"RateStream(rowsPerSecond=$rowsPerSecond, rampUpTimeSeconds=$rampUpTimeSeconds, " +
@@ -87,7 +88,9 @@ class RateStreamTable(
override def schema(): StructType = RateStreamProvider.SCHEMA
- override def capabilities(): util.Set[TableCapability] = Collections.emptySet()
+ override def capabilities(): util.Set[TableCapability] = {
+ Set(TableCapability.MICRO_BATCH_READ, TableCapability.CONTINUOUS_READ).asJava
+ }
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = () => new Scan {
override def readSchema(): StructType = RateStreamProvider.SCHEMA
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
index a0452cf..0f807e2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
@@ -19,8 +19,9 @@ package org.apache.spark.sql.execution.streaming.sources
import java.text.SimpleDateFormat
import java.util
-import java.util.{Collections, Locale}
+import java.util.Locale
+import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}
import org.apache.spark.internal.Logging
@@ -67,7 +68,7 @@ class TextSocketSourceProvider extends TableProvider with DataSourceRegister wit
}
class TextSocketTable(host: String, port: Int, numPartitions: Int, includeTimestamp: Boolean)
- extends Table with SupportsMicroBatchRead with SupportsContinuousRead {
+ extends Table with SupportsRead {
override def name(): String = s"Socket[$host:$port]"
@@ -79,7 +80,9 @@ class TextSocketTable(host: String, port: Int, numPartitions: Int, includeTimest
}
}
- override def capabilities(): util.Set[TableCapability] = Collections.emptySet()
+ override def capabilities(): util.Set[TableCapability] = {
+ Set(TableCapability.MICRO_BATCH_READ, TableCapability.CONTINUOUS_READ).asJava
+ }
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = () => new Scan {
override def readSchema(): StructType = schema()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
index 8eb5de0..219e25c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
@@ -18,9 +18,9 @@
package org.apache.spark.sql.execution.streaming.sources
import java.util
-import java.util.Collections
import javax.annotation.concurrent.GuardedBy
+import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
import org.apache.spark.sql.execution.streaming.{MemorySinkBase, Sink}
-import org.apache.spark.sql.sources.v2.{SupportsStreamingWrite, TableCapability}
+import org.apache.spark.sql.sources.v2.{SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite}
import org.apache.spark.sql.types.StructType
@@ -43,13 +43,15 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
* A sink that stores the results in memory. This [[Sink]] is primarily intended for use in unit
* tests and does not provide durability.
*/
-class MemorySinkV2 extends SupportsStreamingWrite with MemorySinkBase with Logging {
+class MemorySinkV2 extends Table with SupportsWrite with MemorySinkBase with Logging {
override def name(): String = "MemorySinkV2"
override def schema(): StructType = StructType(Nil)
- override def capabilities(): util.Set[TableCapability] = Collections.emptySet()
+ override def capabilities(): util.Set[TableCapability] = {
+ Set(TableCapability.STREAMING_WRITE).asJava
+ }
override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
new WriteBuilder with SupportsTruncate {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
index 3d29ff3..0275df9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{QueryExecution, SparkOptimizer, SparkPlanner, SparkSqlParser}
import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.datasources.v2.V2WriteSupportCheck
+import org.apache.spark.sql.execution.datasources.v2.{V2StreamingScanSupportCheck, V2WriteSupportCheck}
import org.apache.spark.sql.streaming.StreamingQueryManager
import org.apache.spark.sql.util.ExecutionListenerManager
@@ -177,6 +177,7 @@ abstract class BaseSessionStateBuilder(
PreReadCheck +:
HiveOnlyCheck +:
V2WriteSupportCheck +:
+ V2StreamingScanSupportCheck +:
customCheckRules
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 01f29cd..01083a9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
import org.apache.spark.sql.execution.streaming.{StreamingRelation, StreamingRelationV2}
import org.apache.spark.sql.sources.StreamSourceProvider
import org.apache.spark.sql.sources.v2._
+import org.apache.spark.sql.sources.v2.TableCapability._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -181,8 +182,9 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
case Some(schema) => provider.getTable(dsOptions, schema)
case _ => provider.getTable(dsOptions)
}
+ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
table match {
- case _: SupportsMicroBatchRead | _: SupportsContinuousRead =>
+ case _: SupportsRead if table.supportsAny(MICRO_BATCH_READ, CONTINUOUS_READ) =>
Dataset.ofRows(
sparkSession,
StreamingRelationV2(
@@ -190,6 +192,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
sparkSession))
// fallback to v1
+ // TODO (SPARK-27483): we should move this fallback logic to an analyzer rule.
case _ => Dataset.ofRows(sparkSession, StreamingRelation(v1DataSource))
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index 33d032e..d2df3a5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -31,7 +31,8 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
import org.apache.spark.sql.execution.streaming.sources._
-import org.apache.spark.sql.sources.v2.{SupportsStreamingWrite, TableProvider}
+import org.apache.spark.sql.sources.v2.{SupportsWrite, TableProvider}
+import org.apache.spark.sql.sources.v2.TableCapability._
import org.apache.spark.sql.util.CaseInsensitiveStringMap
/**
@@ -315,8 +316,10 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
source = provider, conf = df.sparkSession.sessionState.conf)
val options = sessionOptions ++ extraOptions
val dsOptions = new CaseInsensitiveStringMap(options.asJava)
+ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
provider.getTable(dsOptions) match {
- case s: SupportsStreamingWrite => s
+ case table: SupportsWrite if table.supports(STREAMING_WRITE) =>
+ table.asInstanceOf[BaseStreamingSink]
case _ => createV1Sink()
}
} else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 2e019ba..5a08049 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.streaming.continuous.{ContinuousExecution,
import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.STREAMING_QUERY_LISTENERS
-import org.apache.spark.sql.sources.v2.SupportsStreamingWrite
+import org.apache.spark.sql.sources.v2.SupportsWrite
import org.apache.spark.util.{Clock, SystemClock, Utils}
/**
@@ -261,7 +261,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
}
(sink, trigger) match {
- case (v2Sink: SupportsStreamingWrite, trigger: ContinuousTrigger) =>
+ case (table: SupportsWrite, trigger: ContinuousTrigger) =>
if (operationCheckEnabled) {
UnsupportedOperationChecker.checkForContinuous(analyzedPlan, outputMode)
}
@@ -270,7 +270,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
userSpecifiedName.orNull,
checkpointLocation,
analyzedPlan,
- v2Sink,
+ table,
trigger,
triggerClock,
outputMode,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2StreamingScanSupportCheckSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2StreamingScanSupportCheckSuite.scala
new file mode 100644
index 0000000..8a0450f
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2StreamingScanSupportCheckSuite.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext}
+import org.apache.spark.sql.catalyst.plans.logical.Union
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.streaming.{Offset, Source, StreamingRelation, StreamingRelationV2}
+import org.apache.spark.sql.sources.StreamSourceProvider
+import org.apache.spark.sql.sources.v2.{Table, TableCapability, TableProvider}
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class V2StreamingScanSupportCheckSuite extends SparkFunSuite with SharedSparkSession {
+ import TableCapability._
+
+ private def createStreamingRelation(table: Table, v1Relation: Option[StreamingRelation]) = {
+ StreamingRelationV2(FakeTableProvider, "fake", table, CaseInsensitiveStringMap.empty(),
+ FakeTableProvider.schema.toAttributes, v1Relation)(spark)
+ }
+
+ private def createStreamingRelationV1() = {
+ StreamingRelation(DataSource(spark, classOf[FakeStreamSourceProvider].getName))
+ }
+
+ test("check correct plan") {
+ val plan1 = createStreamingRelation(CapabilityTable(MICRO_BATCH_READ), None)
+ val plan2 = createStreamingRelation(CapabilityTable(CONTINUOUS_READ), None)
+ val plan3 = createStreamingRelation(CapabilityTable(MICRO_BATCH_READ, CONTINUOUS_READ), None)
+ val plan4 = createStreamingRelationV1()
+
+ V2StreamingScanSupportCheck(Union(plan1, plan1))
+ V2StreamingScanSupportCheck(Union(plan2, plan2))
+ V2StreamingScanSupportCheck(Union(plan1, plan3))
+ V2StreamingScanSupportCheck(Union(plan2, plan3))
+ V2StreamingScanSupportCheck(Union(plan1, plan4))
+ V2StreamingScanSupportCheck(Union(plan3, plan4))
+ }
+
+ test("table without scan capability") {
+ val e = intercept[AnalysisException] {
+ V2StreamingScanSupportCheck(createStreamingRelation(CapabilityTable(), None))
+ }
+ assert(e.message.contains("does not support either micro-batch or continuous scan"))
+ }
+
+ test("mix micro-batch only and continuous only") {
+ val plan1 = createStreamingRelation(CapabilityTable(MICRO_BATCH_READ), None)
+ val plan2 = createStreamingRelation(CapabilityTable(CONTINUOUS_READ), None)
+
+ val e = intercept[AnalysisException] {
+ V2StreamingScanSupportCheck(Union(plan1, plan2))
+ }
+ assert(e.message.contains(
+ "The streaming sources in a query do not have a common supported execution mode"))
+ }
+
+ test("mix continuous only and v1 relation") {
+ val plan1 = createStreamingRelation(CapabilityTable(CONTINUOUS_READ), None)
+ val plan2 = createStreamingRelationV1()
+ val e = intercept[AnalysisException] {
+ V2StreamingScanSupportCheck(Union(plan1, plan2))
+ }
+ assert(e.message.contains(
+ "The streaming sources in a query do not have a common supported execution mode"))
+ }
+}
+
+private object FakeTableProvider extends TableProvider {
+ val schema = new StructType().add("i", "int")
+
+ override def getTable(options: CaseInsensitiveStringMap): Table = {
+ throw new UnsupportedOperationException
+ }
+}
+
+private case class CapabilityTable(_capabilities: TableCapability*) extends Table {
+ override def name(): String = "capability_test_table"
+ override def schema(): StructType = FakeTableProvider.schema
+ override def capabilities(): util.Set[TableCapability] = _capabilities.toSet.asJava
+}
+
+private class FakeStreamSourceProvider extends StreamSourceProvider {
+ override def sourceSchema(
+ sqlContext: SQLContext,
+ schema: Option[StructType],
+ providerName: String,
+ parameters: Map[String, String]): (String, StructType) = {
+ "fake" -> FakeTableProvider.schema
+ }
+
+ override def createSource(
+ sqlContext: SQLContext,
+ metadataPath: String,
+ schema: Option[StructType],
+ providerName: String,
+ parameters: Map[String, String]): Source = {
+ new Source {
+ override def schema: StructType = FakeTableProvider.schema
+ override def getOffset: Option[Offset] = {
+ throw new UnsupportedOperationException
+ }
+ override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+ throw new UnsupportedOperationException
+ }
+ override def stop(): Unit = {}
+ }
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
index f022ede..25a68e4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
@@ -20,13 +20,16 @@ package org.apache.spark.sql.streaming.sources
import java.util
import java.util.Collections
-import org.apache.spark.sql.{DataFrame, SQLContext}
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext}
import org.apache.spark.sql.execution.datasources.DataSource
-import org.apache.spark.sql.execution.streaming.{RateStreamOffset, Sink, StreamingQueryWrapper}
+import org.apache.spark.sql.execution.streaming.{BaseStreamingSink, RateStreamOffset, Sink, StreamingQueryWrapper}
import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.sources.v2._
+import org.apache.spark.sql.sources.v2.TableCapability._
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming._
import org.apache.spark.sql.sources.v2.writer.{WriteBuilder, WriterCommitMessage}
@@ -77,24 +80,12 @@ class FakeWriteBuilder extends WriteBuilder with StreamingWrite {
}
}
-trait FakeMicroBatchReadTable extends Table with SupportsMicroBatchRead {
- override def name(): String = "fake"
- override def schema(): StructType = StructType(Seq())
- override def capabilities(): util.Set[TableCapability] = Collections.emptySet()
- override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new FakeScanBuilder
-}
-
-trait FakeContinuousReadTable extends Table with SupportsContinuousRead {
- override def name(): String = "fake"
- override def schema(): StructType = StructType(Seq())
- override def capabilities(): util.Set[TableCapability] = Collections.emptySet()
- override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new FakeScanBuilder
-}
-
-trait FakeStreamingWriteTable extends Table with SupportsStreamingWrite {
+trait FakeStreamingWriteTable extends Table with SupportsWrite with BaseStreamingSink {
override def name(): String = "fake"
override def schema(): StructType = StructType(Seq())
- override def capabilities(): util.Set[TableCapability] = Collections.emptySet()
+ override def capabilities(): util.Set[TableCapability] = {
+ Set(STREAMING_WRITE).asJava
+ }
override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
new FakeWriteBuilder
}
@@ -110,7 +101,16 @@ class FakeReadMicroBatchOnly
override def getTable(options: CaseInsensitiveStringMap): Table = {
LastReadOptions.options = options
- new FakeMicroBatchReadTable {}
+ new Table with SupportsRead {
+ override def name(): String = "fake"
+ override def schema(): StructType = StructType(Seq())
+ override def capabilities(): util.Set[TableCapability] = {
+ Set(MICRO_BATCH_READ).asJava
+ }
+ override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
+ new FakeScanBuilder
+ }
+ }
}
}
@@ -124,7 +124,16 @@ class FakeReadContinuousOnly
override def getTable(options: CaseInsensitiveStringMap): Table = {
LastReadOptions.options = options
- new FakeContinuousReadTable {}
+ new Table with SupportsRead {
+ override def name(): String = "fake"
+ override def schema(): StructType = StructType(Seq())
+ override def capabilities(): util.Set[TableCapability] = {
+ Set(CONTINUOUS_READ).asJava
+ }
+ override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
+ new FakeScanBuilder
+ }
+ }
}
}
@@ -132,7 +141,16 @@ class FakeReadBothModes extends DataSourceRegister with TableProvider {
override def shortName(): String = "fake-read-microbatch-continuous"
override def getTable(options: CaseInsensitiveStringMap): Table = {
- new Table with FakeMicroBatchReadTable with FakeContinuousReadTable {}
+ new Table with SupportsRead {
+ override def name(): String = "fake"
+ override def schema(): StructType = StructType(Seq())
+ override def capabilities(): util.Set[TableCapability] = {
+ Set(MICRO_BATCH_READ, CONTINUOUS_READ).asJava
+ }
+ override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
+ new FakeScanBuilder
+ }
+ }
}
}
@@ -365,39 +383,37 @@ class StreamingDataSourceV2Suite extends StreamTest {
val sinkTable = DataSource.lookupDataSource(write, spark.sqlContext.conf).getConstructor()
.newInstance().asInstanceOf[TableProvider].getTable(CaseInsensitiveStringMap.empty())
- (sourceTable, sinkTable, trigger) match {
- // Valid microbatch queries.
- case (_: SupportsMicroBatchRead, _: SupportsStreamingWrite, t)
- if !t.isInstanceOf[ContinuousTrigger] =>
- testPositiveCase(read, write, trigger)
-
- // Valid continuous queries.
- case (_: SupportsContinuousRead, _: SupportsStreamingWrite,
- _: ContinuousTrigger) =>
- testPositiveCase(read, write, trigger)
-
+ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
+ trigger match {
// Invalid - can't read at all
- case (r, _, _) if !r.isInstanceOf[SupportsMicroBatchRead] &&
- !r.isInstanceOf[SupportsContinuousRead] =>
+ case _ if !sourceTable.supportsAny(MICRO_BATCH_READ, CONTINUOUS_READ) =>
testNegativeCase(read, write, trigger,
s"Data source $read does not support streamed reading")
// Invalid - can't write
- case (_, w, _) if !w.isInstanceOf[SupportsStreamingWrite] =>
+ case _ if !sinkTable.supports(STREAMING_WRITE) =>
testNegativeCase(read, write, trigger,
s"Data source $write does not support streamed writing")
- // Invalid - trigger is continuous but reader is not
- case (r, _: SupportsStreamingWrite, _: ContinuousTrigger)
- if !r.isInstanceOf[SupportsContinuousRead] =>
- testNegativeCase(read, write, trigger,
- s"Data source $read does not support continuous processing")
+ case _: ContinuousTrigger =>
+ if (sourceTable.supports(CONTINUOUS_READ)) {
+ // Valid microbatch queries.
+ testPositiveCase(read, write, trigger)
+ } else {
+ // Invalid - trigger is continuous but reader is not
+ testNegativeCase(
+ read, write, trigger, s"Data source $read does not support continuous processing")
+ }
- // Invalid - trigger is microbatch but reader is not
- case (r, _, t) if !r.isInstanceOf[SupportsMicroBatchRead] &&
- !t.isInstanceOf[ContinuousTrigger] =>
- testPostCreationNegativeCase(read, write, trigger,
- s"Data source $read does not support microbatch processing")
+ case microBatchTrigger =>
+ if (sourceTable.supports(MICRO_BATCH_READ)) {
+ // Valid continuous queries.
+ testPositiveCase(read, write, trigger)
+ } else {
+ // Invalid - trigger is microbatch but reader is not
+ testPostCreationNegativeCase(read, write, trigger,
+ s"Data source $read does not support microbatch processing")
+ }
}
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
index 3bca770..70364e5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlanner
import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.datasources.v2.V2WriteSupportCheck
+import org.apache.spark.sql.execution.datasources.v2.{V2StreamingScanSupportCheck, V2WriteSupportCheck}
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionResourceLoader, SessionState}
@@ -89,6 +89,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
PreWriteCheck +:
PreReadCheck +:
V2WriteSupportCheck +:
+ V2StreamingScanSupportCheck +:
customCheckRules
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org