You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2018/09/24 15:49:28 UTC
spark git commit: [SPARK-25460][BRANCH-2.4][SS] DataSourceV2: SS
sources do not respect SessionConfigSupport
Repository: spark
Updated Branches:
refs/heads/branch-2.4 51d5378f8 -> ec384284e
[SPARK-25460][BRANCH-2.4][SS] DataSourceV2: SS sources do not respect SessionConfigSupport
## What changes were proposed in this pull request?
This PR proposes to backport SPARK-25460 to branch-2.4:
This PR proposes to respect `SessionConfigSupport` in SS datasources as well. Currently these are only respected in batch sources:
https://github.com/apache/spark/blob/e06da95cd9423f55cdb154a2778b0bddf7be984c/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L198-L203
https://github.com/apache/spark/blob/e06da95cd9423f55cdb154a2778b0bddf7be984c/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L244-L249
If a developer makes a datasource V2 that supports both structured streaming and batch jobs, batch jobs respect a specific configuration, let's say, URL to connect and fetch data (which end users might not be aware of); however, structured streaming ends up with not supporting this (and should explicitly be set into options).
## How was this patch tested?
Unit tests were added.
Closes #22529 from HyukjinKwon/SPARK-25460-backport.
Authored-by: hyukjinkwon <gu...@apache.org>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ec384284
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ec384284
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ec384284
Branch: refs/heads/branch-2.4
Commit: ec384284eb427d7573bd94c707777e23e4137971
Parents: 51d5378
Author: hyukjinkwon <gu...@apache.org>
Authored: Mon Sep 24 08:49:19 2018 -0700
Committer: Dongjoon Hyun <do...@apache.org>
Committed: Mon Sep 24 08:49:19 2018 -0700
----------------------------------------------------------------------
.../spark/sql/streaming/DataStreamReader.scala | 18 ++-
.../spark/sql/streaming/DataStreamWriter.scala | 16 ++-
.../sources/StreamingDataSourceV2Suite.scala | 118 ++++++++++++++++---
3 files changed, 125 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/ec384284/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 7eb5db5..a9cb5e8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -26,6 +26,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
import org.apache.spark.sql.execution.streaming.{StreamingRelation, StreamingRelationV2}
import org.apache.spark.sql.sources.StreamSourceProvider
import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, MicroBatchReadSupport}
@@ -158,7 +159,6 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
}
val ds = DataSource.lookupDataSource(source, sparkSession.sqlContext.conf).newInstance()
- val options = new DataSourceOptions(extraOptions.asJava)
// We need to generate the V1 data source so we can pass it to the V2 relation as a shim.
// We can't be sure at this point whether we'll actually want to use V2, since we don't know the
// writer or whether the query is continuous.
@@ -173,12 +173,16 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
}
ds match {
case s: MicroBatchReadSupport =>
+ val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
+ ds = s, conf = sparkSession.sessionState.conf)
+ val options = sessionOptions ++ extraOptions
+ val dataSourceOptions = new DataSourceOptions(options.asJava)
var tempReader: MicroBatchReader = null
val schema = try {
tempReader = s.createMicroBatchReader(
Optional.ofNullable(userSpecifiedSchema.orNull),
Utils.createTempDir(namePrefix = s"temporaryReader").getCanonicalPath,
- options)
+ dataSourceOptions)
tempReader.readSchema()
} finally {
// Stop tempReader to avoid side-effect thing
@@ -190,17 +194,21 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
Dataset.ofRows(
sparkSession,
StreamingRelationV2(
- s, source, extraOptions.toMap,
+ s, source, options,
schema.toAttributes, v1Relation)(sparkSession))
case s: ContinuousReadSupport =>
+ val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
+ ds = s, conf = sparkSession.sessionState.conf)
+ val options = sessionOptions ++ extraOptions
+ val dataSourceOptions = new DataSourceOptions(options.asJava)
val tempReader = s.createContinuousReader(
Optional.ofNullable(userSpecifiedSchema.orNull),
Utils.createTempDir(namePrefix = s"temporaryReader").getCanonicalPath,
- options)
+ dataSourceOptions)
Dataset.ofRows(
sparkSession,
StreamingRelationV2(
- s, source, extraOptions.toMap,
+ s, source, options,
tempReader.readSchema().toAttributes, v1Relation)(sparkSession))
case _ =>
// Code path for data source v1.
http://git-wip-us.apache.org/repos/asf/spark/blob/ec384284/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index 3b9a56f..735fd17 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
import org.apache.spark.sql.execution.streaming.sources._
@@ -298,22 +299,27 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
} else {
val ds = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
val disabledSources = df.sparkSession.sqlContext.conf.disabledV2StreamingWriters.split(",")
+ var options = extraOptions.toMap
val sink = ds.newInstance() match {
- case w: StreamWriteSupport if !disabledSources.contains(w.getClass.getCanonicalName) => w
+ case w: StreamWriteSupport if !disabledSources.contains(w.getClass.getCanonicalName) =>
+ val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
+ w, df.sparkSession.sessionState.conf)
+ options = sessionOptions ++ extraOptions
+ w
case _ =>
val ds = DataSource(
df.sparkSession,
className = source,
- options = extraOptions.toMap,
+ options = options,
partitionColumns = normalizedParCols.getOrElse(Nil))
ds.createSink(outputMode)
}
df.sparkSession.sessionState.streamingQueryManager.startQuery(
- extraOptions.get("queryName"),
- extraOptions.get("checkpointLocation"),
+ options.get("queryName"),
+ options.get("checkpointLocation"),
df,
- extraOptions.toMap,
+ options,
sink,
outputMode,
useTempCheckpointLocation = source == "console",
http://git-wip-us.apache.org/repos/asf/spark/blob/ec384284/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
index 52b833a..2565cd9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
@@ -26,11 +26,11 @@ import org.apache.spark.sql.execution.streaming.{RateStreamOffset, Sink, Streami
import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
-import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, MicroBatchReadSupport, StreamWriteSupport}
+import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader.InputPartition
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, MicroBatchReader, Offset, PartitionOffset}
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
-import org.apache.spark.sql.streaming.{OutputMode, StreamTest, Trigger}
+import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, StreamTest, Trigger}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
@@ -54,14 +54,20 @@ trait FakeMicroBatchReadSupport extends MicroBatchReadSupport {
override def createMicroBatchReader(
schema: Optional[StructType],
checkpointLocation: String,
- options: DataSourceOptions): MicroBatchReader = FakeReader()
+ options: DataSourceOptions): MicroBatchReader = {
+ LastReadOptions.options = options
+ FakeReader()
+ }
}
trait FakeContinuousReadSupport extends ContinuousReadSupport {
override def createContinuousReader(
schema: Optional[StructType],
checkpointLocation: String,
- options: DataSourceOptions): ContinuousReader = FakeReader()
+ options: DataSourceOptions): ContinuousReader = {
+ LastReadOptions.options = options
+ FakeReader()
+ }
}
trait FakeStreamWriteSupport extends StreamWriteSupport {
@@ -70,16 +76,27 @@ trait FakeStreamWriteSupport extends StreamWriteSupport {
schema: StructType,
mode: OutputMode,
options: DataSourceOptions): StreamWriter = {
+ LastWriteOptions.options = options
throw new IllegalStateException("fake sink - cannot actually write")
}
}
-class FakeReadMicroBatchOnly extends DataSourceRegister with FakeMicroBatchReadSupport {
+class FakeReadMicroBatchOnly
+ extends DataSourceRegister
+ with FakeMicroBatchReadSupport
+ with SessionConfigSupport {
override def shortName(): String = "fake-read-microbatch-only"
+
+ override def keyPrefix: String = shortName()
}
-class FakeReadContinuousOnly extends DataSourceRegister with FakeContinuousReadSupport {
+class FakeReadContinuousOnly
+ extends DataSourceRegister
+ with FakeContinuousReadSupport
+ with SessionConfigSupport {
override def shortName(): String = "fake-read-continuous-only"
+
+ override def keyPrefix: String = shortName()
}
class FakeReadBothModes extends DataSourceRegister
@@ -91,8 +108,13 @@ class FakeReadNeitherMode extends DataSourceRegister {
override def shortName(): String = "fake-read-neither-mode"
}
-class FakeWrite extends DataSourceRegister with FakeStreamWriteSupport {
+class FakeWrite
+ extends DataSourceRegister
+ with FakeStreamWriteSupport
+ with SessionConfigSupport {
override def shortName(): String = "fake-write-microbatch-continuous"
+
+ override def keyPrefix: String = shortName()
}
class FakeNoWrite extends DataSourceRegister {
@@ -120,6 +142,21 @@ class FakeWriteV1Fallback extends DataSourceRegister
override def shortName(): String = "fake-write-v1-fallback"
}
+object LastReadOptions {
+ var options: DataSourceOptions = _
+
+ def clear(): Unit = {
+ options = null
+ }
+}
+
+object LastWriteOptions {
+ var options: DataSourceOptions = _
+
+ def clear(): Unit = {
+ options = null
+ }
+}
class StreamingDataSourceV2Suite extends StreamTest {
@@ -129,6 +166,11 @@ class StreamingDataSourceV2Suite extends StreamTest {
spark.conf.set("spark.sql.streaming.checkpointLocation", fakeCheckpoint.getCanonicalPath)
}
+ override def afterEach(): Unit = {
+ LastReadOptions.clear()
+ LastWriteOptions.clear()
+ }
+
val readFormats = Seq(
"fake-read-microbatch-only",
"fake-read-continuous-only",
@@ -142,7 +184,14 @@ class StreamingDataSourceV2Suite extends StreamTest {
Trigger.ProcessingTime(1000),
Trigger.Continuous(1000))
- private def testPositiveCase(readFormat: String, writeFormat: String, trigger: Trigger) = {
+ private def testPositiveCase(readFormat: String, writeFormat: String, trigger: Trigger): Unit = {
+ testPositiveCaseWithQuery(readFormat, writeFormat, trigger)(() => _)
+ }
+
+ private def testPositiveCaseWithQuery(
+ readFormat: String,
+ writeFormat: String,
+ trigger: Trigger)(check: StreamingQuery => Unit): Unit = {
val query = spark.readStream
.format(readFormat)
.load()
@@ -150,8 +199,8 @@ class StreamingDataSourceV2Suite extends StreamTest {
.format(writeFormat)
.trigger(trigger)
.start()
+ check(query)
query.stop()
- query
}
private def testNegativeCase(
@@ -187,19 +236,54 @@ class StreamingDataSourceV2Suite extends StreamTest {
test("disabled v2 write") {
// Ensure the V2 path works normally and generates a V2 sink..
- val v2Query = testPositiveCase(
- "fake-read-microbatch-continuous", "fake-write-v1-fallback", Trigger.Once())
- assert(v2Query.asInstanceOf[StreamingQueryWrapper].streamingQuery.sink
- .isInstanceOf[FakeWriteV1Fallback])
+ testPositiveCaseWithQuery(
+ "fake-read-microbatch-continuous", "fake-write-v1-fallback", Trigger.Once()) { v2Query =>
+ assert(v2Query.asInstanceOf[StreamingQueryWrapper].streamingQuery.sink
+ .isInstanceOf[FakeWriteV1Fallback])
+ }
// Ensure we create a V1 sink with the config. Note the config is a comma separated
// list, including other fake entries.
val fullSinkName = "org.apache.spark.sql.streaming.sources.FakeWriteV1Fallback"
withSQLConf(SQLConf.DISABLED_V2_STREAMING_WRITERS.key -> s"a,b,c,test,$fullSinkName,d,e") {
- val v1Query = testPositiveCase(
- "fake-read-microbatch-continuous", "fake-write-v1-fallback", Trigger.Once())
- assert(v1Query.asInstanceOf[StreamingQueryWrapper].streamingQuery.sink
- .isInstanceOf[FakeSink])
+ testPositiveCaseWithQuery(
+ "fake-read-microbatch-continuous", "fake-write-v1-fallback", Trigger.Once()) { v1Query =>
+ assert(v1Query.asInstanceOf[StreamingQueryWrapper].streamingQuery.sink
+ .isInstanceOf[FakeSink])
+ }
+ }
+ }
+
+ Seq(
+ Tuple2(classOf[FakeReadMicroBatchOnly], Trigger.Once()),
+ Tuple2(classOf[FakeReadContinuousOnly], Trigger.Continuous(1000))
+ ).foreach { case (source, trigger) =>
+ test(s"SPARK-25460: session options are respected in structured streaming sources - $source") {
+ // `keyPrefix` and `shortName` are the same in this test case
+ val readSource = source.newInstance().shortName()
+ val writeSource = "fake-write-microbatch-continuous"
+
+ val readOptionName = "optionA"
+ withSQLConf(s"spark.datasource.$readSource.$readOptionName" -> "true") {
+ testPositiveCaseWithQuery(readSource, writeSource, trigger) { _ =>
+ eventually(timeout(streamingTimeout)) {
+ // Write options should not be set.
+ assert(LastWriteOptions.options.getBoolean(readOptionName, false) == false)
+ assert(LastReadOptions.options.getBoolean(readOptionName, false) == true)
+ }
+ }
+ }
+
+ val writeOptionName = "optionB"
+ withSQLConf(s"spark.datasource.$writeSource.$writeOptionName" -> "true") {
+ testPositiveCaseWithQuery(readSource, writeSource, trigger) { _ =>
+ eventually(timeout(streamingTimeout)) {
+ // Read options should not be set.
+ assert(LastReadOptions.options.getBoolean(writeOptionName, false) == false)
+ assert(LastWriteOptions.options.getBoolean(writeOptionName, false) == true)
+ }
+ }
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org