You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by joseph-torres <gi...@git.apache.org> on 2017/12/15 01:34:34 UTC

[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

GitHub user joseph-torres opened a pull request:

    https://github.com/apache/spark/pull/19984

    [SPARK-22789] Map-only continuous processing execution

    ## What changes were proposed in this pull request?
    
    Basic continuous execution, supporting map/flatMap/filter, with commits and advancement through RPC.
    
    ## How was this patch tested?
    
    new unit-ish tests (exercising execution end to end)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/joseph-torres/spark continuous-impl

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/19984.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #19984
    
----
commit d6bea84447d910e79d5926972d87a80bc5dc2e2e
Author: Jose Torres <jo...@databricks.com>
Date:   2017-12-07T22:08:28Z

    Refactor StreamExecution into a parent class so continuous processing can extend it

commit df6b8861173d1e7853952c8f3ffe504975efe204
Author: Jose Torres <jo...@databricks.com>
Date:   2017-12-12T19:31:28Z

    address fmt

commit 6f0ce6b1cf1abf602c2b02ce6d31f46f8fa71b7c
Author: Jose Torres <jo...@databricks.com>
Date:   2017-12-13T00:09:48Z

    slight changes

commit 2b360ab49bcab3c73ea85ce62202e40e950931ef
Author: Jose Torres <jo...@databricks.com>
Date:   2017-12-13T00:10:34Z

    rm spurious space

commit 1b19f1ce4444f17e7324997649ad8c5f97887912
Author: Jose Torres <jo...@databricks.com>
Date:   2017-12-13T00:35:30Z

    fix compile

commit 96eba13be9764e63f3d1375d7b51dbfd0675aa98
Author: Jose Torres <jo...@databricks.com>
Date:   2017-12-11T20:48:20Z

    harness

commit 2d5efadb9e7662363e3e4a3c66e0f5f73e4935ef
Author: Jose Torres <jo...@databricks.com>
Date:   2017-12-11T21:18:25Z

    awaitEpoch impl

commit 578bbb7eb0725b795ac65d1beda436515f4f4eba
Author: Jose Torres <jo...@databricks.com>
Date:   2017-12-11T21:46:09Z

    move local[10] to only continuous suite

commit 9051eff6c88838ac61ab45763ed84d593e2d4837
Author: Jose Torres <jo...@databricks.com>
Date:   2017-12-11T21:49:55Z

    repeatedly restart

commit 60fa4477591cc264b9ea253f64065d762ce3f96f
Author: Jose Torres <jo...@databricks.com>
Date:   2017-12-11T22:02:52Z

    fix some simple TODOs

commit ea8e76ec75752d134433730ee1a007cce1fdcfe8
Author: Jose Torres <jo...@databricks.com>
Date:   2017-12-11T22:11:18Z

    use runId instead of queryId for endpoint name

commit d0f3cc7701d9eb3e7df571561e751f03c0537f3a
Author: Jose Torres <jo...@databricks.com>
Date:   2017-12-11T22:19:03Z

    more simple todos

commit ba9dbaa1be2f54827a42f3177669082e7d1f99e2
Author: Jose Torres <jo...@databricks.com>
Date:   2017-12-11T22:27:12Z

    remove old state

commit 2cd005f4685e492ae78d6b9c579c80c2370d2f14
Author: Jose Torres <jo...@databricks.com>
Date:   2017-12-11T22:35:51Z

    remove clean shutdown workaround in StreamTest

commit a7fa31fb5375074d888bd0a94e317ad3f1692e5a
Author: Jose Torres <jo...@databricks.com>
Date:   2017-12-11T22:50:09Z

    update ContinuousExecution docs

commit f687432a58acf7337885edfc01adc94188d174d8
Author: Jose Torres <jo...@databricks.com>
Date:   2017-12-11T22:59:14Z

    add comments to EpochCoordinator

commit 987b011ee78292c3379559910ebe101daf4f9450
Author: Jose Torres <jo...@databricks.com>
Date:   2017-12-12T00:02:54Z

    change offset semantic to end of previous epoch

commit 5494fc50ef99b3e584c287b03eaa32b30657a5ce
Author: Jose Torres <jo...@databricks.com>
Date:   2017-12-12T00:18:40Z

    document EpochCoordinator

commit d6ef404b85fa6977b5f38a853dca11de5189b3f9
Author: Jose Torres <jo...@databricks.com>
Date:   2017-12-12T02:06:44Z

    simplify epoch handling

commit 647bd2745c1c0842002d4f71b61aa34beb0f8b29
Author: Jose Torres <jo...@databricks.com>
Date:   2017-12-12T19:17:58Z

    stress tests

commit 053a9f349a4829433a495aa5989f1ca1c8a3256e
Author: Jose Torres <jo...@databricks.com>
Date:   2017-12-12T20:17:22Z

    add minBatchesToRetain

commit 7072d21444388fe167fa7e3475b3e95ec9923d5e
Author: Jose Torres <jo...@databricks.com>
Date:   2017-12-12T20:43:33Z

    add confs

commit 4083a8f5c6b6ef298726234d54f23a90e971e77e
Author: Jose Torres <jo...@databricks.com>
Date:   2017-12-12T21:10:33Z

    latency suite not meaningful here

commit 41d391f2027a4e8b3730d15cea7b7fbcdcec27de
Author: Jose Torres <jo...@databricks.com>
Date:   2017-12-13T00:04:07Z

    more stress::q

commit 402cfa3b10dfb0f37ce8d94336be3b3c01fe9f90
Author: Jose Torres <jo...@databricks.com>
Date:   2017-12-13T18:55:23Z

    use temp dir

commit e4a1bc19db9ea0233879d270e725ed58d95a34ad
Author: Jose Torres <jo...@databricks.com>
Date:   2017-12-14T19:37:36Z

    fix against rebase

commit 8887b3c92afe8bb1659f600785af5d97f085f2bb
Author: Jose Torres <jo...@databricks.com>
Date:   2017-12-14T21:32:16Z

    fix ser/deser

commit 60bf0e33f20134af296d85b5c52729c4063ef2e1
Author: Jose Torres <jo...@databricks.com>
Date:   2017-12-14T22:41:21Z

    fix rebase compile

commit 749bddc6303321118407ff5c2664528f7160ff65
Author: Jose Torres <jo...@databricks.com>
Date:   2017-12-14T23:35:46Z

    stop using ProcessingTime in executor

commit 5a15ed5b30cc70e70d199d82e617c67426562ba2
Author: Jose Torres <jo...@databricks.com>
Date:   2017-12-14T23:54:52Z

    add tests for supported ops

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85092/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    **[Test build #85079 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85079/testReport)** for PR 19984 at commit [`2af9b40`](https://github.com/apache/spark/commit/2af9b40d60027397fd6e915413b88f12249245e5).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    **[Test build #85332 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85332/testReport)** for PR 19984 at commit [`b4f7976`](https://github.com/apache/spark/commit/b4f79762c083735011bf98250c39c263876c8cc8).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

Posted by joseph-torres <gi...@git.apache.org>.
Github user joseph-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19984#discussion_r158158855
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala ---
    @@ -0,0 +1,205 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit}
    +import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark._
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.sql.{Row, SQLContext}
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader}
    +import org.apache.spark.sql.execution.streaming._
    +import org.apache.spark.sql.execution.streaming.continuous._
    +import org.apache.spark.sql.sources.v2.reader._
    +import org.apache.spark.sql.streaming.ProcessingTime
    +import org.apache.spark.util.{SystemClock, ThreadUtils}
    +
    +class ContinuousDataSourceRDD(
    +    sc: SparkContext,
    +    sqlContext: SQLContext,
    +    @transient private val readTasks: java.util.List[ReadTask[UnsafeRow]])
    +  extends RDD[UnsafeRow](sc, Nil) {
    +
    +  private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize
    +  private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs
    +
    +  override protected def getPartitions: Array[Partition] = {
    +    readTasks.asScala.zipWithIndex.map {
    +      case (readTask, index) => new DataSourceRDDPartition(index, readTask)
    +    }.toArray
    +  }
    +
    +  override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = {
    +    val reader = split.asInstanceOf[DataSourceRDDPartition].readTask.createDataReader()
    +
    +    val runId = context.getLocalProperty(ContinuousExecution.RUN_ID_KEY)
    +
    +    // This queue contains two types of messages:
    +    // * (null, null) representing an epoch boundary.
    +    // * (row, off) containing a data row and its corresponding PartitionOffset.
    +    val queue = new ArrayBlockingQueue[(UnsafeRow, PartitionOffset)](dataQueueSize)
    +
    +    val epochPollFailed = new AtomicBoolean(false)
    +    val epochPollExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
    +      s"epoch-poll--${runId}--${context.partitionId()}")
    +    val epochPollRunnable = new EpochPollRunnable(queue, context, epochPollFailed)
    +    epochPollExecutor.scheduleWithFixedDelay(
    +      epochPollRunnable, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS)
    +
    +    // Important sequencing - we must get start offset before the data reader thread begins
    +    val startOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset
    +
    +    val dataReaderFailed = new AtomicBoolean(false)
    +    val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed)
    +    dataReaderThread.setDaemon(true)
    +    dataReaderThread.start()
    +
    +    context.addTaskCompletionListener(_ => {
    +      reader.close()
    +      dataReaderThread.interrupt()
    +      epochPollExecutor.shutdown()
    +    })
    +
    +    val epochEndpoint = EpochCoordinatorRef.get(runId, SparkEnv.get)
    +    new Iterator[UnsafeRow] {
    +      private var currentRow: UnsafeRow = _
    +      private var currentOffset: PartitionOffset = startOffset
    +      private var currentEpoch =
    +        context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
    +
    +      override def hasNext(): Boolean = {
    +        if (dataReaderFailed.get()) {
    +          throw new SparkException("data read failed", dataReaderThread.failureReason)
    +        }
    +        if (epochPollFailed.get()) {
    +          throw new SparkException("epoch poll failed", epochPollRunnable.failureReason)
    +        }
    +
    +        queue.take() match {
    +          // epoch boundary marker
    +          case (null, null) =>
    +            epochEndpoint.send(ReportPartitionOffset(
    +              context.partitionId(),
    +              currentEpoch,
    +              currentOffset))
    +            currentEpoch += 1
    +            false
    +          // real row
    +          case (row, offset) =>
    +            currentRow = row
    +            currentOffset = offset
    +            true
    +        }
    +      }
    +
    +      override def next(): UnsafeRow = {
    --- End diff --
    
    NextIterator won't quite work, because we need to be able to start going again after the iterator is "finished". I'll clean it up a bit to comply with the contract.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    **[Test build #85089 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85089/testReport)** for PR 19984 at commit [`359ebdd`](https://github.com/apache/spark/commit/359ebdd8bdac0b93aa6b88beab0212393f1e2577).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    **[Test build #85092 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85092/testReport)** for PR 19984 at commit [`19f08a9`](https://github.com/apache/spark/commit/19f08a9c875c6e52bb75c82d196fb3a310311ffe).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19984#discussion_r158135301
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala ---
    @@ -0,0 +1,205 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit}
    +import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark._
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.sql.{Row, SQLContext}
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader}
    +import org.apache.spark.sql.execution.streaming._
    +import org.apache.spark.sql.execution.streaming.continuous._
    +import org.apache.spark.sql.sources.v2.reader._
    +import org.apache.spark.sql.streaming.ProcessingTime
    +import org.apache.spark.util.{SystemClock, ThreadUtils}
    +
    +class ContinuousDataSourceRDD(
    +    sc: SparkContext,
    +    sqlContext: SQLContext,
    +    @transient private val readTasks: java.util.List[ReadTask[UnsafeRow]])
    +  extends RDD[UnsafeRow](sc, Nil) {
    +
    +  private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize
    +  private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs
    +
    +  override protected def getPartitions: Array[Partition] = {
    +    readTasks.asScala.zipWithIndex.map {
    +      case (readTask, index) => new DataSourceRDDPartition(index, readTask)
    +    }.toArray
    +  }
    +
    +  override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = {
    +    val reader = split.asInstanceOf[DataSourceRDDPartition].readTask.createDataReader()
    +
    +    val runId = context.getLocalProperty(ContinuousExecution.RUN_ID_KEY)
    +
    +    // This queue contains two types of messages:
    +    // * (null, null) representing an epoch boundary.
    +    // * (row, off) containing a data row and its corresponding PartitionOffset.
    +    val queue = new ArrayBlockingQueue[(UnsafeRow, PartitionOffset)](dataQueueSize)
    +
    +    val epochPollFailed = new AtomicBoolean(false)
    +    val epochPollExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
    +      s"epoch-poll--${runId}--${context.partitionId()}")
    +    val epochPollRunnable = new EpochPollRunnable(queue, context, epochPollFailed)
    +    epochPollExecutor.scheduleWithFixedDelay(
    +      epochPollRunnable, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS)
    +
    +    // Important sequencing - we must get start offset before the data reader thread begins
    +    val startOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset
    +
    +    val dataReaderFailed = new AtomicBoolean(false)
    +    val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed)
    +    dataReaderThread.setDaemon(true)
    +    dataReaderThread.start()
    +
    +    context.addTaskCompletionListener(_ => {
    +      reader.close()
    +      dataReaderThread.interrupt()
    +      epochPollExecutor.shutdown()
    +    })
    +
    +    val epochEndpoint = EpochCoordinatorRef.get(runId, SparkEnv.get)
    +    new Iterator[UnsafeRow] {
    +      private var currentRow: UnsafeRow = _
    +      private var currentOffset: PartitionOffset = startOffset
    +      private var currentEpoch =
    +        context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
    +
    +      override def hasNext(): Boolean = {
    +        if (dataReaderFailed.get()) {
    +          throw new SparkException("data read failed", dataReaderThread.failureReason)
    +        }
    +        if (epochPollFailed.get()) {
    +          throw new SparkException("epoch poll failed", epochPollRunnable.failureReason)
    +        }
    +
    +        queue.take() match {
    +          // epoch boundary marker
    +          case (null, null) =>
    +            epochEndpoint.send(ReportPartitionOffset(
    +              context.partitionId(),
    +              currentEpoch,
    +              currentOffset))
    +            currentEpoch += 1
    +            false
    +          // real row
    +          case (row, offset) =>
    +            currentRow = row
    +            currentOffset = offset
    +            true
    +        }
    +      }
    +
    +      override def next(): UnsafeRow = {
    +        val r = currentRow
    +        currentRow = null
    +        r
    +      }
    +    }
    +  }
    +
    +  override def getPreferredLocations(split: Partition): Seq[String] = {
    +    split.asInstanceOf[DataSourceRDDPartition].readTask.preferredLocations()
    +  }
    +}
    +
    +case class EpochPackedPartitionOffset(epoch: Long) extends PartitionOffset
    +
    +class EpochPollRunnable(
    +    queue: BlockingQueue[(UnsafeRow, PartitionOffset)],
    +    context: TaskContext,
    +    failedFlag: AtomicBoolean)
    +  extends Thread with Logging {
    +  private[continuous] var failureReason: Throwable = _
    +
    +  private val epochEndpoint = EpochCoordinatorRef.get(
    +    context.getLocalProperty(ContinuousExecution.RUN_ID_KEY), SparkEnv.get)
    +  private var currentEpoch = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
    +
    +  override def run(): Unit = {
    +    try {
    +      val newEpoch = epochEndpoint.askSync[Long](GetCurrentEpoch())
    +      for (i <- currentEpoch to newEpoch - 1) {
    +        queue.put((null, null))
    +        logDebug(s"Sent marker to start epoch ${i + 1}")
    +      }
    +      currentEpoch = newEpoch
    +    } catch {
    +      case t: Throwable =>
    +        failedFlag.set(true)
    +        failureReason = t
    +        throw t
    +    }
    +  }
    +}
    +
    +class DataReaderThread(
    +    reader: DataReader[UnsafeRow],
    +    queue: BlockingQueue[(UnsafeRow, PartitionOffset)],
    +    context: TaskContext,
    +    failedFlag: AtomicBoolean) extends Thread {
    +  private[continuous] var failureReason: Throwable = _
    +
    +  override def run(): Unit = {
    +    val baseReader = ContinuousDataSourceRDD.getBaseReader(reader)
    +    try {
    +      while (!context.isInterrupted && !context.isCompleted()) {
    +        if (!reader.next()) {
    +          // Check again, since reader.next() might have blocked through an incoming interrupt.
    +          if (!context.isInterrupted && !context.isCompleted()) {
    +            throw new IllegalStateException(
    +              "Continuous reader reported no elements! Reader should have blocked waiting.")
    +          } else {
    +            return
    +          }
    +        }
    +
    +        queue.put((reader.get().copy(), baseReader.getOffset))
    +      }
    +    } catch {
    +      case _: InterruptedException if context.isInterrupted() =>
    +        // Continuous shutdown always involves an interrupt; shut down quietly.
    +        return
    +
    +      case t: Throwable =>
    +        failedFlag.set(true)
    +        failureReason = t
    --- End diff --
    
    ditto


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19984#discussion_r157124394
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
    @@ -1035,6 +1035,22 @@ object SQLConf {
         .booleanConf
         .createWithDefault(true)
     
    +  val CONTINUOUS_STREAMING_EXECUTOR_QUEUE_SIZE =
    +    buildConf("spark.sql.streaming.continuous.executorQueueSize")
    +    .internal()
    +    .doc("The size (measured in number of rows) of the queue used in continuous execution to" +
    +      " buffer the results of a ContinuousDataReader.")
    +    .intConf
    --- End diff --
    
    `longConf`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    **[Test build #84981 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84981/testReport)** for PR 19984 at commit [`f50488c`](https://github.com/apache/spark/commit/f50488cf94ab015019e99d187b54ab922e4ca6c2).
     * This patch **fails SparkR unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    **[Test build #84975 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84975/testReport)** for PR 19984 at commit [`63f78d2`](https://github.com/apache/spark/commit/63f78d266f3fd4ac5fc2fec53c04c0029d1a5e68).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19984#discussion_r158399908
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala ---
    @@ -0,0 +1,343 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.util.concurrent.TimeUnit
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.sql.{AnalysisException, SparkSession}
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.execution.SQLExecution
    +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2}
    +import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
    +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, ContinuousWriteSupport, DataSourceV2Options}
    +import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, PartitionOffset}
    +import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
    +import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
    +import org.apache.spark.sql.types.StructType
    +import org.apache.spark.util.{Clock, Utils}
    +
    +class ContinuousExecution(
    +    sparkSession: SparkSession,
    +    name: String,
    +    checkpointRoot: String,
    +    analyzedPlan: LogicalPlan,
    +    sink: ContinuousWriteSupport,
    +    trigger: Trigger,
    +    triggerClock: Clock,
    +    outputMode: OutputMode,
    +    extraOptions: Map[String, String],
    +    deleteCheckpointOnStop: Boolean)
    +  extends StreamExecution(
    +    sparkSession, name, checkpointRoot, analyzedPlan, sink,
    +    trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
    +
    +  @volatile protected var continuousSources: Seq[ContinuousReader] = Seq.empty
    +  override protected def sources: Seq[BaseStreamingSource] = continuousSources
    +
    +  override lazy val logicalPlan: LogicalPlan = {
    +    assert(queryExecutionThread eq Thread.currentThread,
    +      "logicalPlan must be initialized in StreamExecutionThread " +
    +        s"but the current thread was ${Thread.currentThread}")
    +    var nextSourceId = 0L
    +    val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]()
    +    analyzedPlan.transform {
    +      case r @ StreamingRelationV2(
    +          source: ContinuousReadSupport, _, extraReaderOptions, output, _) =>
    +        toExecutionRelationMap.getOrElseUpdate(r, {
    +          ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession)
    +        })
    +      case StreamingRelationV2(_, sourceName, _, _, _) =>
    +        throw new AnalysisException(
    +          s"Data source $sourceName does not support continuous processing.")
    +    }
    +  }
    +
    +  private val triggerExecutor = trigger match {
    +    case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock)
    +    case _ => throw new IllegalStateException(s"Unsupported type of trigger: $trigger")
    +  }
    +
    +  override protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = {
    +    do {
    +      try {
    +        runContinuous(sparkSessionForStream)
    +      } catch {
    +        case _: InterruptedException if state.get().equals(RECONFIGURING) =>
    +          // swallow exception and run again
    +          state.set(ACTIVE)
    +      }
    +    } while (state.get() == ACTIVE)
    +  }
    +
    +  /**
    +   * Populate the start offsets to start the execution at the current offsets stored in the sink
    +   * (i.e. avoid reprocessing data that we have already processed). This function must be called
    +   * before any processing occurs and will populate the following fields:
    +   *  - currentBatchId
    +   *  - committedOffsets
    +   *  The basic structure of this method is as follows:
    +   *
    +   *  Identify (from the commit log) the latest epoch that has committed
    +   *  IF last epoch exists THEN
    +   *    Get end offsets for the epoch
    +   *    Set those offsets as the current commit progress
    +   *    Set the next epoch ID as the last + 1
    +   *    Return the end offsets of the last epoch as start for the next one
    +   *    DONE
    +   *  ELSE
    +   *    Start a new query log
    +   *  DONE
    +   */
    +  private def getStartOffsets(sparkSessionToRunBatches: SparkSession): OffsetSeq = {
    +    // Note that this will need a slight modification for exactly once. If ending offsets were
    +    // reported but not committed for any epochs, we must replay exactly to those offsets.
    +    // For at least once, we can just ignore those reports and risk duplicates.
    +    commitLog.getLatest() match {
    +      case Some((latestEpochId, _)) =>
    +        val nextOffsets = offsetLog.get(latestEpochId).getOrElse {
    +          throw new IllegalStateException(
    +            s"Batch $latestEpochId was committed without end epoch offsets!")
    +        }
    +        committedOffsets = nextOffsets.toStreamProgress(sources)
    +
    +        // Forcibly align commit and offset logs by slicing off any spurious offset logs from
    +        // a previous run. We can't allow commits to an epoch that a previous run reached but
    +        // this run has not.
    +        offsetLog.purgeAfter(latestEpochId)
    +
    +        currentBatchId = latestEpochId + 1
    +        logDebug(s"Resuming at epoch $currentBatchId with committed offsets $committedOffsets")
    +        nextOffsets
    +      case None =>
    +        // We are starting this stream for the first time. Offsets are all None.
    +        logInfo(s"Starting new streaming query.")
    +        currentBatchId = 0
    +        OffsetSeq.fill(continuousSources.map(_ => null): _*)
    +    }
    +  }
    +
    +  /**
    +   * Do a continuous run.
    +   * @param sparkSessionForQuery Isolated [[SparkSession]] to run the continuous query with.
    +   */
    +  private def runContinuous(sparkSessionForQuery: SparkSession): Unit = {
    +    // A list of attributes that will need to be updated.
    +    val replacements = new ArrayBuffer[(Attribute, Attribute)]
    +    // Translate from continuous relation to the underlying data source.
    +    var nextSourceId = 0
    +    continuousSources = logicalPlan.collect {
    +      case ContinuousExecutionRelation(dataSource, extraReaderOptions, output) =>
    +        val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
    +        nextSourceId += 1
    +
    +        dataSource.createContinuousReader(
    +          java.util.Optional.empty[StructType](),
    +          metadataPath,
    +          new DataSourceV2Options(extraReaderOptions.asJava))
    +    }
    +    uniqueSources = continuousSources.distinct
    +
    +    val offsets = getStartOffsets(sparkSessionForQuery)
    +
    +    var insertedSourceId = 0
    +    val withNewSources = logicalPlan transform {
    +      case ContinuousExecutionRelation(_, _, output) =>
    +        val reader = continuousSources(insertedSourceId)
    +        insertedSourceId += 1
    +        val newOutput = reader.readSchema().toAttributes
    +
    +        assert(output.size == newOutput.size,
    +          s"Invalid reader: ${Utils.truncatedString(output, ",")} != " +
    +            s"${Utils.truncatedString(newOutput, ",")}")
    +        replacements ++= output.zip(newOutput)
    +
    +        val loggedOffset = offsets.offsets(0)
    +        val realOffset = loggedOffset.map(off => reader.deserializeOffset(off.json))
    +        reader.setOffset(java.util.Optional.ofNullable(realOffset.orNull))
    +        DataSourceV2Relation(newOutput, reader)
    +    }
    +
    +    // Rewire the plan to use the new attributes that were returned by the source.
    +    val replacementMap = AttributeMap(replacements)
    +    val triggerLogicalPlan = withNewSources transformAllExpressions {
    +      case a: Attribute if replacementMap.contains(a) =>
    +        replacementMap(a).withMetadata(a.metadata)
    +      case (_: CurrentTimestamp | _: CurrentDate) =>
    +        throw new IllegalStateException(
    +          "CurrentTimestamp and CurrentDate not yet supported for continuous processing")
    +    }
    +
    +    val writer = sink.createContinuousWriter(
    +      s"$runId",
    +      triggerLogicalPlan.schema,
    +      outputMode,
    +      new DataSourceV2Options(extraOptions.asJava))
    +    val withSink = WriteToDataSourceV2(writer.get(), triggerLogicalPlan)
    +
    +    val reader = withSink.collect {
    +      case DataSourceV2Relation(_, r: ContinuousReader) => r
    +    }.head
    +
    +    reportTimeTaken("queryPlanning") {
    +      lastExecution = new IncrementalExecution(
    +        sparkSessionForQuery,
    +        withSink,
    +        outputMode,
    +        checkpointFile("state"),
    +        runId,
    +        currentBatchId,
    +        offsetSeqMetadata)
    +      lastExecution.executedPlan // Force the lazy generation of execution plan
    +    }
    +
    +    sparkSession.sparkContext.setLocalProperty(
    +      ContinuousExecution.START_EPOCH_KEY, currentBatchId.toString)
    +    sparkSession.sparkContext.setLocalProperty(
    +      ContinuousExecution.RUN_ID_KEY, runId.toString)
    +
    +    // Use the parent Spark session for the endpoint since it's where this query ID is registered.
    +    val epochEndpoint =
    +      EpochCoordinatorRef.create(
    +        writer.get(), reader, currentBatchId,
    +        id.toString, runId.toString, sparkSession, SparkEnv.get)
    +    val epochUpdateThread = new Thread(new Runnable {
    +      override def run: Unit = {
    +        try {
    +          triggerExecutor.execute(() => {
    +            startTrigger()
    +
    +            if (reader.needsReconfiguration()) {
    +              stopSources()
    +              state.set(RECONFIGURING)
    --- End diff --
    
    Stopping a source may cause an exception, e.g., it closes a socket while queryExecutionThread is reading from it. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19984#discussion_r158397203
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala ---
    @@ -0,0 +1,336 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.streaming.continuous
    +
    +import java.io.{File, InterruptedIOException, IOException, UncheckedIOException}
    +import java.nio.channels.ClosedByInterruptException
    +import java.util.concurrent.{CountDownLatch, ExecutionException, TimeoutException, TimeUnit}
    +
    +import scala.reflect.ClassTag
    +import scala.util.control.ControlThrowable
    +
    +import com.google.common.util.concurrent.UncheckedExecutionException
    +import org.apache.commons.io.FileUtils
    +import org.apache.hadoop.conf.Configuration
    +
    +import org.apache.spark.{SparkContext, SparkEnv}
    +import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.catalyst.plans.logical.Range
    +import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
    +import org.apache.spark.sql.execution.command.ExplainCommand
    +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanExec, WriteToDataSourceV2Exec}
    +import org.apache.spark.sql.execution.streaming._
    +import org.apache.spark.sql.execution.streaming.continuous._
    +import org.apache.spark.sql.execution.streaming.sources.MemorySinkV2
    +import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreConf, StateStoreId, StateStoreProvider}
    +import org.apache.spark.sql.functions._
    +import org.apache.spark.sql.internal.SQLConf
    +import org.apache.spark.sql.sources.StreamSourceProvider
    +import org.apache.spark.sql.streaming.{StreamTest, Trigger}
    +import org.apache.spark.sql.streaming.util.StreamManualClock
    +import org.apache.spark.sql.test.TestSparkSession
    +import org.apache.spark.sql.types._
    +import org.apache.spark.util.Utils
    +
    +class ContinuousSuiteBase extends StreamTest {
    +  // We need more than the default local[2] to be able to schedule all partitions simultaneously.
    +  override protected def createSparkSession = new TestSparkSession(
    +    new SparkContext(
    +      "local[10]",
    +      "continuous-stream-test-sql-context",
    +      sparkConf.set("spark.sql.testkey", "true")))
    +
    +  protected def waitForRateSourceTriggers(query: StreamExecution, numTriggers: Int): Unit = {
    +    query match {
    +      case s: ContinuousExecution =>
    +        assert(numTriggers >= 2, "must wait for at least 2 triggers to ensure query is initialized")
    +        val reader = s.lastExecution.executedPlan.collectFirst {
    +          case DataSourceV2ScanExec(_, r: ContinuousRateStreamReader) => r
    +        }.get
    +
    +        val deltaMs = numTriggers * 1000 + 300
    +        while (System.currentTimeMillis < reader.creationTime + deltaMs) {
    +          Thread.sleep(reader.creationTime + deltaMs - System.currentTimeMillis)
    +        }
    +    }
    +  }
    +
    +  // A continuous trigger that will only fire the initial time for the duration of a test.
    +  // This allows clean testing with manual epoch advancement.
    +  protected val longContinuousTrigger = Trigger.Continuous("1 hour")
    +}
    +
    +class ContinuousSuite extends ContinuousSuiteBase {
    +  import testImplicits._
    +
    +  test("basic rate source") {
    +    val df = spark.readStream
    +      .format("rate")
    +      .option("numPartitions", "5")
    +      .option("rowsPerSecond", "5")
    +      .load()
    +      .select('value)
    +
    +    testStream(df, useV2Sink = true)(
    +      StartStream(longContinuousTrigger),
    +      AwaitEpoch(0),
    +      Execute(waitForRateSourceTriggers(_, 2)),
    +      IncrementEpoch(),
    +      CheckAnswer(scala.Range(0, 10): _*),
    --- End diff --
    
    let's not do an exact match check here.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84975/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19984#discussion_r158122336
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala ---
    @@ -0,0 +1,343 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.util.concurrent.TimeUnit
    +
    +import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.sql.{AnalysisException, SparkSession}
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.execution.SQLExecution
    +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2}
    +import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
    +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, ContinuousWriteSupport, DataSourceV2Options}
    +import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, PartitionOffset}
    +import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
    +import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
    +import org.apache.spark.sql.types.StructType
    +import org.apache.spark.util.{Clock, Utils}
    +
    +class ContinuousExecution(
    +    sparkSession: SparkSession,
    +    name: String,
    +    checkpointRoot: String,
    +    analyzedPlan: LogicalPlan,
    +    sink: ContinuousWriteSupport,
    +    trigger: Trigger,
    +    triggerClock: Clock,
    +    outputMode: OutputMode,
    +    extraOptions: Map[String, String],
    +    deleteCheckpointOnStop: Boolean)
    +  extends StreamExecution(
    +    sparkSession, name, checkpointRoot, analyzedPlan, sink,
    +    trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
    +
    +  @volatile protected var continuousSources: Seq[ContinuousReader] = Seq.empty
    +  override protected def sources: Seq[BaseStreamingSource] = continuousSources
    +
    +  override lazy val logicalPlan: LogicalPlan = {
    +    assert(queryExecutionThread eq Thread.currentThread,
    +      "logicalPlan must be initialized in StreamExecutionThread " +
    +        s"but the current thread was ${Thread.currentThread}")
    +    var nextSourceId = 0L
    +    val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]()
    +    analyzedPlan.transform {
    +      case r @ StreamingRelationV2(
    +          source: ContinuousReadSupport, _, extraReaderOptions, output, _) =>
    +        toExecutionRelationMap.getOrElseUpdate(r, {
    +          ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession)
    +        })
    +      case StreamingRelationV2(_, sourceName, _, _, _) =>
    +        throw new AnalysisException(
    +          s"Data source $sourceName does not support continuous processing.")
    +    }
    +  }
    +
    +  private val triggerExecutor = trigger match {
    +    case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock)
    --- End diff --
    
    I would use `ThreadUtils.newDaemonSingleThreadScheduledExecutor` rather than `ProcessingTimeExecutor`. `ProcessingTimeExecutor` is designed for ProcessingTimeTrigger. It's weird to use it here, in addition, we may make some changes into ProcessingTimeExecutor in future and they may break Continuous execution.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19984#discussion_r158116796
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala ---
    @@ -0,0 +1,195 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.util.concurrent.atomic.AtomicLong
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
    +import org.apache.spark.sql.SparkSession
    +import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
    +import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, PartitionOffset}
    +import org.apache.spark.sql.sources.v2.writer.{ContinuousWriter, WriterCommitMessage}
    +import org.apache.spark.util.RpcUtils
    +
    +private[continuous] sealed trait EpochCoordinatorMessage extends Serializable
    +
    +// Driver epoch trigger message
    +/**
    + * Atomically increment the current epoch and get the new value.
    + */
    +private[sql] case class IncrementAndGetEpoch() extends EpochCoordinatorMessage
    +
    +// Init messages
    +/**
    + * Set the reader and writer partition counts. Tasks may not be started until the coordinator
    + * has acknowledged these messages.
    + */
    +private[sql] case class SetReaderPartitions(numPartitions: Int) extends EpochCoordinatorMessage
    +case class SetWriterPartitions(numPartitions: Int) extends EpochCoordinatorMessage
    +
    +// Partition task messages
    +/**
    + * Get the current epoch.
    + */
    +private[sql] case class GetCurrentEpoch() extends EpochCoordinatorMessage
    --- End diff --
    
    nit: `case class` -> `case object`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19984#discussion_r158135254
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala ---
    @@ -0,0 +1,205 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit}
    +import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark._
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.sql.{Row, SQLContext}
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader}
    +import org.apache.spark.sql.execution.streaming._
    +import org.apache.spark.sql.execution.streaming.continuous._
    +import org.apache.spark.sql.sources.v2.reader._
    +import org.apache.spark.sql.streaming.ProcessingTime
    +import org.apache.spark.util.{SystemClock, ThreadUtils}
    +
    +class ContinuousDataSourceRDD(
    +    sc: SparkContext,
    +    sqlContext: SQLContext,
    +    @transient private val readTasks: java.util.List[ReadTask[UnsafeRow]])
    +  extends RDD[UnsafeRow](sc, Nil) {
    +
    +  private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize
    +  private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs
    +
    +  override protected def getPartitions: Array[Partition] = {
    +    readTasks.asScala.zipWithIndex.map {
    +      case (readTask, index) => new DataSourceRDDPartition(index, readTask)
    +    }.toArray
    +  }
    +
    +  override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = {
    +    val reader = split.asInstanceOf[DataSourceRDDPartition].readTask.createDataReader()
    +
    +    val runId = context.getLocalProperty(ContinuousExecution.RUN_ID_KEY)
    +
    +    // This queue contains two types of messages:
    +    // * (null, null) representing an epoch boundary.
    +    // * (row, off) containing a data row and its corresponding PartitionOffset.
    +    val queue = new ArrayBlockingQueue[(UnsafeRow, PartitionOffset)](dataQueueSize)
    +
    +    val epochPollFailed = new AtomicBoolean(false)
    +    val epochPollExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
    +      s"epoch-poll--${runId}--${context.partitionId()}")
    +    val epochPollRunnable = new EpochPollRunnable(queue, context, epochPollFailed)
    +    epochPollExecutor.scheduleWithFixedDelay(
    +      epochPollRunnable, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS)
    +
    +    // Important sequencing - we must get start offset before the data reader thread begins
    +    val startOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset
    +
    +    val dataReaderFailed = new AtomicBoolean(false)
    +    val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed)
    +    dataReaderThread.setDaemon(true)
    +    dataReaderThread.start()
    +
    +    context.addTaskCompletionListener(_ => {
    +      reader.close()
    +      dataReaderThread.interrupt()
    +      epochPollExecutor.shutdown()
    +    })
    +
    +    val epochEndpoint = EpochCoordinatorRef.get(runId, SparkEnv.get)
    +    new Iterator[UnsafeRow] {
    +      private var currentRow: UnsafeRow = _
    +      private var currentOffset: PartitionOffset = startOffset
    +      private var currentEpoch =
    +        context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
    +
    +      override def hasNext(): Boolean = {
    +        if (dataReaderFailed.get()) {
    +          throw new SparkException("data read failed", dataReaderThread.failureReason)
    +        }
    +        if (epochPollFailed.get()) {
    +          throw new SparkException("epoch poll failed", epochPollRunnable.failureReason)
    +        }
    +
    +        queue.take() match {
    +          // epoch boundary marker
    +          case (null, null) =>
    +            epochEndpoint.send(ReportPartitionOffset(
    +              context.partitionId(),
    +              currentEpoch,
    +              currentOffset))
    +            currentEpoch += 1
    +            false
    +          // real row
    +          case (row, offset) =>
    +            currentRow = row
    +            currentOffset = offset
    +            true
    +        }
    +      }
    +
    +      override def next(): UnsafeRow = {
    +        val r = currentRow
    +        currentRow = null
    +        r
    +      }
    +    }
    +  }
    +
    +  override def getPreferredLocations(split: Partition): Seq[String] = {
    +    split.asInstanceOf[DataSourceRDDPartition].readTask.preferredLocations()
    +  }
    +}
    +
    +case class EpochPackedPartitionOffset(epoch: Long) extends PartitionOffset
    +
    +class EpochPollRunnable(
    +    queue: BlockingQueue[(UnsafeRow, PartitionOffset)],
    +    context: TaskContext,
    +    failedFlag: AtomicBoolean)
    +  extends Thread with Logging {
    +  private[continuous] var failureReason: Throwable = _
    +
    +  private val epochEndpoint = EpochCoordinatorRef.get(
    +    context.getLocalProperty(ContinuousExecution.RUN_ID_KEY), SparkEnv.get)
    +  private var currentEpoch = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
    +
    +  override def run(): Unit = {
    +    try {
    +      val newEpoch = epochEndpoint.askSync[Long](GetCurrentEpoch())
    +      for (i <- currentEpoch to newEpoch - 1) {
    +        queue.put((null, null))
    +        logDebug(s"Sent marker to start epoch ${i + 1}")
    +      }
    +      currentEpoch = newEpoch
    +    } catch {
    +      case t: Throwable =>
    +        failedFlag.set(true)
    +        failureReason = t
    --- End diff --
    
    switch the order so that the other thread can definitely see `failureReason` if `failedFlag` is `true`. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19984#discussion_r158137370
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala ---
    @@ -0,0 +1,205 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit}
    +import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark._
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.sql.{Row, SQLContext}
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader}
    +import org.apache.spark.sql.execution.streaming._
    +import org.apache.spark.sql.execution.streaming.continuous._
    +import org.apache.spark.sql.sources.v2.reader._
    +import org.apache.spark.sql.streaming.ProcessingTime
    +import org.apache.spark.util.{SystemClock, ThreadUtils}
    +
    +class ContinuousDataSourceRDD(
    +    sc: SparkContext,
    +    sqlContext: SQLContext,
    +    @transient private val readTasks: java.util.List[ReadTask[UnsafeRow]])
    +  extends RDD[UnsafeRow](sc, Nil) {
    +
    +  private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize
    +  private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs
    +
    +  override protected def getPartitions: Array[Partition] = {
    +    readTasks.asScala.zipWithIndex.map {
    +      case (readTask, index) => new DataSourceRDDPartition(index, readTask)
    +    }.toArray
    +  }
    +
    +  override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = {
    +    val reader = split.asInstanceOf[DataSourceRDDPartition].readTask.createDataReader()
    +
    +    val runId = context.getLocalProperty(ContinuousExecution.RUN_ID_KEY)
    +
    +    // This queue contains two types of messages:
    +    // * (null, null) representing an epoch boundary.
    +    // * (row, off) containing a data row and its corresponding PartitionOffset.
    +    val queue = new ArrayBlockingQueue[(UnsafeRow, PartitionOffset)](dataQueueSize)
    +
    +    val epochPollFailed = new AtomicBoolean(false)
    +    val epochPollExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
    +      s"epoch-poll--${runId}--${context.partitionId()}")
    +    val epochPollRunnable = new EpochPollRunnable(queue, context, epochPollFailed)
    +    epochPollExecutor.scheduleWithFixedDelay(
    +      epochPollRunnable, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS)
    +
    +    // Important sequencing - we must get start offset before the data reader thread begins
    +    val startOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset
    +
    +    val dataReaderFailed = new AtomicBoolean(false)
    +    val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed)
    +    dataReaderThread.setDaemon(true)
    +    dataReaderThread.start()
    +
    +    context.addTaskCompletionListener(_ => {
    +      reader.close()
    +      dataReaderThread.interrupt()
    +      epochPollExecutor.shutdown()
    +    })
    +
    +    val epochEndpoint = EpochCoordinatorRef.get(runId, SparkEnv.get)
    +    new Iterator[UnsafeRow] {
    +      private var currentRow: UnsafeRow = _
    +      private var currentOffset: PartitionOffset = startOffset
    +      private var currentEpoch =
    +        context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
    +
    +      override def hasNext(): Boolean = {
    +        if (dataReaderFailed.get()) {
    +          throw new SparkException("data read failed", dataReaderThread.failureReason)
    +        }
    +        if (epochPollFailed.get()) {
    +          throw new SparkException("epoch poll failed", epochPollRunnable.failureReason)
    +        }
    +
    +        queue.take() match {
    +          // epoch boundary marker
    +          case (null, null) =>
    +            epochEndpoint.send(ReportPartitionOffset(
    +              context.partitionId(),
    +              currentEpoch,
    +              currentOffset))
    +            currentEpoch += 1
    +            false
    +          // real row
    +          case (row, offset) =>
    +            currentRow = row
    +            currentOffset = offset
    +            true
    +        }
    +      }
    +
    +      override def next(): UnsafeRow = {
    +        val r = currentRow
    +        currentRow = null
    +        r
    +      }
    +    }
    +  }
    +
    +  override def getPreferredLocations(split: Partition): Seq[String] = {
    +    split.asInstanceOf[DataSourceRDDPartition].readTask.preferredLocations()
    +  }
    +}
    +
    +case class EpochPackedPartitionOffset(epoch: Long) extends PartitionOffset
    +
    +class EpochPollRunnable(
    +    queue: BlockingQueue[(UnsafeRow, PartitionOffset)],
    +    context: TaskContext,
    +    failedFlag: AtomicBoolean)
    +  extends Thread with Logging {
    +  private[continuous] var failureReason: Throwable = _
    +
    +  private val epochEndpoint = EpochCoordinatorRef.get(
    +    context.getLocalProperty(ContinuousExecution.RUN_ID_KEY), SparkEnv.get)
    +  private var currentEpoch = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
    +
    +  override def run(): Unit = {
    +    try {
    +      val newEpoch = epochEndpoint.askSync[Long](GetCurrentEpoch())
    +      for (i <- currentEpoch to newEpoch - 1) {
    +        queue.put((null, null))
    +        logDebug(s"Sent marker to start epoch ${i + 1}")
    +      }
    +      currentEpoch = newEpoch
    +    } catch {
    +      case t: Throwable =>
    +        failedFlag.set(true)
    +        failureReason = t
    +        throw t
    +    }
    +  }
    +}
    +
    +class DataReaderThread(
    +    reader: DataReader[UnsafeRow],
    +    queue: BlockingQueue[(UnsafeRow, PartitionOffset)],
    +    context: TaskContext,
    +    failedFlag: AtomicBoolean) extends Thread {
    --- End diff --
    
    set a proper thread name for this thread


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    **[Test build #85287 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85287/testReport)** for PR 19984 at commit [`07a9e06`](https://github.com/apache/spark/commit/07a9e0654df61ad52f7db28ee663a380dee3c2a8).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85332/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    **[Test build #85218 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85218/testReport)** for PR 19984 at commit [`825d437`](https://github.com/apache/spark/commit/825d437fe1e897c2047171ce78c6bb92805dc5be).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84981/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19984#discussion_r157124515
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java ---
    @@ -95,4 +96,57 @@ public static Trigger ProcessingTime(String interval) {
       public static Trigger Once() {
         return OneTimeTrigger$.MODULE$;
       }
    +
    +  /**
    +   * A trigger that continuously processes streaming data, asynchronously checkpointing at
    +   * the specified interval.
    +   *
    +   * @since 2.3.0
    +   */
    +  public static Trigger Continuous(long intervalMs) {
    +    return ContinuousTrigger.apply(intervalMs);
    +  }
    +
    +  /**
    +   * A trigger that continuously processes streaming data, asynchronously checkpointing at
    +   * the specified interval.
    +   *
    +   * {{{
    +   *    import java.util.concurrent.TimeUnit
    +   *    df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
    +   * }}}
    +   *
    +   * @since 2.3.0
    +   */
    +  public static Trigger Continuous(long interval, TimeUnit timeUnit) {
    +    return ContinuousTrigger.create(interval, timeUnit);
    +  }
    +
    +  /**
    +   * (Scala-friendly)
    +   * A trigger that continuously processes streaming data, asynchronously checkpointing at
    +   * the specified interval.
    +   *
    +   * {{{
    +   *    import scala.concurrent.duration._
    +   *    df.writeStream.trigger(Trigger.Continuous(10.seconds))
    +   * }}}
    +   * @since 2.2.0
    +   */
    +  public static Trigger Continuous(Duration interval) {
    +    return ContinuousTrigger.apply(interval);
    +  }
    +
    +  /**
    +   * A trigger that continuously processes streaming data, asynchronously checkpointing at
    +   * the specified interval.
    +   *
    +   * {{{
    +   *    df.writeStream.trigger(Trigger.Continuous("10 seconds"))
    +   * }}}
    +   * @since 2.2.0
    --- End diff --
    
    2.3.0?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19984#discussion_r165055237
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala ---
    @@ -75,6 +76,52 @@ case class StreamingExecutionRelation(
       )
     }
     
    +// We have to pack in the V1 data source as a shim, for the case when a source implements
    +// continuous processing (which is always V2) but only has V1 microbatch support. We don't
    +// know at read time whether the query is conntinuous or not, so we need to be able to
    --- End diff --
    
    can't we know it from the specified trigger?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/19984


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19984#discussion_r158138974
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala ---
    @@ -0,0 +1,205 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit}
    +import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark._
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.sql.{Row, SQLContext}
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader}
    +import org.apache.spark.sql.execution.streaming._
    +import org.apache.spark.sql.execution.streaming.continuous._
    +import org.apache.spark.sql.sources.v2.reader._
    +import org.apache.spark.sql.streaming.ProcessingTime
    +import org.apache.spark.util.{SystemClock, ThreadUtils}
    +
    +class ContinuousDataSourceRDD(
    +    sc: SparkContext,
    +    sqlContext: SQLContext,
    +    @transient private val readTasks: java.util.List[ReadTask[UnsafeRow]])
    +  extends RDD[UnsafeRow](sc, Nil) {
    +
    +  private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize
    +  private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs
    +
    +  override protected def getPartitions: Array[Partition] = {
    +    readTasks.asScala.zipWithIndex.map {
    +      case (readTask, index) => new DataSourceRDDPartition(index, readTask)
    +    }.toArray
    +  }
    +
    +  override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = {
    +    val reader = split.asInstanceOf[DataSourceRDDPartition].readTask.createDataReader()
    +
    +    val runId = context.getLocalProperty(ContinuousExecution.RUN_ID_KEY)
    +
    +    // This queue contains two types of messages:
    +    // * (null, null) representing an epoch boundary.
    +    // * (row, off) containing a data row and its corresponding PartitionOffset.
    +    val queue = new ArrayBlockingQueue[(UnsafeRow, PartitionOffset)](dataQueueSize)
    +
    +    val epochPollFailed = new AtomicBoolean(false)
    +    val epochPollExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
    +      s"epoch-poll--${runId}--${context.partitionId()}")
    +    val epochPollRunnable = new EpochPollRunnable(queue, context, epochPollFailed)
    +    epochPollExecutor.scheduleWithFixedDelay(
    +      epochPollRunnable, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS)
    +
    +    // Important sequencing - we must get start offset before the data reader thread begins
    +    val startOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset
    +
    +    val dataReaderFailed = new AtomicBoolean(false)
    +    val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed)
    +    dataReaderThread.setDaemon(true)
    +    dataReaderThread.start()
    +
    +    context.addTaskCompletionListener(_ => {
    +      reader.close()
    +      dataReaderThread.interrupt()
    +      epochPollExecutor.shutdown()
    +    })
    +
    +    val epochEndpoint = EpochCoordinatorRef.get(runId, SparkEnv.get)
    +    new Iterator[UnsafeRow] {
    +      private var currentRow: UnsafeRow = _
    +      private var currentOffset: PartitionOffset = startOffset
    +      private var currentEpoch =
    +        context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
    +
    +      override def hasNext(): Boolean = {
    +        if (dataReaderFailed.get()) {
    +          throw new SparkException("data read failed", dataReaderThread.failureReason)
    +        }
    +        if (epochPollFailed.get()) {
    +          throw new SparkException("epoch poll failed", epochPollRunnable.failureReason)
    +        }
    +
    +        queue.take() match {
    +          // epoch boundary marker
    +          case (null, null) =>
    +            epochEndpoint.send(ReportPartitionOffset(
    +              context.partitionId(),
    +              currentEpoch,
    +              currentOffset))
    +            currentEpoch += 1
    +            false
    +          // real row
    +          case (row, offset) =>
    +            currentRow = row
    +            currentOffset = offset
    +            true
    +        }
    +      }
    +
    +      override def next(): UnsafeRow = {
    +        val r = currentRow
    +        currentRow = null
    +        r
    +      }
    +    }
    +  }
    +
    +  override def getPreferredLocations(split: Partition): Seq[String] = {
    +    split.asInstanceOf[DataSourceRDDPartition].readTask.preferredLocations()
    +  }
    +}
    +
    +case class EpochPackedPartitionOffset(epoch: Long) extends PartitionOffset
    +
    +class EpochPollRunnable(
    +    queue: BlockingQueue[(UnsafeRow, PartitionOffset)],
    +    context: TaskContext,
    +    failedFlag: AtomicBoolean)
    +  extends Thread with Logging {
    +  private[continuous] var failureReason: Throwable = _
    +
    +  private val epochEndpoint = EpochCoordinatorRef.get(
    +    context.getLocalProperty(ContinuousExecution.RUN_ID_KEY), SparkEnv.get)
    +  private var currentEpoch = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
    +
    +  override def run(): Unit = {
    +    try {
    +      val newEpoch = epochEndpoint.askSync[Long](GetCurrentEpoch())
    +      for (i <- currentEpoch to newEpoch - 1) {
    +        queue.put((null, null))
    +        logDebug(s"Sent marker to start epoch ${i + 1}")
    +      }
    +      currentEpoch = newEpoch
    +    } catch {
    +      case t: Throwable =>
    +        failedFlag.set(true)
    +        failureReason = t
    +        throw t
    +    }
    +  }
    +}
    +
    +class DataReaderThread(
    +    reader: DataReader[UnsafeRow],
    +    queue: BlockingQueue[(UnsafeRow, PartitionOffset)],
    +    context: TaskContext,
    +    failedFlag: AtomicBoolean) extends Thread {
    +  private[continuous] var failureReason: Throwable = _
    +
    +  override def run(): Unit = {
    +    val baseReader = ContinuousDataSourceRDD.getBaseReader(reader)
    +    try {
    +      while (!context.isInterrupted && !context.isCompleted()) {
    +        if (!reader.next()) {
    +          // Check again, since reader.next() might have blocked through an incoming interrupt.
    +          if (!context.isInterrupted && !context.isCompleted()) {
    +            throw new IllegalStateException(
    +              "Continuous reader reported no elements! Reader should have blocked waiting.")
    +          } else {
    +            return
    +          }
    +        }
    +
    +        queue.put((reader.get().copy(), baseReader.getOffset))
    +      }
    +    } catch {
    +      case _: InterruptedException if context.isInterrupted() =>
    +        // Continuous shutdown always involves an interrupt; shut down quietly.
    +        return
    +
    +      case t: Throwable =>
    +        failedFlag.set(true)
    +        failureReason = t
    +        throw t
    --- End diff --
    
    We cannot throw `t` here as it will kill the executor. See org.apache.spark.util.SparkUncaughtExceptionHandler


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19984#discussion_r158121081
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala ---
    @@ -0,0 +1,343 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.util.concurrent.TimeUnit
    +
    +import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.sql.{AnalysisException, SparkSession}
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.execution.SQLExecution
    +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2}
    +import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
    +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, ContinuousWriteSupport, DataSourceV2Options}
    +import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, PartitionOffset}
    +import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
    +import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
    +import org.apache.spark.sql.types.StructType
    +import org.apache.spark.util.{Clock, Utils}
    +
    +class ContinuousExecution(
    +    sparkSession: SparkSession,
    +    name: String,
    +    checkpointRoot: String,
    +    analyzedPlan: LogicalPlan,
    +    sink: ContinuousWriteSupport,
    +    trigger: Trigger,
    +    triggerClock: Clock,
    +    outputMode: OutputMode,
    +    extraOptions: Map[String, String],
    +    deleteCheckpointOnStop: Boolean)
    +  extends StreamExecution(
    +    sparkSession, name, checkpointRoot, analyzedPlan, sink,
    +    trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
    +
    +  @volatile protected var continuousSources: Seq[ContinuousReader] = Seq.empty
    +  override protected def sources: Seq[BaseStreamingSource] = continuousSources
    +
    +  override lazy val logicalPlan: LogicalPlan = {
    +    assert(queryExecutionThread eq Thread.currentThread,
    +      "logicalPlan must be initialized in StreamExecutionThread " +
    +        s"but the current thread was ${Thread.currentThread}")
    +    var nextSourceId = 0L
    +    val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]()
    +    analyzedPlan.transform {
    +      case r @ StreamingRelationV2(
    +          source: ContinuousReadSupport, _, extraReaderOptions, output, _) =>
    +        toExecutionRelationMap.getOrElseUpdate(r, {
    +          ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession)
    +        })
    +      case StreamingRelationV2(_, sourceName, _, _, _) =>
    +        throw new AnalysisException(
    +          s"Data source $sourceName does not support continuous processing.")
    +    }
    +  }
    +
    +  private val triggerExecutor = trigger match {
    +    case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock)
    +    case _ => throw new IllegalStateException(s"Unsupported type of trigger: $trigger")
    +  }
    +
    +  override protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = {
    +    do {
    +      try {
    +        runContinuous(sparkSessionForStream)
    +      } catch {
    +        case _: Throwable if state.get().equals(RECONFIGURING) =>
    --- End diff --
    
    `_: Throwable` => `NonFatal(e)` 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19984#discussion_r158119082
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala ---
    @@ -266,6 +266,21 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
         }
       }
     
    +  /**
    +   * Removes all log entries later than thresholdBatchId (exclusive).
    +   */
    +  def purgeAfter(thresholdBatchId: Long): Unit = {
    +    val batchIds = fileManager.list(metadataPath, batchFilesFilter)
    +      .map(f => pathToBatchId(f.getPath))
    +
    +    for (batchId <- batchIds if batchId > thresholdBatchId) {
    +      print(s"AAAAA purging\n")
    --- End diff --
    
    nit: remove this


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19984#discussion_r158117033
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -303,7 +299,7 @@ abstract class StreamExecution(
               e,
               committedOffsets.toOffsetSeq(sources, offsetSeqMetadata).toString,
               availableOffsets.toOffsetSeq(sources, offsetSeqMetadata).toString)
    -        logError(s"Query $prettyIdString terminated with error", e)
    +        // logError(s"Query $prettyIdString terminated with error", e)
    --- End diff --
    
    nit: restore `logError`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85287/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    Build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by joseph-torres <gi...@git.apache.org>.
Github user joseph-torres commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84976/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85218/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19984#discussion_r158391581
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala ---
    @@ -0,0 +1,343 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.util.concurrent.TimeUnit
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.sql.{AnalysisException, SparkSession}
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.execution.SQLExecution
    +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2}
    +import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
    +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, ContinuousWriteSupport, DataSourceV2Options}
    +import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, PartitionOffset}
    +import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
    +import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
    +import org.apache.spark.sql.types.StructType
    +import org.apache.spark.util.{Clock, Utils}
    +
    +class ContinuousExecution(
    +    sparkSession: SparkSession,
    +    name: String,
    +    checkpointRoot: String,
    +    analyzedPlan: LogicalPlan,
    +    sink: ContinuousWriteSupport,
    +    trigger: Trigger,
    +    triggerClock: Clock,
    +    outputMode: OutputMode,
    +    extraOptions: Map[String, String],
    +    deleteCheckpointOnStop: Boolean)
    +  extends StreamExecution(
    +    sparkSession, name, checkpointRoot, analyzedPlan, sink,
    +    trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
    +
    +  @volatile protected var continuousSources: Seq[ContinuousReader] = Seq.empty
    +  override protected def sources: Seq[BaseStreamingSource] = continuousSources
    +
    +  override lazy val logicalPlan: LogicalPlan = {
    +    assert(queryExecutionThread eq Thread.currentThread,
    +      "logicalPlan must be initialized in StreamExecutionThread " +
    +        s"but the current thread was ${Thread.currentThread}")
    +    var nextSourceId = 0L
    +    val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]()
    +    analyzedPlan.transform {
    +      case r @ StreamingRelationV2(
    +          source: ContinuousReadSupport, _, extraReaderOptions, output, _) =>
    +        toExecutionRelationMap.getOrElseUpdate(r, {
    +          ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession)
    +        })
    +      case StreamingRelationV2(_, sourceName, _, _, _) =>
    +        throw new AnalysisException(
    +          s"Data source $sourceName does not support continuous processing.")
    +    }
    +  }
    +
    +  private val triggerExecutor = trigger match {
    +    case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock)
    +    case _ => throw new IllegalStateException(s"Unsupported type of trigger: $trigger")
    +  }
    +
    +  override protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = {
    +    do {
    +      try {
    +        runContinuous(sparkSessionForStream)
    +      } catch {
    +        case _: InterruptedException if state.get().equals(RECONFIGURING) =>
    +          // swallow exception and run again
    +          state.set(ACTIVE)
    +      }
    +    } while (state.get() == ACTIVE)
    +  }
    +
    +  /**
    +   * Populate the start offsets to start the execution at the current offsets stored in the sink
    +   * (i.e. avoid reprocessing data that we have already processed). This function must be called
    +   * before any processing occurs and will populate the following fields:
    +   *  - currentBatchId
    +   *  - committedOffsets
    +   *  The basic structure of this method is as follows:
    +   *
    +   *  Identify (from the commit log) the latest epoch that has committed
    +   *  IF last epoch exists THEN
    +   *    Get end offsets for the epoch
    +   *    Set those offsets as the current commit progress
    +   *    Set the next epoch ID as the last + 1
    +   *    Return the end offsets of the last epoch as start for the next one
    +   *    DONE
    +   *  ELSE
    +   *    Start a new query log
    +   *  DONE
    +   */
    +  private def getStartOffsets(sparkSessionToRunBatches: SparkSession): OffsetSeq = {
    +    // Note that this will need a slight modification for exactly once. If ending offsets were
    +    // reported but not committed for any epochs, we must replay exactly to those offsets.
    +    // For at least once, we can just ignore those reports and risk duplicates.
    +    commitLog.getLatest() match {
    +      case Some((latestEpochId, _)) =>
    +        val nextOffsets = offsetLog.get(latestEpochId).getOrElse {
    +          throw new IllegalStateException(
    +            s"Batch $latestEpochId was committed without end epoch offsets!")
    +        }
    +        committedOffsets = nextOffsets.toStreamProgress(sources)
    +
    +        // Forcibly align commit and offset logs by slicing off any spurious offset logs from
    +        // a previous run. We can't allow commits to an epoch that a previous run reached but
    +        // this run has not.
    +        offsetLog.purgeAfter(latestEpochId)
    +
    +        currentBatchId = latestEpochId + 1
    +        logDebug(s"Resuming at epoch $currentBatchId with committed offsets $committedOffsets")
    +        nextOffsets
    +      case None =>
    +        // We are starting this stream for the first time. Offsets are all None.
    +        logInfo(s"Starting new streaming query.")
    +        currentBatchId = 0
    +        OffsetSeq.fill(continuousSources.map(_ => null): _*)
    +    }
    +  }
    +
    +  /**
    +   * Do a continuous run.
    +   * @param sparkSessionForQuery Isolated [[SparkSession]] to run the continuous query with.
    +   */
    +  private def runContinuous(sparkSessionForQuery: SparkSession): Unit = {
    +    // A list of attributes that will need to be updated.
    +    val replacements = new ArrayBuffer[(Attribute, Attribute)]
    +    // Translate from continuous relation to the underlying data source.
    +    var nextSourceId = 0
    +    continuousSources = logicalPlan.collect {
    +      case ContinuousExecutionRelation(dataSource, extraReaderOptions, output) =>
    +        val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
    +        nextSourceId += 1
    +
    +        dataSource.createContinuousReader(
    +          java.util.Optional.empty[StructType](),
    +          metadataPath,
    +          new DataSourceV2Options(extraReaderOptions.asJava))
    +    }
    +    uniqueSources = continuousSources.distinct
    +
    +    val offsets = getStartOffsets(sparkSessionForQuery)
    +
    +    var insertedSourceId = 0
    +    val withNewSources = logicalPlan transform {
    +      case ContinuousExecutionRelation(_, _, output) =>
    +        val reader = continuousSources(insertedSourceId)
    +        insertedSourceId += 1
    +        val newOutput = reader.readSchema().toAttributes
    +
    +        assert(output.size == newOutput.size,
    +          s"Invalid reader: ${Utils.truncatedString(output, ",")} != " +
    +            s"${Utils.truncatedString(newOutput, ",")}")
    +        replacements ++= output.zip(newOutput)
    +
    +        val loggedOffset = offsets.offsets(0)
    +        val realOffset = loggedOffset.map(off => reader.deserializeOffset(off.json))
    +        reader.setOffset(java.util.Optional.ofNullable(realOffset.orNull))
    +        DataSourceV2Relation(newOutput, reader)
    +    }
    +
    +    // Rewire the plan to use the new attributes that were returned by the source.
    +    val replacementMap = AttributeMap(replacements)
    +    val triggerLogicalPlan = withNewSources transformAllExpressions {
    +      case a: Attribute if replacementMap.contains(a) =>
    +        replacementMap(a).withMetadata(a.metadata)
    +      case (_: CurrentTimestamp | _: CurrentDate) =>
    +        throw new IllegalStateException(
    +          "CurrentTimestamp and CurrentDate not yet supported for continuous processing")
    +    }
    +
    +    val writer = sink.createContinuousWriter(
    +      s"$runId",
    +      triggerLogicalPlan.schema,
    +      outputMode,
    +      new DataSourceV2Options(extraOptions.asJava))
    +    val withSink = WriteToDataSourceV2(writer.get(), triggerLogicalPlan)
    +
    +    val reader = withSink.collect {
    +      case DataSourceV2Relation(_, r: ContinuousReader) => r
    +    }.head
    +
    +    reportTimeTaken("queryPlanning") {
    +      lastExecution = new IncrementalExecution(
    +        sparkSessionForQuery,
    +        withSink,
    +        outputMode,
    +        checkpointFile("state"),
    +        runId,
    +        currentBatchId,
    +        offsetSeqMetadata)
    +      lastExecution.executedPlan // Force the lazy generation of execution plan
    +    }
    +
    +    sparkSession.sparkContext.setLocalProperty(
    +      ContinuousExecution.START_EPOCH_KEY, currentBatchId.toString)
    +    sparkSession.sparkContext.setLocalProperty(
    +      ContinuousExecution.RUN_ID_KEY, runId.toString)
    +
    +    // Use the parent Spark session for the endpoint since it's where this query ID is registered.
    +    val epochEndpoint =
    +      EpochCoordinatorRef.create(
    +        writer.get(), reader, currentBatchId,
    +        id.toString, runId.toString, sparkSession, SparkEnv.get)
    +    val epochUpdateThread = new Thread(new Runnable {
    +      override def run: Unit = {
    +        try {
    +          triggerExecutor.execute(() => {
    +            startTrigger()
    +
    +            if (reader.needsReconfiguration()) {
    +              stopSources()
    +              state.set(RECONFIGURING)
    +              if (queryExecutionThread.isAlive) {
    +                sparkSession.sparkContext.cancelJobGroup(runId.toString)
    +                queryExecutionThread.interrupt()
    +                // No need to join - this thread is about to end anyway.
    +              }
    +              false
    +            } else if (isActive) {
    +              currentBatchId = epochEndpoint.askSync[Long](IncrementAndGetEpoch)
    +              logInfo(s"New epoch $currentBatchId is starting.")
    +              true
    +            } else {
    +              false
    +            }
    +          })
    +        } catch {
    +          case _: InterruptedException =>
    +            // Cleanly stop the query.
    +            return
    +        }
    +      }
    +    })
    +
    +    try {
    +      epochUpdateThread.setDaemon(true)
    +      epochUpdateThread.start()
    +
    +      reportTimeTaken("runContinuous") {
    +        SQLExecution.withNewExecutionId(
    +          sparkSessionForQuery, lastExecution)(lastExecution.toRdd)
    +      }
    +    } finally {
    +      SparkEnv.get.rpcEnv.stop(epochEndpoint)
    +
    +      epochUpdateThread.interrupt()
    +      epochUpdateThread.join()
    +    }
    +  }
    +
    +  /**
    +   * Report ending partition offsets for the given reader at the given epoch.
    +   */
    +  def addOffset(
    +      epoch: Long, reader: ContinuousReader, partitionOffsets: Seq[PartitionOffset]): Unit = {
    +    assert(continuousSources.length == 1, "only one continuous source supported currently")
    +
    +    if (partitionOffsets.contains(null)) {
    +      // If any offset is null, that means the corresponding partition hasn't seen any data yet, so
    +      // there's nothing meaningful to add to the offset log.
    +    }
    +    val globalOffset = reader.mergeOffsets(partitionOffsets.toArray)
    +    synchronized {
    +      offsetLog.add(epoch, OffsetSeq.fill(globalOffset))
    +    }
    +  }
    +
    +  /**
    +   * Mark the specified epoch as committed. All readers must have reported end offsets for the epoch
    +   * before this is called.
    +   */
    +  def commit(epoch: Long): Unit = {
    +    assert(continuousSources.length == 1, "only one continuous source supported currently")
    +    assert(offsetLog.get(epoch).isDefined, s"offset for epoch $epoch not reported before commit")
    +    synchronized {
    +      commitLog.add(epoch)
    --- End diff --
    
    since this is called in the RPC thread, we should also check if this query is still alive here.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19984#discussion_r158547367
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala ---
    @@ -0,0 +1,195 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.util.concurrent.atomic.AtomicLong
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
    +import org.apache.spark.sql.SparkSession
    +import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
    +import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, PartitionOffset}
    +import org.apache.spark.sql.sources.v2.writer.{ContinuousWriter, WriterCommitMessage}
    +import org.apache.spark.util.RpcUtils
    +
    +private[continuous] sealed trait EpochCoordinatorMessage extends Serializable
    +
    +// Driver epoch trigger message
    +/**
    + * Atomically increment the current epoch and get the new value.
    + */
    +private[sql] case object IncrementAndGetEpoch extends EpochCoordinatorMessage
    +
    +// Init messages
    +/**
    + * Set the reader and writer partition counts. Tasks may not be started until the coordinator
    + * has acknowledged these messages.
    + */
    +private[sql] case class SetReaderPartitions(numPartitions: Int) extends EpochCoordinatorMessage
    +case class SetWriterPartitions(numPartitions: Int) extends EpochCoordinatorMessage
    +
    +// Partition task messages
    +/**
    + * Get the current epoch.
    + */
    +private[sql] case object GetCurrentEpoch extends EpochCoordinatorMessage
    +/**
    + * Commit a partition at the specified epoch with the given message.
    + */
    +private[sql] case class CommitPartitionEpoch(
    +    partitionId: Int,
    +    epoch: Long,
    +    message: WriterCommitMessage) extends EpochCoordinatorMessage
    +/**
    + * Report that a partition is ending the specified epoch at the specified offset.
    + */
    +private[sql] case class ReportPartitionOffset(
    +    partitionId: Int,
    +    epoch: Long,
    +    offset: PartitionOffset) extends EpochCoordinatorMessage
    +
    +
    +/** Helper object used to create reference to [[EpochCoordinator]]. */
    +private[sql] object EpochCoordinatorRef extends Logging {
    +  private def endpointName(runId: String) = s"EpochCoordinator-$runId"
    +
    +  /**
    +   * Create a reference to a new [[EpochCoordinator]].
    +   */
    +  def create(
    +      writer: ContinuousWriter,
    +      reader: ContinuousReader,
    +      startEpoch: Long,
    +      queryId: String,
    +      runId: String,
    +      session: SparkSession,
    +      env: SparkEnv): RpcEndpointRef = synchronized {
    +    val coordinator = new EpochCoordinator(writer, reader, startEpoch, queryId, session, env.rpcEnv)
    +    val ref = env.rpcEnv.setupEndpoint(endpointName(runId), coordinator)
    +    logInfo("Registered EpochCoordinator endpoint")
    +    ref
    +  }
    +
    +  def get(runId: String, env: SparkEnv): RpcEndpointRef = synchronized {
    +    val rpcEndpointRef = RpcUtils.makeDriverRef(endpointName(runId), env.conf, env.rpcEnv)
    +    logDebug("Retrieved existing EpochCoordinator endpoint")
    +    rpcEndpointRef
    +  }
    +}
    +
    +/**
    + * Handles three major epoch coordination tasks for continuous processing:
    + *
    + * * Maintains a local epoch counter (the "driver epoch"), incremented by IncrementAndGetEpoch
    + *   and pollable from executors by GetCurrentEpoch. Note that this epoch is *not* immediately
    + *   reflected anywhere in ContinuousExecution.
    + * * Collates ReportPartitionOffset messages, and forwards to ContinuousExecution when all
    + *   readers have ended a given epoch.
    + * * Collates CommitPartitionEpoch messages, and forwards to ContinuousExecution when all readers
    + *   have both committed and reported an end offset for a given epoch.
    + */
    +private[continuous] class EpochCoordinator(
    +    writer: ContinuousWriter,
    +    reader: ContinuousReader,
    +    startEpoch: Long,
    +    queryId: String,
    +    session: SparkSession,
    +    override val rpcEnv: RpcEnv)
    +  extends ThreadSafeRpcEndpoint with Logging {
    +
    +  private var numReaderPartitions: Int = _
    +  private var numWriterPartitions: Int = _
    +
    +  private var currentDriverEpoch = startEpoch
    +
    +  // (epoch, partition) -> message
    +  private val partitionCommits =
    +    mutable.Map[(Long, Int), WriterCommitMessage]()
    +  // (epoch, partition) -> offset
    +  private val partitionOffsets =
    +    mutable.Map[(Long, Int), PartitionOffset]()
    +
    +  private def resolveCommitsAtEpoch(epoch: Long) = {
    +    val thisEpochCommits =
    +      partitionCommits.collect { case ((e, _), msg) if e == epoch => msg }
    +    val nextEpochOffsets =
    +      partitionOffsets.collect { case ((e, _), o) if e == epoch => o }
    +
    +    if (thisEpochCommits.size == numWriterPartitions &&
    +      nextEpochOffsets.size == numReaderPartitions) {
    +      logDebug(s"Epoch $epoch has received commits from all partitions. Committing globally.")
    +      val query = session.streams.get(queryId).asInstanceOf[StreamingQueryWrapper]
    --- End diff --
    
    @jose-torres  forgot to update this? Also you don't need `queryId` any more.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    **[Test build #84981 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84981/testReport)** for PR 19984 at commit [`f50488c`](https://github.com/apache/spark/commit/f50488cf94ab015019e99d187b54ab922e4ca6c2).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19984#discussion_r158391247
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala ---
    @@ -0,0 +1,195 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.util.concurrent.atomic.AtomicLong
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
    +import org.apache.spark.sql.SparkSession
    +import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
    +import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, PartitionOffset}
    +import org.apache.spark.sql.sources.v2.writer.{ContinuousWriter, WriterCommitMessage}
    +import org.apache.spark.util.RpcUtils
    +
    +private[continuous] sealed trait EpochCoordinatorMessage extends Serializable
    +
    +// Driver epoch trigger message
    +/**
    + * Atomically increment the current epoch and get the new value.
    + */
    +private[sql] case object IncrementAndGetEpoch extends EpochCoordinatorMessage
    +
    +// Init messages
    +/**
    + * Set the reader and writer partition counts. Tasks may not be started until the coordinator
    + * has acknowledged these messages.
    + */
    +private[sql] case class SetReaderPartitions(numPartitions: Int) extends EpochCoordinatorMessage
    +case class SetWriterPartitions(numPartitions: Int) extends EpochCoordinatorMessage
    +
    +// Partition task messages
    +/**
    + * Get the current epoch.
    + */
    +private[sql] case object GetCurrentEpoch extends EpochCoordinatorMessage
    +/**
    + * Commit a partition at the specified epoch with the given message.
    + */
    +private[sql] case class CommitPartitionEpoch(
    +    partitionId: Int,
    +    epoch: Long,
    +    message: WriterCommitMessage) extends EpochCoordinatorMessage
    +/**
    + * Report that a partition is ending the specified epoch at the specified offset.
    + */
    +private[sql] case class ReportPartitionOffset(
    +    partitionId: Int,
    +    epoch: Long,
    +    offset: PartitionOffset) extends EpochCoordinatorMessage
    +
    +
    +/** Helper object used to create reference to [[EpochCoordinator]]. */
    +private[sql] object EpochCoordinatorRef extends Logging {
    +  private def endpointName(runId: String) = s"EpochCoordinator-$runId"
    +
    +  /**
    +   * Create a reference to a new [[EpochCoordinator]].
    +   */
    +  def create(
    +      writer: ContinuousWriter,
    +      reader: ContinuousReader,
    +      startEpoch: Long,
    +      queryId: String,
    +      runId: String,
    +      session: SparkSession,
    +      env: SparkEnv): RpcEndpointRef = synchronized {
    +    val coordinator = new EpochCoordinator(writer, reader, startEpoch, queryId, session, env.rpcEnv)
    +    val ref = env.rpcEnv.setupEndpoint(endpointName(runId), coordinator)
    +    logInfo("Registered EpochCoordinator endpoint")
    +    ref
    +  }
    +
    +  def get(runId: String, env: SparkEnv): RpcEndpointRef = synchronized {
    +    val rpcEndpointRef = RpcUtils.makeDriverRef(endpointName(runId), env.conf, env.rpcEnv)
    +    logDebug("Retrieved existing EpochCoordinator endpoint")
    +    rpcEndpointRef
    +  }
    +}
    +
    +/**
    + * Handles three major epoch coordination tasks for continuous processing:
    + *
    + * * Maintains a local epoch counter (the "driver epoch"), incremented by IncrementAndGetEpoch
    + *   and pollable from executors by GetCurrentEpoch. Note that this epoch is *not* immediately
    + *   reflected anywhere in ContinuousExecution.
    + * * Collates ReportPartitionOffset messages, and forwards to ContinuousExecution when all
    + *   readers have ended a given epoch.
    + * * Collates CommitPartitionEpoch messages, and forwards to ContinuousExecution when all readers
    + *   have both committed and reported an end offset for a given epoch.
    + */
    +private[continuous] class EpochCoordinator(
    +    writer: ContinuousWriter,
    +    reader: ContinuousReader,
    +    startEpoch: Long,
    +    queryId: String,
    +    session: SparkSession,
    +    override val rpcEnv: RpcEnv)
    +  extends ThreadSafeRpcEndpoint with Logging {
    +
    +  private var numReaderPartitions: Int = _
    +  private var numWriterPartitions: Int = _
    +
    +  private var currentDriverEpoch = startEpoch
    +
    +  // (epoch, partition) -> message
    +  private val partitionCommits =
    +    mutable.Map[(Long, Int), WriterCommitMessage]()
    +  // (epoch, partition) -> offset
    +  private val partitionOffsets =
    +    mutable.Map[(Long, Int), PartitionOffset]()
    +
    +  private def resolveCommitsAtEpoch(epoch: Long) = {
    +    val thisEpochCommits =
    +      partitionCommits.collect { case ((e, _), msg) if e == epoch => msg }
    +    val nextEpochOffsets =
    +      partitionOffsets.collect { case ((e, _), o) if e == epoch => o }
    +
    +    if (thisEpochCommits.size == numWriterPartitions &&
    +      nextEpochOffsets.size == numReaderPartitions) {
    +      logDebug(s"Epoch $epoch has received commits from all partitions. Committing globally.")
    +      val query = session.streams.get(queryId).asInstanceOf[StreamingQueryWrapper]
    --- End diff --
    
    why not pass the query into `EpochCoordinator`'s constructor? Getting a query from StreamingQueryManager may have a race condition because the query can fail before we process `CommitPartitionEpoch` messages. If so, `session.streams.get(queryId)` will return null.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19984#discussion_r158120930
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala ---
    @@ -0,0 +1,343 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.util.concurrent.TimeUnit
    +
    +import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.sql.{AnalysisException, SparkSession}
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.execution.SQLExecution
    +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2}
    +import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
    +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, ContinuousWriteSupport, DataSourceV2Options}
    +import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, PartitionOffset}
    +import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
    +import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
    +import org.apache.spark.sql.types.StructType
    +import org.apache.spark.util.{Clock, Utils}
    +
    +class ContinuousExecution(
    +    sparkSession: SparkSession,
    +    name: String,
    +    checkpointRoot: String,
    +    analyzedPlan: LogicalPlan,
    +    sink: ContinuousWriteSupport,
    +    trigger: Trigger,
    +    triggerClock: Clock,
    +    outputMode: OutputMode,
    +    extraOptions: Map[String, String],
    +    deleteCheckpointOnStop: Boolean)
    +  extends StreamExecution(
    +    sparkSession, name, checkpointRoot, analyzedPlan, sink,
    +    trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
    +
    +  @volatile protected var continuousSources: Seq[ContinuousReader] = Seq.empty
    +  override protected def sources: Seq[BaseStreamingSource] = continuousSources
    +
    +  override lazy val logicalPlan: LogicalPlan = {
    +    assert(queryExecutionThread eq Thread.currentThread,
    +      "logicalPlan must be initialized in StreamExecutionThread " +
    +        s"but the current thread was ${Thread.currentThread}")
    +    var nextSourceId = 0L
    +    val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]()
    +    analyzedPlan.transform {
    +      case r @ StreamingRelationV2(
    +          source: ContinuousReadSupport, _, extraReaderOptions, output, _) =>
    +        toExecutionRelationMap.getOrElseUpdate(r, {
    +          ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession)
    +        })
    +      case StreamingRelationV2(_, sourceName, _, _, _) =>
    +        throw new AnalysisException(
    +          s"Data source $sourceName does not support continuous processing.")
    +    }
    +  }
    +
    +  private val triggerExecutor = trigger match {
    +    case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock)
    +    case _ => throw new IllegalStateException(s"Unsupported type of trigger: $trigger")
    +  }
    +
    +  override protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = {
    +    do {
    +      try {
    +        runContinuous(sparkSessionForStream)
    +      } catch {
    +        case _: Throwable if state.get().equals(RECONFIGURING) =>
    --- End diff --
    
    there is a race condition here. Since you stop the source first, this thread may throw the exception before `state` is set to `RECONFIGURING`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19984#discussion_r158398822
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala ---
    @@ -0,0 +1,343 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.util.concurrent.TimeUnit
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.sql.{AnalysisException, SparkSession}
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.execution.SQLExecution
    +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2}
    +import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
    +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, ContinuousWriteSupport, DataSourceV2Options}
    +import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, PartitionOffset}
    +import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
    +import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
    +import org.apache.spark.sql.types.StructType
    +import org.apache.spark.util.{Clock, Utils}
    +
    +class ContinuousExecution(
    +    sparkSession: SparkSession,
    +    name: String,
    +    checkpointRoot: String,
    +    analyzedPlan: LogicalPlan,
    +    sink: ContinuousWriteSupport,
    +    trigger: Trigger,
    +    triggerClock: Clock,
    +    outputMode: OutputMode,
    +    extraOptions: Map[String, String],
    +    deleteCheckpointOnStop: Boolean)
    +  extends StreamExecution(
    +    sparkSession, name, checkpointRoot, analyzedPlan, sink,
    +    trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
    +
    +  @volatile protected var continuousSources: Seq[ContinuousReader] = Seq.empty
    +  override protected def sources: Seq[BaseStreamingSource] = continuousSources
    +
    +  override lazy val logicalPlan: LogicalPlan = {
    +    assert(queryExecutionThread eq Thread.currentThread,
    +      "logicalPlan must be initialized in StreamExecutionThread " +
    +        s"but the current thread was ${Thread.currentThread}")
    +    var nextSourceId = 0L
    +    val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]()
    +    analyzedPlan.transform {
    +      case r @ StreamingRelationV2(
    +          source: ContinuousReadSupport, _, extraReaderOptions, output, _) =>
    +        toExecutionRelationMap.getOrElseUpdate(r, {
    +          ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession)
    +        })
    +      case StreamingRelationV2(_, sourceName, _, _, _) =>
    +        throw new AnalysisException(
    +          s"Data source $sourceName does not support continuous processing.")
    +    }
    +  }
    +
    +  private val triggerExecutor = trigger match {
    +    case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock)
    +    case _ => throw new IllegalStateException(s"Unsupported type of trigger: $trigger")
    +  }
    +
    +  override protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = {
    +    do {
    +      try {
    +        runContinuous(sparkSessionForStream)
    +      } catch {
    +        case _: InterruptedException if state.get().equals(RECONFIGURING) =>
    +          // swallow exception and run again
    +          state.set(ACTIVE)
    +      }
    +    } while (state.get() == ACTIVE)
    +  }
    +
    +  /**
    +   * Populate the start offsets to start the execution at the current offsets stored in the sink
    +   * (i.e. avoid reprocessing data that we have already processed). This function must be called
    +   * before any processing occurs and will populate the following fields:
    +   *  - currentBatchId
    +   *  - committedOffsets
    +   *  The basic structure of this method is as follows:
    +   *
    +   *  Identify (from the commit log) the latest epoch that has committed
    +   *  IF last epoch exists THEN
    +   *    Get end offsets for the epoch
    +   *    Set those offsets as the current commit progress
    +   *    Set the next epoch ID as the last + 1
    +   *    Return the end offsets of the last epoch as start for the next one
    +   *    DONE
    +   *  ELSE
    +   *    Start a new query log
    +   *  DONE
    +   */
    +  private def getStartOffsets(sparkSessionToRunBatches: SparkSession): OffsetSeq = {
    +    // Note that this will need a slight modification for exactly once. If ending offsets were
    +    // reported but not committed for any epochs, we must replay exactly to those offsets.
    +    // For at least once, we can just ignore those reports and risk duplicates.
    +    commitLog.getLatest() match {
    +      case Some((latestEpochId, _)) =>
    +        val nextOffsets = offsetLog.get(latestEpochId).getOrElse {
    +          throw new IllegalStateException(
    +            s"Batch $latestEpochId was committed without end epoch offsets!")
    +        }
    +        committedOffsets = nextOffsets.toStreamProgress(sources)
    +
    +        // Forcibly align commit and offset logs by slicing off any spurious offset logs from
    +        // a previous run. We can't allow commits to an epoch that a previous run reached but
    +        // this run has not.
    +        offsetLog.purgeAfter(latestEpochId)
    +
    +        currentBatchId = latestEpochId + 1
    +        logDebug(s"Resuming at epoch $currentBatchId with committed offsets $committedOffsets")
    +        nextOffsets
    +      case None =>
    +        // We are starting this stream for the first time. Offsets are all None.
    +        logInfo(s"Starting new streaming query.")
    +        currentBatchId = 0
    +        OffsetSeq.fill(continuousSources.map(_ => null): _*)
    +    }
    +  }
    +
    +  /**
    +   * Do a continuous run.
    +   * @param sparkSessionForQuery Isolated [[SparkSession]] to run the continuous query with.
    +   */
    +  private def runContinuous(sparkSessionForQuery: SparkSession): Unit = {
    +    // A list of attributes that will need to be updated.
    +    val replacements = new ArrayBuffer[(Attribute, Attribute)]
    +    // Translate from continuous relation to the underlying data source.
    +    var nextSourceId = 0
    +    continuousSources = logicalPlan.collect {
    +      case ContinuousExecutionRelation(dataSource, extraReaderOptions, output) =>
    +        val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
    +        nextSourceId += 1
    +
    +        dataSource.createContinuousReader(
    +          java.util.Optional.empty[StructType](),
    +          metadataPath,
    +          new DataSourceV2Options(extraReaderOptions.asJava))
    +    }
    +    uniqueSources = continuousSources.distinct
    +
    +    val offsets = getStartOffsets(sparkSessionForQuery)
    +
    +    var insertedSourceId = 0
    +    val withNewSources = logicalPlan transform {
    +      case ContinuousExecutionRelation(_, _, output) =>
    +        val reader = continuousSources(insertedSourceId)
    +        insertedSourceId += 1
    +        val newOutput = reader.readSchema().toAttributes
    +
    +        assert(output.size == newOutput.size,
    +          s"Invalid reader: ${Utils.truncatedString(output, ",")} != " +
    +            s"${Utils.truncatedString(newOutput, ",")}")
    +        replacements ++= output.zip(newOutput)
    +
    +        val loggedOffset = offsets.offsets(0)
    +        val realOffset = loggedOffset.map(off => reader.deserializeOffset(off.json))
    +        reader.setOffset(java.util.Optional.ofNullable(realOffset.orNull))
    +        DataSourceV2Relation(newOutput, reader)
    +    }
    +
    +    // Rewire the plan to use the new attributes that were returned by the source.
    +    val replacementMap = AttributeMap(replacements)
    +    val triggerLogicalPlan = withNewSources transformAllExpressions {
    +      case a: Attribute if replacementMap.contains(a) =>
    +        replacementMap(a).withMetadata(a.metadata)
    +      case (_: CurrentTimestamp | _: CurrentDate) =>
    +        throw new IllegalStateException(
    +          "CurrentTimestamp and CurrentDate not yet supported for continuous processing")
    +    }
    +
    +    val writer = sink.createContinuousWriter(
    +      s"$runId",
    +      triggerLogicalPlan.schema,
    +      outputMode,
    +      new DataSourceV2Options(extraOptions.asJava))
    +    val withSink = WriteToDataSourceV2(writer.get(), triggerLogicalPlan)
    +
    +    val reader = withSink.collect {
    +      case DataSourceV2Relation(_, r: ContinuousReader) => r
    +    }.head
    +
    +    reportTimeTaken("queryPlanning") {
    +      lastExecution = new IncrementalExecution(
    +        sparkSessionForQuery,
    +        withSink,
    +        outputMode,
    +        checkpointFile("state"),
    +        runId,
    +        currentBatchId,
    +        offsetSeqMetadata)
    +      lastExecution.executedPlan // Force the lazy generation of execution plan
    +    }
    +
    +    sparkSession.sparkContext.setLocalProperty(
    +      ContinuousExecution.START_EPOCH_KEY, currentBatchId.toString)
    +    sparkSession.sparkContext.setLocalProperty(
    +      ContinuousExecution.RUN_ID_KEY, runId.toString)
    +
    +    // Use the parent Spark session for the endpoint since it's where this query ID is registered.
    +    val epochEndpoint =
    +      EpochCoordinatorRef.create(
    +        writer.get(), reader, currentBatchId,
    +        id.toString, runId.toString, sparkSession, SparkEnv.get)
    +    val epochUpdateThread = new Thread(new Runnable {
    +      override def run: Unit = {
    +        try {
    +          triggerExecutor.execute(() => {
    +            startTrigger()
    +
    +            if (reader.needsReconfiguration()) {
    +              stopSources()
    +              state.set(RECONFIGURING)
    --- End diff --
    
    I think that order is fine too, but I don't think stopSources can cause an exception in the query execution thread. queryExecutionThread.interrupt() is the line which passes control flow; until that runs the query execution thread should be sitting there waiting for the long-running spark task.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84937/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    **[Test build #85092 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85092/testReport)** for PR 19984 at commit [`19f08a9`](https://github.com/apache/spark/commit/19f08a9c875c6e52bb75c82d196fb3a310311ffe).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19984#discussion_r158134853
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala ---
    @@ -0,0 +1,205 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit}
    +import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark._
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.sql.{Row, SQLContext}
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader}
    +import org.apache.spark.sql.execution.streaming._
    +import org.apache.spark.sql.execution.streaming.continuous._
    +import org.apache.spark.sql.sources.v2.reader._
    +import org.apache.spark.sql.streaming.ProcessingTime
    +import org.apache.spark.util.{SystemClock, ThreadUtils}
    +
    +class ContinuousDataSourceRDD(
    +    sc: SparkContext,
    +    sqlContext: SQLContext,
    +    @transient private val readTasks: java.util.List[ReadTask[UnsafeRow]])
    +  extends RDD[UnsafeRow](sc, Nil) {
    +
    +  private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize
    +  private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs
    +
    +  override protected def getPartitions: Array[Partition] = {
    +    readTasks.asScala.zipWithIndex.map {
    +      case (readTask, index) => new DataSourceRDDPartition(index, readTask)
    +    }.toArray
    +  }
    +
    +  override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = {
    +    val reader = split.asInstanceOf[DataSourceRDDPartition].readTask.createDataReader()
    +
    +    val runId = context.getLocalProperty(ContinuousExecution.RUN_ID_KEY)
    +
    +    // This queue contains two types of messages:
    +    // * (null, null) representing an epoch boundary.
    +    // * (row, off) containing a data row and its corresponding PartitionOffset.
    +    val queue = new ArrayBlockingQueue[(UnsafeRow, PartitionOffset)](dataQueueSize)
    +
    +    val epochPollFailed = new AtomicBoolean(false)
    +    val epochPollExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
    +      s"epoch-poll--${runId}--${context.partitionId()}")
    +    val epochPollRunnable = new EpochPollRunnable(queue, context, epochPollFailed)
    +    epochPollExecutor.scheduleWithFixedDelay(
    +      epochPollRunnable, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS)
    +
    +    // Important sequencing - we must get start offset before the data reader thread begins
    +    val startOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset
    +
    +    val dataReaderFailed = new AtomicBoolean(false)
    +    val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed)
    +    dataReaderThread.setDaemon(true)
    +    dataReaderThread.start()
    +
    +    context.addTaskCompletionListener(_ => {
    +      reader.close()
    +      dataReaderThread.interrupt()
    +      epochPollExecutor.shutdown()
    +    })
    +
    +    val epochEndpoint = EpochCoordinatorRef.get(runId, SparkEnv.get)
    +    new Iterator[UnsafeRow] {
    +      private var currentRow: UnsafeRow = _
    +      private var currentOffset: PartitionOffset = startOffset
    +      private var currentEpoch =
    +        context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
    +
    +      override def hasNext(): Boolean = {
    +        if (dataReaderFailed.get()) {
    +          throw new SparkException("data read failed", dataReaderThread.failureReason)
    +        }
    +        if (epochPollFailed.get()) {
    +          throw new SparkException("epoch poll failed", epochPollRunnable.failureReason)
    +        }
    +
    +        queue.take() match {
    +          // epoch boundary marker
    +          case (null, null) =>
    +            epochEndpoint.send(ReportPartitionOffset(
    +              context.partitionId(),
    +              currentEpoch,
    +              currentOffset))
    +            currentEpoch += 1
    +            false
    +          // real row
    +          case (row, offset) =>
    +            currentRow = row
    +            currentOffset = offset
    +            true
    +        }
    +      }
    +
    +      override def next(): UnsafeRow = {
    --- End diff --
    
    this method doesn't follow the java Iterator next contract:
    ```
    NoSuchElementException if the iteration has no more elements
    ```
    
    You can extend `org.apache.spark.util.NextIterator` to fix it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

Posted by joseph-torres <gi...@git.apache.org>.
Github user joseph-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19984#discussion_r158156114
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala ---
    @@ -0,0 +1,343 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.util.concurrent.TimeUnit
    +
    +import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.sql.{AnalysisException, SparkSession}
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.execution.SQLExecution
    +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2}
    +import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
    +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, ContinuousWriteSupport, DataSourceV2Options}
    +import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, PartitionOffset}
    +import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
    +import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
    +import org.apache.spark.sql.types.StructType
    +import org.apache.spark.util.{Clock, Utils}
    +
    +class ContinuousExecution(
    +    sparkSession: SparkSession,
    +    name: String,
    +    checkpointRoot: String,
    +    analyzedPlan: LogicalPlan,
    +    sink: ContinuousWriteSupport,
    +    trigger: Trigger,
    +    triggerClock: Clock,
    +    outputMode: OutputMode,
    +    extraOptions: Map[String, String],
    +    deleteCheckpointOnStop: Boolean)
    +  extends StreamExecution(
    +    sparkSession, name, checkpointRoot, analyzedPlan, sink,
    +    trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
    +
    +  @volatile protected var continuousSources: Seq[ContinuousReader] = Seq.empty
    +  override protected def sources: Seq[BaseStreamingSource] = continuousSources
    +
    +  override lazy val logicalPlan: LogicalPlan = {
    +    assert(queryExecutionThread eq Thread.currentThread,
    +      "logicalPlan must be initialized in StreamExecutionThread " +
    +        s"but the current thread was ${Thread.currentThread}")
    +    var nextSourceId = 0L
    +    val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]()
    +    analyzedPlan.transform {
    +      case r @ StreamingRelationV2(
    +          source: ContinuousReadSupport, _, extraReaderOptions, output, _) =>
    +        toExecutionRelationMap.getOrElseUpdate(r, {
    +          ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession)
    +        })
    +      case StreamingRelationV2(_, sourceName, _, _, _) =>
    +        throw new AnalysisException(
    +          s"Data source $sourceName does not support continuous processing.")
    +    }
    +  }
    +
    +  private val triggerExecutor = trigger match {
    +    case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock)
    +    case _ => throw new IllegalStateException(s"Unsupported type of trigger: $trigger")
    +  }
    +
    +  override protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = {
    +    do {
    +      try {
    +        runContinuous(sparkSessionForStream)
    +      } catch {
    +        case _: Throwable if state.get().equals(RECONFIGURING) =>
    --- End diff --
    
    The sequencing is:
    
    - The pre-existing method stopSources() marks the ContinuousReader objects as stopped and cleans up any resources they may be holding. This doesn't affect query execution, and stopSources already swallows any non-fatal exception thrown by a stop() implementation.
    - The reconfiguring state is set.
    - The job group for this run is cancelled, fanning out to the continuous processing executors and telling them to stop.
    - The query execution thread is interrupted, passing control flow in that thread to the state RECONFIGURING check.
    
    We can check for InterruptedException specifically instead of Throwable, though. I'll change that.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    **[Test build #85216 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85216/testReport)** for PR 19984 at commit [`c00e104`](https://github.com/apache/spark/commit/c00e104803333eea181685142aebafc8fef8f0a8).
     * This patch **fails PySpark unit tests**.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19984#discussion_r158120028
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala ---
    @@ -0,0 +1,343 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.util.concurrent.TimeUnit
    +
    +import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.sql.{AnalysisException, SparkSession}
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.execution.SQLExecution
    +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2}
    +import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
    +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, ContinuousWriteSupport, DataSourceV2Options}
    +import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, PartitionOffset}
    +import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
    +import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
    +import org.apache.spark.sql.types.StructType
    +import org.apache.spark.util.{Clock, Utils}
    +
    +class ContinuousExecution(
    +    sparkSession: SparkSession,
    +    name: String,
    +    checkpointRoot: String,
    +    analyzedPlan: LogicalPlan,
    +    sink: ContinuousWriteSupport,
    +    trigger: Trigger,
    +    triggerClock: Clock,
    +    outputMode: OutputMode,
    +    extraOptions: Map[String, String],
    +    deleteCheckpointOnStop: Boolean)
    +  extends StreamExecution(
    +    sparkSession, name, checkpointRoot, analyzedPlan, sink,
    +    trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
    +
    +  @volatile protected var continuousSources: Seq[ContinuousReader] = Seq.empty
    +  override protected def sources: Seq[BaseStreamingSource] = continuousSources
    +
    +  override lazy val logicalPlan: LogicalPlan = {
    +    assert(queryExecutionThread eq Thread.currentThread,
    +      "logicalPlan must be initialized in StreamExecutionThread " +
    +        s"but the current thread was ${Thread.currentThread}")
    +    var nextSourceId = 0L
    +    val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]()
    +    analyzedPlan.transform {
    +      case r @ StreamingRelationV2(
    +          source: ContinuousReadSupport, _, extraReaderOptions, output, _) =>
    +        toExecutionRelationMap.getOrElseUpdate(r, {
    +          ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession)
    +        })
    +      case StreamingRelationV2(_, sourceName, _, _, _) =>
    +        throw new AnalysisException(
    +          s"Data source $sourceName does not support continuous processing.")
    +    }
    +  }
    +
    +  private val triggerExecutor = trigger match {
    +    case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock)
    +    case _ => throw new IllegalStateException(s"Unsupported type of trigger: $trigger")
    +  }
    +
    +  override protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = {
    +    do {
    +      try {
    +        runContinuous(sparkSessionForStream)
    +      } catch {
    +        case _: Throwable if state.get().equals(RECONFIGURING) =>
    +          // swallow exception and run again
    +          state.set(ACTIVE)
    +      }
    +    } while (state.get() == ACTIVE)
    +  }
    +
    +  /**
    +   * Populate the start offsets to start the execution at the current offsets stored in the sink
    +   * (i.e. avoid reprocessing data that we have already processed). This function must be called
    +   * before any processing occurs and will populate the following fields:
    +   *  - currentBatchId
    +   *  - committedOffsets
    +   *  The basic structure of this method is as follows:
    +   *
    +   *  Identify (from the commit log) the latest epoch that has committed
    +   *  IF last epoch exists THEN
    +   *    Get end offsets for the epoch
    +   *    Set those offsets as the current commit progress
    +   *    Set the next epoch ID as the last + 1
    +   *    Return the end offsets of the last epoch as start for the next one
    +   *    DONE
    +   *  ELSE
    +   *    Start a new query log
    +   *  DONE
    +   */
    +  private def getStartOffsets(sparkSessionToRunBatches: SparkSession): OffsetSeq = {
    +    // Note that this will need a slight modification for exactly once. If ending offsets were
    +    // reported but not committed for any epochs, we must replay exactly to those offsets.
    +    // For at least once, we can just ignore those reports and risk duplicates.
    +    commitLog.getLatest() match {
    +      case Some((latestEpochId, _)) =>
    +        val nextOffsets = offsetLog.get(latestEpochId).getOrElse {
    +          throw new IllegalStateException(
    +            s"Batch $latestEpochId was committed without end epoch offsets!")
    +        }
    +        committedOffsets = nextOffsets.toStreamProgress(sources)
    +
    +        // Forcibly align commit and offset logs by slicing off any spurious offset logs from
    +        // a previous run. We can't allow commits to an epoch that a previous run reached but
    +        // this run has not.
    +        offsetLog.purgeAfter(latestEpochId)
    +
    +        currentBatchId = latestEpochId + 1
    +        logDebug(s"Resuming at epoch $currentBatchId with committed offsets $committedOffsets")
    +        nextOffsets
    +      case None =>
    +        // We are starting this stream for the first time. Offsets are all None.
    +        logInfo(s"Starting new streaming query.")
    +        currentBatchId = 0
    +        OffsetSeq.fill(continuousSources.map(_ => null): _*)
    +    }
    +  }
    +
    +  /**
    +   * Do a continuous run.
    +   * @param sparkSessionForQuery Isolated [[SparkSession]] to run the continuous query with.
    +   */
    +  private def runContinuous(sparkSessionForQuery: SparkSession): Unit = {
    +    import scala.collection.JavaConverters._
    --- End diff --
    
    nit: move this to the beginning of this file


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    **[Test build #85230 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85230/testReport)** for PR 19984 at commit [`825d437`](https://github.com/apache/spark/commit/825d437fe1e897c2047171ce78c6bb92805dc5be).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    **[Test build #85079 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85079/testReport)** for PR 19984 at commit [`2af9b40`](https://github.com/apache/spark/commit/2af9b40d60027397fd6e915413b88f12249245e5).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19984#discussion_r165117379
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala ---
    @@ -75,6 +76,52 @@ case class StreamingExecutionRelation(
       )
     }
     
    +// We have to pack in the V1 data source as a shim, for the case when a source implements
    +// continuous processing (which is always V2) but only has V1 microbatch support. We don't
    +// know at read time whether the query is conntinuous or not, so we need to be able to
    --- End diff --
    
    The trigger isn't specified at the point where the dataframe is created.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19984#discussion_r158136868
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala ---
    @@ -0,0 +1,205 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit}
    +import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark._
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.sql.{Row, SQLContext}
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader}
    +import org.apache.spark.sql.execution.streaming._
    +import org.apache.spark.sql.execution.streaming.continuous._
    +import org.apache.spark.sql.sources.v2.reader._
    +import org.apache.spark.sql.streaming.ProcessingTime
    +import org.apache.spark.util.{SystemClock, ThreadUtils}
    +
    +class ContinuousDataSourceRDD(
    +    sc: SparkContext,
    +    sqlContext: SQLContext,
    +    @transient private val readTasks: java.util.List[ReadTask[UnsafeRow]])
    +  extends RDD[UnsafeRow](sc, Nil) {
    +
    +  private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize
    +  private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs
    +
    +  override protected def getPartitions: Array[Partition] = {
    +    readTasks.asScala.zipWithIndex.map {
    +      case (readTask, index) => new DataSourceRDDPartition(index, readTask)
    +    }.toArray
    +  }
    +
    +  override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = {
    +    val reader = split.asInstanceOf[DataSourceRDDPartition].readTask.createDataReader()
    +
    +    val runId = context.getLocalProperty(ContinuousExecution.RUN_ID_KEY)
    +
    +    // This queue contains two types of messages:
    +    // * (null, null) representing an epoch boundary.
    +    // * (row, off) containing a data row and its corresponding PartitionOffset.
    +    val queue = new ArrayBlockingQueue[(UnsafeRow, PartitionOffset)](dataQueueSize)
    +
    +    val epochPollFailed = new AtomicBoolean(false)
    +    val epochPollExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
    +      s"epoch-poll--${runId}--${context.partitionId()}")
    +    val epochPollRunnable = new EpochPollRunnable(queue, context, epochPollFailed)
    +    epochPollExecutor.scheduleWithFixedDelay(
    +      epochPollRunnable, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS)
    +
    +    // Important sequencing - we must get start offset before the data reader thread begins
    +    val startOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset
    +
    +    val dataReaderFailed = new AtomicBoolean(false)
    +    val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed)
    +    dataReaderThread.setDaemon(true)
    +    dataReaderThread.start()
    +
    +    context.addTaskCompletionListener(_ => {
    +      reader.close()
    +      dataReaderThread.interrupt()
    +      epochPollExecutor.shutdown()
    +    })
    +
    +    val epochEndpoint = EpochCoordinatorRef.get(runId, SparkEnv.get)
    +    new Iterator[UnsafeRow] {
    +      private var currentRow: UnsafeRow = _
    +      private var currentOffset: PartitionOffset = startOffset
    +      private var currentEpoch =
    +        context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
    +
    +      override def hasNext(): Boolean = {
    +        if (dataReaderFailed.get()) {
    +          throw new SparkException("data read failed", dataReaderThread.failureReason)
    +        }
    +        if (epochPollFailed.get()) {
    +          throw new SparkException("epoch poll failed", epochPollRunnable.failureReason)
    +        }
    +
    +        queue.take() match {
    --- End diff --
    
    What if `dataReaderFailed` is set when a thread is blocking here? Seems the task will block forever.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19984#discussion_r158389177
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala ---
    @@ -0,0 +1,343 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.util.concurrent.TimeUnit
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.sql.{AnalysisException, SparkSession}
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.execution.SQLExecution
    +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2}
    +import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
    +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, ContinuousWriteSupport, DataSourceV2Options}
    +import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, PartitionOffset}
    +import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
    +import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
    +import org.apache.spark.sql.types.StructType
    +import org.apache.spark.util.{Clock, Utils}
    +
    +class ContinuousExecution(
    +    sparkSession: SparkSession,
    +    name: String,
    +    checkpointRoot: String,
    +    analyzedPlan: LogicalPlan,
    +    sink: ContinuousWriteSupport,
    +    trigger: Trigger,
    +    triggerClock: Clock,
    +    outputMode: OutputMode,
    +    extraOptions: Map[String, String],
    +    deleteCheckpointOnStop: Boolean)
    +  extends StreamExecution(
    +    sparkSession, name, checkpointRoot, analyzedPlan, sink,
    +    trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
    +
    +  @volatile protected var continuousSources: Seq[ContinuousReader] = Seq.empty
    +  override protected def sources: Seq[BaseStreamingSource] = continuousSources
    +
    +  override lazy val logicalPlan: LogicalPlan = {
    +    assert(queryExecutionThread eq Thread.currentThread,
    +      "logicalPlan must be initialized in StreamExecutionThread " +
    +        s"but the current thread was ${Thread.currentThread}")
    +    var nextSourceId = 0L
    +    val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]()
    +    analyzedPlan.transform {
    +      case r @ StreamingRelationV2(
    +          source: ContinuousReadSupport, _, extraReaderOptions, output, _) =>
    +        toExecutionRelationMap.getOrElseUpdate(r, {
    +          ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession)
    +        })
    +      case StreamingRelationV2(_, sourceName, _, _, _) =>
    +        throw new AnalysisException(
    +          s"Data source $sourceName does not support continuous processing.")
    +    }
    +  }
    +
    +  private val triggerExecutor = trigger match {
    +    case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock)
    +    case _ => throw new IllegalStateException(s"Unsupported type of trigger: $trigger")
    +  }
    +
    +  override protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = {
    +    do {
    +      try {
    +        runContinuous(sparkSessionForStream)
    +      } catch {
    +        case _: InterruptedException if state.get().equals(RECONFIGURING) =>
    +          // swallow exception and run again
    +          state.set(ACTIVE)
    +      }
    +    } while (state.get() == ACTIVE)
    +  }
    +
    +  /**
    +   * Populate the start offsets to start the execution at the current offsets stored in the sink
    +   * (i.e. avoid reprocessing data that we have already processed). This function must be called
    +   * before any processing occurs and will populate the following fields:
    +   *  - currentBatchId
    +   *  - committedOffsets
    +   *  The basic structure of this method is as follows:
    +   *
    +   *  Identify (from the commit log) the latest epoch that has committed
    +   *  IF last epoch exists THEN
    +   *    Get end offsets for the epoch
    +   *    Set those offsets as the current commit progress
    +   *    Set the next epoch ID as the last + 1
    +   *    Return the end offsets of the last epoch as start for the next one
    +   *    DONE
    +   *  ELSE
    +   *    Start a new query log
    +   *  DONE
    +   */
    +  private def getStartOffsets(sparkSessionToRunBatches: SparkSession): OffsetSeq = {
    +    // Note that this will need a slight modification for exactly once. If ending offsets were
    +    // reported but not committed for any epochs, we must replay exactly to those offsets.
    +    // For at least once, we can just ignore those reports and risk duplicates.
    +    commitLog.getLatest() match {
    +      case Some((latestEpochId, _)) =>
    +        val nextOffsets = offsetLog.get(latestEpochId).getOrElse {
    +          throw new IllegalStateException(
    +            s"Batch $latestEpochId was committed without end epoch offsets!")
    +        }
    +        committedOffsets = nextOffsets.toStreamProgress(sources)
    +
    +        // Forcibly align commit and offset logs by slicing off any spurious offset logs from
    +        // a previous run. We can't allow commits to an epoch that a previous run reached but
    +        // this run has not.
    +        offsetLog.purgeAfter(latestEpochId)
    +
    +        currentBatchId = latestEpochId + 1
    +        logDebug(s"Resuming at epoch $currentBatchId with committed offsets $committedOffsets")
    +        nextOffsets
    +      case None =>
    +        // We are starting this stream for the first time. Offsets are all None.
    +        logInfo(s"Starting new streaming query.")
    +        currentBatchId = 0
    +        OffsetSeq.fill(continuousSources.map(_ => null): _*)
    +    }
    +  }
    +
    +  /**
    +   * Do a continuous run.
    +   * @param sparkSessionForQuery Isolated [[SparkSession]] to run the continuous query with.
    +   */
    +  private def runContinuous(sparkSessionForQuery: SparkSession): Unit = {
    +    // A list of attributes that will need to be updated.
    +    val replacements = new ArrayBuffer[(Attribute, Attribute)]
    +    // Translate from continuous relation to the underlying data source.
    +    var nextSourceId = 0
    +    continuousSources = logicalPlan.collect {
    +      case ContinuousExecutionRelation(dataSource, extraReaderOptions, output) =>
    +        val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
    +        nextSourceId += 1
    +
    +        dataSource.createContinuousReader(
    +          java.util.Optional.empty[StructType](),
    +          metadataPath,
    +          new DataSourceV2Options(extraReaderOptions.asJava))
    +    }
    +    uniqueSources = continuousSources.distinct
    +
    +    val offsets = getStartOffsets(sparkSessionForQuery)
    +
    +    var insertedSourceId = 0
    +    val withNewSources = logicalPlan transform {
    +      case ContinuousExecutionRelation(_, _, output) =>
    +        val reader = continuousSources(insertedSourceId)
    +        insertedSourceId += 1
    +        val newOutput = reader.readSchema().toAttributes
    +
    +        assert(output.size == newOutput.size,
    +          s"Invalid reader: ${Utils.truncatedString(output, ",")} != " +
    +            s"${Utils.truncatedString(newOutput, ",")}")
    +        replacements ++= output.zip(newOutput)
    +
    +        val loggedOffset = offsets.offsets(0)
    +        val realOffset = loggedOffset.map(off => reader.deserializeOffset(off.json))
    +        reader.setOffset(java.util.Optional.ofNullable(realOffset.orNull))
    +        DataSourceV2Relation(newOutput, reader)
    +    }
    +
    +    // Rewire the plan to use the new attributes that were returned by the source.
    +    val replacementMap = AttributeMap(replacements)
    +    val triggerLogicalPlan = withNewSources transformAllExpressions {
    +      case a: Attribute if replacementMap.contains(a) =>
    +        replacementMap(a).withMetadata(a.metadata)
    +      case (_: CurrentTimestamp | _: CurrentDate) =>
    +        throw new IllegalStateException(
    +          "CurrentTimestamp and CurrentDate not yet supported for continuous processing")
    +    }
    +
    +    val writer = sink.createContinuousWriter(
    +      s"$runId",
    +      triggerLogicalPlan.schema,
    +      outputMode,
    +      new DataSourceV2Options(extraOptions.asJava))
    +    val withSink = WriteToDataSourceV2(writer.get(), triggerLogicalPlan)
    +
    +    val reader = withSink.collect {
    +      case DataSourceV2Relation(_, r: ContinuousReader) => r
    +    }.head
    +
    +    reportTimeTaken("queryPlanning") {
    +      lastExecution = new IncrementalExecution(
    +        sparkSessionForQuery,
    +        withSink,
    +        outputMode,
    +        checkpointFile("state"),
    +        runId,
    +        currentBatchId,
    +        offsetSeqMetadata)
    +      lastExecution.executedPlan // Force the lazy generation of execution plan
    +    }
    +
    +    sparkSession.sparkContext.setLocalProperty(
    +      ContinuousExecution.START_EPOCH_KEY, currentBatchId.toString)
    +    sparkSession.sparkContext.setLocalProperty(
    +      ContinuousExecution.RUN_ID_KEY, runId.toString)
    +
    +    // Use the parent Spark session for the endpoint since it's where this query ID is registered.
    +    val epochEndpoint =
    +      EpochCoordinatorRef.create(
    +        writer.get(), reader, currentBatchId,
    +        id.toString, runId.toString, sparkSession, SparkEnv.get)
    +    val epochUpdateThread = new Thread(new Runnable {
    --- End diff --
    
    nit: -> `new Thread(new Runnable {` -> `new Thread(s"epoch update thread for $prettyIdString", new Runnable {`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    **[Test build #85332 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85332/testReport)** for PR 19984 at commit [`b4f7976`](https://github.com/apache/spark/commit/b4f79762c083735011bf98250c39c263876c8cc8).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    **[Test build #84976 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84976/testReport)** for PR 19984 at commit [`527cc5d`](https://github.com/apache/spark/commit/527cc5d20e13ec3b1779de6e678b71dc3071b4a7).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19984#discussion_r158139103
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala ---
    @@ -0,0 +1,205 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit}
    +import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark._
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.rpc.RpcEndpointRef
    +import org.apache.spark.sql.{Row, SQLContext}
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader}
    +import org.apache.spark.sql.execution.streaming._
    +import org.apache.spark.sql.execution.streaming.continuous._
    +import org.apache.spark.sql.sources.v2.reader._
    +import org.apache.spark.sql.streaming.ProcessingTime
    +import org.apache.spark.util.{SystemClock, ThreadUtils}
    +
    +class ContinuousDataSourceRDD(
    +    sc: SparkContext,
    +    sqlContext: SQLContext,
    +    @transient private val readTasks: java.util.List[ReadTask[UnsafeRow]])
    +  extends RDD[UnsafeRow](sc, Nil) {
    +
    +  private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize
    +  private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs
    +
    +  override protected def getPartitions: Array[Partition] = {
    +    readTasks.asScala.zipWithIndex.map {
    +      case (readTask, index) => new DataSourceRDDPartition(index, readTask)
    +    }.toArray
    +  }
    +
    +  override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = {
    +    val reader = split.asInstanceOf[DataSourceRDDPartition].readTask.createDataReader()
    +
    +    val runId = context.getLocalProperty(ContinuousExecution.RUN_ID_KEY)
    +
    +    // This queue contains two types of messages:
    +    // * (null, null) representing an epoch boundary.
    +    // * (row, off) containing a data row and its corresponding PartitionOffset.
    +    val queue = new ArrayBlockingQueue[(UnsafeRow, PartitionOffset)](dataQueueSize)
    +
    +    val epochPollFailed = new AtomicBoolean(false)
    +    val epochPollExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
    +      s"epoch-poll--${runId}--${context.partitionId()}")
    +    val epochPollRunnable = new EpochPollRunnable(queue, context, epochPollFailed)
    +    epochPollExecutor.scheduleWithFixedDelay(
    +      epochPollRunnable, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS)
    +
    +    // Important sequencing - we must get start offset before the data reader thread begins
    +    val startOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset
    +
    +    val dataReaderFailed = new AtomicBoolean(false)
    +    val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed)
    +    dataReaderThread.setDaemon(true)
    +    dataReaderThread.start()
    +
    +    context.addTaskCompletionListener(_ => {
    +      reader.close()
    +      dataReaderThread.interrupt()
    +      epochPollExecutor.shutdown()
    +    })
    +
    +    val epochEndpoint = EpochCoordinatorRef.get(runId, SparkEnv.get)
    +    new Iterator[UnsafeRow] {
    +      private var currentRow: UnsafeRow = _
    +      private var currentOffset: PartitionOffset = startOffset
    +      private var currentEpoch =
    +        context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
    +
    +      override def hasNext(): Boolean = {
    +        if (dataReaderFailed.get()) {
    +          throw new SparkException("data read failed", dataReaderThread.failureReason)
    +        }
    +        if (epochPollFailed.get()) {
    +          throw new SparkException("epoch poll failed", epochPollRunnable.failureReason)
    +        }
    +
    +        queue.take() match {
    +          // epoch boundary marker
    +          case (null, null) =>
    +            epochEndpoint.send(ReportPartitionOffset(
    +              context.partitionId(),
    +              currentEpoch,
    +              currentOffset))
    +            currentEpoch += 1
    +            false
    +          // real row
    +          case (row, offset) =>
    +            currentRow = row
    +            currentOffset = offset
    +            true
    +        }
    +      }
    +
    +      override def next(): UnsafeRow = {
    +        val r = currentRow
    +        currentRow = null
    +        r
    +      }
    +    }
    +  }
    +
    +  override def getPreferredLocations(split: Partition): Seq[String] = {
    +    split.asInstanceOf[DataSourceRDDPartition].readTask.preferredLocations()
    +  }
    +}
    +
    +case class EpochPackedPartitionOffset(epoch: Long) extends PartitionOffset
    +
    +class EpochPollRunnable(
    +    queue: BlockingQueue[(UnsafeRow, PartitionOffset)],
    +    context: TaskContext,
    +    failedFlag: AtomicBoolean)
    +  extends Thread with Logging {
    +  private[continuous] var failureReason: Throwable = _
    +
    +  private val epochEndpoint = EpochCoordinatorRef.get(
    +    context.getLocalProperty(ContinuousExecution.RUN_ID_KEY), SparkEnv.get)
    +  private var currentEpoch = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
    +
    +  override def run(): Unit = {
    +    try {
    +      val newEpoch = epochEndpoint.askSync[Long](GetCurrentEpoch())
    +      for (i <- currentEpoch to newEpoch - 1) {
    +        queue.put((null, null))
    +        logDebug(s"Sent marker to start epoch ${i + 1}")
    +      }
    +      currentEpoch = newEpoch
    +    } catch {
    +      case t: Throwable =>
    +        failedFlag.set(true)
    +        failureReason = t
    +        throw t
    --- End diff --
    
    ditto


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    **[Test build #84937 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84937/testReport)** for PR 19984 at commit [`25a23d1`](https://github.com/apache/spark/commit/25a23d10e35f1c4283cdf7a2657c844f190d4d15).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85079/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    **[Test build #84973 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84973/testReport)** for PR 19984 at commit [`70d5d7c`](https://github.com/apache/spark/commit/70d5d7caed94394248c1d28777e74f3df9e398fe).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    **[Test build #85218 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85218/testReport)** for PR 19984 at commit [`825d437`](https://github.com/apache/spark/commit/825d437fe1e897c2047171ce78c6bb92805dc5be).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19984#discussion_r157135337
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java ---
    @@ -95,4 +96,57 @@ public static Trigger ProcessingTime(String interval) {
       public static Trigger Once() {
         return OneTimeTrigger$.MODULE$;
       }
    +
    +  /**
    +   * A trigger that continuously processes streaming data, asynchronously checkpointing at
    +   * the specified interval.
    +   *
    +   * @since 2.3.0
    +   */
    +  public static Trigger Continuous(long intervalMs) {
    +    return ContinuousTrigger.apply(intervalMs);
    +  }
    +
    +  /**
    +   * A trigger that continuously processes streaming data, asynchronously checkpointing at
    +   * the specified interval.
    +   *
    +   * {{{
    +   *    import java.util.concurrent.TimeUnit
    +   *    df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
    --- End diff --
    
    `Trigger.Continuous(10, TimeUnit.SECONDS)` instead of `ProcessingTime.create(10, TimeUnit.SECONDS)`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    **[Test build #85287 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85287/testReport)** for PR 19984 at commit [`07a9e06`](https://github.com/apache/spark/commit/07a9e0654df61ad52f7db28ee663a380dee3c2a8).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84973/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    **[Test build #85230 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85230/testReport)** for PR 19984 at commit [`825d437`](https://github.com/apache/spark/commit/825d437fe1e897c2047171ce78c6bb92805dc5be).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    **[Test build #84975 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84975/testReport)** for PR 19984 at commit [`63f78d2`](https://github.com/apache/spark/commit/63f78d266f3fd4ac5fc2fec53c04c0029d1a5e68).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19984#discussion_r158389368
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala ---
    @@ -0,0 +1,343 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.util.concurrent.TimeUnit
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.sql.{AnalysisException, SparkSession}
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.execution.SQLExecution
    +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2}
    +import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
    +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, ContinuousWriteSupport, DataSourceV2Options}
    +import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, PartitionOffset}
    +import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
    +import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
    +import org.apache.spark.sql.types.StructType
    +import org.apache.spark.util.{Clock, Utils}
    +
    +class ContinuousExecution(
    +    sparkSession: SparkSession,
    +    name: String,
    +    checkpointRoot: String,
    +    analyzedPlan: LogicalPlan,
    +    sink: ContinuousWriteSupport,
    +    trigger: Trigger,
    +    triggerClock: Clock,
    +    outputMode: OutputMode,
    +    extraOptions: Map[String, String],
    +    deleteCheckpointOnStop: Boolean)
    +  extends StreamExecution(
    +    sparkSession, name, checkpointRoot, analyzedPlan, sink,
    +    trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
    +
    +  @volatile protected var continuousSources: Seq[ContinuousReader] = Seq.empty
    +  override protected def sources: Seq[BaseStreamingSource] = continuousSources
    +
    +  override lazy val logicalPlan: LogicalPlan = {
    +    assert(queryExecutionThread eq Thread.currentThread,
    +      "logicalPlan must be initialized in StreamExecutionThread " +
    +        s"but the current thread was ${Thread.currentThread}")
    +    var nextSourceId = 0L
    +    val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]()
    +    analyzedPlan.transform {
    +      case r @ StreamingRelationV2(
    +          source: ContinuousReadSupport, _, extraReaderOptions, output, _) =>
    +        toExecutionRelationMap.getOrElseUpdate(r, {
    +          ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession)
    +        })
    +      case StreamingRelationV2(_, sourceName, _, _, _) =>
    +        throw new AnalysisException(
    +          s"Data source $sourceName does not support continuous processing.")
    +    }
    +  }
    +
    +  private val triggerExecutor = trigger match {
    +    case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock)
    +    case _ => throw new IllegalStateException(s"Unsupported type of trigger: $trigger")
    +  }
    +
    +  override protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = {
    +    do {
    +      try {
    +        runContinuous(sparkSessionForStream)
    +      } catch {
    +        case _: InterruptedException if state.get().equals(RECONFIGURING) =>
    +          // swallow exception and run again
    +          state.set(ACTIVE)
    +      }
    +    } while (state.get() == ACTIVE)
    +  }
    +
    +  /**
    +   * Populate the start offsets to start the execution at the current offsets stored in the sink
    +   * (i.e. avoid reprocessing data that we have already processed). This function must be called
    +   * before any processing occurs and will populate the following fields:
    +   *  - currentBatchId
    +   *  - committedOffsets
    +   *  The basic structure of this method is as follows:
    +   *
    +   *  Identify (from the commit log) the latest epoch that has committed
    +   *  IF last epoch exists THEN
    +   *    Get end offsets for the epoch
    +   *    Set those offsets as the current commit progress
    +   *    Set the next epoch ID as the last + 1
    +   *    Return the end offsets of the last epoch as start for the next one
    +   *    DONE
    +   *  ELSE
    +   *    Start a new query log
    +   *  DONE
    +   */
    +  private def getStartOffsets(sparkSessionToRunBatches: SparkSession): OffsetSeq = {
    +    // Note that this will need a slight modification for exactly once. If ending offsets were
    +    // reported but not committed for any epochs, we must replay exactly to those offsets.
    +    // For at least once, we can just ignore those reports and risk duplicates.
    +    commitLog.getLatest() match {
    +      case Some((latestEpochId, _)) =>
    +        val nextOffsets = offsetLog.get(latestEpochId).getOrElse {
    +          throw new IllegalStateException(
    +            s"Batch $latestEpochId was committed without end epoch offsets!")
    +        }
    +        committedOffsets = nextOffsets.toStreamProgress(sources)
    +
    +        // Forcibly align commit and offset logs by slicing off any spurious offset logs from
    +        // a previous run. We can't allow commits to an epoch that a previous run reached but
    +        // this run has not.
    +        offsetLog.purgeAfter(latestEpochId)
    +
    +        currentBatchId = latestEpochId + 1
    +        logDebug(s"Resuming at epoch $currentBatchId with committed offsets $committedOffsets")
    +        nextOffsets
    +      case None =>
    +        // We are starting this stream for the first time. Offsets are all None.
    +        logInfo(s"Starting new streaming query.")
    +        currentBatchId = 0
    +        OffsetSeq.fill(continuousSources.map(_ => null): _*)
    +    }
    +  }
    +
    +  /**
    +   * Do a continuous run.
    +   * @param sparkSessionForQuery Isolated [[SparkSession]] to run the continuous query with.
    +   */
    +  private def runContinuous(sparkSessionForQuery: SparkSession): Unit = {
    +    // A list of attributes that will need to be updated.
    +    val replacements = new ArrayBuffer[(Attribute, Attribute)]
    +    // Translate from continuous relation to the underlying data source.
    +    var nextSourceId = 0
    +    continuousSources = logicalPlan.collect {
    +      case ContinuousExecutionRelation(dataSource, extraReaderOptions, output) =>
    +        val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
    +        nextSourceId += 1
    +
    +        dataSource.createContinuousReader(
    +          java.util.Optional.empty[StructType](),
    +          metadataPath,
    +          new DataSourceV2Options(extraReaderOptions.asJava))
    +    }
    +    uniqueSources = continuousSources.distinct
    +
    +    val offsets = getStartOffsets(sparkSessionForQuery)
    +
    +    var insertedSourceId = 0
    +    val withNewSources = logicalPlan transform {
    +      case ContinuousExecutionRelation(_, _, output) =>
    +        val reader = continuousSources(insertedSourceId)
    +        insertedSourceId += 1
    +        val newOutput = reader.readSchema().toAttributes
    +
    +        assert(output.size == newOutput.size,
    +          s"Invalid reader: ${Utils.truncatedString(output, ",")} != " +
    +            s"${Utils.truncatedString(newOutput, ",")}")
    +        replacements ++= output.zip(newOutput)
    +
    +        val loggedOffset = offsets.offsets(0)
    +        val realOffset = loggedOffset.map(off => reader.deserializeOffset(off.json))
    +        reader.setOffset(java.util.Optional.ofNullable(realOffset.orNull))
    +        DataSourceV2Relation(newOutput, reader)
    +    }
    +
    +    // Rewire the plan to use the new attributes that were returned by the source.
    +    val replacementMap = AttributeMap(replacements)
    +    val triggerLogicalPlan = withNewSources transformAllExpressions {
    +      case a: Attribute if replacementMap.contains(a) =>
    +        replacementMap(a).withMetadata(a.metadata)
    +      case (_: CurrentTimestamp | _: CurrentDate) =>
    +        throw new IllegalStateException(
    +          "CurrentTimestamp and CurrentDate not yet supported for continuous processing")
    +    }
    +
    +    val writer = sink.createContinuousWriter(
    +      s"$runId",
    +      triggerLogicalPlan.schema,
    +      outputMode,
    +      new DataSourceV2Options(extraOptions.asJava))
    +    val withSink = WriteToDataSourceV2(writer.get(), triggerLogicalPlan)
    +
    +    val reader = withSink.collect {
    +      case DataSourceV2Relation(_, r: ContinuousReader) => r
    +    }.head
    +
    +    reportTimeTaken("queryPlanning") {
    +      lastExecution = new IncrementalExecution(
    +        sparkSessionForQuery,
    +        withSink,
    +        outputMode,
    +        checkpointFile("state"),
    +        runId,
    +        currentBatchId,
    +        offsetSeqMetadata)
    +      lastExecution.executedPlan // Force the lazy generation of execution plan
    +    }
    +
    +    sparkSession.sparkContext.setLocalProperty(
    +      ContinuousExecution.START_EPOCH_KEY, currentBatchId.toString)
    +    sparkSession.sparkContext.setLocalProperty(
    +      ContinuousExecution.RUN_ID_KEY, runId.toString)
    +
    +    // Use the parent Spark session for the endpoint since it's where this query ID is registered.
    +    val epochEndpoint =
    +      EpochCoordinatorRef.create(
    +        writer.get(), reader, currentBatchId,
    +        id.toString, runId.toString, sparkSession, SparkEnv.get)
    +    val epochUpdateThread = new Thread(new Runnable {
    +      override def run: Unit = {
    +        try {
    +          triggerExecutor.execute(() => {
    +            startTrigger()
    +
    +            if (reader.needsReconfiguration()) {
    +              stopSources()
    +              state.set(RECONFIGURING)
    --- End diff --
    
    nit: move this above `stopSources()`. Otherwise, the stream thread may see the exception before setting state to `RECONFIGURING`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

Posted by joseph-torres <gi...@git.apache.org>.
Github user joseph-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19984#discussion_r157278184
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
    @@ -1035,6 +1035,22 @@ object SQLConf {
         .booleanConf
         .createWithDefault(true)
     
    +  val CONTINUOUS_STREAMING_EXECUTOR_QUEUE_SIZE =
    +    buildConf("spark.sql.streaming.continuous.executorQueueSize")
    +    .internal()
    +    .doc("The size (measured in number of rows) of the queue used in continuous execution to" +
    +      " buffer the results of a ContinuousDataReader.")
    +    .intConf
    --- End diff --
    
    Should it be? I can't imagine anything close to MAX_INT being a reasonable value here. Will it be hard to migrate to a long if we later discover it's needed?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19984#discussion_r158121770
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala ---
    @@ -0,0 +1,343 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.util.concurrent.TimeUnit
    +
    +import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.sql.{AnalysisException, SparkSession}
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.execution.SQLExecution
    +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2}
    +import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
    +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, ContinuousWriteSupport, DataSourceV2Options}
    +import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, PartitionOffset}
    +import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
    +import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
    +import org.apache.spark.sql.types.StructType
    +import org.apache.spark.util.{Clock, Utils}
    +
    +class ContinuousExecution(
    +    sparkSession: SparkSession,
    +    name: String,
    +    checkpointRoot: String,
    +    analyzedPlan: LogicalPlan,
    +    sink: ContinuousWriteSupport,
    +    trigger: Trigger,
    +    triggerClock: Clock,
    +    outputMode: OutputMode,
    +    extraOptions: Map[String, String],
    +    deleteCheckpointOnStop: Boolean)
    +  extends StreamExecution(
    +    sparkSession, name, checkpointRoot, analyzedPlan, sink,
    +    trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
    +
    +  @volatile protected var continuousSources: Seq[ContinuousReader] = Seq.empty
    +  override protected def sources: Seq[BaseStreamingSource] = continuousSources
    +
    +  override lazy val logicalPlan: LogicalPlan = {
    +    assert(queryExecutionThread eq Thread.currentThread,
    +      "logicalPlan must be initialized in StreamExecutionThread " +
    +        s"but the current thread was ${Thread.currentThread}")
    +    var nextSourceId = 0L
    +    val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]()
    +    analyzedPlan.transform {
    +      case r @ StreamingRelationV2(
    +          source: ContinuousReadSupport, _, extraReaderOptions, output, _) =>
    +        toExecutionRelationMap.getOrElseUpdate(r, {
    +          ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession)
    +        })
    +      case StreamingRelationV2(_, sourceName, _, _, _) =>
    +        throw new AnalysisException(
    +          s"Data source $sourceName does not support continuous processing.")
    +    }
    +  }
    +
    +  private val triggerExecutor = trigger match {
    +    case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock)
    +    case _ => throw new IllegalStateException(s"Unsupported type of trigger: $trigger")
    +  }
    +
    +  override protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = {
    +    do {
    +      try {
    +        runContinuous(sparkSessionForStream)
    +      } catch {
    +        case _: Throwable if state.get().equals(RECONFIGURING) =>
    --- End diff --
    
    In a second thought, it's dangerous to swallow exception here. If the source has some bugs in `Source.stop`, it will hide it and continue to run. I would expect to see a better mechanism to ask the source to stop it without throwing exceptions.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19984#discussion_r158382180
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java ---
    @@ -19,6 +19,7 @@
     
     import java.util.concurrent.TimeUnit;
     
    +import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger;
    --- End diff --
    
    nit: move this below `import org.apache.spark.annotation.InterfaceStability;`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    Thanks! Merging to master!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    **[Test build #85089 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85089/testReport)** for PR 19984 at commit [`359ebdd`](https://github.com/apache/spark/commit/359ebdd8bdac0b93aa6b88beab0212393f1e2577).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19984#discussion_r158361942
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala ---
    @@ -109,6 +125,42 @@ object DataWritingSparkTask extends Logging {
           logError(s"Writer for partition ${context.partitionId()} aborted.")
         })
       }
    +
    +  def runContinuous(
    +      writeTask: DataWriterFactory[InternalRow],
    +      context: TaskContext,
    +      iter: Iterator[InternalRow]): WriterCommitMessage = {
    +    val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber())
    +    val runId = context.getLocalProperty(ContinuousExecution.RUN_ID_KEY)
    +    val currentMsg: WriterCommitMessage = null
    +    var currentEpoch = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
    +
    +    do {
    +      // write the data and commit this writer.
    +      Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
    +        try {
    +          iter.foreach(dataWriter.write)
    +          logInfo(s"Writer for partition ${context.partitionId()} is committing.")
    +          val msg = dataWriter.commit()
    +          logInfo(s"Writer for partition ${context.partitionId()} committed.")
    +          EpochCoordinatorRef.get(runId, SparkEnv.get).send(
    --- End diff --
    
    nit: `EpochCoordinatorRef.get` is not cheap. You can store it outside the loop.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85089/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    **[Test build #84976 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84976/testReport)** for PR 19984 at commit [`527cc5d`](https://github.com/apache/spark/commit/527cc5d20e13ec3b1779de6e678b71dc3071b4a7).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19984#discussion_r157124505
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java ---
    @@ -95,4 +96,57 @@ public static Trigger ProcessingTime(String interval) {
       public static Trigger Once() {
         return OneTimeTrigger$.MODULE$;
       }
    +
    +  /**
    +   * A trigger that continuously processes streaming data, asynchronously checkpointing at
    +   * the specified interval.
    +   *
    +   * @since 2.3.0
    +   */
    +  public static Trigger Continuous(long intervalMs) {
    +    return ContinuousTrigger.apply(intervalMs);
    +  }
    +
    +  /**
    +   * A trigger that continuously processes streaming data, asynchronously checkpointing at
    +   * the specified interval.
    +   *
    +   * {{{
    +   *    import java.util.concurrent.TimeUnit
    +   *    df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
    +   * }}}
    +   *
    +   * @since 2.3.0
    +   */
    +  public static Trigger Continuous(long interval, TimeUnit timeUnit) {
    +    return ContinuousTrigger.create(interval, timeUnit);
    +  }
    +
    +  /**
    +   * (Scala-friendly)
    +   * A trigger that continuously processes streaming data, asynchronously checkpointing at
    +   * the specified interval.
    +   *
    +   * {{{
    +   *    import scala.concurrent.duration._
    +   *    df.writeStream.trigger(Trigger.Continuous(10.seconds))
    +   * }}}
    +   * @since 2.2.0
    --- End diff --
    
    2.3.0?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19984#discussion_r158388472
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala ---
    @@ -0,0 +1,343 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.util.concurrent.TimeUnit
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.sql.{AnalysisException, SparkSession}
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.execution.SQLExecution
    +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2}
    +import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
    +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, ContinuousWriteSupport, DataSourceV2Options}
    +import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, PartitionOffset}
    +import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
    +import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
    +import org.apache.spark.sql.types.StructType
    +import org.apache.spark.util.{Clock, Utils}
    +
    +class ContinuousExecution(
    +    sparkSession: SparkSession,
    +    name: String,
    +    checkpointRoot: String,
    +    analyzedPlan: LogicalPlan,
    +    sink: ContinuousWriteSupport,
    +    trigger: Trigger,
    +    triggerClock: Clock,
    +    outputMode: OutputMode,
    +    extraOptions: Map[String, String],
    +    deleteCheckpointOnStop: Boolean)
    +  extends StreamExecution(
    +    sparkSession, name, checkpointRoot, analyzedPlan, sink,
    +    trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
    +
    +  @volatile protected var continuousSources: Seq[ContinuousReader] = Seq.empty
    +  override protected def sources: Seq[BaseStreamingSource] = continuousSources
    +
    +  override lazy val logicalPlan: LogicalPlan = {
    +    assert(queryExecutionThread eq Thread.currentThread,
    +      "logicalPlan must be initialized in StreamExecutionThread " +
    +        s"but the current thread was ${Thread.currentThread}")
    +    var nextSourceId = 0L
    --- End diff --
    
    nit: unused


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    **[Test build #84973 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84973/testReport)** for PR 19984 at commit [`70d5d7c`](https://github.com/apache/spark/commit/70d5d7caed94394248c1d28777e74f3df9e398fe).
     * This patch **fails MiMa tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by joseph-torres <gi...@git.apache.org>.
Github user joseph-torres commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    The result says it fails Spark unit tests, but clicking through shows a count of 0.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

Posted by joseph-torres <gi...@git.apache.org>.
Github user joseph-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19984#discussion_r158159270
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala ---
    @@ -0,0 +1,343 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.util.concurrent.TimeUnit
    +
    +import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.sql.{AnalysisException, SparkSession}
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.execution.SQLExecution
    +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2}
    +import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
    +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, ContinuousWriteSupport, DataSourceV2Options}
    +import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, PartitionOffset}
    +import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
    +import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
    +import org.apache.spark.sql.types.StructType
    +import org.apache.spark.util.{Clock, Utils}
    +
    +class ContinuousExecution(
    +    sparkSession: SparkSession,
    +    name: String,
    +    checkpointRoot: String,
    +    analyzedPlan: LogicalPlan,
    +    sink: ContinuousWriteSupport,
    +    trigger: Trigger,
    +    triggerClock: Clock,
    +    outputMode: OutputMode,
    +    extraOptions: Map[String, String],
    +    deleteCheckpointOnStop: Boolean)
    +  extends StreamExecution(
    +    sparkSession, name, checkpointRoot, analyzedPlan, sink,
    +    trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
    +
    +  @volatile protected var continuousSources: Seq[ContinuousReader] = Seq.empty
    +  override protected def sources: Seq[BaseStreamingSource] = continuousSources
    +
    +  override lazy val logicalPlan: LogicalPlan = {
    +    assert(queryExecutionThread eq Thread.currentThread,
    +      "logicalPlan must be initialized in StreamExecutionThread " +
    +        s"but the current thread was ${Thread.currentThread}")
    +    var nextSourceId = 0L
    +    val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]()
    +    analyzedPlan.transform {
    +      case r @ StreamingRelationV2(
    +          source: ContinuousReadSupport, _, extraReaderOptions, output, _) =>
    +        toExecutionRelationMap.getOrElseUpdate(r, {
    +          ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession)
    +        })
    +      case StreamingRelationV2(_, sourceName, _, _, _) =>
    +        throw new AnalysisException(
    +          s"Data source $sourceName does not support continuous processing.")
    +    }
    +  }
    +
    +  private val triggerExecutor = trigger match {
    +    case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock)
    --- End diff --
    
    I think the coupling is correct here. ProcessingTime represents the rate of progress through the query's fenceposts, which applies here as well as it does in the microbatch case.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19984#discussion_r158116511
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala ---
    @@ -0,0 +1,195 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.util.concurrent.atomic.AtomicLong
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
    +import org.apache.spark.sql.SparkSession
    +import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
    +import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, PartitionOffset}
    +import org.apache.spark.sql.sources.v2.writer.{ContinuousWriter, WriterCommitMessage}
    +import org.apache.spark.util.RpcUtils
    +
    +private[continuous] sealed trait EpochCoordinatorMessage extends Serializable
    +
    +// Driver epoch trigger message
    +/**
    + * Atomically increment the current epoch and get the new value.
    + */
    +private[sql] case class IncrementAndGetEpoch() extends EpochCoordinatorMessage
    --- End diff --
    
    nit: `case class` -> `case object`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85216/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85230/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    **[Test build #84937 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84937/testReport)** for PR 19984 at commit [`25a23d1`](https://github.com/apache/spark/commit/25a23d10e35f1c4283cdf7a2657c844f190d4d15).
     * This patch **fails MiMa tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19984
  
    **[Test build #85216 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85216/testReport)** for PR 19984 at commit [`c00e104`](https://github.com/apache/spark/commit/c00e104803333eea181685142aebafc8fef8f0a8).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19984#discussion_r157124263
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
    @@ -1035,6 +1035,22 @@ object SQLConf {
         .booleanConf
         .createWithDefault(true)
     
    +  val CONTINUOUS_STREAMING_EXECUTOR_QUEUE_SIZE =
    +    buildConf("spark.sql.streaming.continuous.executorQueueSize")
    +    .internal()
    +    .doc("The size (measured in number of rows) of the queue used in continuous execution to" +
    +      " buffer the results of a ContinuousDataReader.")
    +    .intConf
    +    .createWithDefault(1024)
    +
    +  val CONTINUOUS_STREAMING_EXECUTOR_POLL_INTERVAL_MS =
    +    buildConf("spark.sql.streaming.continuous.executorPollIntervalMs")
    +      .internal()
    +      .doc("The interval at which continuous execution readers will poll to check whether" +
    +        " the epoch has advanced on the driver.")
    +      .intConf
    --- End diff --
    
    `timeConf`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org