You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2018/09/24 15:49:28 UTC

spark git commit: [SPARK-25460][BRANCH-2.4][SS] DataSourceV2: SS sources do not respect SessionConfigSupport

Repository: spark
Updated Branches:
  refs/heads/branch-2.4 51d5378f8 -> ec384284e


[SPARK-25460][BRANCH-2.4][SS] DataSourceV2: SS sources do not respect SessionConfigSupport

## What changes were proposed in this pull request?

This PR proposes to backport SPARK-25460 to branch-2.4:

This PR proposes to respect `SessionConfigSupport` in SS datasources as well. Currently these are only respected in batch sources:

https://github.com/apache/spark/blob/e06da95cd9423f55cdb154a2778b0bddf7be984c/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L198-L203

https://github.com/apache/spark/blob/e06da95cd9423f55cdb154a2778b0bddf7be984c/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L244-L249

If a developer makes a datasource V2 that supports both structured streaming and batch jobs, batch jobs respect a specific configuration, let's say, URL to connect and fetch data (which end users might not be aware of); however, structured streaming ends up with not supporting this (and should explicitly be set into options).

## How was this patch tested?

Unit tests were added.

Closes #22529 from HyukjinKwon/SPARK-25460-backport.

Authored-by: hyukjinkwon <gu...@apache.org>
Signed-off-by: Dongjoon Hyun <do...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ec384284
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ec384284
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ec384284

Branch: refs/heads/branch-2.4
Commit: ec384284eb427d7573bd94c707777e23e4137971
Parents: 51d5378
Author: hyukjinkwon <gu...@apache.org>
Authored: Mon Sep 24 08:49:19 2018 -0700
Committer: Dongjoon Hyun <do...@apache.org>
Committed: Mon Sep 24 08:49:19 2018 -0700

----------------------------------------------------------------------
 .../spark/sql/streaming/DataStreamReader.scala  |  18 ++-
 .../spark/sql/streaming/DataStreamWriter.scala  |  16 ++-
 .../sources/StreamingDataSourceV2Suite.scala    | 118 ++++++++++++++++---
 3 files changed, 125 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ec384284/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 7eb5db5..a9cb5e8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -26,6 +26,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession}
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
 import org.apache.spark.sql.execution.streaming.{StreamingRelation, StreamingRelationV2}
 import org.apache.spark.sql.sources.StreamSourceProvider
 import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, MicroBatchReadSupport}
@@ -158,7 +159,6 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
     }
 
     val ds = DataSource.lookupDataSource(source, sparkSession.sqlContext.conf).newInstance()
-    val options = new DataSourceOptions(extraOptions.asJava)
     // We need to generate the V1 data source so we can pass it to the V2 relation as a shim.
     // We can't be sure at this point whether we'll actually want to use V2, since we don't know the
     // writer or whether the query is continuous.
@@ -173,12 +173,16 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
     }
     ds match {
       case s: MicroBatchReadSupport =>
+        val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
+          ds = s, conf = sparkSession.sessionState.conf)
+        val options = sessionOptions ++ extraOptions
+        val dataSourceOptions = new DataSourceOptions(options.asJava)
         var tempReader: MicroBatchReader = null
         val schema = try {
           tempReader = s.createMicroBatchReader(
             Optional.ofNullable(userSpecifiedSchema.orNull),
             Utils.createTempDir(namePrefix = s"temporaryReader").getCanonicalPath,
-            options)
+            dataSourceOptions)
           tempReader.readSchema()
         } finally {
           // Stop tempReader to avoid side-effect thing
@@ -190,17 +194,21 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
         Dataset.ofRows(
           sparkSession,
           StreamingRelationV2(
-            s, source, extraOptions.toMap,
+            s, source, options,
             schema.toAttributes, v1Relation)(sparkSession))
       case s: ContinuousReadSupport =>
+        val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
+          ds = s, conf = sparkSession.sessionState.conf)
+        val options = sessionOptions ++ extraOptions
+        val dataSourceOptions = new DataSourceOptions(options.asJava)
         val tempReader = s.createContinuousReader(
           Optional.ofNullable(userSpecifiedSchema.orNull),
           Utils.createTempDir(namePrefix = s"temporaryReader").getCanonicalPath,
-          options)
+          dataSourceOptions)
         Dataset.ofRows(
           sparkSession,
           StreamingRelationV2(
-            s, source, extraOptions.toMap,
+            s, source, options,
             tempReader.readSchema().toAttributes, v1Relation)(sparkSession))
       case _ =>
         // Code path for data source v1.

http://git-wip-us.apache.org/repos/asf/spark/blob/ec384284/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index 3b9a56f..735fd17 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
 import org.apache.spark.sql.execution.streaming.sources._
@@ -298,22 +299,27 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
     } else {
       val ds = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
       val disabledSources = df.sparkSession.sqlContext.conf.disabledV2StreamingWriters.split(",")
+      var options = extraOptions.toMap
       val sink = ds.newInstance() match {
-        case w: StreamWriteSupport if !disabledSources.contains(w.getClass.getCanonicalName) => w
+        case w: StreamWriteSupport if !disabledSources.contains(w.getClass.getCanonicalName) =>
+          val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
+            w, df.sparkSession.sessionState.conf)
+          options = sessionOptions ++ extraOptions
+          w
         case _ =>
           val ds = DataSource(
             df.sparkSession,
             className = source,
-            options = extraOptions.toMap,
+            options = options,
             partitionColumns = normalizedParCols.getOrElse(Nil))
           ds.createSink(outputMode)
       }
 
       df.sparkSession.sessionState.streamingQueryManager.startQuery(
-        extraOptions.get("queryName"),
-        extraOptions.get("checkpointLocation"),
+        options.get("queryName"),
+        options.get("checkpointLocation"),
         df,
-        extraOptions.toMap,
+        options,
         sink,
         outputMode,
         useTempCheckpointLocation = source == "console",

http://git-wip-us.apache.org/repos/asf/spark/blob/ec384284/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
index 52b833a..2565cd9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
@@ -26,11 +26,11 @@ import org.apache.spark.sql.execution.streaming.{RateStreamOffset, Sink, Streami
 import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
-import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, MicroBatchReadSupport, StreamWriteSupport}
+import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.reader.InputPartition
 import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, MicroBatchReader, Offset, PartitionOffset}
 import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
-import org.apache.spark.sql.streaming.{OutputMode, StreamTest, Trigger}
+import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, StreamTest, Trigger}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils
 
@@ -54,14 +54,20 @@ trait FakeMicroBatchReadSupport extends MicroBatchReadSupport {
   override def createMicroBatchReader(
       schema: Optional[StructType],
       checkpointLocation: String,
-      options: DataSourceOptions): MicroBatchReader = FakeReader()
+      options: DataSourceOptions): MicroBatchReader = {
+    LastReadOptions.options = options
+    FakeReader()
+  }
 }
 
 trait FakeContinuousReadSupport extends ContinuousReadSupport {
   override def createContinuousReader(
       schema: Optional[StructType],
       checkpointLocation: String,
-      options: DataSourceOptions): ContinuousReader = FakeReader()
+      options: DataSourceOptions): ContinuousReader = {
+    LastReadOptions.options = options
+    FakeReader()
+  }
 }
 
 trait FakeStreamWriteSupport extends StreamWriteSupport {
@@ -70,16 +76,27 @@ trait FakeStreamWriteSupport extends StreamWriteSupport {
       schema: StructType,
       mode: OutputMode,
       options: DataSourceOptions): StreamWriter = {
+    LastWriteOptions.options = options
     throw new IllegalStateException("fake sink - cannot actually write")
   }
 }
 
-class FakeReadMicroBatchOnly extends DataSourceRegister with FakeMicroBatchReadSupport {
+class FakeReadMicroBatchOnly
+    extends DataSourceRegister
+    with FakeMicroBatchReadSupport
+    with SessionConfigSupport {
   override def shortName(): String = "fake-read-microbatch-only"
+
+  override def keyPrefix: String = shortName()
 }
 
-class FakeReadContinuousOnly extends DataSourceRegister with FakeContinuousReadSupport {
+class FakeReadContinuousOnly
+    extends DataSourceRegister
+    with FakeContinuousReadSupport
+    with SessionConfigSupport {
   override def shortName(): String = "fake-read-continuous-only"
+
+  override def keyPrefix: String = shortName()
 }
 
 class FakeReadBothModes extends DataSourceRegister
@@ -91,8 +108,13 @@ class FakeReadNeitherMode extends DataSourceRegister {
   override def shortName(): String = "fake-read-neither-mode"
 }
 
-class FakeWrite extends DataSourceRegister with FakeStreamWriteSupport {
+class FakeWrite
+    extends DataSourceRegister
+    with FakeStreamWriteSupport
+    with SessionConfigSupport {
   override def shortName(): String = "fake-write-microbatch-continuous"
+
+  override def keyPrefix: String = shortName()
 }
 
 class FakeNoWrite extends DataSourceRegister {
@@ -120,6 +142,21 @@ class FakeWriteV1Fallback extends DataSourceRegister
   override def shortName(): String = "fake-write-v1-fallback"
 }
 
+object LastReadOptions {
+  var options: DataSourceOptions = _
+
+  def clear(): Unit = {
+    options = null
+  }
+}
+
+object LastWriteOptions {
+  var options: DataSourceOptions = _
+
+  def clear(): Unit = {
+    options = null
+  }
+}
 
 class StreamingDataSourceV2Suite extends StreamTest {
 
@@ -129,6 +166,11 @@ class StreamingDataSourceV2Suite extends StreamTest {
     spark.conf.set("spark.sql.streaming.checkpointLocation", fakeCheckpoint.getCanonicalPath)
   }
 
+  override def afterEach(): Unit = {
+    LastReadOptions.clear()
+    LastWriteOptions.clear()
+  }
+
   val readFormats = Seq(
     "fake-read-microbatch-only",
     "fake-read-continuous-only",
@@ -142,7 +184,14 @@ class StreamingDataSourceV2Suite extends StreamTest {
     Trigger.ProcessingTime(1000),
     Trigger.Continuous(1000))
 
-  private def testPositiveCase(readFormat: String, writeFormat: String, trigger: Trigger) = {
+  private def testPositiveCase(readFormat: String, writeFormat: String, trigger: Trigger): Unit = {
+    testPositiveCaseWithQuery(readFormat, writeFormat, trigger)(() => _)
+  }
+
+  private def testPositiveCaseWithQuery(
+      readFormat: String,
+      writeFormat: String,
+      trigger: Trigger)(check: StreamingQuery => Unit): Unit = {
     val query = spark.readStream
       .format(readFormat)
       .load()
@@ -150,8 +199,8 @@ class StreamingDataSourceV2Suite extends StreamTest {
       .format(writeFormat)
       .trigger(trigger)
       .start()
+    check(query)
     query.stop()
-    query
   }
 
   private def testNegativeCase(
@@ -187,19 +236,54 @@ class StreamingDataSourceV2Suite extends StreamTest {
 
   test("disabled v2 write") {
     // Ensure the V2 path works normally and generates a V2 sink..
-    val v2Query = testPositiveCase(
-      "fake-read-microbatch-continuous", "fake-write-v1-fallback", Trigger.Once())
-    assert(v2Query.asInstanceOf[StreamingQueryWrapper].streamingQuery.sink
-      .isInstanceOf[FakeWriteV1Fallback])
+    testPositiveCaseWithQuery(
+      "fake-read-microbatch-continuous", "fake-write-v1-fallback", Trigger.Once()) { v2Query =>
+      assert(v2Query.asInstanceOf[StreamingQueryWrapper].streamingQuery.sink
+        .isInstanceOf[FakeWriteV1Fallback])
+    }
 
     // Ensure we create a V1 sink with the config. Note the config is a comma separated
     // list, including other fake entries.
     val fullSinkName = "org.apache.spark.sql.streaming.sources.FakeWriteV1Fallback"
     withSQLConf(SQLConf.DISABLED_V2_STREAMING_WRITERS.key -> s"a,b,c,test,$fullSinkName,d,e") {
-      val v1Query = testPositiveCase(
-        "fake-read-microbatch-continuous", "fake-write-v1-fallback", Trigger.Once())
-      assert(v1Query.asInstanceOf[StreamingQueryWrapper].streamingQuery.sink
-        .isInstanceOf[FakeSink])
+      testPositiveCaseWithQuery(
+        "fake-read-microbatch-continuous", "fake-write-v1-fallback", Trigger.Once()) { v1Query =>
+        assert(v1Query.asInstanceOf[StreamingQueryWrapper].streamingQuery.sink
+          .isInstanceOf[FakeSink])
+      }
+    }
+  }
+
+  Seq(
+    Tuple2(classOf[FakeReadMicroBatchOnly], Trigger.Once()),
+    Tuple2(classOf[FakeReadContinuousOnly], Trigger.Continuous(1000))
+  ).foreach { case (source, trigger) =>
+    test(s"SPARK-25460: session options are respected in structured streaming sources - $source") {
+      // `keyPrefix` and `shortName` are the same in this test case
+      val readSource = source.newInstance().shortName()
+      val writeSource = "fake-write-microbatch-continuous"
+
+      val readOptionName = "optionA"
+      withSQLConf(s"spark.datasource.$readSource.$readOptionName" -> "true") {
+        testPositiveCaseWithQuery(readSource, writeSource, trigger) { _ =>
+          eventually(timeout(streamingTimeout)) {
+            // Write options should not be set.
+            assert(LastWriteOptions.options.getBoolean(readOptionName, false) == false)
+            assert(LastReadOptions.options.getBoolean(readOptionName, false) == true)
+          }
+        }
+      }
+
+      val writeOptionName = "optionB"
+      withSQLConf(s"spark.datasource.$writeSource.$writeOptionName" -> "true") {
+        testPositiveCaseWithQuery(readSource, writeSource, trigger) { _ =>
+          eventually(timeout(streamingTimeout)) {
+            // Read options should not be set.
+            assert(LastReadOptions.options.getBoolean(writeOptionName, false) == false)
+            assert(LastWriteOptions.options.getBoolean(writeOptionName, false) == true)
+          }
+        }
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org