You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/11/18 19:11:24 UTC

spark git commit: [SPARK-18187][SQL] CompactibleFileStreamLog should not use "compactInterval" direcly with user setting.

Repository: spark
Updated Branches:
  refs/heads/master d9dd979d1 -> 51baca221


[SPARK-18187][SQL] CompactibleFileStreamLog should not use "compactInterval" direcly with user setting.

## What changes were proposed in this pull request?
CompactibleFileStreamLog relys on "compactInterval" to detect a compaction batch. If the "compactInterval" is reset by user, CompactibleFileStreamLog will return wrong answer, resulting data loss. This PR procides a way to check the validity of 'compactInterval', and calculate an appropriate value.

## How was this patch tested?
When restart a stream, we change the 'spark.sql.streaming.fileSource.log.compactInterval' different with the former one.

The primary solution to this issue was given by uncleGen
Added extensions include an additional metadata field in OffsetSeq and CompactibleFileStreamLog APIs. zsxwing

Author: Tyson Condie <tc...@gmail.com>
Author: genmao.ygm <ge...@genmaoygmdeMacBook-Air.local>

Closes #15852 from tcondie/spark-18187.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/51baca22
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/51baca22
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/51baca22

Branch: refs/heads/master
Commit: 51baca2219fda8692b88fc8552548544aec73a1e
Parents: d9dd979
Author: Tyson Condie <tc...@gmail.com>
Authored: Fri Nov 18 11:11:24 2016 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Fri Nov 18 11:11:24 2016 -0800

----------------------------------------------------------------------
 .../streaming/CompactibleFileStreamLog.scala    | 61 +++++++++++++++++++-
 .../execution/streaming/FileStreamSinkLog.scala |  8 ++-
 .../streaming/FileStreamSourceLog.scala         |  9 +--
 .../execution/streaming/HDFSMetadataLog.scala   |  2 +-
 .../sql/execution/streaming/OffsetSeq.scala     | 12 +++-
 .../sql/execution/streaming/OffsetSeqLog.scala  | 31 +++++++---
 .../CompactibleFileStreamLogSuite.scala         | 33 +++++++++++
 .../sql/streaming/FileStreamSourceSuite.scala   | 41 ++++++++-----
 .../apache/spark/sql/streaming/StreamTest.scala | 20 ++++++-
 9 files changed, 178 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/51baca22/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
index 8af3db1..8529cea 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
@@ -63,7 +63,46 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
 
   protected def isDeletingExpiredLog: Boolean
 
-  protected def compactInterval: Int
+  protected def defaultCompactInterval: Int
+
+  protected final lazy val compactInterval: Int = {
+    // SPARK-18187: "compactInterval" can be set by user via defaultCompactInterval.
+    // If there are existing log entries, then we should ensure a compatible compactInterval
+    // is used, irrespective of the defaultCompactInterval. There are three cases:
+    //
+    // 1. If there is no '.compact' file, we can use the default setting directly.
+    // 2. If there are two or more '.compact' files, we use the interval of patch id suffix with
+    // '.compact' as compactInterval. This case could arise if isDeletingExpiredLog == false.
+    // 3. If there is only one '.compact' file, then we must find a compact interval
+    // that is compatible with (i.e., a divisor of) the previous compact file, and that
+    // faithfully tries to represent the revised default compact interval i.e., is at least
+    // is large if possible.
+    // e.g., if defaultCompactInterval is 5 (and previous compact interval could have
+    // been any 2,3,4,6,12), then a log could be: 11.compact, 12, 13, in which case
+    // will ensure that the new compactInterval = 6 > 5 and (11 + 1) % 6 == 0
+    val compactibleBatchIds = fileManager.list(metadataPath, batchFilesFilter)
+      .filter(f => f.getPath.toString.endsWith(CompactibleFileStreamLog.COMPACT_FILE_SUFFIX))
+      .map(f => pathToBatchId(f.getPath))
+      .sorted
+      .reverse
+
+    // Case 1
+    var interval = defaultCompactInterval
+    if (compactibleBatchIds.length >= 2) {
+      // Case 2
+      val latestCompactBatchId = compactibleBatchIds(0)
+      val previousCompactBatchId = compactibleBatchIds(1)
+      interval = (latestCompactBatchId - previousCompactBatchId).toInt
+    } else if (compactibleBatchIds.length == 1) {
+      // Case 3
+      interval = CompactibleFileStreamLog.deriveCompactInterval(
+        defaultCompactInterval, compactibleBatchIds(0).toInt)
+    }
+    assert(interval > 0, s"intervalValue = $interval not positive value.")
+    logInfo(s"Set the compact interval to $interval " +
+      s"[defaultCompactInterval: $defaultCompactInterval]")
+    interval
+  }
 
   /**
    * Filter out the obsolete logs.
@@ -245,4 +284,24 @@ object CompactibleFileStreamLog {
   def nextCompactionBatchId(batchId: Long, compactInterval: Long): Long = {
     (batchId + compactInterval + 1) / compactInterval * compactInterval - 1
   }
+
+  /**
+   * Derives a compact interval from the latest compact batch id and
+   * a default compact interval.
+   */
+  def deriveCompactInterval(defaultInterval: Int, latestCompactBatchId: Int) : Int = {
+    if (latestCompactBatchId + 1 <= defaultInterval) {
+      latestCompactBatchId + 1
+    } else if (defaultInterval < (latestCompactBatchId + 1) / 2) {
+      // Find the first divisor >= default compact interval
+      def properDivisors(min: Int, n: Int) =
+        (min to n/2).view.filter(i => n % i == 0) :+ n
+
+      properDivisors(defaultInterval, latestCompactBatchId + 1).head
+    } else {
+      // default compact interval > than any divisor other than latest compact id
+      latestCompactBatchId + 1
+    }
+  }
 }
+

http://git-wip-us.apache.org/repos/asf/spark/blob/51baca22/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
index b4f1415..eb6eed8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
@@ -88,9 +88,11 @@ class FileStreamSinkLog(
 
   protected override val isDeletingExpiredLog = sparkSession.sessionState.conf.fileSinkLogDeletion
 
-  protected override val compactInterval = sparkSession.sessionState.conf.fileSinkLogCompactInterval
-  require(compactInterval > 0,
-    s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $compactInterval) " +
+  protected override val defaultCompactInterval =
+    sparkSession.sessionState.conf.fileSinkLogCompactInterval
+
+  require(defaultCompactInterval > 0,
+    s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $defaultCompactInterval) " +
       "to a positive value.")
 
   override def compactLogs(logs: Seq[SinkFileStatus]): Seq[SinkFileStatus] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/51baca22/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
index fe81b15..327b3ac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
@@ -38,11 +38,12 @@ class FileStreamSourceLog(
   import CompactibleFileStreamLog._
 
   // Configurations about metadata compaction
-  protected override val compactInterval =
+  protected override val defaultCompactInterval: Int =
     sparkSession.sessionState.conf.fileSourceLogCompactInterval
-  require(compactInterval > 0,
-    s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was $compactInterval) to a " +
-      s"positive value.")
+
+  require(defaultCompactInterval > 0,
+    s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} " +
+      s"(was $defaultCompactInterval) to a positive value.")
 
   protected override val fileCleanupDelayMs =
     sparkSession.sessionState.conf.fileSourceLogCleanupDelay

http://git-wip-us.apache.org/repos/asf/spark/blob/51baca22/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index db7057d..080729b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -70,7 +70,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
   /**
    * A `PathFilter` to filter only batch files
    */
-  private val batchFilesFilter = new PathFilter {
+  protected val batchFilesFilter = new PathFilter {
     override def accept(path: Path): Boolean = isBatchFile(path)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/51baca22/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
index a4e1fe6..7469cae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
@@ -23,7 +23,7 @@ package org.apache.spark.sql.execution.streaming
  * [[Source]]s that are present in a streaming query. This is similar to simplified, single-instance
  * vector clock that must progress linearly forward.
  */
-case class OffsetSeq(offsets: Seq[Option[Offset]]) {
+case class OffsetSeq(offsets: Seq[Option[Offset]], metadata: Option[String] = None) {
 
   /**
    * Unpacks an offset into [[StreamProgress]] by associating each offset with the order list of
@@ -47,7 +47,13 @@ object OffsetSeq {
    * Returns a [[OffsetSeq]] with a variable sequence of offsets.
    * `nulls` in the sequence are converted to `None`s.
    */
-  def fill(offsets: Offset*): OffsetSeq = {
-    OffsetSeq(offsets.map(Option(_)))
+  def fill(offsets: Offset*): OffsetSeq = OffsetSeq.fill(None, offsets: _*)
+
+  /**
+   * Returns a [[OffsetSeq]] with metadata and a variable sequence of offsets.
+   * `nulls` in the sequence are converted to `None`s.
+   */
+  def fill(metadata: Option[String], offsets: Offset*): OffsetSeq = {
+    OffsetSeq(offsets.map(Option(_)), metadata)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/51baca22/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
index d1c9d95..cc25b44 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
@@ -33,12 +33,13 @@ import org.apache.spark.sql.SparkSession
  * by a newline character. If a source offset is missing, then
  * that line will contain a string value defined in the
  * SERIALIZED_VOID_OFFSET variable in [[OffsetSeqLog]] companion object.
- * For instance, when dealine wiht [[LongOffset]] types:
- *   v1   // version 1
- *   {0}  // LongOffset 0
- *   {3}  // LongOffset 3
- *   -    // No offset for this source i.e., an invalid JSON string
- *   {2}  // LongOffset 2
+ * For instance, when dealing with [[LongOffset]] types:
+ *   v1        // version 1
+ *   metadata
+ *   {0}       // LongOffset 0
+ *   {3}       // LongOffset 3
+ *   -         // No offset for this source i.e., an invalid JSON string
+ *   {2}       // LongOffset 2
  *   ...
  */
 class OffsetSeqLog(sparkSession: SparkSession, path: String)
@@ -58,13 +59,25 @@ class OffsetSeqLog(sparkSession: SparkSession, path: String)
     if (version != OffsetSeqLog.VERSION) {
       throw new IllegalStateException(s"Unknown log version: ${version}")
     }
-    OffsetSeq.fill(lines.map(parseOffset).toArray: _*)
+
+    // read metadata
+    val metadata = lines.next().trim match {
+      case "" => None
+      case md => Some(md)
+    }
+    OffsetSeq.fill(metadata, lines.map(parseOffset).toArray: _*)
   }
 
-  override protected def serialize(metadata: OffsetSeq, out: OutputStream): Unit = {
+  override protected def serialize(offsetSeq: OffsetSeq, out: OutputStream): Unit = {
     // called inside a try-finally where the underlying stream is closed in the caller
     out.write(OffsetSeqLog.VERSION.getBytes(UTF_8))
-    metadata.offsets.map(_.map(_.json)).foreach { offset =>
+
+    // write metadata
+    out.write('\n')
+    out.write(offsetSeq.metadata.getOrElse("").getBytes(UTF_8))
+
+    // write offsets, one per line
+    offsetSeq.offsets.map(_.map(_.json)).foreach { offset =>
       out.write('\n')
       offset match {
         case Some(json: String) => out.write(json.getBytes(UTF_8))

http://git-wip-us.apache.org/repos/asf/spark/blob/51baca22/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
new file mode 100644
index 0000000..2cd2157
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.SparkFunSuite
+
+class CompactibleFileStreamLogSuite extends SparkFunSuite {
+
+  import CompactibleFileStreamLog._
+
+  test("deriveCompactInterval") {
+    // latestCompactBatchId(4) + 1 <= default(5)
+    // then use latestestCompactBatchId + 1 === 5
+    assert(5 === deriveCompactInterval(5, 4))
+    // First divisor of 10 greater than 4 === 5
+    assert(5 === deriveCompactInterval(4, 9))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/51baca22/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index b365af7..a099153 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.streaming
 
 import java.io.File
 
+import scala.collection.mutable
+
 import org.scalatest.PrivateMethodTester
 import org.scalatest.time.SpanSugar._
 
@@ -896,32 +898,38 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
     }
   }
 
-  test("compacat metadata log") {
+  test("compact interval metadata log") {
     val _sources = PrivateMethod[Seq[Source]]('sources)
     val _metadataLog = PrivateMethod[FileStreamSourceLog]('metadataLog)
 
-    def verify(execution: StreamExecution)
-      (batchId: Long, expectedBatches: Int): Boolean = {
+    def verify(
+        execution: StreamExecution,
+        batchId: Long,
+        expectedBatches: Int,
+        expectedCompactInterval: Int): Boolean = {
       import CompactibleFileStreamLog._
 
       val fileSource = (execution invokePrivate _sources()).head.asInstanceOf[FileStreamSource]
       val metadataLog = fileSource invokePrivate _metadataLog()
 
-      if (isCompactionBatch(batchId, 2)) {
+      if (isCompactionBatch(batchId, expectedCompactInterval)) {
         val path = metadataLog.batchIdToPath(batchId)
 
         // Assert path name should be ended with compact suffix.
-        assert(path.getName.endsWith(COMPACT_FILE_SUFFIX))
+        assert(path.getName.endsWith(COMPACT_FILE_SUFFIX),
+          "path does not end with compact file suffix")
 
         // Compacted batch should include all entries from start.
         val entries = metadataLog.get(batchId)
-        assert(entries.isDefined)
-        assert(entries.get.length === metadataLog.allFiles().length)
-        assert(metadataLog.get(None, Some(batchId)).flatMap(_._2).length === entries.get.length)
+        assert(entries.isDefined, "Entries not defined")
+        assert(entries.get.length === metadataLog.allFiles().length, "clean up check")
+        assert(metadataLog.get(None, Some(batchId)).flatMap(_._2).length ===
+          entries.get.length, "Length check")
       }
 
       assert(metadataLog.allFiles().sortBy(_.batchId) ===
-        metadataLog.get(None, Some(batchId)).flatMap(_._2).sortBy(_.batchId))
+        metadataLog.get(None, Some(batchId)).flatMap(_._2).sortBy(_.batchId),
+        "Batch id mismatch")
 
       metadataLog.get(None, Some(batchId)).flatMap(_._2).length === expectedBatches
     }
@@ -932,26 +940,27 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
       ) {
         val fileStream = createFileStream("text", src.getCanonicalPath)
         val filtered = fileStream.filter($"value" contains "keep")
+        val updateConf = Map(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "5")
 
         testStream(filtered)(
           AddTextFileData("drop1\nkeep2\nkeep3", src, tmp),
           CheckAnswer("keep2", "keep3"),
-          AssertOnQuery(verify(_)(0L, 1)),
+          AssertOnQuery(verify(_, 0L, 1, 2)),
           AddTextFileData("drop4\nkeep5\nkeep6", src, tmp),
           CheckAnswer("keep2", "keep3", "keep5", "keep6"),
-          AssertOnQuery(verify(_)(1L, 2)),
+          AssertOnQuery(verify(_, 1L, 2, 2)),
           AddTextFileData("drop7\nkeep8\nkeep9", src, tmp),
           CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9"),
-          AssertOnQuery(verify(_)(2L, 3)),
+          AssertOnQuery(verify(_, 2L, 3, 2)),
           StopStream,
-          StartStream(),
-          AssertOnQuery(verify(_)(2L, 3)),
+          StartStream(additionalConfs = updateConf),
+          AssertOnQuery(verify(_, 2L, 3, 2)),
           AddTextFileData("drop10\nkeep11", src, tmp),
           CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9", "keep11"),
-          AssertOnQuery(verify(_)(3L, 4)),
+          AssertOnQuery(verify(_, 3L, 4, 2)),
           AddTextFileData("drop12\nkeep13", src, tmp),
           CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9", "keep11", "keep13"),
-          AssertOnQuery(verify(_)(4L, 5))
+          AssertOnQuery(verify(_, 4L, 5, 2))
         )
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/51baca22/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 7428330..a6b2d4b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -161,7 +161,8 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
   /** Starts the stream, resuming if data has already been processed. It must not be running. */
   case class StartStream(
       trigger: Trigger = ProcessingTime(0),
-      triggerClock: Clock = new SystemClock)
+      triggerClock: Clock = new SystemClock,
+      additionalConfs: Map[String, String] = Map.empty)
     extends StreamAction
 
   /** Advance the trigger clock's time manually. */
@@ -240,6 +241,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
     var lastStream: StreamExecution = null
     val awaiting = new mutable.HashMap[Int, Offset]() // source index -> offset to wait for
     val sink = new MemorySink(stream.schema, outputMode)
+    val resetConfValues = mutable.Map[String, Option[String]]()
 
     @volatile
     var streamDeathCause: Throwable = null
@@ -330,7 +332,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
       startedTest.foreach { action =>
         logInfo(s"Processing test stream action: $action")
         action match {
-          case StartStream(trigger, triggerClock) =>
+          case StartStream(trigger, triggerClock, additionalConfs) =>
             verify(currentStream == null, "stream already running")
             verify(triggerClock.isInstanceOf[SystemClock]
               || triggerClock.isInstanceOf[StreamManualClock],
@@ -338,6 +340,14 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
             if (triggerClock.isInstanceOf[StreamManualClock]) {
               manualClockExpectedTime = triggerClock.asInstanceOf[StreamManualClock].getTimeMillis()
             }
+
+            additionalConfs.foreach(pair => {
+              val value =
+                if (spark.conf.contains(pair._1)) Some(spark.conf.get(pair._1)) else None
+              resetConfValues(pair._1) = value
+              spark.conf.set(pair._1, pair._2)
+            })
+
             lastStream = currentStream
             currentStream =
               spark
@@ -519,6 +529,12 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
         currentStream.stop()
       }
       spark.streams.removeListener(statusCollector)
+
+      // Rollback prev configuration values
+      resetConfValues.foreach {
+        case (key, Some(value)) => spark.conf.set(key, value)
+        case (key, None) => spark.conf.unset(key)
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org