You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by marmbrus <gi...@git.apache.org> on 2016/03/18 01:09:22 UTC

[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

GitHub user marmbrus opened a pull request:

    https://github.com/apache/spark/pull/11804

    [SPARK-13985][SQL] Deterministic batches with ids

    This PR relaxes the requirements of a `Sink` for structured streaming to only require idempotent appending of data.  Previously the `Sink` needed to be able to transactionally append data while recording an opaque offset indicated how far in a stream we have processed.
    
    In order to do this, a new write-ahead-log has been added to stream execution, which records the offsets that will are present in each batch.  The log is created in the newly added `checkpointLocation`, which defaults to `${spark.sql.streaming.checkpointLocation}/${queryName}` but can be overriden by setting `checkpointLocation` in `DataFrameWriter`.
    
    In addition to making sinks easier to write the addition of batchIds and a checkpoint location is done in anticipation of integration with the the `StateStore` (#11645).

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/marmbrus/spark batchIds

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/11804.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #11804
    
----
commit 53ba226e97cc1b216b3333239042f53a74bb13f6
Author: Michael Armbrust <mi...@databricks.com>
Date:   2016-03-17T00:14:09Z

    [SPARK-13985][SQL] Deterministic batches with ids

commit 97503f1b45d45c4e4524cf43853e9faab43d0032
Author: Michael Armbrust <mi...@databricks.com>
Date:   2016-03-17T23:58:34Z

    cleanup

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11804#discussion_r56740325
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -118,9 +148,10 @@ class StreamExecution(
           // While active, repeatedly attempt to run batches.
           SQLContext.setActive(sqlContext)
           populateStartOffsets()
    -      logInfo(s"Stream running at $streamProgress")
    +      logDebug(s"Stream running from $committedOffsets to $availableOffsets")
           while (isActive) {
    -        attemptBatch()
    +        if (dataAvailable) attemptBatch()
    --- End diff --
    
    Maybe rename attemptBatch to runBatch because it actually runs the batch instead of running it only if data is available.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11804#discussion_r56929651
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala ---
    @@ -17,31 +17,19 @@
     
     package org.apache.spark.sql.execution.streaming
     
    +import org.apache.spark.sql.DataFrame
    +
     /**
    - * An interface for systems that can collect the results of a streaming query.
    - *
    - * When new data is produced by a query, a [[Sink]] must be able to transactionally collect the
    - * data and update the [[Offset]]. In the case of a failure, the sink will be recreated
    - * and must be able to return the [[Offset]] for all of the data that is made durable.
    - * This contract allows Spark to process data with exactly-once semantics, even in the case
    - * of failures that require the computation to be restarted.
    + * An interface for systems that can collect the results of a streaming query. In order to preserve
    + * exactly once semantics a sink must be idempotent in the face of multiple attempts to add the same
    --- End diff --
    
    ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11804#issuecomment-198568970
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/53548/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11804#discussion_r56908363
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala ---
    @@ -29,8 +30,13 @@ trait Source  {
       /** Returns the schema of the data from this source */
       def schema: StructType
     
    +  /** Returns the maximum available offset for this source. */
    +  def getOffset: Option[Offset]
    +
       /**
    -   * Returns the next batch of data that is available after `start`, if any is available.
    +   * Returns the data that is is between the offsets (`start`, `end`].  When `start` is `None` then
    +   * the batch should begin with the first available record.  This method must always return the
    --- End diff --
    
    That actually raises an issue.  The start offset needs to be deterministic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11804#issuecomment-199649812
  
    **[Test build #53748 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53748/consoleFull)** for PR 11804 at commit [`042d969`](https://github.com/apache/spark/commit/042d969eee0c5c3f6857cd0422053ba63ce8a39f).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11804#discussion_r56732864
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -142,22 +173,75 @@ class StreamExecution(
     
       /**
        * Populate the start offsets to start the execution at the current offsets stored in the sink
    -   * (i.e. avoid reprocessing data that we have already processed).
    +   * (i.e. avoid reprocessing data that we have already processed).  This function must be called
    +   * before any processing occurs and will populate the following fields:
    +   *  - currentBatchId
    +   *  - committedOffsets
    +   *  - availableOffsets
        */
       private def populateStartOffsets(): Unit = {
    -    sink.currentOffset match {
    -      case Some(c: CompositeOffset) =>
    -        val storedProgress = c.offsets
    -        val sources = logicalPlan collect {
    -          case StreamingRelation(source, _) => source
    +    offsetLog.getLatest() match {
    +      case Some((batchId, nextOffsets: CompositeOffset)) =>
    +        logInfo(s"Resuming continuous query, starting with batch $batchId")
    +        currentBatchId = batchId + 1
    +        nextOffsets.toStreamProgress(sources, availableOffsets)
    +        logDebug(s"Found possibly uncommitted offsets $availableOffsets")
    +
    +        offsetLog.get(batchId - 1).foreach {
    +          case lastOffsets: CompositeOffset =>
    +            lastOffsets.toStreamProgress(sources, committedOffsets)
    +            logDebug(s"Resuming with committed offsets: $committedOffsets")
             }
     
    -        assert(sources.size == storedProgress.size)
    -        sources.zip(storedProgress).foreach { case (source, offset) =>
    -          offset.foreach(streamProgress.update(source, _))
    -        }
           case None => // We are starting this stream for the first time.
    -      case _ => throw new IllegalArgumentException("Expected composite offset from sink")
    +        logInfo(s"Starting new continuous query.")
    +        currentBatchId = 0
    +        commitAndConstructNextBatch()
    +
    +      case Some((_, offset)) =>
    +        sys.error(s"Invalid offset $offset")
    +    }
    +  }
    +
    +  /**
    +   * Returns true if there is any new data available to be processed.
    +   */
    +  def dataAvailable: Boolean = {
    +    availableOffsets.exists {
    +      case (source, available) =>
    +        committedOffsets
    +            .get(source)
    +            .map(committed => committed < available)
    +            .getOrElse(true)
    +    }
    +  }
    +
    +  /**
    +   * Queries all of the sources to see if any new data is available. When there is new data the
    +   * batchId counter is incremented and a new log entry is written with the newest offsets.
    +   *
    +   * Note that committing the offsets for a new batch implicitly marks the previous batch as
    +   * finished and thus this method should only be called when all currently available data
    +   * has been written to the sink.
    +   */
    +  def commitAndConstructNextBatch(): Boolean = committedOffsets.synchronized {
    --- End diff --
    
    why not private?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11804#discussion_r56868269
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -142,22 +173,75 @@ class StreamExecution(
     
       /**
        * Populate the start offsets to start the execution at the current offsets stored in the sink
    -   * (i.e. avoid reprocessing data that we have already processed).
    +   * (i.e. avoid reprocessing data that we have already processed).  This function must be called
    +   * before any processing occurs and will populate the following fields:
    +   *  - currentBatchId
    +   *  - committedOffsets
    +   *  - availableOffsets
        */
       private def populateStartOffsets(): Unit = {
    -    sink.currentOffset match {
    -      case Some(c: CompositeOffset) =>
    -        val storedProgress = c.offsets
    -        val sources = logicalPlan collect {
    -          case StreamingRelation(source, _) => source
    +    offsetLog.getLatest() match {
    +      case Some((batchId, nextOffsets: CompositeOffset)) =>
    +        logInfo(s"Resuming continuous query, starting with batch $batchId")
    +        currentBatchId = batchId + 1
    +        nextOffsets.toStreamProgress(sources, availableOffsets)
    +        logDebug(s"Found possibly uncommitted offsets $availableOffsets")
    +
    +        offsetLog.get(batchId - 1).foreach {
    +          case lastOffsets: CompositeOffset =>
    +            lastOffsets.toStreamProgress(sources, committedOffsets)
    +            logDebug(s"Resuming with committed offsets: $committedOffsets")
             }
     
    -        assert(sources.size == storedProgress.size)
    -        sources.zip(storedProgress).foreach { case (source, offset) =>
    -          offset.foreach(streamProgress.update(source, _))
    -        }
           case None => // We are starting this stream for the first time.
    -      case _ => throw new IllegalArgumentException("Expected composite offset from sink")
    +        logInfo(s"Starting new continuous query.")
    +        currentBatchId = 0
    +        commitAndConstructNextBatch()
    +
    +      case Some((_, offset)) =>
    --- End diff --
    
    How about changing the type of `offsetLog` to `HDFSMetadataLog[CompositeOffset]`? Then you don't need to add this case branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/11804#issuecomment-199631156
  
    > Test build #53724 has finished for PR 11804 at commit 636e75c.
    > 
    > This patch fails from timeout after a configured wait of `250m`.
    > This patch merges cleanly.
    > This patch adds the following public classes (experimental):
    > class StreamProgress(
    
    I think the following test was hung forever:
    ```
      test("options") {
        val map = new java.util.HashMap[String, String]
        map.put("opt3", "3")
    
        val df = sqlContext.read
            .format("org.apache.spark.sql.streaming.test")
            .option("opt1", "1")
            .options(Map("opt2" -> "2"))
            .options(map)
            .stream()
    
        assert(LastOptions.parameters("opt1") == "1")
        assert(LastOptions.parameters("opt2") == "2")
        assert(LastOptions.parameters("opt3") == "3")
    
        LastOptions.parameters = null
    
        df.write
          .format("org.apache.spark.sql.streaming.test")
          .option("opt1", "1")
          .options(Map("opt2" -> "2"))
          .options(map)
          .startStream()
          .stop()
    
        assert(LastOptions.parameters("opt1") == "1")
        assert(LastOptions.parameters("opt2") == "2")
        assert(LastOptions.parameters("opt3") == "3")
      }
    ```
    Looks like there is a bug in this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11804#issuecomment-199614603
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/53724/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11804#issuecomment-198593141
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/53559/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/11804#issuecomment-199912261
  
    LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11804#discussion_r56732168
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -74,20 +92,32 @@ class StreamExecution(
         override def run(): Unit = { runBatches() }
       }
     
    +  /**
    +   * A write-ahead-log that records the offsets that are present in each batch. In order to ensure
    +   * that a given batch will always consist of the same data, we write to this log *before* any
    +   * processing is done.  Thus, the Nth record in this log indicated data that is currently being
    +   * processed and the N-1th entry indicates which offsets have been durably committed to the sink.
    +   */
    +  val offsetLog = new HDFSMetadataLog[Offset](sqlContext, metadataDirectory("offsets"))
    +
       /** Whether the query is currently active or not */
       override def isActive: Boolean = state == ACTIVE
     
       /** Returns current status of all the sources. */
       override def sourceStatuses: Array[SourceStatus] = {
    -    sources.map(s => new SourceStatus(s.toString, streamProgress.get(s))).toArray
    +    sources.map(s => new SourceStatus(s.toString, availableOffsets.get(s))).toArray
       }
     
       /** Returns current status of the sink. */
    -  override def sinkStatus: SinkStatus = new SinkStatus(sink.toString, sink.currentOffset)
    +  override def sinkStatus: SinkStatus =
    +    new SinkStatus(sink.toString, committedOffsets.toCompositeOffset(sources))
     
       /** Returns the [[ContinuousQueryException]] if the query was terminated by an exception. */
       override def exception: Option[ContinuousQueryException] = Option(streamDeathCause)
     
    +  private def metadataDirectory(name: String): String =
    --- End diff --
    
    what name is this? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11804#discussion_r56730518
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -74,20 +92,32 @@ class StreamExecution(
         override def run(): Unit = { runBatches() }
       }
     
    +  /**
    +   * A write-ahead-log that records the offsets that are present in each batch. In order to ensure
    +   * that a given batch will always consist of the same data, we write to this log *before* any
    +   * processing is done.  Thus, the Nth record in this log indicated data that is currently being
    +   * processed and the N-1th entry indicates which offsets have been durably committed to the sink.
    +   */
    +  val offsetLog = new HDFSMetadataLog[Offset](sqlContext, metadataDirectory("offsets"))
    --- End diff --
    
    why not private? For debugging?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11804#discussion_r56866005
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -52,13 +55,28 @@ class StreamExecution(
       /** Minimum amount of time in between the start of each batch. */
       private val minBatchTime = 10
     
    -  /** Tracks how much data we have processed from each input source. */
    -  private[sql] val streamProgress = new StreamProgress
    +  /**
    +   * Tracks how much data we have processed and committed to the sink or state store from each
    +   * input source.
    +   */
    +  private[sql] val committedOffsets = new StreamProgress
    --- End diff --
    
    We need some protection for accessing committedOffsets since StreamProgress is not thread-safe. How about providing a getter method that is protected by synchronized and returns a clone `StreamProgress`? Since this is only used in tests, we don't need to worry about the overhead of `clone`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11804#issuecomment-199648042
  
    **[Test build #2659 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2659/consoleFull)** for PR 11804 at commit [`3fc2fe2`](https://github.com/apache/spark/commit/3fc2fe21c5b61508a6038f60b1c5e6399a1585c7).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11804#issuecomment-199521305
  
    **[Test build #53721 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53721/consoleFull)** for PR 11804 at commit [`3c893a3`](https://github.com/apache/spark/commit/3c893a30dba2dfbaf9446e5f186edc81b4917467).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11804#issuecomment-199678517
  
    **[Test build #2657 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2657/consoleFull)** for PR 11804 at commit [`3fc2fe2`](https://github.com/apache/spark/commit/3fc2fe21c5b61508a6038f60b1c5e6399a1585c7).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11804#discussion_r56925645
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala ---
    @@ -17,31 +17,19 @@
     
     package org.apache.spark.sql.execution.streaming
     
    +import org.apache.spark.sql.DataFrame
    +
     /**
    - * An interface for systems that can collect the results of a streaming query.
    - *
    - * When new data is produced by a query, a [[Sink]] must be able to transactionally collect the
    - * data and update the [[Offset]]. In the case of a failure, the sink will be recreated
    - * and must be able to return the [[Offset]] for all of the data that is made durable.
    - * This contract allows Spark to process data with exactly-once semantics, even in the case
    - * of failures that require the computation to be restarted.
    + * An interface for systems that can collect the results of a streaming query. In order to preserve
    + * exactly once semantics a sink must be idempotent in the face of multiple attempts to add the same
    --- End diff --
    
    nit:  exactly once semantics <comma>


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11804#issuecomment-199680822
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11804#issuecomment-198141150
  
    **[Test build #53482 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53482/consoleFull)** for PR 11804 at commit [`97503f1`](https://github.com/apache/spark/commit/97503f1b45d45c4e4524cf43853e9faab43d0032).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11804#discussion_r56732415
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -142,22 +173,75 @@ class StreamExecution(
     
       /**
        * Populate the start offsets to start the execution at the current offsets stored in the sink
    -   * (i.e. avoid reprocessing data that we have already processed).
    +   * (i.e. avoid reprocessing data that we have already processed).  This function must be called
    --- End diff --
    
    Double space before "This function"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11804#discussion_r56730359
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala ---
    @@ -29,8 +30,13 @@ trait Source  {
       /** Returns the schema of the data from this source */
       def schema: StructType
     
    +  /** Returns the maximum available offset for this source. */
    +  def getOffset: Option[Offset]
    +
       /**
    -   * Returns the next batch of data that is available after `start`, if any is available.
    +   * Returns the data that is is between the offsets (`start`, `end`].  When `start` is `None` then
    +   * the batch should begin with the first available record.  This method must always return the
    --- End diff --
    
    Shouldnt say first available record. In this case, the source is free to choose the point from which it wants to return the data, first available, or latest available (e.g. Kinesis can choose to return from TRIM_HORIZON, or LATEST)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11804#issuecomment-198525825
  
    **[Test build #53559 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53559/consoleFull)** for PR 11804 at commit [`23c5a05`](https://github.com/apache/spark/commit/23c5a057824db20108a912194469d7930842b8f6).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11804#issuecomment-199614277
  
    **[Test build #53724 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53724/consoleFull)** for PR 11804 at commit [`636e75c`](https://github.com/apache/spark/commit/636e75c3ffdcc07da0c8b9f493fb67bd37b45b68).
     * This patch **fails from timeout after a configured wait of \`250m\`**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class StreamProgress(`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11804#discussion_r56925619
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala ---
    @@ -113,6 +116,7 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String) extends
             try {
               // Try to commit the batch
               // It will fail if there is an existing file (someone has committed the batch)
    +          logInfo(s"Attempting to write log ${batchFile(batchId)}")
    --- End diff --
    
     nit: why is this log info? sounds like logDebug.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11804#discussion_r56869880
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -52,13 +55,28 @@ class StreamExecution(
       /** Minimum amount of time in between the start of each batch. */
       private val minBatchTime = 10
     
    -  /** Tracks how much data we have processed from each input source. */
    -  private[sql] val streamProgress = new StreamProgress
    +  /**
    +   * Tracks how much data we have processed and committed to the sink or state store from each
    +   * input source.
    +   */
    +  private[sql] val committedOffsets = new StreamProgress
    --- End diff --
    
    Yeah, `StreamProgress` has gotten kind of ugly.  I think I'm going to just make it immutable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11804#issuecomment-199679384
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11804#issuecomment-199614597
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11804#issuecomment-198201981
  
    **[Test build #53482 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53482/consoleFull)** for PR 11804 at commit [`97503f1`](https://github.com/apache/spark/commit/97503f1b45d45c4e4524cf43853e9faab43d0032).
     * This patch **fails from timeout after a configured wait of \`250m\`**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11804#discussion_r56734229
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala ---
    @@ -52,6 +52,21 @@ case class CompositeOffset(offsets: Seq[Option[Offset]]) extends Offset {
         case i if i == 0 => 0
         case i if i > 0 => 1
       }
    +
    +  /**
    +   * Unpacks an offset into [[StreamProgress]] by associating each offset with the order list of
    +   * sources.
    +   *
    +   * This method is typically used to associate a serialized offset with actual sources (which
    +   * cannot be serialized).
    +   */
    +  def toStreamProgress(
    +      sources: Seq[Source],
    +      dest: StreamProgress = new StreamProgress): StreamProgress = {
    --- End diff --
    
    Also... toStreamProgress sounds like you are creating a stream progress, where here (as well as in all the usages) you are updating an existing progress. Better to name `updateStreamProgress` and make `dest` not optional.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/11804#issuecomment-199575632
  
    Few minor nits, other than that LGTM. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11804#issuecomment-198202053
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11804#issuecomment-198202055
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/53482/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11804#issuecomment-199647935
  
    **[Test build #2658 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2658/consoleFull)** for PR 11804 at commit [`3fc2fe2`](https://github.com/apache/spark/commit/3fc2fe21c5b61508a6038f60b1c5e6399a1585c7).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/11804


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11804#issuecomment-199552353
  
    **[Test build #53721 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53721/consoleFull)** for PR 11804 at commit [`3c893a3`](https://github.com/apache/spark/commit/3c893a30dba2dfbaf9446e5f186edc81b4917467).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11804#issuecomment-199647700
  
    **[Test build #2657 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2657/consoleFull)** for PR 11804 at commit [`3fc2fe2`](https://github.com/apache/spark/commit/3fc2fe21c5b61508a6038f60b1c5e6399a1585c7).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11804#discussion_r56871895
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala ---
    @@ -29,8 +30,13 @@ trait Source  {
       /** Returns the schema of the data from this source */
       def schema: StructType
     
    +  /** Returns the maximum available offset for this source. */
    +  def getOffset: Option[Offset]
    --- End diff --
    
    `fetchNextOffset`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11804#issuecomment-199552809
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11804#discussion_r56737631
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -52,13 +55,28 @@ class StreamExecution(
       /** Minimum amount of time in between the start of each batch. */
       private val minBatchTime = 10
     
    -  /** Tracks how much data we have processed from each input source. */
    -  private[sql] val streamProgress = new StreamProgress
    +  /**
    +   * Tracks how much data we have processed and committed to the sink or state store from each
    +   * input source.
    +   */
    +  private[sql] val committedOffsets = new StreamProgress
    +
    +  /**
    +   * Tracks the offsets that are available to be processed, but have not yet be committed to the
    +   * sink.
    +   */
    +  private[sql] val availableOffsets = new StreamProgress
    +
    +  /** The current batchId or -1 if execution has not yet been initialized. */
    +  private[sql] var currentBatchId: Long = -1
    --- End diff --
    
    `private[sql]` -> `private`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11804#issuecomment-199678954
  
    **[Test build #53747 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53747/consoleFull)** for PR 11804 at commit [`3fc2fe2`](https://github.com/apache/spark/commit/3fc2fe21c5b61508a6038f60b1c5e6399a1585c7).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/11804#issuecomment-198139963
  
    /cc @tdas @zsxwing 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11804#discussion_r56875523
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala ---
    @@ -29,8 +30,13 @@ trait Source  {
       /** Returns the schema of the data from this source */
       def schema: StructType
     
    +  /** Returns the maximum available offset for this source. */
    +  def getOffset: Option[Offset]
    --- End diff --
    
    Maybe `fetchOffset`? It does not always move to the next offset.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11804#discussion_r56908545
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -52,13 +55,28 @@ class StreamExecution(
       /** Minimum amount of time in between the start of each batch. */
       private val minBatchTime = 10
     
    -  /** Tracks how much data we have processed from each input source. */
    -  private[sql] val streamProgress = new StreamProgress
    +  /**
    +   * Tracks how much data we have processed and committed to the sink or state store from each
    +   * input source.
    +   */
    +  private[sql] val committedOffsets = new StreamProgress
    +
    +  /**
    +   * Tracks the offsets that are available to be processed, but have not yet be committed to the
    +   * sink.
    +   */
    +  private[sql] val availableOffsets = new StreamProgress
    +
    +  /** The current batchId or -1 if execution has not yet been initialized. */
    +  private[sql] var currentBatchId: Long = -1
     
       /** All stream sources present the query plan. */
       private val sources =
         logicalPlan.collect { case s: StreamingRelation => s.source }
     
    +  /** A list of unique sources in the query plan. */
    +  private val uniqueSources = sources.distinct
    --- End diff --
    
    Avoid calling possibly expensive operations like getMaxOffset multiple times if the same source exists more than one in a query.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11804#issuecomment-198593070
  
    **[Test build #53559 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53559/consoleFull)** for PR 11804 at commit [`23c5a05`](https://github.com/apache/spark/commit/23c5a057824db20108a912194469d7930842b8f6).
     * This patch **fails from timeout after a configured wait of \`250m\`**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11804#issuecomment-199648223
  
    **[Test build #2660 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2660/consoleFull)** for PR 11804 at commit [`3fc2fe2`](https://github.com/apache/spark/commit/3fc2fe21c5b61508a6038f60b1c5e6399a1585c7).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11804#discussion_r56732082
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -52,13 +55,28 @@ class StreamExecution(
       /** Minimum amount of time in between the start of each batch. */
       private val minBatchTime = 10
     
    -  /** Tracks how much data we have processed from each input source. */
    -  private[sql] val streamProgress = new StreamProgress
    +  /**
    +   * Tracks how much data we have processed and committed to the sink or state store from each
    +   * input source.
    +   */
    +  private[sql] val committedOffsets = new StreamProgress
    +
    +  /**
    +   * Tracks the offsets that are available to be processed, but have not yet be committed to the
    +   * sink.
    +   */
    +  private[sql] val availableOffsets = new StreamProgress
    +
    +  /** The current batchId or -1 if execution has not yet been initialized. */
    +  private[sql] var currentBatchId: Long = -1
     
       /** All stream sources present the query plan. */
       private val sources =
         logicalPlan.collect { case s: StreamingRelation => s.source }
     
    +  /** A list of unique sources in the query plan. */
    +  private val uniqueSources = sources.distinct
    --- End diff --
    
    What's the purpose of find unique sources.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11804#discussion_r56732843
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -142,22 +173,75 @@ class StreamExecution(
     
       /**
        * Populate the start offsets to start the execution at the current offsets stored in the sink
    -   * (i.e. avoid reprocessing data that we have already processed).
    +   * (i.e. avoid reprocessing data that we have already processed).  This function must be called
    +   * before any processing occurs and will populate the following fields:
    +   *  - currentBatchId
    +   *  - committedOffsets
    +   *  - availableOffsets
        */
       private def populateStartOffsets(): Unit = {
    -    sink.currentOffset match {
    -      case Some(c: CompositeOffset) =>
    -        val storedProgress = c.offsets
    -        val sources = logicalPlan collect {
    -          case StreamingRelation(source, _) => source
    +    offsetLog.getLatest() match {
    +      case Some((batchId, nextOffsets: CompositeOffset)) =>
    +        logInfo(s"Resuming continuous query, starting with batch $batchId")
    +        currentBatchId = batchId + 1
    +        nextOffsets.toStreamProgress(sources, availableOffsets)
    +        logDebug(s"Found possibly uncommitted offsets $availableOffsets")
    +
    +        offsetLog.get(batchId - 1).foreach {
    +          case lastOffsets: CompositeOffset =>
    +            lastOffsets.toStreamProgress(sources, committedOffsets)
    +            logDebug(s"Resuming with committed offsets: $committedOffsets")
             }
     
    -        assert(sources.size == storedProgress.size)
    -        sources.zip(storedProgress).foreach { case (source, offset) =>
    -          offset.foreach(streamProgress.update(source, _))
    -        }
           case None => // We are starting this stream for the first time.
    -      case _ => throw new IllegalArgumentException("Expected composite offset from sink")
    +        logInfo(s"Starting new continuous query.")
    +        currentBatchId = 0
    +        commitAndConstructNextBatch()
    +
    +      case Some((_, offset)) =>
    +        sys.error(s"Invalid offset $offset")
    +    }
    +  }
    +
    +  /**
    +   * Returns true if there is any new data available to be processed.
    +   */
    +  def dataAvailable: Boolean = {
    --- End diff --
    
    why not private?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11804#issuecomment-199678849
  
    **[Test build #2659 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2659/consoleFull)** for PR 11804 at commit [`3fc2fe2`](https://github.com/apache/spark/commit/3fc2fe21c5b61508a6038f60b1c5e6399a1585c7).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11804#issuecomment-199648466
  
    **[Test build #53747 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53747/consoleFull)** for PR 11804 at commit [`3fc2fe2`](https://github.com/apache/spark/commit/3fc2fe21c5b61508a6038f60b1c5e6399a1585c7).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11804#issuecomment-198593140
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11804#issuecomment-198568720
  
    **[Test build #53548 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53548/consoleFull)** for PR 11804 at commit [`97503f1`](https://github.com/apache/spark/commit/97503f1b45d45c4e4524cf43853e9faab43d0032).
     * This patch **fails from timeout after a configured wait of \`250m\`**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11804#issuecomment-199531186
  
    **[Test build #53724 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53724/consoleFull)** for PR 11804 at commit [`636e75c`](https://github.com/apache/spark/commit/636e75c3ffdcc07da0c8b9f493fb67bd37b45b68).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11804#discussion_r56737797
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -142,22 +173,75 @@ class StreamExecution(
     
       /**
        * Populate the start offsets to start the execution at the current offsets stored in the sink
    -   * (i.e. avoid reprocessing data that we have already processed).
    +   * (i.e. avoid reprocessing data that we have already processed).  This function must be called
    +   * before any processing occurs and will populate the following fields:
    +   *  - currentBatchId
    +   *  - committedOffsets
    +   *  - availableOffsets
        */
       private def populateStartOffsets(): Unit = {
    -    sink.currentOffset match {
    -      case Some(c: CompositeOffset) =>
    -        val storedProgress = c.offsets
    -        val sources = logicalPlan collect {
    -          case StreamingRelation(source, _) => source
    +    offsetLog.getLatest() match {
    +      case Some((batchId, nextOffsets: CompositeOffset)) =>
    +        logInfo(s"Resuming continuous query, starting with batch $batchId")
    +        currentBatchId = batchId + 1
    +        nextOffsets.toStreamProgress(sources, availableOffsets)
    +        logDebug(s"Found possibly uncommitted offsets $availableOffsets")
    +
    +        offsetLog.get(batchId - 1).foreach {
    +          case lastOffsets: CompositeOffset =>
    +            lastOffsets.toStreamProgress(sources, committedOffsets)
    +            logDebug(s"Resuming with committed offsets: $committedOffsets")
             }
     
    -        assert(sources.size == storedProgress.size)
    -        sources.zip(storedProgress).foreach { case (source, offset) =>
    -          offset.foreach(streamProgress.update(source, _))
    -        }
           case None => // We are starting this stream for the first time.
    -      case _ => throw new IllegalArgumentException("Expected composite offset from sink")
    +        logInfo(s"Starting new continuous query.")
    +        currentBatchId = 0
    +        commitAndConstructNextBatch()
    +
    +      case Some((_, offset)) =>
    +        sys.error(s"Invalid offset $offset")
    +    }
    +  }
    +
    +  /**
    +   * Returns true if there is any new data available to be processed.
    +   */
    +  def dataAvailable: Boolean = {
    +    availableOffsets.exists {
    +      case (source, available) =>
    +        committedOffsets
    +            .get(source)
    +            .map(committed => committed < available)
    +            .getOrElse(true)
    +    }
    +  }
    +
    +  /**
    +   * Queries all of the sources to see if any new data is available. When there is new data the
    +   * batchId counter is incremented and a new log entry is written with the newest offsets.
    +   *
    +   * Note that committing the offsets for a new batch implicitly marks the previous batch as
    +   * finished and thus this method should only be called when all currently available data
    +   * has been written to the sink.
    +   */
    +  def commitAndConstructNextBatch(): Boolean = committedOffsets.synchronized {
    --- End diff --
    
    private. And why needs `committedOffsets.synchronized`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11804#issuecomment-198568967
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11804#discussion_r56734137
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala ---
    @@ -52,6 +52,21 @@ case class CompositeOffset(offsets: Seq[Option[Offset]]) extends Offset {
         case i if i == 0 => 0
         case i if i > 0 => 1
       }
    +
    +  /**
    +   * Unpacks an offset into [[StreamProgress]] by associating each offset with the order list of
    +   * sources.
    +   *
    +   * This method is typically used to associate a serialized offset with actual sources (which
    +   * cannot be serialized).
    +   */
    +  def toStreamProgress(
    +      sources: Seq[Source],
    +      dest: StreamProgress = new StreamProgress): StreamProgress = {
    --- End diff --
    
    This optional thing is never used. I think its better to not have optional params if its not even exploited.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/11804#issuecomment-198482241
  
    test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11804#discussion_r56913708
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala ---
    @@ -52,6 +52,21 @@ case class CompositeOffset(offsets: Seq[Option[Offset]]) extends Offset {
         case i if i == 0 => 0
         case i if i > 0 => 1
       }
    +
    +  /**
    +   * Unpacks an offset into [[StreamProgress]] by associating each offset with the order list of
    +   * sources.
    +   *
    +   * This method is typically used to associate a serialized offset with actual sources (which
    +   * cannot be serialized).
    +   */
    +  def toStreamProgress(
    +      sources: Seq[Source],
    +      dest: StreamProgress = new StreamProgress): StreamProgress = {
    --- End diff --
    
    I made this all immutable (so it is actually constructing a new one always).  I also got rid of the now unneeded locking.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11804#issuecomment-199676449
  
    **[Test build #2660 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2660/consoleFull)** for PR 11804 at commit [`3fc2fe2`](https://github.com/apache/spark/commit/3fc2fe21c5b61508a6038f60b1c5e6399a1585c7).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11804#issuecomment-199679388
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/53747/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11804#discussion_r56925875
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -142,48 +175,99 @@ class StreamExecution(
     
       /**
        * Populate the start offsets to start the execution at the current offsets stored in the sink
    -   * (i.e. avoid reprocessing data that we have already processed).
    +   * (i.e. avoid reprocessing data that we have already processed). This function must be called
    +   * before any processing occurs and will populate the following fields:
    +   *  - currentBatchId
    +   *  - committedOffsets
    +   *  - availableOffsets
        */
       private def populateStartOffsets(): Unit = {
    -    sink.currentOffset match {
    -      case Some(c: CompositeOffset) =>
    -        val storedProgress = c.offsets
    -        val sources = logicalPlan collect {
    -          case StreamingRelation(source, _) => source
    +    offsetLog.getLatest() match {
    +      case Some((batchId, nextOffsets)) =>
    +        logInfo(s"Resuming continuous query, starting with batch $batchId")
    +        currentBatchId = batchId + 1
    +        availableOffsets = nextOffsets.toStreamProgress(sources)
    +        logDebug(s"Found possibly uncommitted offsets $availableOffsets")
    +
    +        offsetLog.get(batchId - 1).foreach {
    +          case lastOffsets =>
    +            committedOffsets = lastOffsets.toStreamProgress(sources)
    +            logDebug(s"Resuming with committed offsets: $committedOffsets")
             }
     
    -        assert(sources.size == storedProgress.size)
    -        sources.zip(storedProgress).foreach { case (source, offset) =>
    -          offset.foreach(streamProgress.update(source, _))
    -        }
           case None => // We are starting this stream for the first time.
    -      case _ => throw new IllegalArgumentException("Expected composite offset from sink")
    +        logInfo(s"Starting new continuous query.")
    +        currentBatchId = 0
    +        commitAndConstructNextBatch()
         }
       }
     
       /**
    -   * Checks to see if any new data is present in any of the sources. When new data is available,
    -   * a batch is executed and passed to the sink, updating the currentOffsets.
    +   * Returns true if there is any new data available to be processed.
        */
    -  private def attemptBatch(): Unit = {
    +  private def dataAvailable: Boolean = {
    +    availableOffsets.exists {
    +      case (source, available) =>
    +        committedOffsets
    +            .get(source)
    +            .map(committed => committed < available)
    +            .getOrElse(true)
    +    }
    +  }
    +
    +  /**
    +   * Queries all of the sources to see if any new data is available. When there is new data the
    +   * batchId counter is incremented and a new log entry is written with the newest offsets.
    +   *
    +   * Note that committing the offsets for a new batch implicitly marks the previous batch as
    +   * finished and thus this method should only be called when all currently available data
    +   * has been written to the sink.
    +   */
    +  private def commitAndConstructNextBatch(): Boolean = {
    +    // Update committed offsets.
    +    committedOffsets ++= availableOffsets
    +
    +    // Check to see what new data is available.
    +    val newData = uniqueSources.flatMap(s => s.getOffset.map(o => s -> o))
    +    availableOffsets ++= newData
    +
    +    if (dataAvailable) {
    +      logInfo(s"Commiting offsets for batch $currentBatchId.")
    --- End diff --
    
    nit: Commiting --> Committing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11804#issuecomment-199552812
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/53721/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11804#discussion_r56737625
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -52,13 +55,28 @@ class StreamExecution(
       /** Minimum amount of time in between the start of each batch. */
       private val minBatchTime = 10
     
    -  /** Tracks how much data we have processed from each input source. */
    -  private[sql] val streamProgress = new StreamProgress
    +  /**
    +   * Tracks how much data we have processed and committed to the sink or state store from each
    +   * input source.
    +   */
    +  private[sql] val committedOffsets = new StreamProgress
    +
    +  /**
    +   * Tracks the offsets that are available to be processed, but have not yet be committed to the
    +   * sink.
    +   */
    +  private[sql] val availableOffsets = new StreamProgress
    --- End diff --
    
    `private[sql]` -> `private`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11804#discussion_r56733801
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala ---
    @@ -29,8 +30,13 @@ trait Source  {
       /** Returns the schema of the data from this source */
       def schema: StructType
     
    +  /** Returns the maximum available offset for this source. */
    +  def getOffset: Option[Offset]
    --- End diff --
    
    `getOffset` sounds like an immutable method. 
    
    But if I understand right, `Source` should move to the next offset in this method. (`FileStreamSource` changes its `maxBatchId`). Maybe call it `advanceToNextOffset`? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11804#issuecomment-199680433
  
    **[Test build #53748 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53748/consoleFull)** for PR 11804 at commit [`042d969`](https://github.com/apache/spark/commit/042d969eee0c5c3f6857cd0422053ba63ce8a39f).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11804#discussion_r56925940
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -142,48 +175,99 @@ class StreamExecution(
     
       /**
        * Populate the start offsets to start the execution at the current offsets stored in the sink
    -   * (i.e. avoid reprocessing data that we have already processed).
    +   * (i.e. avoid reprocessing data that we have already processed). This function must be called
    +   * before any processing occurs and will populate the following fields:
    +   *  - currentBatchId
    +   *  - committedOffsets
    +   *  - availableOffsets
        */
       private def populateStartOffsets(): Unit = {
    -    sink.currentOffset match {
    -      case Some(c: CompositeOffset) =>
    -        val storedProgress = c.offsets
    -        val sources = logicalPlan collect {
    -          case StreamingRelation(source, _) => source
    +    offsetLog.getLatest() match {
    +      case Some((batchId, nextOffsets)) =>
    +        logInfo(s"Resuming continuous query, starting with batch $batchId")
    +        currentBatchId = batchId + 1
    +        availableOffsets = nextOffsets.toStreamProgress(sources)
    +        logDebug(s"Found possibly uncommitted offsets $availableOffsets")
    +
    +        offsetLog.get(batchId - 1).foreach {
    +          case lastOffsets =>
    +            committedOffsets = lastOffsets.toStreamProgress(sources)
    +            logDebug(s"Resuming with committed offsets: $committedOffsets")
             }
     
    -        assert(sources.size == storedProgress.size)
    -        sources.zip(storedProgress).foreach { case (source, offset) =>
    -          offset.foreach(streamProgress.update(source, _))
    -        }
           case None => // We are starting this stream for the first time.
    -      case _ => throw new IllegalArgumentException("Expected composite offset from sink")
    +        logInfo(s"Starting new continuous query.")
    +        currentBatchId = 0
    +        commitAndConstructNextBatch()
         }
       }
     
       /**
    -   * Checks to see if any new data is present in any of the sources. When new data is available,
    -   * a batch is executed and passed to the sink, updating the currentOffsets.
    +   * Returns true if there is any new data available to be processed.
        */
    -  private def attemptBatch(): Unit = {
    +  private def dataAvailable: Boolean = {
    +    availableOffsets.exists {
    +      case (source, available) =>
    +        committedOffsets
    +            .get(source)
    +            .map(committed => committed < available)
    +            .getOrElse(true)
    +    }
    +  }
    +
    +  /**
    +   * Queries all of the sources to see if any new data is available. When there is new data the
    +   * batchId counter is incremented and a new log entry is written with the newest offsets.
    +   *
    +   * Note that committing the offsets for a new batch implicitly marks the previous batch as
    +   * finished and thus this method should only be called when all currently available data
    +   * has been written to the sink.
    +   */
    +  private def commitAndConstructNextBatch(): Boolean = {
    +    // Update committed offsets.
    +    committedOffsets ++= availableOffsets
    +
    +    // Check to see what new data is available.
    +    val newData = uniqueSources.flatMap(s => s.getOffset.map(o => s -> o))
    +    availableOffsets ++= newData
    +
    +    if (dataAvailable) {
    +      logInfo(s"Commiting offsets for batch $currentBatchId.")
    +      assert(
    +        offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)),
    +        "Concurrent update to the log.  Multiple streaming jobs detected.")
    +      currentBatchId += 1
    --- End diff --
    
    nit: isnt it more useful to do a `logInfo` after committing, to confirm that it has comitted


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11804#issuecomment-198484446
  
    **[Test build #53548 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53548/consoleFull)** for PR 11804 at commit [`97503f1`](https://github.com/apache/spark/commit/97503f1b45d45c4e4524cf43853e9faab43d0032).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11804#issuecomment-199677696
  
    **[Test build #2658 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2658/consoleFull)** for PR 11804 at commit [`3fc2fe2`](https://github.com/apache/spark/commit/3fc2fe21c5b61508a6038f60b1c5e6399a1585c7).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13985][SQL] Deterministic batches with ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11804#issuecomment-199680824
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/53748/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org