You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/09/20 12:23:04 UTC

spark git commit: [SPARK-25460][SS] DataSourceV2: SS sources do not respect SessionConfigSupport

Repository: spark
Updated Branches:
  refs/heads/master 89671a27e -> edf5cc64e


[SPARK-25460][SS] DataSourceV2: SS sources do not respect SessionConfigSupport

## What changes were proposed in this pull request?

This PR proposes to respect `SessionConfigSupport` in SS datasources as well. Currently these are only respected in batch sources:

https://github.com/apache/spark/blob/e06da95cd9423f55cdb154a2778b0bddf7be984c/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L198-L203

https://github.com/apache/spark/blob/e06da95cd9423f55cdb154a2778b0bddf7be984c/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L244-L249

If a developer makes a datasource V2 that supports both structured streaming and batch jobs, batch jobs respect a specific configuration, let's say, URL to connect and fetch data (which end users might not be aware of); however, structured streaming ends up with not supporting this (and should explicitly be set into options).

## How was this patch tested?

Unit tests were added.

Closes #22462 from HyukjinKwon/SPARK-25460.

Authored-by: hyukjinkwon <gu...@apache.org>
Signed-off-by: Wenchen Fan <we...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/edf5cc64
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/edf5cc64
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/edf5cc64

Branch: refs/heads/master
Commit: edf5cc64e4bfc34643952f3a9582beca20c4bddc
Parents: 89671a2
Author: hyukjinkwon <gu...@apache.org>
Authored: Thu Sep 20 20:22:55 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Sep 20 20:22:55 2018 +0800

----------------------------------------------------------------------
 .../spark/sql/streaming/DataStreamReader.scala  |  24 ++--
 .../spark/sql/streaming/DataStreamWriter.scala  |  16 ++-
 .../sources/StreamingDataSourceV2Suite.scala    | 116 ++++++++++++++++---
 3 files changed, 128 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/edf5cc64/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 2a4db4a..4c7dced 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -26,6 +26,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession}
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
 import org.apache.spark.sql.execution.streaming.{StreamingRelation, StreamingRelationV2}
 import org.apache.spark.sql.sources.StreamSourceProvider
 import org.apache.spark.sql.sources.v2.{ContinuousReadSupportProvider, DataSourceOptions, MicroBatchReadSupportProvider}
@@ -158,7 +159,6 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
     }
 
     val ds = DataSource.lookupDataSource(source, sparkSession.sqlContext.conf).newInstance()
-    val options = new DataSourceOptions(extraOptions.asJava)
     // We need to generate the V1 data source so we can pass it to the V2 relation as a shim.
     // We can't be sure at this point whether we'll actually want to use V2, since we don't know the
     // writer or whether the query is continuous.
@@ -173,13 +173,18 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
     }
     ds match {
       case s: MicroBatchReadSupportProvider =>
+        val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
+          ds = s, conf = sparkSession.sessionState.conf)
+        val options = sessionOptions ++ extraOptions
+        val dataSourceOptions = new DataSourceOptions(options.asJava)
         var tempReadSupport: MicroBatchReadSupport = null
         val schema = try {
           val tmpCheckpointPath = Utils.createTempDir(namePrefix = s"tempCP").getCanonicalPath
           tempReadSupport = if (userSpecifiedSchema.isDefined) {
-            s.createMicroBatchReadSupport(userSpecifiedSchema.get, tmpCheckpointPath, options)
+            s.createMicroBatchReadSupport(
+              userSpecifiedSchema.get, tmpCheckpointPath, dataSourceOptions)
           } else {
-            s.createMicroBatchReadSupport(tmpCheckpointPath, options)
+            s.createMicroBatchReadSupport(tmpCheckpointPath, dataSourceOptions)
           }
           tempReadSupport.fullSchema()
         } finally {
@@ -192,16 +197,21 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
         Dataset.ofRows(
           sparkSession,
           StreamingRelationV2(
-            s, source, extraOptions.toMap,
+            s, source, options,
             schema.toAttributes, v1Relation)(sparkSession))
       case s: ContinuousReadSupportProvider =>
+        val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
+          ds = s, conf = sparkSession.sessionState.conf)
+        val options = sessionOptions ++ extraOptions
+        val dataSourceOptions = new DataSourceOptions(options.asJava)
         var tempReadSupport: ContinuousReadSupport = null
         val schema = try {
           val tmpCheckpointPath = Utils.createTempDir(namePrefix = s"tempCP").getCanonicalPath
           tempReadSupport = if (userSpecifiedSchema.isDefined) {
-            s.createContinuousReadSupport(userSpecifiedSchema.get, tmpCheckpointPath, options)
+            s.createContinuousReadSupport(
+              userSpecifiedSchema.get, tmpCheckpointPath, dataSourceOptions)
           } else {
-            s.createContinuousReadSupport(tmpCheckpointPath, options)
+            s.createContinuousReadSupport(tmpCheckpointPath, dataSourceOptions)
           }
           tempReadSupport.fullSchema()
         } finally {
@@ -214,7 +224,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
         Dataset.ofRows(
           sparkSession,
           StreamingRelationV2(
-            s, source, extraOptions.toMap,
+            s, source, options,
             schema.toAttributes, v1Relation)(sparkSession))
       case _ =>
         // Code path for data source v1.

http://git-wip-us.apache.org/repos/asf/spark/blob/edf5cc64/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index 7866e4f..e9a1521 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
 import org.apache.spark.sql.execution.streaming.sources._
@@ -298,23 +299,28 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
     } else {
       val ds = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
       val disabledSources = df.sparkSession.sqlContext.conf.disabledV2StreamingWriters.split(",")
+      var options = extraOptions.toMap
       val sink = ds.newInstance() match {
         case w: StreamingWriteSupportProvider
-            if !disabledSources.contains(w.getClass.getCanonicalName) => w
+            if !disabledSources.contains(w.getClass.getCanonicalName) =>
+          val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
+            w, df.sparkSession.sessionState.conf)
+          options = sessionOptions ++ extraOptions
+          w
         case _ =>
           val ds = DataSource(
             df.sparkSession,
             className = source,
-            options = extraOptions.toMap,
+            options = options,
             partitionColumns = normalizedParCols.getOrElse(Nil))
           ds.createSink(outputMode)
       }
 
       df.sparkSession.sessionState.streamingQueryManager.startQuery(
-        extraOptions.get("queryName"),
-        extraOptions.get("checkpointLocation"),
+        options.get("queryName"),
+        options.get("checkpointLocation"),
         df,
-        extraOptions.toMap,
+        options,
         sink,
         outputMode,
         useTempCheckpointLocation = source == "console",

http://git-wip-us.apache.org/repos/asf/spark/blob/edf5cc64/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
index aeef4c8..3a0e780 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.reader.{InputPartition, PartitionReaderFactory, ScanConfig, ScanConfigBuilder}
 import org.apache.spark.sql.sources.v2.reader.streaming._
 import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
-import org.apache.spark.sql.streaming.{OutputMode, StreamTest, Trigger}
+import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, StreamTest, Trigger}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils
 
@@ -56,13 +56,19 @@ case class FakeReadSupport() extends MicroBatchReadSupport with ContinuousReadSu
 trait FakeMicroBatchReadSupportProvider extends MicroBatchReadSupportProvider {
   override def createMicroBatchReadSupport(
       checkpointLocation: String,
-      options: DataSourceOptions): MicroBatchReadSupport = FakeReadSupport()
+      options: DataSourceOptions): MicroBatchReadSupport = {
+    LastReadOptions.options = options
+    FakeReadSupport()
+  }
 }
 
 trait FakeContinuousReadSupportProvider extends ContinuousReadSupportProvider {
   override def createContinuousReadSupport(
       checkpointLocation: String,
-      options: DataSourceOptions): ContinuousReadSupport = FakeReadSupport()
+      options: DataSourceOptions): ContinuousReadSupport = {
+    LastReadOptions.options = options
+    FakeReadSupport()
+  }
 }
 
 trait FakeStreamingWriteSupportProvider extends StreamingWriteSupportProvider {
@@ -71,16 +77,27 @@ trait FakeStreamingWriteSupportProvider extends StreamingWriteSupportProvider {
       schema: StructType,
       mode: OutputMode,
       options: DataSourceOptions): StreamingWriteSupport = {
+    LastWriteOptions.options = options
     throw new IllegalStateException("fake sink - cannot actually write")
   }
 }
 
-class FakeReadMicroBatchOnly extends DataSourceRegister with FakeMicroBatchReadSupportProvider {
+class FakeReadMicroBatchOnly
+    extends DataSourceRegister
+    with FakeMicroBatchReadSupportProvider
+    with SessionConfigSupport {
   override def shortName(): String = "fake-read-microbatch-only"
+
+  override def keyPrefix: String = shortName()
 }
 
-class FakeReadContinuousOnly extends DataSourceRegister with FakeContinuousReadSupportProvider {
+class FakeReadContinuousOnly
+    extends DataSourceRegister
+    with FakeContinuousReadSupportProvider
+    with SessionConfigSupport {
   override def shortName(): String = "fake-read-continuous-only"
+
+  override def keyPrefix: String = shortName()
 }
 
 class FakeReadBothModes extends DataSourceRegister
@@ -92,8 +109,13 @@ class FakeReadNeitherMode extends DataSourceRegister {
   override def shortName(): String = "fake-read-neither-mode"
 }
 
-class FakeWriteSupportProvider extends DataSourceRegister with FakeStreamingWriteSupportProvider {
+class FakeWriteSupportProvider
+    extends DataSourceRegister
+    with FakeStreamingWriteSupportProvider
+    with SessionConfigSupport {
   override def shortName(): String = "fake-write-microbatch-continuous"
+
+  override def keyPrefix: String = shortName()
 }
 
 class FakeNoWrite extends DataSourceRegister {
@@ -121,6 +143,21 @@ class FakeWriteSupportProviderV1Fallback extends DataSourceRegister
   override def shortName(): String = "fake-write-v1-fallback"
 }
 
+object LastReadOptions {
+  var options: DataSourceOptions = _
+
+  def clear(): Unit = {
+    options = null
+  }
+}
+
+object LastWriteOptions {
+  var options: DataSourceOptions = _
+
+  def clear(): Unit = {
+    options = null
+  }
+}
 
 class StreamingDataSourceV2Suite extends StreamTest {
 
@@ -130,6 +167,11 @@ class StreamingDataSourceV2Suite extends StreamTest {
     spark.conf.set("spark.sql.streaming.checkpointLocation", fakeCheckpoint.getCanonicalPath)
   }
 
+  override def afterEach(): Unit = {
+    LastReadOptions.clear()
+    LastWriteOptions.clear()
+  }
+
   val readFormats = Seq(
     "fake-read-microbatch-only",
     "fake-read-continuous-only",
@@ -143,7 +185,14 @@ class StreamingDataSourceV2Suite extends StreamTest {
     Trigger.ProcessingTime(1000),
     Trigger.Continuous(1000))
 
-  private def testPositiveCase(readFormat: String, writeFormat: String, trigger: Trigger) = {
+  private def testPositiveCase(readFormat: String, writeFormat: String, trigger: Trigger): Unit = {
+    testPositiveCaseWithQuery(readFormat, writeFormat, trigger)(() => _)
+  }
+
+  private def testPositiveCaseWithQuery(
+      readFormat: String,
+      writeFormat: String,
+      trigger: Trigger)(check: StreamingQuery => Unit): Unit = {
     val query = spark.readStream
       .format(readFormat)
       .load()
@@ -151,8 +200,8 @@ class StreamingDataSourceV2Suite extends StreamTest {
       .format(writeFormat)
       .trigger(trigger)
       .start()
+    check(query)
     query.stop()
-    query
   }
 
   private def testNegativeCase(
@@ -188,19 +237,54 @@ class StreamingDataSourceV2Suite extends StreamTest {
 
   test("disabled v2 write") {
     // Ensure the V2 path works normally and generates a V2 sink..
-    val v2Query = testPositiveCase(
-      "fake-read-microbatch-continuous", "fake-write-v1-fallback", Trigger.Once())
-    assert(v2Query.asInstanceOf[StreamingQueryWrapper].streamingQuery.sink
-      .isInstanceOf[FakeWriteSupportProviderV1Fallback])
+    testPositiveCaseWithQuery(
+      "fake-read-microbatch-continuous", "fake-write-v1-fallback", Trigger.Once()) { v2Query =>
+      assert(v2Query.asInstanceOf[StreamingQueryWrapper].streamingQuery.sink
+        .isInstanceOf[FakeWriteSupportProviderV1Fallback])
+    }
 
     // Ensure we create a V1 sink with the config. Note the config is a comma separated
     // list, including other fake entries.
     val fullSinkName = classOf[FakeWriteSupportProviderV1Fallback].getName
     withSQLConf(SQLConf.DISABLED_V2_STREAMING_WRITERS.key -> s"a,b,c,test,$fullSinkName,d,e") {
-      val v1Query = testPositiveCase(
-        "fake-read-microbatch-continuous", "fake-write-v1-fallback", Trigger.Once())
-      assert(v1Query.asInstanceOf[StreamingQueryWrapper].streamingQuery.sink
-        .isInstanceOf[FakeSink])
+      testPositiveCaseWithQuery(
+        "fake-read-microbatch-continuous", "fake-write-v1-fallback", Trigger.Once()) { v1Query =>
+        assert(v1Query.asInstanceOf[StreamingQueryWrapper].streamingQuery.sink
+          .isInstanceOf[FakeSink])
+      }
+    }
+  }
+
+  Seq(
+    Tuple2(classOf[FakeReadMicroBatchOnly], Trigger.Once()),
+    Tuple2(classOf[FakeReadContinuousOnly], Trigger.Continuous(1000))
+  ).foreach { case (source, trigger) =>
+    test(s"SPARK-25460: session options are respected in structured streaming sources - $source") {
+      // `keyPrefix` and `shortName` are the same in this test case
+      val readSource = source.newInstance().shortName()
+      val writeSource = "fake-write-microbatch-continuous"
+
+      val readOptionName = "optionA"
+      withSQLConf(s"spark.datasource.$readSource.$readOptionName" -> "true") {
+        testPositiveCaseWithQuery(readSource, writeSource, trigger) { _ =>
+          eventually(timeout(streamingTimeout)) {
+            // Write options should not be set.
+            assert(LastWriteOptions.options.getBoolean(readOptionName, false) == false)
+            assert(LastReadOptions.options.getBoolean(readOptionName, false) == true)
+          }
+        }
+      }
+
+      val writeOptionName = "optionB"
+      withSQLConf(s"spark.datasource.$writeSource.$writeOptionName" -> "true") {
+        testPositiveCaseWithQuery(readSource, writeSource, trigger) { _ =>
+          eventually(timeout(streamingTimeout)) {
+            // Read options should not be set.
+            assert(LastReadOptions.options.getBoolean(writeOptionName, false) == false)
+            assert(LastWriteOptions.options.getBoolean(writeOptionName, false) == true)
+          }
+        }
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org