You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/10/27 00:33:11 UTC

spark git commit: [SPARK-16963][STREAMING][SQL] Changes to Source trait and related implementation classes

Repository: spark
Updated Branches:
  refs/heads/master a76846cfb -> 5b27598ff


[SPARK-16963][STREAMING][SQL] Changes to Source trait and related implementation classes

## What changes were proposed in this pull request?

This PR contains changes to the Source trait such that the scheduler can notify data sources when it is safe to discard buffered data. Summary of changes:
* Added a method `commit(end: Offset)` that tells the Source that is OK to discard all offsets up `end`, inclusive.
* Changed the semantics of a `None` value for the `getBatch` method to mean "from the very beginning of the stream"; as opposed to "all data present in the Source's buffer".
* Added notes that the upper layers of the system will never call `getBatch` with a start value less than the last value passed to `commit`.
* Added a `lastCommittedOffset` method to allow the scheduler to query the status of each Source on restart. This addition is not strictly necessary, but it seemed like a good idea -- Sources will be maintaining their own persistent state, and there may be bugs in the checkpointing code.
* The scheduler in `StreamExecution.scala` now calls `commit` on its stream sources after marking each batch as complete in its checkpoint.
* `MemoryStream` now cleans committed batches out of its internal buffer.
* `TextSocketSource` now cleans committed batches from its internal buffer.

## How was this patch tested?
Existing regression tests already exercise the new code.

Author: frreiss <fr...@us.ibm.com>

Closes #14553 from frreiss/fred-16963.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5b27598f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5b27598f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5b27598f

Branch: refs/heads/master
Commit: 5b27598ff50cb08e7570fade458da0a3d4d4eabc
Parents: a76846c
Author: frreiss <fr...@us.ibm.com>
Authored: Wed Oct 26 17:33:08 2016 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Wed Oct 26 17:33:08 2016 -0700

----------------------------------------------------------------------
 .../execution/streaming/FileStreamSource.scala  |  9 +++
 .../spark/sql/execution/streaming/Source.scala  | 22 ++++--
 .../execution/streaming/StreamExecution.scala   | 32 ++++++---
 .../spark/sql/execution/streaming/memory.scala  | 47 +++++++++++--
 .../spark/sql/execution/streaming/socket.scala  | 72 ++++++++++++++++----
 .../sql/streaming/StreamingQuerySuite.scala     |  8 +--
 6 files changed, 154 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5b27598f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 115edf7..a392b82 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -176,6 +176,15 @@ class FileStreamSource(
 
   override def toString: String = s"FileStreamSource[$qualifiedBasePath]"
 
+  /**
+   * Informs the source that Spark has completed processing all data for offsets less than or
+   * equal to `end` and will only request offsets greater than `end` in the future.
+   */
+  override def commit(end: Offset): Unit = {
+    // No-op for now; FileStreamSource currently garbage-collects files based on timestamp
+    // and the value of the maxFileAge parameter.
+  }
+
   override def stop() {}
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5b27598f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
index 9711478..f3bd5bf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
@@ -30,16 +30,30 @@ trait Source  {
   /** Returns the schema of the data from this source */
   def schema: StructType
 
-  /** Returns the maximum available offset for this source. */
+  /**
+   * Returns the maximum available offset for this source.
+   * Returns `None` if this source has never received any data.
+   */
   def getOffset: Option[Offset]
 
   /**
-   * Returns the data that is between the offsets (`start`, `end`]. When `start` is `None` then
-   * the batch should begin with the first available record. This method must always return the
-   * same data for a particular `start` and `end` pair.
+   * Returns the data that is between the offsets (`start`, `end`]. When `start` is `None`,
+   * then the batch should begin with the first record. This method must always return the
+   * same data for a particular `start` and `end` pair; even after the Source has been restarted
+   * on a different node.
+   *
+   * Higher layers will always call this method with a value of `start` greater than or equal
+   * to the last value passed to `commit` and a value of `end` less than or equal to the
+   * last value returned by `getOffset`
    */
   def getBatch(start: Option[Offset], end: Offset): DataFrame
 
+  /**
+   * Informs the source that Spark has completed processing all data for offsets less than or
+   * equal to `end` and will only request offsets greater than `end` in the future.
+   */
+  def commit(end: Offset) : Unit = {}
+
   /** Stop this source and free any resources it has allocated. */
   def stop(): Unit
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/5b27598f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index ba8cf80..37af1a5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -73,6 +73,9 @@ class StreamExecution(
   /**
    * Tracks how much data we have processed and committed to the sink or state store from each
    * input source.
+   * Only the scheduler thread should modify this field, and only in atomic steps.
+   * Other threads should make a shallow copy if they are going to access this field more than
+   * once, since the field's value may change at any time.
    */
   @volatile
   var committedOffsets = new StreamProgress
@@ -80,6 +83,9 @@ class StreamExecution(
   /**
    * Tracks the offsets that are available to be processed, but have not yet be committed to the
    * sink.
+   * Only the scheduler thread should modify this field, and only in atomic steps.
+   * Other threads should make a shallow copy if they are going to access this field more than
+   * once, since the field's value may change at any time.
    */
   @volatile
   private var availableOffsets = new StreamProgress
@@ -337,17 +343,27 @@ class StreamExecution(
     }
     if (hasNewData) {
       reportTimeTaken(OFFSET_WAL_WRITE_LATENCY) {
-        assert(
-          offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)),
+        assert(offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)),
           s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
         logInfo(s"Committed offsets for batch $currentBatchId.")
 
+        // NOTE: The following code is correct because runBatches() processes exactly one
+        // batch at a time. If we add pipeline parallelism (multiple batches in flight at
+        // the same time), this cleanup logic will need to change.
+
+        // Now that we've updated the scheduler's persistent checkpoint, it is safe for the
+        // sources to discard data from the previous batch.
+        val prevBatchOff = offsetLog.get(currentBatchId - 1)
+        if (prevBatchOff.isDefined) {
+          prevBatchOff.get.toStreamProgress(sources).foreach {
+            case (src, off) => src.commit(off)
+          }
+        }
+
         // Now that we have logged the new batch, no further processing will happen for
-        // the previous batch, and it is safe to discard the old metadata.
-        // Note that purge is exclusive, i.e. it purges everything before currentBatchId.
-        // NOTE: If StreamExecution implements pipeline parallelism (multiple batches in
-        // flight at the same time), this cleanup logic will need to change.
-        offsetLog.purge(currentBatchId)
+        // the batch before the previous batch, and it is safe to discard the old metadata.
+        // Note that purge is exclusive, i.e. it purges everything before the target ID.
+        offsetLog.purge(currentBatchId - 1)
       }
     } else {
       awaitBatchLock.lock()
@@ -455,7 +471,7 @@ class StreamExecution(
 
   /**
    * Blocks the current thread until processing for data from the given `source` has reached at
-   * least the given `Offset`. This method is indented for use primarily when writing tests.
+   * least the given `Offset`. This method is intended for use primarily when writing tests.
    */
   private[sql] def awaitOffset(source: Source, newOffset: Offset): Unit = {
     def notDone = {

http://git-wip-us.apache.org/repos/asf/spark/blob/5b27598f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index 788fcd0..48d9791 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.streaming
 import java.util.concurrent.atomic.AtomicInteger
 import javax.annotation.concurrent.GuardedBy
 
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 import scala.util.control.NonFatal
 
 import org.apache.spark.internal.Logging
@@ -51,12 +51,23 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
   protected val logicalPlan = StreamingExecutionRelation(this)
   protected val output = logicalPlan.output
 
+  /**
+   * All batches from `lastCommittedOffset + 1` to `currentOffset`, inclusive.
+   * Stored in a ListBuffer to facilitate removing committed batches.
+   */
   @GuardedBy("this")
-  protected val batches = new ArrayBuffer[Dataset[A]]
+  protected val batches = new ListBuffer[Dataset[A]]
 
   @GuardedBy("this")
   protected var currentOffset: LongOffset = new LongOffset(-1)
 
+  /**
+   * Last offset that was discarded, or -1 if no commits have occurred. Note that the value
+   * -1 is used in calculations below and isn't just an arbitrary constant.
+   */
+  @GuardedBy("this")
+  protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)
+
   def schema: StructType = encoder.schema
 
   def toDS()(implicit sqlContext: SQLContext): Dataset[A] = {
@@ -85,21 +96,25 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
   override def toString: String = s"MemoryStream[${Utils.truncatedString(output, ",")}]"
 
   override def getOffset: Option[Offset] = synchronized {
-    if (batches.isEmpty) {
+    if (currentOffset.offset == -1) {
       None
     } else {
       Some(currentOffset)
     }
   }
 
-  /**
-   * Returns the data that is between the offsets (`start`, `end`].
-   */
   override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+    // Compute the internal batch numbers to fetch: [startOrdinal, endOrdinal)
     val startOrdinal =
       start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1
     val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1
-    val newBlocks = synchronized { batches.slice(startOrdinal, endOrdinal) }
+
+    // Internal buffer only holds the batches after lastCommittedOffset.
+    val newBlocks = synchronized {
+      val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1
+      val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1
+      batches.slice(sliceStart, sliceEnd)
+    }
 
     logDebug(
       s"MemoryBatch [$startOrdinal, $endOrdinal]: ${newBlocks.flatMap(_.collect()).mkString(", ")}")
@@ -111,11 +126,29 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
       }
   }
 
+  override def commit(end: Offset): Unit = synchronized {
+    end match {
+      case newOffset: LongOffset =>
+        val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
+
+        if (offsetDiff < 0) {
+          sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")
+        }
+
+        batches.trimStart(offsetDiff)
+        lastOffsetCommitted = newOffset
+      case _ =>
+        sys.error(s"MemoryStream.commit() received an offset ($end) that did not originate with " +
+          "an instance of this class")
+    }
+  }
+
   override def stop() {}
 
   def reset(): Unit = synchronized {
     batches.clear()
     currentOffset = new LongOffset(-1)
+    lastOffsetCommitted = new LongOffset(-1)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5b27598f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
index fb15239..c662e7c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
@@ -24,14 +24,15 @@ import java.text.SimpleDateFormat
 import java.util.Calendar
 import javax.annotation.concurrent.GuardedBy
 
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.ListBuffer
 import scala.util.{Failure, Success, Try}
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext}
+import org.apache.spark.sql._
 import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
 import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
 
+
 object TextSocketSource {
   val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
   val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) ::
@@ -53,8 +54,18 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo
   @GuardedBy("this")
   private var readThread: Thread = null
 
+  /**
+   * All batches from `lastCommittedOffset + 1` to `currentOffset`, inclusive.
+   * Stored in a ListBuffer to facilitate removing committed batches.
+   */
+  @GuardedBy("this")
+  protected val batches = new ListBuffer[(String, Timestamp)]
+
+  @GuardedBy("this")
+  protected var currentOffset: LongOffset = new LongOffset(-1)
+
   @GuardedBy("this")
-  private var lines = new ArrayBuffer[(String, Timestamp)]
+  protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)
 
   initialize()
 
@@ -74,10 +85,12 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo
               return
             }
             TextSocketSource.this.synchronized {
-              lines += ((line,
+              val newData = (line,
                 Timestamp.valueOf(
                   TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime()))
-                ))
+                )
+              currentOffset = currentOffset + 1
+              batches.append(newData)
             }
           }
         } catch {
@@ -92,21 +105,54 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo
   override def schema: StructType = if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP
   else TextSocketSource.SCHEMA_REGULAR
 
-  /** Returns the maximum available offset for this source. */
   override def getOffset: Option[Offset] = synchronized {
-    if (lines.isEmpty) None else Some(LongOffset(lines.size - 1))
+    if (currentOffset.offset == -1) {
+      None
+    } else {
+      Some(currentOffset)
+    }
   }
 
   /** Returns the data that is between the offsets (`start`, `end`]. */
   override def getBatch(start: Option[Offset], end: Offset): DataFrame = synchronized {
-    val startIdx = start.map(_.asInstanceOf[LongOffset].offset.toInt + 1).getOrElse(0)
-    val endIdx = end.asInstanceOf[LongOffset].offset.toInt + 1
-    val data = synchronized { lines.slice(startIdx, endIdx) }
+    val startOrdinal =
+      start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1
+    val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1
+
+    // Internal buffer only holds the batches after lastOffsetCommitted
+    val rawList = synchronized {
+      val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1
+      val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1
+      batches.slice(sliceStart, sliceEnd)
+    }
+
     import sqlContext.implicits._
+    val rawBatch = sqlContext.createDataset(rawList)
+
+    // Underlying MemoryStream has schema (String, Timestamp); strip out the timestamp
+    // if requested.
     if (includeTimestamp) {
-      data.toDF("value", "timestamp")
+      rawBatch.toDF("value", "timestamp")
+    } else {
+      // Strip out timestamp
+      rawBatch.select("_1").toDF("value")
+    }
+  }
+
+  override def commit(end: Offset): Unit = synchronized {
+    if (end.isInstanceOf[LongOffset]) {
+      val newOffset = end.asInstanceOf[LongOffset]
+      val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
+
+      if (offsetDiff < 0) {
+        sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")
+      }
+
+      batches.trimStart(offsetDiff)
+      lastOffsetCommitted = newOffset
     } else {
-      data.map(_._1).toDF("value")
+      sys.error(s"TextSocketStream.commit() received an offset ($end) that did not " +
+        s"originate with an instance of this class")
     }
   }
 
@@ -141,7 +187,7 @@ class TextSocketSourceProvider extends StreamSourceProvider with DataSourceRegis
       providerName: String,
       parameters: Map[String, String]): (String, StructType) = {
     logWarning("The socket source should not be used for production applications! " +
-      "It does not support recovery and stores state indefinitely.")
+      "It does not support recovery.")
     if (!parameters.contains("host")) {
       throw new AnalysisException("Set a host to read from with option(\"host\", ...).")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/5b27598f/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 92020be..dad4104 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -252,8 +252,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
     val inputData = MemoryStream[Int]
     val mapped = inputData.toDS().map(6 / _)
 
-    // Run 3 batches, and then assert that only 1 metadata file is left at the end
-    // since the first 2 should have been purged.
+    // Run 3 batches, and then assert that only 2 metadata files is are at the end
+    // since the first should have been purged.
     testStream(mapped)(
       AddData(inputData, 1, 2),
       CheckAnswer(6, 3),
@@ -262,11 +262,11 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
       AddData(inputData, 4, 6),
       CheckAnswer(6, 3, 6, 3, 1, 1),
 
-      AssertOnQuery("metadata log should contain only one file") { q =>
+      AssertOnQuery("metadata log should contain only two files") { q =>
         val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString)
         val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName())
         val toTest = logFileNames.filter(! _.endsWith(".crc"))  // Workaround for SPARK-17475
-        assert(toTest.size == 1 && toTest.head == "2")
+        assert(toTest.size == 2 && toTest.head == "1")
         true
       }
     )


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org