You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by joseph-torres <gi...@git.apache.org> on 2017/12/15 01:34:34 UTC
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
GitHub user joseph-torres opened a pull request:
https://github.com/apache/spark/pull/19984
[SPARK-22789] Map-only continuous processing execution
## What changes were proposed in this pull request?
Basic continuous execution, supporting map/flatMap/filter, with commits and advancement through RPC.
## How was this patch tested?
new unit-ish tests (exercising execution end to end)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/joseph-torres/spark continuous-impl
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/19984.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #19984
----
commit d6bea84447d910e79d5926972d87a80bc5dc2e2e
Author: Jose Torres <jo...@databricks.com>
Date: 2017-12-07T22:08:28Z
Refactor StreamExecution into a parent class so continuous processing can extend it
commit df6b8861173d1e7853952c8f3ffe504975efe204
Author: Jose Torres <jo...@databricks.com>
Date: 2017-12-12T19:31:28Z
address fmt
commit 6f0ce6b1cf1abf602c2b02ce6d31f46f8fa71b7c
Author: Jose Torres <jo...@databricks.com>
Date: 2017-12-13T00:09:48Z
slight changes
commit 2b360ab49bcab3c73ea85ce62202e40e950931ef
Author: Jose Torres <jo...@databricks.com>
Date: 2017-12-13T00:10:34Z
rm spurious space
commit 1b19f1ce4444f17e7324997649ad8c5f97887912
Author: Jose Torres <jo...@databricks.com>
Date: 2017-12-13T00:35:30Z
fix compile
commit 96eba13be9764e63f3d1375d7b51dbfd0675aa98
Author: Jose Torres <jo...@databricks.com>
Date: 2017-12-11T20:48:20Z
harness
commit 2d5efadb9e7662363e3e4a3c66e0f5f73e4935ef
Author: Jose Torres <jo...@databricks.com>
Date: 2017-12-11T21:18:25Z
awaitEpoch impl
commit 578bbb7eb0725b795ac65d1beda436515f4f4eba
Author: Jose Torres <jo...@databricks.com>
Date: 2017-12-11T21:46:09Z
move local[10] to only continuous suite
commit 9051eff6c88838ac61ab45763ed84d593e2d4837
Author: Jose Torres <jo...@databricks.com>
Date: 2017-12-11T21:49:55Z
repeatedly restart
commit 60fa4477591cc264b9ea253f64065d762ce3f96f
Author: Jose Torres <jo...@databricks.com>
Date: 2017-12-11T22:02:52Z
fix some simple TODOs
commit ea8e76ec75752d134433730ee1a007cce1fdcfe8
Author: Jose Torres <jo...@databricks.com>
Date: 2017-12-11T22:11:18Z
use runId instead of queryId for endpoint name
commit d0f3cc7701d9eb3e7df571561e751f03c0537f3a
Author: Jose Torres <jo...@databricks.com>
Date: 2017-12-11T22:19:03Z
more simple todos
commit ba9dbaa1be2f54827a42f3177669082e7d1f99e2
Author: Jose Torres <jo...@databricks.com>
Date: 2017-12-11T22:27:12Z
remove old state
commit 2cd005f4685e492ae78d6b9c579c80c2370d2f14
Author: Jose Torres <jo...@databricks.com>
Date: 2017-12-11T22:35:51Z
remove clean shutdown workaround in StreamTest
commit a7fa31fb5375074d888bd0a94e317ad3f1692e5a
Author: Jose Torres <jo...@databricks.com>
Date: 2017-12-11T22:50:09Z
update ContinuousExecution docs
commit f687432a58acf7337885edfc01adc94188d174d8
Author: Jose Torres <jo...@databricks.com>
Date: 2017-12-11T22:59:14Z
add comments to EpochCoordinator
commit 987b011ee78292c3379559910ebe101daf4f9450
Author: Jose Torres <jo...@databricks.com>
Date: 2017-12-12T00:02:54Z
change offset semantic to end of previous epoch
commit 5494fc50ef99b3e584c287b03eaa32b30657a5ce
Author: Jose Torres <jo...@databricks.com>
Date: 2017-12-12T00:18:40Z
document EpochCoordinator
commit d6ef404b85fa6977b5f38a853dca11de5189b3f9
Author: Jose Torres <jo...@databricks.com>
Date: 2017-12-12T02:06:44Z
simplify epoch handling
commit 647bd2745c1c0842002d4f71b61aa34beb0f8b29
Author: Jose Torres <jo...@databricks.com>
Date: 2017-12-12T19:17:58Z
stress tests
commit 053a9f349a4829433a495aa5989f1ca1c8a3256e
Author: Jose Torres <jo...@databricks.com>
Date: 2017-12-12T20:17:22Z
add minBatchesToRetain
commit 7072d21444388fe167fa7e3475b3e95ec9923d5e
Author: Jose Torres <jo...@databricks.com>
Date: 2017-12-12T20:43:33Z
add confs
commit 4083a8f5c6b6ef298726234d54f23a90e971e77e
Author: Jose Torres <jo...@databricks.com>
Date: 2017-12-12T21:10:33Z
latency suite not meaningful here
commit 41d391f2027a4e8b3730d15cea7b7fbcdcec27de
Author: Jose Torres <jo...@databricks.com>
Date: 2017-12-13T00:04:07Z
more stress::q
commit 402cfa3b10dfb0f37ce8d94336be3b3c01fe9f90
Author: Jose Torres <jo...@databricks.com>
Date: 2017-12-13T18:55:23Z
use temp dir
commit e4a1bc19db9ea0233879d270e725ed58d95a34ad
Author: Jose Torres <jo...@databricks.com>
Date: 2017-12-14T19:37:36Z
fix against rebase
commit 8887b3c92afe8bb1659f600785af5d97f085f2bb
Author: Jose Torres <jo...@databricks.com>
Date: 2017-12-14T21:32:16Z
fix ser/deser
commit 60bf0e33f20134af296d85b5c52729c4063ef2e1
Author: Jose Torres <jo...@databricks.com>
Date: 2017-12-14T22:41:21Z
fix rebase compile
commit 749bddc6303321118407ff5c2664528f7160ff65
Author: Jose Torres <jo...@databricks.com>
Date: 2017-12-14T23:35:46Z
stop using ProcessingTime in executor
commit 5a15ed5b30cc70e70d199d82e617c67426562ba2
Author: Jose Torres <jo...@databricks.com>
Date: 2017-12-14T23:54:52Z
add tests for supported ops
----
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/19984
Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85092/
Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/19984
**[Test build #85079 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85079/testReport)** for PR 19984 at commit [`2af9b40`](https://github.com/apache/spark/commit/2af9b40d60027397fd6e915413b88f12249245e5).
* This patch **fails Spark unit tests**.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/19984
**[Test build #85332 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85332/testReport)** for PR 19984 at commit [`b4f7976`](https://github.com/apache/spark/commit/b4f79762c083735011bf98250c39c263876c8cc8).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Posted by joseph-torres <gi...@git.apache.org>.
Github user joseph-torres commented on a diff in the pull request:
https://github.com/apache/spark/pull/19984#discussion_r158158855
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader}
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.execution.streaming.continuous._
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.streaming.ProcessingTime
+import org.apache.spark.util.{SystemClock, ThreadUtils}
+
+class ContinuousDataSourceRDD(
+ sc: SparkContext,
+ sqlContext: SQLContext,
+ @transient private val readTasks: java.util.List[ReadTask[UnsafeRow]])
+ extends RDD[UnsafeRow](sc, Nil) {
+
+ private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize
+ private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs
+
+ override protected def getPartitions: Array[Partition] = {
+ readTasks.asScala.zipWithIndex.map {
+ case (readTask, index) => new DataSourceRDDPartition(index, readTask)
+ }.toArray
+ }
+
+ override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = {
+ val reader = split.asInstanceOf[DataSourceRDDPartition].readTask.createDataReader()
+
+ val runId = context.getLocalProperty(ContinuousExecution.RUN_ID_KEY)
+
+ // This queue contains two types of messages:
+ // * (null, null) representing an epoch boundary.
+ // * (row, off) containing a data row and its corresponding PartitionOffset.
+ val queue = new ArrayBlockingQueue[(UnsafeRow, PartitionOffset)](dataQueueSize)
+
+ val epochPollFailed = new AtomicBoolean(false)
+ val epochPollExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+ s"epoch-poll--${runId}--${context.partitionId()}")
+ val epochPollRunnable = new EpochPollRunnable(queue, context, epochPollFailed)
+ epochPollExecutor.scheduleWithFixedDelay(
+ epochPollRunnable, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS)
+
+ // Important sequencing - we must get start offset before the data reader thread begins
+ val startOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset
+
+ val dataReaderFailed = new AtomicBoolean(false)
+ val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed)
+ dataReaderThread.setDaemon(true)
+ dataReaderThread.start()
+
+ context.addTaskCompletionListener(_ => {
+ reader.close()
+ dataReaderThread.interrupt()
+ epochPollExecutor.shutdown()
+ })
+
+ val epochEndpoint = EpochCoordinatorRef.get(runId, SparkEnv.get)
+ new Iterator[UnsafeRow] {
+ private var currentRow: UnsafeRow = _
+ private var currentOffset: PartitionOffset = startOffset
+ private var currentEpoch =
+ context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
+
+ override def hasNext(): Boolean = {
+ if (dataReaderFailed.get()) {
+ throw new SparkException("data read failed", dataReaderThread.failureReason)
+ }
+ if (epochPollFailed.get()) {
+ throw new SparkException("epoch poll failed", epochPollRunnable.failureReason)
+ }
+
+ queue.take() match {
+ // epoch boundary marker
+ case (null, null) =>
+ epochEndpoint.send(ReportPartitionOffset(
+ context.partitionId(),
+ currentEpoch,
+ currentOffset))
+ currentEpoch += 1
+ false
+ // real row
+ case (row, offset) =>
+ currentRow = row
+ currentOffset = offset
+ true
+ }
+ }
+
+ override def next(): UnsafeRow = {
--- End diff --
NextIterator won't quite work, because we need to be able to start going again after the iterator is "finished". I'll clean it up a bit to comply with the contract.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/19984
**[Test build #85089 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85089/testReport)** for PR 19984 at commit [`359ebdd`](https://github.com/apache/spark/commit/359ebdd8bdac0b93aa6b88beab0212393f1e2577).
* This patch **fails PySpark unit tests**.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/19984
**[Test build #85092 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85092/testReport)** for PR 19984 at commit [`19f08a9`](https://github.com/apache/spark/commit/19f08a9c875c6e52bb75c82d196fb3a310311ffe).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/19984#discussion_r158135301
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader}
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.execution.streaming.continuous._
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.streaming.ProcessingTime
+import org.apache.spark.util.{SystemClock, ThreadUtils}
+
+class ContinuousDataSourceRDD(
+ sc: SparkContext,
+ sqlContext: SQLContext,
+ @transient private val readTasks: java.util.List[ReadTask[UnsafeRow]])
+ extends RDD[UnsafeRow](sc, Nil) {
+
+ private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize
+ private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs
+
+ override protected def getPartitions: Array[Partition] = {
+ readTasks.asScala.zipWithIndex.map {
+ case (readTask, index) => new DataSourceRDDPartition(index, readTask)
+ }.toArray
+ }
+
+ override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = {
+ val reader = split.asInstanceOf[DataSourceRDDPartition].readTask.createDataReader()
+
+ val runId = context.getLocalProperty(ContinuousExecution.RUN_ID_KEY)
+
+ // This queue contains two types of messages:
+ // * (null, null) representing an epoch boundary.
+ // * (row, off) containing a data row and its corresponding PartitionOffset.
+ val queue = new ArrayBlockingQueue[(UnsafeRow, PartitionOffset)](dataQueueSize)
+
+ val epochPollFailed = new AtomicBoolean(false)
+ val epochPollExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+ s"epoch-poll--${runId}--${context.partitionId()}")
+ val epochPollRunnable = new EpochPollRunnable(queue, context, epochPollFailed)
+ epochPollExecutor.scheduleWithFixedDelay(
+ epochPollRunnable, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS)
+
+ // Important sequencing - we must get start offset before the data reader thread begins
+ val startOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset
+
+ val dataReaderFailed = new AtomicBoolean(false)
+ val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed)
+ dataReaderThread.setDaemon(true)
+ dataReaderThread.start()
+
+ context.addTaskCompletionListener(_ => {
+ reader.close()
+ dataReaderThread.interrupt()
+ epochPollExecutor.shutdown()
+ })
+
+ val epochEndpoint = EpochCoordinatorRef.get(runId, SparkEnv.get)
+ new Iterator[UnsafeRow] {
+ private var currentRow: UnsafeRow = _
+ private var currentOffset: PartitionOffset = startOffset
+ private var currentEpoch =
+ context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
+
+ override def hasNext(): Boolean = {
+ if (dataReaderFailed.get()) {
+ throw new SparkException("data read failed", dataReaderThread.failureReason)
+ }
+ if (epochPollFailed.get()) {
+ throw new SparkException("epoch poll failed", epochPollRunnable.failureReason)
+ }
+
+ queue.take() match {
+ // epoch boundary marker
+ case (null, null) =>
+ epochEndpoint.send(ReportPartitionOffset(
+ context.partitionId(),
+ currentEpoch,
+ currentOffset))
+ currentEpoch += 1
+ false
+ // real row
+ case (row, offset) =>
+ currentRow = row
+ currentOffset = offset
+ true
+ }
+ }
+
+ override def next(): UnsafeRow = {
+ val r = currentRow
+ currentRow = null
+ r
+ }
+ }
+ }
+
+ override def getPreferredLocations(split: Partition): Seq[String] = {
+ split.asInstanceOf[DataSourceRDDPartition].readTask.preferredLocations()
+ }
+}
+
+case class EpochPackedPartitionOffset(epoch: Long) extends PartitionOffset
+
+class EpochPollRunnable(
+ queue: BlockingQueue[(UnsafeRow, PartitionOffset)],
+ context: TaskContext,
+ failedFlag: AtomicBoolean)
+ extends Thread with Logging {
+ private[continuous] var failureReason: Throwable = _
+
+ private val epochEndpoint = EpochCoordinatorRef.get(
+ context.getLocalProperty(ContinuousExecution.RUN_ID_KEY), SparkEnv.get)
+ private var currentEpoch = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
+
+ override def run(): Unit = {
+ try {
+ val newEpoch = epochEndpoint.askSync[Long](GetCurrentEpoch())
+ for (i <- currentEpoch to newEpoch - 1) {
+ queue.put((null, null))
+ logDebug(s"Sent marker to start epoch ${i + 1}")
+ }
+ currentEpoch = newEpoch
+ } catch {
+ case t: Throwable =>
+ failedFlag.set(true)
+ failureReason = t
+ throw t
+ }
+ }
+}
+
+class DataReaderThread(
+ reader: DataReader[UnsafeRow],
+ queue: BlockingQueue[(UnsafeRow, PartitionOffset)],
+ context: TaskContext,
+ failedFlag: AtomicBoolean) extends Thread {
+ private[continuous] var failureReason: Throwable = _
+
+ override def run(): Unit = {
+ val baseReader = ContinuousDataSourceRDD.getBaseReader(reader)
+ try {
+ while (!context.isInterrupted && !context.isCompleted()) {
+ if (!reader.next()) {
+ // Check again, since reader.next() might have blocked through an incoming interrupt.
+ if (!context.isInterrupted && !context.isCompleted()) {
+ throw new IllegalStateException(
+ "Continuous reader reported no elements! Reader should have blocked waiting.")
+ } else {
+ return
+ }
+ }
+
+ queue.put((reader.get().copy(), baseReader.getOffset))
+ }
+ } catch {
+ case _: InterruptedException if context.isInterrupted() =>
+ // Continuous shutdown always involves an interrupt; shut down quietly.
+ return
+
+ case t: Throwable =>
+ failedFlag.set(true)
+ failureReason = t
--- End diff --
ditto
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/19984
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/19984
Merged build finished. Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:
https://github.com/apache/spark/pull/19984#discussion_r157124394
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1035,6 +1035,22 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val CONTINUOUS_STREAMING_EXECUTOR_QUEUE_SIZE =
+ buildConf("spark.sql.streaming.continuous.executorQueueSize")
+ .internal()
+ .doc("The size (measured in number of rows) of the queue used in continuous execution to" +
+ " buffer the results of a ContinuousDataReader.")
+ .intConf
--- End diff --
`longConf`?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/19984
**[Test build #84981 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84981/testReport)** for PR 19984 at commit [`f50488c`](https://github.com/apache/spark/commit/f50488cf94ab015019e99d187b54ab922e4ca6c2).
* This patch **fails SparkR unit tests**.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/19984
**[Test build #84975 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84975/testReport)** for PR 19984 at commit [`63f78d2`](https://github.com/apache/spark/commit/63f78d266f3fd4ac5fc2fec53c04c0029d1a5e68).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/19984
Merged build finished. Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/19984#discussion_r158399908
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala ---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2}
+import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, ContinuousWriteSupport, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
+import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.{Clock, Utils}
+
+class ContinuousExecution(
+ sparkSession: SparkSession,
+ name: String,
+ checkpointRoot: String,
+ analyzedPlan: LogicalPlan,
+ sink: ContinuousWriteSupport,
+ trigger: Trigger,
+ triggerClock: Clock,
+ outputMode: OutputMode,
+ extraOptions: Map[String, String],
+ deleteCheckpointOnStop: Boolean)
+ extends StreamExecution(
+ sparkSession, name, checkpointRoot, analyzedPlan, sink,
+ trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
+
+ @volatile protected var continuousSources: Seq[ContinuousReader] = Seq.empty
+ override protected def sources: Seq[BaseStreamingSource] = continuousSources
+
+ override lazy val logicalPlan: LogicalPlan = {
+ assert(queryExecutionThread eq Thread.currentThread,
+ "logicalPlan must be initialized in StreamExecutionThread " +
+ s"but the current thread was ${Thread.currentThread}")
+ var nextSourceId = 0L
+ val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]()
+ analyzedPlan.transform {
+ case r @ StreamingRelationV2(
+ source: ContinuousReadSupport, _, extraReaderOptions, output, _) =>
+ toExecutionRelationMap.getOrElseUpdate(r, {
+ ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession)
+ })
+ case StreamingRelationV2(_, sourceName, _, _, _) =>
+ throw new AnalysisException(
+ s"Data source $sourceName does not support continuous processing.")
+ }
+ }
+
+ private val triggerExecutor = trigger match {
+ case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock)
+ case _ => throw new IllegalStateException(s"Unsupported type of trigger: $trigger")
+ }
+
+ override protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = {
+ do {
+ try {
+ runContinuous(sparkSessionForStream)
+ } catch {
+ case _: InterruptedException if state.get().equals(RECONFIGURING) =>
+ // swallow exception and run again
+ state.set(ACTIVE)
+ }
+ } while (state.get() == ACTIVE)
+ }
+
+ /**
+ * Populate the start offsets to start the execution at the current offsets stored in the sink
+ * (i.e. avoid reprocessing data that we have already processed). This function must be called
+ * before any processing occurs and will populate the following fields:
+ * - currentBatchId
+ * - committedOffsets
+ * The basic structure of this method is as follows:
+ *
+ * Identify (from the commit log) the latest epoch that has committed
+ * IF last epoch exists THEN
+ * Get end offsets for the epoch
+ * Set those offsets as the current commit progress
+ * Set the next epoch ID as the last + 1
+ * Return the end offsets of the last epoch as start for the next one
+ * DONE
+ * ELSE
+ * Start a new query log
+ * DONE
+ */
+ private def getStartOffsets(sparkSessionToRunBatches: SparkSession): OffsetSeq = {
+ // Note that this will need a slight modification for exactly once. If ending offsets were
+ // reported but not committed for any epochs, we must replay exactly to those offsets.
+ // For at least once, we can just ignore those reports and risk duplicates.
+ commitLog.getLatest() match {
+ case Some((latestEpochId, _)) =>
+ val nextOffsets = offsetLog.get(latestEpochId).getOrElse {
+ throw new IllegalStateException(
+ s"Batch $latestEpochId was committed without end epoch offsets!")
+ }
+ committedOffsets = nextOffsets.toStreamProgress(sources)
+
+ // Forcibly align commit and offset logs by slicing off any spurious offset logs from
+ // a previous run. We can't allow commits to an epoch that a previous run reached but
+ // this run has not.
+ offsetLog.purgeAfter(latestEpochId)
+
+ currentBatchId = latestEpochId + 1
+ logDebug(s"Resuming at epoch $currentBatchId with committed offsets $committedOffsets")
+ nextOffsets
+ case None =>
+ // We are starting this stream for the first time. Offsets are all None.
+ logInfo(s"Starting new streaming query.")
+ currentBatchId = 0
+ OffsetSeq.fill(continuousSources.map(_ => null): _*)
+ }
+ }
+
+ /**
+ * Do a continuous run.
+ * @param sparkSessionForQuery Isolated [[SparkSession]] to run the continuous query with.
+ */
+ private def runContinuous(sparkSessionForQuery: SparkSession): Unit = {
+ // A list of attributes that will need to be updated.
+ val replacements = new ArrayBuffer[(Attribute, Attribute)]
+ // Translate from continuous relation to the underlying data source.
+ var nextSourceId = 0
+ continuousSources = logicalPlan.collect {
+ case ContinuousExecutionRelation(dataSource, extraReaderOptions, output) =>
+ val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
+ nextSourceId += 1
+
+ dataSource.createContinuousReader(
+ java.util.Optional.empty[StructType](),
+ metadataPath,
+ new DataSourceV2Options(extraReaderOptions.asJava))
+ }
+ uniqueSources = continuousSources.distinct
+
+ val offsets = getStartOffsets(sparkSessionForQuery)
+
+ var insertedSourceId = 0
+ val withNewSources = logicalPlan transform {
+ case ContinuousExecutionRelation(_, _, output) =>
+ val reader = continuousSources(insertedSourceId)
+ insertedSourceId += 1
+ val newOutput = reader.readSchema().toAttributes
+
+ assert(output.size == newOutput.size,
+ s"Invalid reader: ${Utils.truncatedString(output, ",")} != " +
+ s"${Utils.truncatedString(newOutput, ",")}")
+ replacements ++= output.zip(newOutput)
+
+ val loggedOffset = offsets.offsets(0)
+ val realOffset = loggedOffset.map(off => reader.deserializeOffset(off.json))
+ reader.setOffset(java.util.Optional.ofNullable(realOffset.orNull))
+ DataSourceV2Relation(newOutput, reader)
+ }
+
+ // Rewire the plan to use the new attributes that were returned by the source.
+ val replacementMap = AttributeMap(replacements)
+ val triggerLogicalPlan = withNewSources transformAllExpressions {
+ case a: Attribute if replacementMap.contains(a) =>
+ replacementMap(a).withMetadata(a.metadata)
+ case (_: CurrentTimestamp | _: CurrentDate) =>
+ throw new IllegalStateException(
+ "CurrentTimestamp and CurrentDate not yet supported for continuous processing")
+ }
+
+ val writer = sink.createContinuousWriter(
+ s"$runId",
+ triggerLogicalPlan.schema,
+ outputMode,
+ new DataSourceV2Options(extraOptions.asJava))
+ val withSink = WriteToDataSourceV2(writer.get(), triggerLogicalPlan)
+
+ val reader = withSink.collect {
+ case DataSourceV2Relation(_, r: ContinuousReader) => r
+ }.head
+
+ reportTimeTaken("queryPlanning") {
+ lastExecution = new IncrementalExecution(
+ sparkSessionForQuery,
+ withSink,
+ outputMode,
+ checkpointFile("state"),
+ runId,
+ currentBatchId,
+ offsetSeqMetadata)
+ lastExecution.executedPlan // Force the lazy generation of execution plan
+ }
+
+ sparkSession.sparkContext.setLocalProperty(
+ ContinuousExecution.START_EPOCH_KEY, currentBatchId.toString)
+ sparkSession.sparkContext.setLocalProperty(
+ ContinuousExecution.RUN_ID_KEY, runId.toString)
+
+ // Use the parent Spark session for the endpoint since it's where this query ID is registered.
+ val epochEndpoint =
+ EpochCoordinatorRef.create(
+ writer.get(), reader, currentBatchId,
+ id.toString, runId.toString, sparkSession, SparkEnv.get)
+ val epochUpdateThread = new Thread(new Runnable {
+ override def run: Unit = {
+ try {
+ triggerExecutor.execute(() => {
+ startTrigger()
+
+ if (reader.needsReconfiguration()) {
+ stopSources()
+ state.set(RECONFIGURING)
--- End diff --
Stopping a source may cause an exception, e.g., it closes a socket while queryExecutionThread is reading from it.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/19984#discussion_r158397203
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala ---
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.continuous
+
+import java.io.{File, InterruptedIOException, IOException, UncheckedIOException}
+import java.nio.channels.ClosedByInterruptException
+import java.util.concurrent.{CountDownLatch, ExecutionException, TimeoutException, TimeUnit}
+
+import scala.reflect.ClassTag
+import scala.util.control.ControlThrowable
+
+import com.google.common.util.concurrent.UncheckedExecutionException
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.{SparkContext, SparkEnv}
+import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.plans.logical.Range
+import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
+import org.apache.spark.sql.execution.command.ExplainCommand
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanExec, WriteToDataSourceV2Exec}
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.execution.streaming.continuous._
+import org.apache.spark.sql.execution.streaming.sources.MemorySinkV2
+import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreConf, StateStoreId, StateStoreProvider}
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.StreamSourceProvider
+import org.apache.spark.sql.streaming.{StreamTest, Trigger}
+import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.sql.test.TestSparkSession
+import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
+
+class ContinuousSuiteBase extends StreamTest {
+ // We need more than the default local[2] to be able to schedule all partitions simultaneously.
+ override protected def createSparkSession = new TestSparkSession(
+ new SparkContext(
+ "local[10]",
+ "continuous-stream-test-sql-context",
+ sparkConf.set("spark.sql.testkey", "true")))
+
+ protected def waitForRateSourceTriggers(query: StreamExecution, numTriggers: Int): Unit = {
+ query match {
+ case s: ContinuousExecution =>
+ assert(numTriggers >= 2, "must wait for at least 2 triggers to ensure query is initialized")
+ val reader = s.lastExecution.executedPlan.collectFirst {
+ case DataSourceV2ScanExec(_, r: ContinuousRateStreamReader) => r
+ }.get
+
+ val deltaMs = numTriggers * 1000 + 300
+ while (System.currentTimeMillis < reader.creationTime + deltaMs) {
+ Thread.sleep(reader.creationTime + deltaMs - System.currentTimeMillis)
+ }
+ }
+ }
+
+ // A continuous trigger that will only fire the initial time for the duration of a test.
+ // This allows clean testing with manual epoch advancement.
+ protected val longContinuousTrigger = Trigger.Continuous("1 hour")
+}
+
+class ContinuousSuite extends ContinuousSuiteBase {
+ import testImplicits._
+
+ test("basic rate source") {
+ val df = spark.readStream
+ .format("rate")
+ .option("numPartitions", "5")
+ .option("rowsPerSecond", "5")
+ .load()
+ .select('value)
+
+ testStream(df, useV2Sink = true)(
+ StartStream(longContinuousTrigger),
+ AwaitEpoch(0),
+ Execute(waitForRateSourceTriggers(_, 2)),
+ IncrementEpoch(),
+ CheckAnswer(scala.Range(0, 10): _*),
--- End diff --
let's not do an exact match check here.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/19984
Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84975/
Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/19984#discussion_r158122336
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala ---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2}
+import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, ContinuousWriteSupport, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
+import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.{Clock, Utils}
+
+class ContinuousExecution(
+ sparkSession: SparkSession,
+ name: String,
+ checkpointRoot: String,
+ analyzedPlan: LogicalPlan,
+ sink: ContinuousWriteSupport,
+ trigger: Trigger,
+ triggerClock: Clock,
+ outputMode: OutputMode,
+ extraOptions: Map[String, String],
+ deleteCheckpointOnStop: Boolean)
+ extends StreamExecution(
+ sparkSession, name, checkpointRoot, analyzedPlan, sink,
+ trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
+
+ @volatile protected var continuousSources: Seq[ContinuousReader] = Seq.empty
+ override protected def sources: Seq[BaseStreamingSource] = continuousSources
+
+ override lazy val logicalPlan: LogicalPlan = {
+ assert(queryExecutionThread eq Thread.currentThread,
+ "logicalPlan must be initialized in StreamExecutionThread " +
+ s"but the current thread was ${Thread.currentThread}")
+ var nextSourceId = 0L
+ val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]()
+ analyzedPlan.transform {
+ case r @ StreamingRelationV2(
+ source: ContinuousReadSupport, _, extraReaderOptions, output, _) =>
+ toExecutionRelationMap.getOrElseUpdate(r, {
+ ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession)
+ })
+ case StreamingRelationV2(_, sourceName, _, _, _) =>
+ throw new AnalysisException(
+ s"Data source $sourceName does not support continuous processing.")
+ }
+ }
+
+ private val triggerExecutor = trigger match {
+ case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock)
--- End diff --
I would use `ThreadUtils.newDaemonSingleThreadScheduledExecutor` rather than `ProcessingTimeExecutor`. `ProcessingTimeExecutor` is designed for ProcessingTimeTrigger. It's weird to use it here, in addition, we may make some changes into ProcessingTimeExecutor in future and they may break Continuous execution.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/19984#discussion_r158116796
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.mutable
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
+import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.{ContinuousWriter, WriterCommitMessage}
+import org.apache.spark.util.RpcUtils
+
+private[continuous] sealed trait EpochCoordinatorMessage extends Serializable
+
+// Driver epoch trigger message
+/**
+ * Atomically increment the current epoch and get the new value.
+ */
+private[sql] case class IncrementAndGetEpoch() extends EpochCoordinatorMessage
+
+// Init messages
+/**
+ * Set the reader and writer partition counts. Tasks may not be started until the coordinator
+ * has acknowledged these messages.
+ */
+private[sql] case class SetReaderPartitions(numPartitions: Int) extends EpochCoordinatorMessage
+case class SetWriterPartitions(numPartitions: Int) extends EpochCoordinatorMessage
+
+// Partition task messages
+/**
+ * Get the current epoch.
+ */
+private[sql] case class GetCurrentEpoch() extends EpochCoordinatorMessage
--- End diff --
nit: `case class` -> `case object`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/19984#discussion_r158135254
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader}
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.execution.streaming.continuous._
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.streaming.ProcessingTime
+import org.apache.spark.util.{SystemClock, ThreadUtils}
+
+class ContinuousDataSourceRDD(
+ sc: SparkContext,
+ sqlContext: SQLContext,
+ @transient private val readTasks: java.util.List[ReadTask[UnsafeRow]])
+ extends RDD[UnsafeRow](sc, Nil) {
+
+ private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize
+ private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs
+
+ override protected def getPartitions: Array[Partition] = {
+ readTasks.asScala.zipWithIndex.map {
+ case (readTask, index) => new DataSourceRDDPartition(index, readTask)
+ }.toArray
+ }
+
+ override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = {
+ val reader = split.asInstanceOf[DataSourceRDDPartition].readTask.createDataReader()
+
+ val runId = context.getLocalProperty(ContinuousExecution.RUN_ID_KEY)
+
+ // This queue contains two types of messages:
+ // * (null, null) representing an epoch boundary.
+ // * (row, off) containing a data row and its corresponding PartitionOffset.
+ val queue = new ArrayBlockingQueue[(UnsafeRow, PartitionOffset)](dataQueueSize)
+
+ val epochPollFailed = new AtomicBoolean(false)
+ val epochPollExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+ s"epoch-poll--${runId}--${context.partitionId()}")
+ val epochPollRunnable = new EpochPollRunnable(queue, context, epochPollFailed)
+ epochPollExecutor.scheduleWithFixedDelay(
+ epochPollRunnable, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS)
+
+ // Important sequencing - we must get start offset before the data reader thread begins
+ val startOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset
+
+ val dataReaderFailed = new AtomicBoolean(false)
+ val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed)
+ dataReaderThread.setDaemon(true)
+ dataReaderThread.start()
+
+ context.addTaskCompletionListener(_ => {
+ reader.close()
+ dataReaderThread.interrupt()
+ epochPollExecutor.shutdown()
+ })
+
+ val epochEndpoint = EpochCoordinatorRef.get(runId, SparkEnv.get)
+ new Iterator[UnsafeRow] {
+ private var currentRow: UnsafeRow = _
+ private var currentOffset: PartitionOffset = startOffset
+ private var currentEpoch =
+ context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
+
+ override def hasNext(): Boolean = {
+ if (dataReaderFailed.get()) {
+ throw new SparkException("data read failed", dataReaderThread.failureReason)
+ }
+ if (epochPollFailed.get()) {
+ throw new SparkException("epoch poll failed", epochPollRunnable.failureReason)
+ }
+
+ queue.take() match {
+ // epoch boundary marker
+ case (null, null) =>
+ epochEndpoint.send(ReportPartitionOffset(
+ context.partitionId(),
+ currentEpoch,
+ currentOffset))
+ currentEpoch += 1
+ false
+ // real row
+ case (row, offset) =>
+ currentRow = row
+ currentOffset = offset
+ true
+ }
+ }
+
+ override def next(): UnsafeRow = {
+ val r = currentRow
+ currentRow = null
+ r
+ }
+ }
+ }
+
+ override def getPreferredLocations(split: Partition): Seq[String] = {
+ split.asInstanceOf[DataSourceRDDPartition].readTask.preferredLocations()
+ }
+}
+
+case class EpochPackedPartitionOffset(epoch: Long) extends PartitionOffset
+
+class EpochPollRunnable(
+ queue: BlockingQueue[(UnsafeRow, PartitionOffset)],
+ context: TaskContext,
+ failedFlag: AtomicBoolean)
+ extends Thread with Logging {
+ private[continuous] var failureReason: Throwable = _
+
+ private val epochEndpoint = EpochCoordinatorRef.get(
+ context.getLocalProperty(ContinuousExecution.RUN_ID_KEY), SparkEnv.get)
+ private var currentEpoch = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
+
+ override def run(): Unit = {
+ try {
+ val newEpoch = epochEndpoint.askSync[Long](GetCurrentEpoch())
+ for (i <- currentEpoch to newEpoch - 1) {
+ queue.put((null, null))
+ logDebug(s"Sent marker to start epoch ${i + 1}")
+ }
+ currentEpoch = newEpoch
+ } catch {
+ case t: Throwable =>
+ failedFlag.set(true)
+ failureReason = t
--- End diff --
switch the order so that the other thread can definitely see `failureReason` if `failedFlag` is `true`.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/19984#discussion_r158137370
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader}
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.execution.streaming.continuous._
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.streaming.ProcessingTime
+import org.apache.spark.util.{SystemClock, ThreadUtils}
+
+class ContinuousDataSourceRDD(
+ sc: SparkContext,
+ sqlContext: SQLContext,
+ @transient private val readTasks: java.util.List[ReadTask[UnsafeRow]])
+ extends RDD[UnsafeRow](sc, Nil) {
+
+ private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize
+ private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs
+
+ override protected def getPartitions: Array[Partition] = {
+ readTasks.asScala.zipWithIndex.map {
+ case (readTask, index) => new DataSourceRDDPartition(index, readTask)
+ }.toArray
+ }
+
+ override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = {
+ val reader = split.asInstanceOf[DataSourceRDDPartition].readTask.createDataReader()
+
+ val runId = context.getLocalProperty(ContinuousExecution.RUN_ID_KEY)
+
+ // This queue contains two types of messages:
+ // * (null, null) representing an epoch boundary.
+ // * (row, off) containing a data row and its corresponding PartitionOffset.
+ val queue = new ArrayBlockingQueue[(UnsafeRow, PartitionOffset)](dataQueueSize)
+
+ val epochPollFailed = new AtomicBoolean(false)
+ val epochPollExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+ s"epoch-poll--${runId}--${context.partitionId()}")
+ val epochPollRunnable = new EpochPollRunnable(queue, context, epochPollFailed)
+ epochPollExecutor.scheduleWithFixedDelay(
+ epochPollRunnable, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS)
+
+ // Important sequencing - we must get start offset before the data reader thread begins
+ val startOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset
+
+ val dataReaderFailed = new AtomicBoolean(false)
+ val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed)
+ dataReaderThread.setDaemon(true)
+ dataReaderThread.start()
+
+ context.addTaskCompletionListener(_ => {
+ reader.close()
+ dataReaderThread.interrupt()
+ epochPollExecutor.shutdown()
+ })
+
+ val epochEndpoint = EpochCoordinatorRef.get(runId, SparkEnv.get)
+ new Iterator[UnsafeRow] {
+ private var currentRow: UnsafeRow = _
+ private var currentOffset: PartitionOffset = startOffset
+ private var currentEpoch =
+ context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
+
+ override def hasNext(): Boolean = {
+ if (dataReaderFailed.get()) {
+ throw new SparkException("data read failed", dataReaderThread.failureReason)
+ }
+ if (epochPollFailed.get()) {
+ throw new SparkException("epoch poll failed", epochPollRunnable.failureReason)
+ }
+
+ queue.take() match {
+ // epoch boundary marker
+ case (null, null) =>
+ epochEndpoint.send(ReportPartitionOffset(
+ context.partitionId(),
+ currentEpoch,
+ currentOffset))
+ currentEpoch += 1
+ false
+ // real row
+ case (row, offset) =>
+ currentRow = row
+ currentOffset = offset
+ true
+ }
+ }
+
+ override def next(): UnsafeRow = {
+ val r = currentRow
+ currentRow = null
+ r
+ }
+ }
+ }
+
+ override def getPreferredLocations(split: Partition): Seq[String] = {
+ split.asInstanceOf[DataSourceRDDPartition].readTask.preferredLocations()
+ }
+}
+
+case class EpochPackedPartitionOffset(epoch: Long) extends PartitionOffset
+
+class EpochPollRunnable(
+ queue: BlockingQueue[(UnsafeRow, PartitionOffset)],
+ context: TaskContext,
+ failedFlag: AtomicBoolean)
+ extends Thread with Logging {
+ private[continuous] var failureReason: Throwable = _
+
+ private val epochEndpoint = EpochCoordinatorRef.get(
+ context.getLocalProperty(ContinuousExecution.RUN_ID_KEY), SparkEnv.get)
+ private var currentEpoch = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
+
+ override def run(): Unit = {
+ try {
+ val newEpoch = epochEndpoint.askSync[Long](GetCurrentEpoch())
+ for (i <- currentEpoch to newEpoch - 1) {
+ queue.put((null, null))
+ logDebug(s"Sent marker to start epoch ${i + 1}")
+ }
+ currentEpoch = newEpoch
+ } catch {
+ case t: Throwable =>
+ failedFlag.set(true)
+ failureReason = t
+ throw t
+ }
+ }
+}
+
+class DataReaderThread(
+ reader: DataReader[UnsafeRow],
+ queue: BlockingQueue[(UnsafeRow, PartitionOffset)],
+ context: TaskContext,
+ failedFlag: AtomicBoolean) extends Thread {
--- End diff --
set a proper thread name for this thread
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/19984
**[Test build #85287 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85287/testReport)** for PR 19984 at commit [`07a9e06`](https://github.com/apache/spark/commit/07a9e0654df61ad52f7db28ee663a380dee3c2a8).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/19984
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85332/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/19984
**[Test build #85218 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85218/testReport)** for PR 19984 at commit [`825d437`](https://github.com/apache/spark/commit/825d437fe1e897c2047171ce78c6bb92805dc5be).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/19984
Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84981/
Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:
https://github.com/apache/spark/pull/19984#discussion_r157124515
--- Diff: sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java ---
@@ -95,4 +96,57 @@ public static Trigger ProcessingTime(String interval) {
public static Trigger Once() {
return OneTimeTrigger$.MODULE$;
}
+
+ /**
+ * A trigger that continuously processes streaming data, asynchronously checkpointing at
+ * the specified interval.
+ *
+ * @since 2.3.0
+ */
+ public static Trigger Continuous(long intervalMs) {
+ return ContinuousTrigger.apply(intervalMs);
+ }
+
+ /**
+ * A trigger that continuously processes streaming data, asynchronously checkpointing at
+ * the specified interval.
+ *
+ * {{{
+ * import java.util.concurrent.TimeUnit
+ * df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
+ * }}}
+ *
+ * @since 2.3.0
+ */
+ public static Trigger Continuous(long interval, TimeUnit timeUnit) {
+ return ContinuousTrigger.create(interval, timeUnit);
+ }
+
+ /**
+ * (Scala-friendly)
+ * A trigger that continuously processes streaming data, asynchronously checkpointing at
+ * the specified interval.
+ *
+ * {{{
+ * import scala.concurrent.duration._
+ * df.writeStream.trigger(Trigger.Continuous(10.seconds))
+ * }}}
+ * @since 2.2.0
+ */
+ public static Trigger Continuous(Duration interval) {
+ return ContinuousTrigger.apply(interval);
+ }
+
+ /**
+ * A trigger that continuously processes streaming data, asynchronously checkpointing at
+ * the specified interval.
+ *
+ * {{{
+ * df.writeStream.trigger(Trigger.Continuous("10 seconds"))
+ * }}}
+ * @since 2.2.0
--- End diff --
2.3.0?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/19984
Merged build finished. Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/19984#discussion_r165055237
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala ---
@@ -75,6 +76,52 @@ case class StreamingExecutionRelation(
)
}
+// We have to pack in the V1 data source as a shim, for the case when a source implements
+// continuous processing (which is always V2) but only has V1 microbatch support. We don't
+// know at read time whether the query is conntinuous or not, so we need to be able to
--- End diff --
can't we know it from the specified trigger?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/spark/pull/19984
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/19984#discussion_r158138974
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader}
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.execution.streaming.continuous._
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.streaming.ProcessingTime
+import org.apache.spark.util.{SystemClock, ThreadUtils}
+
+class ContinuousDataSourceRDD(
+ sc: SparkContext,
+ sqlContext: SQLContext,
+ @transient private val readTasks: java.util.List[ReadTask[UnsafeRow]])
+ extends RDD[UnsafeRow](sc, Nil) {
+
+ private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize
+ private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs
+
+ override protected def getPartitions: Array[Partition] = {
+ readTasks.asScala.zipWithIndex.map {
+ case (readTask, index) => new DataSourceRDDPartition(index, readTask)
+ }.toArray
+ }
+
+ override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = {
+ val reader = split.asInstanceOf[DataSourceRDDPartition].readTask.createDataReader()
+
+ val runId = context.getLocalProperty(ContinuousExecution.RUN_ID_KEY)
+
+ // This queue contains two types of messages:
+ // * (null, null) representing an epoch boundary.
+ // * (row, off) containing a data row and its corresponding PartitionOffset.
+ val queue = new ArrayBlockingQueue[(UnsafeRow, PartitionOffset)](dataQueueSize)
+
+ val epochPollFailed = new AtomicBoolean(false)
+ val epochPollExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+ s"epoch-poll--${runId}--${context.partitionId()}")
+ val epochPollRunnable = new EpochPollRunnable(queue, context, epochPollFailed)
+ epochPollExecutor.scheduleWithFixedDelay(
+ epochPollRunnable, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS)
+
+ // Important sequencing - we must get start offset before the data reader thread begins
+ val startOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset
+
+ val dataReaderFailed = new AtomicBoolean(false)
+ val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed)
+ dataReaderThread.setDaemon(true)
+ dataReaderThread.start()
+
+ context.addTaskCompletionListener(_ => {
+ reader.close()
+ dataReaderThread.interrupt()
+ epochPollExecutor.shutdown()
+ })
+
+ val epochEndpoint = EpochCoordinatorRef.get(runId, SparkEnv.get)
+ new Iterator[UnsafeRow] {
+ private var currentRow: UnsafeRow = _
+ private var currentOffset: PartitionOffset = startOffset
+ private var currentEpoch =
+ context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
+
+ override def hasNext(): Boolean = {
+ if (dataReaderFailed.get()) {
+ throw new SparkException("data read failed", dataReaderThread.failureReason)
+ }
+ if (epochPollFailed.get()) {
+ throw new SparkException("epoch poll failed", epochPollRunnable.failureReason)
+ }
+
+ queue.take() match {
+ // epoch boundary marker
+ case (null, null) =>
+ epochEndpoint.send(ReportPartitionOffset(
+ context.partitionId(),
+ currentEpoch,
+ currentOffset))
+ currentEpoch += 1
+ false
+ // real row
+ case (row, offset) =>
+ currentRow = row
+ currentOffset = offset
+ true
+ }
+ }
+
+ override def next(): UnsafeRow = {
+ val r = currentRow
+ currentRow = null
+ r
+ }
+ }
+ }
+
+ override def getPreferredLocations(split: Partition): Seq[String] = {
+ split.asInstanceOf[DataSourceRDDPartition].readTask.preferredLocations()
+ }
+}
+
+case class EpochPackedPartitionOffset(epoch: Long) extends PartitionOffset
+
+class EpochPollRunnable(
+ queue: BlockingQueue[(UnsafeRow, PartitionOffset)],
+ context: TaskContext,
+ failedFlag: AtomicBoolean)
+ extends Thread with Logging {
+ private[continuous] var failureReason: Throwable = _
+
+ private val epochEndpoint = EpochCoordinatorRef.get(
+ context.getLocalProperty(ContinuousExecution.RUN_ID_KEY), SparkEnv.get)
+ private var currentEpoch = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
+
+ override def run(): Unit = {
+ try {
+ val newEpoch = epochEndpoint.askSync[Long](GetCurrentEpoch())
+ for (i <- currentEpoch to newEpoch - 1) {
+ queue.put((null, null))
+ logDebug(s"Sent marker to start epoch ${i + 1}")
+ }
+ currentEpoch = newEpoch
+ } catch {
+ case t: Throwable =>
+ failedFlag.set(true)
+ failureReason = t
+ throw t
+ }
+ }
+}
+
+class DataReaderThread(
+ reader: DataReader[UnsafeRow],
+ queue: BlockingQueue[(UnsafeRow, PartitionOffset)],
+ context: TaskContext,
+ failedFlag: AtomicBoolean) extends Thread {
+ private[continuous] var failureReason: Throwable = _
+
+ override def run(): Unit = {
+ val baseReader = ContinuousDataSourceRDD.getBaseReader(reader)
+ try {
+ while (!context.isInterrupted && !context.isCompleted()) {
+ if (!reader.next()) {
+ // Check again, since reader.next() might have blocked through an incoming interrupt.
+ if (!context.isInterrupted && !context.isCompleted()) {
+ throw new IllegalStateException(
+ "Continuous reader reported no elements! Reader should have blocked waiting.")
+ } else {
+ return
+ }
+ }
+
+ queue.put((reader.get().copy(), baseReader.getOffset))
+ }
+ } catch {
+ case _: InterruptedException if context.isInterrupted() =>
+ // Continuous shutdown always involves an interrupt; shut down quietly.
+ return
+
+ case t: Throwable =>
+ failedFlag.set(true)
+ failureReason = t
+ throw t
--- End diff --
We cannot throw `t` here as it will kill the executor. See org.apache.spark.util.SparkUncaughtExceptionHandler
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/19984
Merged build finished. Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/19984#discussion_r158121081
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala ---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2}
+import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, ContinuousWriteSupport, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
+import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.{Clock, Utils}
+
+class ContinuousExecution(
+ sparkSession: SparkSession,
+ name: String,
+ checkpointRoot: String,
+ analyzedPlan: LogicalPlan,
+ sink: ContinuousWriteSupport,
+ trigger: Trigger,
+ triggerClock: Clock,
+ outputMode: OutputMode,
+ extraOptions: Map[String, String],
+ deleteCheckpointOnStop: Boolean)
+ extends StreamExecution(
+ sparkSession, name, checkpointRoot, analyzedPlan, sink,
+ trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
+
+ @volatile protected var continuousSources: Seq[ContinuousReader] = Seq.empty
+ override protected def sources: Seq[BaseStreamingSource] = continuousSources
+
+ override lazy val logicalPlan: LogicalPlan = {
+ assert(queryExecutionThread eq Thread.currentThread,
+ "logicalPlan must be initialized in StreamExecutionThread " +
+ s"but the current thread was ${Thread.currentThread}")
+ var nextSourceId = 0L
+ val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]()
+ analyzedPlan.transform {
+ case r @ StreamingRelationV2(
+ source: ContinuousReadSupport, _, extraReaderOptions, output, _) =>
+ toExecutionRelationMap.getOrElseUpdate(r, {
+ ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession)
+ })
+ case StreamingRelationV2(_, sourceName, _, _, _) =>
+ throw new AnalysisException(
+ s"Data source $sourceName does not support continuous processing.")
+ }
+ }
+
+ private val triggerExecutor = trigger match {
+ case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock)
+ case _ => throw new IllegalStateException(s"Unsupported type of trigger: $trigger")
+ }
+
+ override protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = {
+ do {
+ try {
+ runContinuous(sparkSessionForStream)
+ } catch {
+ case _: Throwable if state.get().equals(RECONFIGURING) =>
--- End diff --
`_: Throwable` => `NonFatal(e)`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/19984
Merged build finished. Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/19984#discussion_r158119082
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala ---
@@ -266,6 +266,21 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
}
}
+ /**
+ * Removes all log entries later than thresholdBatchId (exclusive).
+ */
+ def purgeAfter(thresholdBatchId: Long): Unit = {
+ val batchIds = fileManager.list(metadataPath, batchFilesFilter)
+ .map(f => pathToBatchId(f.getPath))
+
+ for (batchId <- batchIds if batchId > thresholdBatchId) {
+ print(s"AAAAA purging\n")
--- End diff --
nit: remove this
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/19984#discussion_r158117033
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
@@ -303,7 +299,7 @@ abstract class StreamExecution(
e,
committedOffsets.toOffsetSeq(sources, offsetSeqMetadata).toString,
availableOffsets.toOffsetSeq(sources, offsetSeqMetadata).toString)
- logError(s"Query $prettyIdString terminated with error", e)
+ // logError(s"Query $prettyIdString terminated with error", e)
--- End diff --
nit: restore `logError`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/19984
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85287/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/19984
Build finished. Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/19984
Merged build finished. Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by joseph-torres <gi...@git.apache.org>.
Github user joseph-torres commented on the issue:
https://github.com/apache/spark/pull/19984
retest this please
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/19984
Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84976/
Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/19984
Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85218/
Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/19984#discussion_r158391581
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala ---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2}
+import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, ContinuousWriteSupport, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
+import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.{Clock, Utils}
+
+class ContinuousExecution(
+ sparkSession: SparkSession,
+ name: String,
+ checkpointRoot: String,
+ analyzedPlan: LogicalPlan,
+ sink: ContinuousWriteSupport,
+ trigger: Trigger,
+ triggerClock: Clock,
+ outputMode: OutputMode,
+ extraOptions: Map[String, String],
+ deleteCheckpointOnStop: Boolean)
+ extends StreamExecution(
+ sparkSession, name, checkpointRoot, analyzedPlan, sink,
+ trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
+
+ @volatile protected var continuousSources: Seq[ContinuousReader] = Seq.empty
+ override protected def sources: Seq[BaseStreamingSource] = continuousSources
+
+ override lazy val logicalPlan: LogicalPlan = {
+ assert(queryExecutionThread eq Thread.currentThread,
+ "logicalPlan must be initialized in StreamExecutionThread " +
+ s"but the current thread was ${Thread.currentThread}")
+ var nextSourceId = 0L
+ val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]()
+ analyzedPlan.transform {
+ case r @ StreamingRelationV2(
+ source: ContinuousReadSupport, _, extraReaderOptions, output, _) =>
+ toExecutionRelationMap.getOrElseUpdate(r, {
+ ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession)
+ })
+ case StreamingRelationV2(_, sourceName, _, _, _) =>
+ throw new AnalysisException(
+ s"Data source $sourceName does not support continuous processing.")
+ }
+ }
+
+ private val triggerExecutor = trigger match {
+ case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock)
+ case _ => throw new IllegalStateException(s"Unsupported type of trigger: $trigger")
+ }
+
+ override protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = {
+ do {
+ try {
+ runContinuous(sparkSessionForStream)
+ } catch {
+ case _: InterruptedException if state.get().equals(RECONFIGURING) =>
+ // swallow exception and run again
+ state.set(ACTIVE)
+ }
+ } while (state.get() == ACTIVE)
+ }
+
+ /**
+ * Populate the start offsets to start the execution at the current offsets stored in the sink
+ * (i.e. avoid reprocessing data that we have already processed). This function must be called
+ * before any processing occurs and will populate the following fields:
+ * - currentBatchId
+ * - committedOffsets
+ * The basic structure of this method is as follows:
+ *
+ * Identify (from the commit log) the latest epoch that has committed
+ * IF last epoch exists THEN
+ * Get end offsets for the epoch
+ * Set those offsets as the current commit progress
+ * Set the next epoch ID as the last + 1
+ * Return the end offsets of the last epoch as start for the next one
+ * DONE
+ * ELSE
+ * Start a new query log
+ * DONE
+ */
+ private def getStartOffsets(sparkSessionToRunBatches: SparkSession): OffsetSeq = {
+ // Note that this will need a slight modification for exactly once. If ending offsets were
+ // reported but not committed for any epochs, we must replay exactly to those offsets.
+ // For at least once, we can just ignore those reports and risk duplicates.
+ commitLog.getLatest() match {
+ case Some((latestEpochId, _)) =>
+ val nextOffsets = offsetLog.get(latestEpochId).getOrElse {
+ throw new IllegalStateException(
+ s"Batch $latestEpochId was committed without end epoch offsets!")
+ }
+ committedOffsets = nextOffsets.toStreamProgress(sources)
+
+ // Forcibly align commit and offset logs by slicing off any spurious offset logs from
+ // a previous run. We can't allow commits to an epoch that a previous run reached but
+ // this run has not.
+ offsetLog.purgeAfter(latestEpochId)
+
+ currentBatchId = latestEpochId + 1
+ logDebug(s"Resuming at epoch $currentBatchId with committed offsets $committedOffsets")
+ nextOffsets
+ case None =>
+ // We are starting this stream for the first time. Offsets are all None.
+ logInfo(s"Starting new streaming query.")
+ currentBatchId = 0
+ OffsetSeq.fill(continuousSources.map(_ => null): _*)
+ }
+ }
+
+ /**
+ * Do a continuous run.
+ * @param sparkSessionForQuery Isolated [[SparkSession]] to run the continuous query with.
+ */
+ private def runContinuous(sparkSessionForQuery: SparkSession): Unit = {
+ // A list of attributes that will need to be updated.
+ val replacements = new ArrayBuffer[(Attribute, Attribute)]
+ // Translate from continuous relation to the underlying data source.
+ var nextSourceId = 0
+ continuousSources = logicalPlan.collect {
+ case ContinuousExecutionRelation(dataSource, extraReaderOptions, output) =>
+ val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
+ nextSourceId += 1
+
+ dataSource.createContinuousReader(
+ java.util.Optional.empty[StructType](),
+ metadataPath,
+ new DataSourceV2Options(extraReaderOptions.asJava))
+ }
+ uniqueSources = continuousSources.distinct
+
+ val offsets = getStartOffsets(sparkSessionForQuery)
+
+ var insertedSourceId = 0
+ val withNewSources = logicalPlan transform {
+ case ContinuousExecutionRelation(_, _, output) =>
+ val reader = continuousSources(insertedSourceId)
+ insertedSourceId += 1
+ val newOutput = reader.readSchema().toAttributes
+
+ assert(output.size == newOutput.size,
+ s"Invalid reader: ${Utils.truncatedString(output, ",")} != " +
+ s"${Utils.truncatedString(newOutput, ",")}")
+ replacements ++= output.zip(newOutput)
+
+ val loggedOffset = offsets.offsets(0)
+ val realOffset = loggedOffset.map(off => reader.deserializeOffset(off.json))
+ reader.setOffset(java.util.Optional.ofNullable(realOffset.orNull))
+ DataSourceV2Relation(newOutput, reader)
+ }
+
+ // Rewire the plan to use the new attributes that were returned by the source.
+ val replacementMap = AttributeMap(replacements)
+ val triggerLogicalPlan = withNewSources transformAllExpressions {
+ case a: Attribute if replacementMap.contains(a) =>
+ replacementMap(a).withMetadata(a.metadata)
+ case (_: CurrentTimestamp | _: CurrentDate) =>
+ throw new IllegalStateException(
+ "CurrentTimestamp and CurrentDate not yet supported for continuous processing")
+ }
+
+ val writer = sink.createContinuousWriter(
+ s"$runId",
+ triggerLogicalPlan.schema,
+ outputMode,
+ new DataSourceV2Options(extraOptions.asJava))
+ val withSink = WriteToDataSourceV2(writer.get(), triggerLogicalPlan)
+
+ val reader = withSink.collect {
+ case DataSourceV2Relation(_, r: ContinuousReader) => r
+ }.head
+
+ reportTimeTaken("queryPlanning") {
+ lastExecution = new IncrementalExecution(
+ sparkSessionForQuery,
+ withSink,
+ outputMode,
+ checkpointFile("state"),
+ runId,
+ currentBatchId,
+ offsetSeqMetadata)
+ lastExecution.executedPlan // Force the lazy generation of execution plan
+ }
+
+ sparkSession.sparkContext.setLocalProperty(
+ ContinuousExecution.START_EPOCH_KEY, currentBatchId.toString)
+ sparkSession.sparkContext.setLocalProperty(
+ ContinuousExecution.RUN_ID_KEY, runId.toString)
+
+ // Use the parent Spark session for the endpoint since it's where this query ID is registered.
+ val epochEndpoint =
+ EpochCoordinatorRef.create(
+ writer.get(), reader, currentBatchId,
+ id.toString, runId.toString, sparkSession, SparkEnv.get)
+ val epochUpdateThread = new Thread(new Runnable {
+ override def run: Unit = {
+ try {
+ triggerExecutor.execute(() => {
+ startTrigger()
+
+ if (reader.needsReconfiguration()) {
+ stopSources()
+ state.set(RECONFIGURING)
+ if (queryExecutionThread.isAlive) {
+ sparkSession.sparkContext.cancelJobGroup(runId.toString)
+ queryExecutionThread.interrupt()
+ // No need to join - this thread is about to end anyway.
+ }
+ false
+ } else if (isActive) {
+ currentBatchId = epochEndpoint.askSync[Long](IncrementAndGetEpoch)
+ logInfo(s"New epoch $currentBatchId is starting.")
+ true
+ } else {
+ false
+ }
+ })
+ } catch {
+ case _: InterruptedException =>
+ // Cleanly stop the query.
+ return
+ }
+ }
+ })
+
+ try {
+ epochUpdateThread.setDaemon(true)
+ epochUpdateThread.start()
+
+ reportTimeTaken("runContinuous") {
+ SQLExecution.withNewExecutionId(
+ sparkSessionForQuery, lastExecution)(lastExecution.toRdd)
+ }
+ } finally {
+ SparkEnv.get.rpcEnv.stop(epochEndpoint)
+
+ epochUpdateThread.interrupt()
+ epochUpdateThread.join()
+ }
+ }
+
+ /**
+ * Report ending partition offsets for the given reader at the given epoch.
+ */
+ def addOffset(
+ epoch: Long, reader: ContinuousReader, partitionOffsets: Seq[PartitionOffset]): Unit = {
+ assert(continuousSources.length == 1, "only one continuous source supported currently")
+
+ if (partitionOffsets.contains(null)) {
+ // If any offset is null, that means the corresponding partition hasn't seen any data yet, so
+ // there's nothing meaningful to add to the offset log.
+ }
+ val globalOffset = reader.mergeOffsets(partitionOffsets.toArray)
+ synchronized {
+ offsetLog.add(epoch, OffsetSeq.fill(globalOffset))
+ }
+ }
+
+ /**
+ * Mark the specified epoch as committed. All readers must have reported end offsets for the epoch
+ * before this is called.
+ */
+ def commit(epoch: Long): Unit = {
+ assert(continuousSources.length == 1, "only one continuous source supported currently")
+ assert(offsetLog.get(epoch).isDefined, s"offset for epoch $epoch not reported before commit")
+ synchronized {
+ commitLog.add(epoch)
--- End diff --
since this is called in the RPC thread, we should also check if this query is still alive here.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/19984#discussion_r158547367
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.mutable
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
+import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.{ContinuousWriter, WriterCommitMessage}
+import org.apache.spark.util.RpcUtils
+
+private[continuous] sealed trait EpochCoordinatorMessage extends Serializable
+
+// Driver epoch trigger message
+/**
+ * Atomically increment the current epoch and get the new value.
+ */
+private[sql] case object IncrementAndGetEpoch extends EpochCoordinatorMessage
+
+// Init messages
+/**
+ * Set the reader and writer partition counts. Tasks may not be started until the coordinator
+ * has acknowledged these messages.
+ */
+private[sql] case class SetReaderPartitions(numPartitions: Int) extends EpochCoordinatorMessage
+case class SetWriterPartitions(numPartitions: Int) extends EpochCoordinatorMessage
+
+// Partition task messages
+/**
+ * Get the current epoch.
+ */
+private[sql] case object GetCurrentEpoch extends EpochCoordinatorMessage
+/**
+ * Commit a partition at the specified epoch with the given message.
+ */
+private[sql] case class CommitPartitionEpoch(
+ partitionId: Int,
+ epoch: Long,
+ message: WriterCommitMessage) extends EpochCoordinatorMessage
+/**
+ * Report that a partition is ending the specified epoch at the specified offset.
+ */
+private[sql] case class ReportPartitionOffset(
+ partitionId: Int,
+ epoch: Long,
+ offset: PartitionOffset) extends EpochCoordinatorMessage
+
+
+/** Helper object used to create reference to [[EpochCoordinator]]. */
+private[sql] object EpochCoordinatorRef extends Logging {
+ private def endpointName(runId: String) = s"EpochCoordinator-$runId"
+
+ /**
+ * Create a reference to a new [[EpochCoordinator]].
+ */
+ def create(
+ writer: ContinuousWriter,
+ reader: ContinuousReader,
+ startEpoch: Long,
+ queryId: String,
+ runId: String,
+ session: SparkSession,
+ env: SparkEnv): RpcEndpointRef = synchronized {
+ val coordinator = new EpochCoordinator(writer, reader, startEpoch, queryId, session, env.rpcEnv)
+ val ref = env.rpcEnv.setupEndpoint(endpointName(runId), coordinator)
+ logInfo("Registered EpochCoordinator endpoint")
+ ref
+ }
+
+ def get(runId: String, env: SparkEnv): RpcEndpointRef = synchronized {
+ val rpcEndpointRef = RpcUtils.makeDriverRef(endpointName(runId), env.conf, env.rpcEnv)
+ logDebug("Retrieved existing EpochCoordinator endpoint")
+ rpcEndpointRef
+ }
+}
+
+/**
+ * Handles three major epoch coordination tasks for continuous processing:
+ *
+ * * Maintains a local epoch counter (the "driver epoch"), incremented by IncrementAndGetEpoch
+ * and pollable from executors by GetCurrentEpoch. Note that this epoch is *not* immediately
+ * reflected anywhere in ContinuousExecution.
+ * * Collates ReportPartitionOffset messages, and forwards to ContinuousExecution when all
+ * readers have ended a given epoch.
+ * * Collates CommitPartitionEpoch messages, and forwards to ContinuousExecution when all readers
+ * have both committed and reported an end offset for a given epoch.
+ */
+private[continuous] class EpochCoordinator(
+ writer: ContinuousWriter,
+ reader: ContinuousReader,
+ startEpoch: Long,
+ queryId: String,
+ session: SparkSession,
+ override val rpcEnv: RpcEnv)
+ extends ThreadSafeRpcEndpoint with Logging {
+
+ private var numReaderPartitions: Int = _
+ private var numWriterPartitions: Int = _
+
+ private var currentDriverEpoch = startEpoch
+
+ // (epoch, partition) -> message
+ private val partitionCommits =
+ mutable.Map[(Long, Int), WriterCommitMessage]()
+ // (epoch, partition) -> offset
+ private val partitionOffsets =
+ mutable.Map[(Long, Int), PartitionOffset]()
+
+ private def resolveCommitsAtEpoch(epoch: Long) = {
+ val thisEpochCommits =
+ partitionCommits.collect { case ((e, _), msg) if e == epoch => msg }
+ val nextEpochOffsets =
+ partitionOffsets.collect { case ((e, _), o) if e == epoch => o }
+
+ if (thisEpochCommits.size == numWriterPartitions &&
+ nextEpochOffsets.size == numReaderPartitions) {
+ logDebug(s"Epoch $epoch has received commits from all partitions. Committing globally.")
+ val query = session.streams.get(queryId).asInstanceOf[StreamingQueryWrapper]
--- End diff --
@jose-torres forgot to update this? Also you don't need `queryId` any more.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/19984
**[Test build #84981 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84981/testReport)** for PR 19984 at commit [`f50488c`](https://github.com/apache/spark/commit/f50488cf94ab015019e99d187b54ab922e4ca6c2).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/19984#discussion_r158391247
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.mutable
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
+import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.{ContinuousWriter, WriterCommitMessage}
+import org.apache.spark.util.RpcUtils
+
+private[continuous] sealed trait EpochCoordinatorMessage extends Serializable
+
+// Driver epoch trigger message
+/**
+ * Atomically increment the current epoch and get the new value.
+ */
+private[sql] case object IncrementAndGetEpoch extends EpochCoordinatorMessage
+
+// Init messages
+/**
+ * Set the reader and writer partition counts. Tasks may not be started until the coordinator
+ * has acknowledged these messages.
+ */
+private[sql] case class SetReaderPartitions(numPartitions: Int) extends EpochCoordinatorMessage
+case class SetWriterPartitions(numPartitions: Int) extends EpochCoordinatorMessage
+
+// Partition task messages
+/**
+ * Get the current epoch.
+ */
+private[sql] case object GetCurrentEpoch extends EpochCoordinatorMessage
+/**
+ * Commit a partition at the specified epoch with the given message.
+ */
+private[sql] case class CommitPartitionEpoch(
+ partitionId: Int,
+ epoch: Long,
+ message: WriterCommitMessage) extends EpochCoordinatorMessage
+/**
+ * Report that a partition is ending the specified epoch at the specified offset.
+ */
+private[sql] case class ReportPartitionOffset(
+ partitionId: Int,
+ epoch: Long,
+ offset: PartitionOffset) extends EpochCoordinatorMessage
+
+
+/** Helper object used to create reference to [[EpochCoordinator]]. */
+private[sql] object EpochCoordinatorRef extends Logging {
+ private def endpointName(runId: String) = s"EpochCoordinator-$runId"
+
+ /**
+ * Create a reference to a new [[EpochCoordinator]].
+ */
+ def create(
+ writer: ContinuousWriter,
+ reader: ContinuousReader,
+ startEpoch: Long,
+ queryId: String,
+ runId: String,
+ session: SparkSession,
+ env: SparkEnv): RpcEndpointRef = synchronized {
+ val coordinator = new EpochCoordinator(writer, reader, startEpoch, queryId, session, env.rpcEnv)
+ val ref = env.rpcEnv.setupEndpoint(endpointName(runId), coordinator)
+ logInfo("Registered EpochCoordinator endpoint")
+ ref
+ }
+
+ def get(runId: String, env: SparkEnv): RpcEndpointRef = synchronized {
+ val rpcEndpointRef = RpcUtils.makeDriverRef(endpointName(runId), env.conf, env.rpcEnv)
+ logDebug("Retrieved existing EpochCoordinator endpoint")
+ rpcEndpointRef
+ }
+}
+
+/**
+ * Handles three major epoch coordination tasks for continuous processing:
+ *
+ * * Maintains a local epoch counter (the "driver epoch"), incremented by IncrementAndGetEpoch
+ * and pollable from executors by GetCurrentEpoch. Note that this epoch is *not* immediately
+ * reflected anywhere in ContinuousExecution.
+ * * Collates ReportPartitionOffset messages, and forwards to ContinuousExecution when all
+ * readers have ended a given epoch.
+ * * Collates CommitPartitionEpoch messages, and forwards to ContinuousExecution when all readers
+ * have both committed and reported an end offset for a given epoch.
+ */
+private[continuous] class EpochCoordinator(
+ writer: ContinuousWriter,
+ reader: ContinuousReader,
+ startEpoch: Long,
+ queryId: String,
+ session: SparkSession,
+ override val rpcEnv: RpcEnv)
+ extends ThreadSafeRpcEndpoint with Logging {
+
+ private var numReaderPartitions: Int = _
+ private var numWriterPartitions: Int = _
+
+ private var currentDriverEpoch = startEpoch
+
+ // (epoch, partition) -> message
+ private val partitionCommits =
+ mutable.Map[(Long, Int), WriterCommitMessage]()
+ // (epoch, partition) -> offset
+ private val partitionOffsets =
+ mutable.Map[(Long, Int), PartitionOffset]()
+
+ private def resolveCommitsAtEpoch(epoch: Long) = {
+ val thisEpochCommits =
+ partitionCommits.collect { case ((e, _), msg) if e == epoch => msg }
+ val nextEpochOffsets =
+ partitionOffsets.collect { case ((e, _), o) if e == epoch => o }
+
+ if (thisEpochCommits.size == numWriterPartitions &&
+ nextEpochOffsets.size == numReaderPartitions) {
+ logDebug(s"Epoch $epoch has received commits from all partitions. Committing globally.")
+ val query = session.streams.get(queryId).asInstanceOf[StreamingQueryWrapper]
--- End diff --
why not pass the query into `EpochCoordinator`'s constructor? Getting a query from StreamingQueryManager may have a race condition because the query can fail before we process `CommitPartitionEpoch` messages. If so, `session.streams.get(queryId)` will return null.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/19984#discussion_r158120930
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala ---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2}
+import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, ContinuousWriteSupport, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
+import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.{Clock, Utils}
+
+class ContinuousExecution(
+ sparkSession: SparkSession,
+ name: String,
+ checkpointRoot: String,
+ analyzedPlan: LogicalPlan,
+ sink: ContinuousWriteSupport,
+ trigger: Trigger,
+ triggerClock: Clock,
+ outputMode: OutputMode,
+ extraOptions: Map[String, String],
+ deleteCheckpointOnStop: Boolean)
+ extends StreamExecution(
+ sparkSession, name, checkpointRoot, analyzedPlan, sink,
+ trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
+
+ @volatile protected var continuousSources: Seq[ContinuousReader] = Seq.empty
+ override protected def sources: Seq[BaseStreamingSource] = continuousSources
+
+ override lazy val logicalPlan: LogicalPlan = {
+ assert(queryExecutionThread eq Thread.currentThread,
+ "logicalPlan must be initialized in StreamExecutionThread " +
+ s"but the current thread was ${Thread.currentThread}")
+ var nextSourceId = 0L
+ val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]()
+ analyzedPlan.transform {
+ case r @ StreamingRelationV2(
+ source: ContinuousReadSupport, _, extraReaderOptions, output, _) =>
+ toExecutionRelationMap.getOrElseUpdate(r, {
+ ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession)
+ })
+ case StreamingRelationV2(_, sourceName, _, _, _) =>
+ throw new AnalysisException(
+ s"Data source $sourceName does not support continuous processing.")
+ }
+ }
+
+ private val triggerExecutor = trigger match {
+ case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock)
+ case _ => throw new IllegalStateException(s"Unsupported type of trigger: $trigger")
+ }
+
+ override protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = {
+ do {
+ try {
+ runContinuous(sparkSessionForStream)
+ } catch {
+ case _: Throwable if state.get().equals(RECONFIGURING) =>
--- End diff --
there is a race condition here. Since you stop the source first, this thread may throw the exception before `state` is set to `RECONFIGURING`.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:
https://github.com/apache/spark/pull/19984#discussion_r158398822
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala ---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2}
+import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, ContinuousWriteSupport, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
+import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.{Clock, Utils}
+
+class ContinuousExecution(
+ sparkSession: SparkSession,
+ name: String,
+ checkpointRoot: String,
+ analyzedPlan: LogicalPlan,
+ sink: ContinuousWriteSupport,
+ trigger: Trigger,
+ triggerClock: Clock,
+ outputMode: OutputMode,
+ extraOptions: Map[String, String],
+ deleteCheckpointOnStop: Boolean)
+ extends StreamExecution(
+ sparkSession, name, checkpointRoot, analyzedPlan, sink,
+ trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
+
+ @volatile protected var continuousSources: Seq[ContinuousReader] = Seq.empty
+ override protected def sources: Seq[BaseStreamingSource] = continuousSources
+
+ override lazy val logicalPlan: LogicalPlan = {
+ assert(queryExecutionThread eq Thread.currentThread,
+ "logicalPlan must be initialized in StreamExecutionThread " +
+ s"but the current thread was ${Thread.currentThread}")
+ var nextSourceId = 0L
+ val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]()
+ analyzedPlan.transform {
+ case r @ StreamingRelationV2(
+ source: ContinuousReadSupport, _, extraReaderOptions, output, _) =>
+ toExecutionRelationMap.getOrElseUpdate(r, {
+ ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession)
+ })
+ case StreamingRelationV2(_, sourceName, _, _, _) =>
+ throw new AnalysisException(
+ s"Data source $sourceName does not support continuous processing.")
+ }
+ }
+
+ private val triggerExecutor = trigger match {
+ case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock)
+ case _ => throw new IllegalStateException(s"Unsupported type of trigger: $trigger")
+ }
+
+ override protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = {
+ do {
+ try {
+ runContinuous(sparkSessionForStream)
+ } catch {
+ case _: InterruptedException if state.get().equals(RECONFIGURING) =>
+ // swallow exception and run again
+ state.set(ACTIVE)
+ }
+ } while (state.get() == ACTIVE)
+ }
+
+ /**
+ * Populate the start offsets to start the execution at the current offsets stored in the sink
+ * (i.e. avoid reprocessing data that we have already processed). This function must be called
+ * before any processing occurs and will populate the following fields:
+ * - currentBatchId
+ * - committedOffsets
+ * The basic structure of this method is as follows:
+ *
+ * Identify (from the commit log) the latest epoch that has committed
+ * IF last epoch exists THEN
+ * Get end offsets for the epoch
+ * Set those offsets as the current commit progress
+ * Set the next epoch ID as the last + 1
+ * Return the end offsets of the last epoch as start for the next one
+ * DONE
+ * ELSE
+ * Start a new query log
+ * DONE
+ */
+ private def getStartOffsets(sparkSessionToRunBatches: SparkSession): OffsetSeq = {
+ // Note that this will need a slight modification for exactly once. If ending offsets were
+ // reported but not committed for any epochs, we must replay exactly to those offsets.
+ // For at least once, we can just ignore those reports and risk duplicates.
+ commitLog.getLatest() match {
+ case Some((latestEpochId, _)) =>
+ val nextOffsets = offsetLog.get(latestEpochId).getOrElse {
+ throw new IllegalStateException(
+ s"Batch $latestEpochId was committed without end epoch offsets!")
+ }
+ committedOffsets = nextOffsets.toStreamProgress(sources)
+
+ // Forcibly align commit and offset logs by slicing off any spurious offset logs from
+ // a previous run. We can't allow commits to an epoch that a previous run reached but
+ // this run has not.
+ offsetLog.purgeAfter(latestEpochId)
+
+ currentBatchId = latestEpochId + 1
+ logDebug(s"Resuming at epoch $currentBatchId with committed offsets $committedOffsets")
+ nextOffsets
+ case None =>
+ // We are starting this stream for the first time. Offsets are all None.
+ logInfo(s"Starting new streaming query.")
+ currentBatchId = 0
+ OffsetSeq.fill(continuousSources.map(_ => null): _*)
+ }
+ }
+
+ /**
+ * Do a continuous run.
+ * @param sparkSessionForQuery Isolated [[SparkSession]] to run the continuous query with.
+ */
+ private def runContinuous(sparkSessionForQuery: SparkSession): Unit = {
+ // A list of attributes that will need to be updated.
+ val replacements = new ArrayBuffer[(Attribute, Attribute)]
+ // Translate from continuous relation to the underlying data source.
+ var nextSourceId = 0
+ continuousSources = logicalPlan.collect {
+ case ContinuousExecutionRelation(dataSource, extraReaderOptions, output) =>
+ val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
+ nextSourceId += 1
+
+ dataSource.createContinuousReader(
+ java.util.Optional.empty[StructType](),
+ metadataPath,
+ new DataSourceV2Options(extraReaderOptions.asJava))
+ }
+ uniqueSources = continuousSources.distinct
+
+ val offsets = getStartOffsets(sparkSessionForQuery)
+
+ var insertedSourceId = 0
+ val withNewSources = logicalPlan transform {
+ case ContinuousExecutionRelation(_, _, output) =>
+ val reader = continuousSources(insertedSourceId)
+ insertedSourceId += 1
+ val newOutput = reader.readSchema().toAttributes
+
+ assert(output.size == newOutput.size,
+ s"Invalid reader: ${Utils.truncatedString(output, ",")} != " +
+ s"${Utils.truncatedString(newOutput, ",")}")
+ replacements ++= output.zip(newOutput)
+
+ val loggedOffset = offsets.offsets(0)
+ val realOffset = loggedOffset.map(off => reader.deserializeOffset(off.json))
+ reader.setOffset(java.util.Optional.ofNullable(realOffset.orNull))
+ DataSourceV2Relation(newOutput, reader)
+ }
+
+ // Rewire the plan to use the new attributes that were returned by the source.
+ val replacementMap = AttributeMap(replacements)
+ val triggerLogicalPlan = withNewSources transformAllExpressions {
+ case a: Attribute if replacementMap.contains(a) =>
+ replacementMap(a).withMetadata(a.metadata)
+ case (_: CurrentTimestamp | _: CurrentDate) =>
+ throw new IllegalStateException(
+ "CurrentTimestamp and CurrentDate not yet supported for continuous processing")
+ }
+
+ val writer = sink.createContinuousWriter(
+ s"$runId",
+ triggerLogicalPlan.schema,
+ outputMode,
+ new DataSourceV2Options(extraOptions.asJava))
+ val withSink = WriteToDataSourceV2(writer.get(), triggerLogicalPlan)
+
+ val reader = withSink.collect {
+ case DataSourceV2Relation(_, r: ContinuousReader) => r
+ }.head
+
+ reportTimeTaken("queryPlanning") {
+ lastExecution = new IncrementalExecution(
+ sparkSessionForQuery,
+ withSink,
+ outputMode,
+ checkpointFile("state"),
+ runId,
+ currentBatchId,
+ offsetSeqMetadata)
+ lastExecution.executedPlan // Force the lazy generation of execution plan
+ }
+
+ sparkSession.sparkContext.setLocalProperty(
+ ContinuousExecution.START_EPOCH_KEY, currentBatchId.toString)
+ sparkSession.sparkContext.setLocalProperty(
+ ContinuousExecution.RUN_ID_KEY, runId.toString)
+
+ // Use the parent Spark session for the endpoint since it's where this query ID is registered.
+ val epochEndpoint =
+ EpochCoordinatorRef.create(
+ writer.get(), reader, currentBatchId,
+ id.toString, runId.toString, sparkSession, SparkEnv.get)
+ val epochUpdateThread = new Thread(new Runnable {
+ override def run: Unit = {
+ try {
+ triggerExecutor.execute(() => {
+ startTrigger()
+
+ if (reader.needsReconfiguration()) {
+ stopSources()
+ state.set(RECONFIGURING)
--- End diff --
I think that order is fine too, but I don't think stopSources can cause an exception in the query execution thread. queryExecutionThread.interrupt() is the line which passes control flow; until that runs the query execution thread should be sitting there waiting for the long-running spark task.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/19984
Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84937/
Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/19984
**[Test build #85092 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85092/testReport)** for PR 19984 at commit [`19f08a9`](https://github.com/apache/spark/commit/19f08a9c875c6e52bb75c82d196fb3a310311ffe).
* This patch **fails due to an unknown error code, -9**.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/19984#discussion_r158134853
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader}
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.execution.streaming.continuous._
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.streaming.ProcessingTime
+import org.apache.spark.util.{SystemClock, ThreadUtils}
+
+class ContinuousDataSourceRDD(
+ sc: SparkContext,
+ sqlContext: SQLContext,
+ @transient private val readTasks: java.util.List[ReadTask[UnsafeRow]])
+ extends RDD[UnsafeRow](sc, Nil) {
+
+ private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize
+ private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs
+
+ override protected def getPartitions: Array[Partition] = {
+ readTasks.asScala.zipWithIndex.map {
+ case (readTask, index) => new DataSourceRDDPartition(index, readTask)
+ }.toArray
+ }
+
+ override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = {
+ val reader = split.asInstanceOf[DataSourceRDDPartition].readTask.createDataReader()
+
+ val runId = context.getLocalProperty(ContinuousExecution.RUN_ID_KEY)
+
+ // This queue contains two types of messages:
+ // * (null, null) representing an epoch boundary.
+ // * (row, off) containing a data row and its corresponding PartitionOffset.
+ val queue = new ArrayBlockingQueue[(UnsafeRow, PartitionOffset)](dataQueueSize)
+
+ val epochPollFailed = new AtomicBoolean(false)
+ val epochPollExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+ s"epoch-poll--${runId}--${context.partitionId()}")
+ val epochPollRunnable = new EpochPollRunnable(queue, context, epochPollFailed)
+ epochPollExecutor.scheduleWithFixedDelay(
+ epochPollRunnable, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS)
+
+ // Important sequencing - we must get start offset before the data reader thread begins
+ val startOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset
+
+ val dataReaderFailed = new AtomicBoolean(false)
+ val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed)
+ dataReaderThread.setDaemon(true)
+ dataReaderThread.start()
+
+ context.addTaskCompletionListener(_ => {
+ reader.close()
+ dataReaderThread.interrupt()
+ epochPollExecutor.shutdown()
+ })
+
+ val epochEndpoint = EpochCoordinatorRef.get(runId, SparkEnv.get)
+ new Iterator[UnsafeRow] {
+ private var currentRow: UnsafeRow = _
+ private var currentOffset: PartitionOffset = startOffset
+ private var currentEpoch =
+ context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
+
+ override def hasNext(): Boolean = {
+ if (dataReaderFailed.get()) {
+ throw new SparkException("data read failed", dataReaderThread.failureReason)
+ }
+ if (epochPollFailed.get()) {
+ throw new SparkException("epoch poll failed", epochPollRunnable.failureReason)
+ }
+
+ queue.take() match {
+ // epoch boundary marker
+ case (null, null) =>
+ epochEndpoint.send(ReportPartitionOffset(
+ context.partitionId(),
+ currentEpoch,
+ currentOffset))
+ currentEpoch += 1
+ false
+ // real row
+ case (row, offset) =>
+ currentRow = row
+ currentOffset = offset
+ true
+ }
+ }
+
+ override def next(): UnsafeRow = {
--- End diff --
this method doesn't follow the java Iterator next contract:
```
NoSuchElementException if the iteration has no more elements
```
You can extend `org.apache.spark.util.NextIterator` to fix it.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Posted by joseph-torres <gi...@git.apache.org>.
Github user joseph-torres commented on a diff in the pull request:
https://github.com/apache/spark/pull/19984#discussion_r158156114
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala ---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2}
+import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, ContinuousWriteSupport, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
+import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.{Clock, Utils}
+
+class ContinuousExecution(
+ sparkSession: SparkSession,
+ name: String,
+ checkpointRoot: String,
+ analyzedPlan: LogicalPlan,
+ sink: ContinuousWriteSupport,
+ trigger: Trigger,
+ triggerClock: Clock,
+ outputMode: OutputMode,
+ extraOptions: Map[String, String],
+ deleteCheckpointOnStop: Boolean)
+ extends StreamExecution(
+ sparkSession, name, checkpointRoot, analyzedPlan, sink,
+ trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
+
+ @volatile protected var continuousSources: Seq[ContinuousReader] = Seq.empty
+ override protected def sources: Seq[BaseStreamingSource] = continuousSources
+
+ override lazy val logicalPlan: LogicalPlan = {
+ assert(queryExecutionThread eq Thread.currentThread,
+ "logicalPlan must be initialized in StreamExecutionThread " +
+ s"but the current thread was ${Thread.currentThread}")
+ var nextSourceId = 0L
+ val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]()
+ analyzedPlan.transform {
+ case r @ StreamingRelationV2(
+ source: ContinuousReadSupport, _, extraReaderOptions, output, _) =>
+ toExecutionRelationMap.getOrElseUpdate(r, {
+ ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession)
+ })
+ case StreamingRelationV2(_, sourceName, _, _, _) =>
+ throw new AnalysisException(
+ s"Data source $sourceName does not support continuous processing.")
+ }
+ }
+
+ private val triggerExecutor = trigger match {
+ case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock)
+ case _ => throw new IllegalStateException(s"Unsupported type of trigger: $trigger")
+ }
+
+ override protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = {
+ do {
+ try {
+ runContinuous(sparkSessionForStream)
+ } catch {
+ case _: Throwable if state.get().equals(RECONFIGURING) =>
--- End diff --
The sequencing is:
- The pre-existing method stopSources() marks the ContinuousReader objects as stopped and cleans up any resources they may be holding. This doesn't affect query execution, and stopSources already swallows any non-fatal exception thrown by a stop() implementation.
- The reconfiguring state is set.
- The job group for this run is cancelled, fanning out to the continuous processing executors and telling them to stop.
- The query execution thread is interrupted, passing control flow in that thread to the state RECONFIGURING check.
We can check for InterruptedException specifically instead of Throwable, though. I'll change that.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/19984
**[Test build #85216 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85216/testReport)** for PR 19984 at commit [`c00e104`](https://github.com/apache/spark/commit/c00e104803333eea181685142aebafc8fef8f0a8).
* This patch **fails PySpark unit tests**.
* This patch **does not merge cleanly**.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/19984#discussion_r158120028
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala ---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2}
+import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, ContinuousWriteSupport, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
+import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.{Clock, Utils}
+
+class ContinuousExecution(
+ sparkSession: SparkSession,
+ name: String,
+ checkpointRoot: String,
+ analyzedPlan: LogicalPlan,
+ sink: ContinuousWriteSupport,
+ trigger: Trigger,
+ triggerClock: Clock,
+ outputMode: OutputMode,
+ extraOptions: Map[String, String],
+ deleteCheckpointOnStop: Boolean)
+ extends StreamExecution(
+ sparkSession, name, checkpointRoot, analyzedPlan, sink,
+ trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
+
+ @volatile protected var continuousSources: Seq[ContinuousReader] = Seq.empty
+ override protected def sources: Seq[BaseStreamingSource] = continuousSources
+
+ override lazy val logicalPlan: LogicalPlan = {
+ assert(queryExecutionThread eq Thread.currentThread,
+ "logicalPlan must be initialized in StreamExecutionThread " +
+ s"but the current thread was ${Thread.currentThread}")
+ var nextSourceId = 0L
+ val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]()
+ analyzedPlan.transform {
+ case r @ StreamingRelationV2(
+ source: ContinuousReadSupport, _, extraReaderOptions, output, _) =>
+ toExecutionRelationMap.getOrElseUpdate(r, {
+ ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession)
+ })
+ case StreamingRelationV2(_, sourceName, _, _, _) =>
+ throw new AnalysisException(
+ s"Data source $sourceName does not support continuous processing.")
+ }
+ }
+
+ private val triggerExecutor = trigger match {
+ case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock)
+ case _ => throw new IllegalStateException(s"Unsupported type of trigger: $trigger")
+ }
+
+ override protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = {
+ do {
+ try {
+ runContinuous(sparkSessionForStream)
+ } catch {
+ case _: Throwable if state.get().equals(RECONFIGURING) =>
+ // swallow exception and run again
+ state.set(ACTIVE)
+ }
+ } while (state.get() == ACTIVE)
+ }
+
+ /**
+ * Populate the start offsets to start the execution at the current offsets stored in the sink
+ * (i.e. avoid reprocessing data that we have already processed). This function must be called
+ * before any processing occurs and will populate the following fields:
+ * - currentBatchId
+ * - committedOffsets
+ * The basic structure of this method is as follows:
+ *
+ * Identify (from the commit log) the latest epoch that has committed
+ * IF last epoch exists THEN
+ * Get end offsets for the epoch
+ * Set those offsets as the current commit progress
+ * Set the next epoch ID as the last + 1
+ * Return the end offsets of the last epoch as start for the next one
+ * DONE
+ * ELSE
+ * Start a new query log
+ * DONE
+ */
+ private def getStartOffsets(sparkSessionToRunBatches: SparkSession): OffsetSeq = {
+ // Note that this will need a slight modification for exactly once. If ending offsets were
+ // reported but not committed for any epochs, we must replay exactly to those offsets.
+ // For at least once, we can just ignore those reports and risk duplicates.
+ commitLog.getLatest() match {
+ case Some((latestEpochId, _)) =>
+ val nextOffsets = offsetLog.get(latestEpochId).getOrElse {
+ throw new IllegalStateException(
+ s"Batch $latestEpochId was committed without end epoch offsets!")
+ }
+ committedOffsets = nextOffsets.toStreamProgress(sources)
+
+ // Forcibly align commit and offset logs by slicing off any spurious offset logs from
+ // a previous run. We can't allow commits to an epoch that a previous run reached but
+ // this run has not.
+ offsetLog.purgeAfter(latestEpochId)
+
+ currentBatchId = latestEpochId + 1
+ logDebug(s"Resuming at epoch $currentBatchId with committed offsets $committedOffsets")
+ nextOffsets
+ case None =>
+ // We are starting this stream for the first time. Offsets are all None.
+ logInfo(s"Starting new streaming query.")
+ currentBatchId = 0
+ OffsetSeq.fill(continuousSources.map(_ => null): _*)
+ }
+ }
+
+ /**
+ * Do a continuous run.
+ * @param sparkSessionForQuery Isolated [[SparkSession]] to run the continuous query with.
+ */
+ private def runContinuous(sparkSessionForQuery: SparkSession): Unit = {
+ import scala.collection.JavaConverters._
--- End diff --
nit: move this to the beginning of this file
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/19984
**[Test build #85230 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85230/testReport)** for PR 19984 at commit [`825d437`](https://github.com/apache/spark/commit/825d437fe1e897c2047171ce78c6bb92805dc5be).
* This patch **fails PySpark unit tests**.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/19984
**[Test build #85079 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85079/testReport)** for PR 19984 at commit [`2af9b40`](https://github.com/apache/spark/commit/2af9b40d60027397fd6e915413b88f12249245e5).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:
https://github.com/apache/spark/pull/19984#discussion_r165117379
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala ---
@@ -75,6 +76,52 @@ case class StreamingExecutionRelation(
)
}
+// We have to pack in the V1 data source as a shim, for the case when a source implements
+// continuous processing (which is always V2) but only has V1 microbatch support. We don't
+// know at read time whether the query is conntinuous or not, so we need to be able to
--- End diff --
The trigger isn't specified at the point where the dataframe is created.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/19984#discussion_r158136868
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader}
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.execution.streaming.continuous._
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.streaming.ProcessingTime
+import org.apache.spark.util.{SystemClock, ThreadUtils}
+
+class ContinuousDataSourceRDD(
+ sc: SparkContext,
+ sqlContext: SQLContext,
+ @transient private val readTasks: java.util.List[ReadTask[UnsafeRow]])
+ extends RDD[UnsafeRow](sc, Nil) {
+
+ private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize
+ private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs
+
+ override protected def getPartitions: Array[Partition] = {
+ readTasks.asScala.zipWithIndex.map {
+ case (readTask, index) => new DataSourceRDDPartition(index, readTask)
+ }.toArray
+ }
+
+ override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = {
+ val reader = split.asInstanceOf[DataSourceRDDPartition].readTask.createDataReader()
+
+ val runId = context.getLocalProperty(ContinuousExecution.RUN_ID_KEY)
+
+ // This queue contains two types of messages:
+ // * (null, null) representing an epoch boundary.
+ // * (row, off) containing a data row and its corresponding PartitionOffset.
+ val queue = new ArrayBlockingQueue[(UnsafeRow, PartitionOffset)](dataQueueSize)
+
+ val epochPollFailed = new AtomicBoolean(false)
+ val epochPollExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+ s"epoch-poll--${runId}--${context.partitionId()}")
+ val epochPollRunnable = new EpochPollRunnable(queue, context, epochPollFailed)
+ epochPollExecutor.scheduleWithFixedDelay(
+ epochPollRunnable, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS)
+
+ // Important sequencing - we must get start offset before the data reader thread begins
+ val startOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset
+
+ val dataReaderFailed = new AtomicBoolean(false)
+ val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed)
+ dataReaderThread.setDaemon(true)
+ dataReaderThread.start()
+
+ context.addTaskCompletionListener(_ => {
+ reader.close()
+ dataReaderThread.interrupt()
+ epochPollExecutor.shutdown()
+ })
+
+ val epochEndpoint = EpochCoordinatorRef.get(runId, SparkEnv.get)
+ new Iterator[UnsafeRow] {
+ private var currentRow: UnsafeRow = _
+ private var currentOffset: PartitionOffset = startOffset
+ private var currentEpoch =
+ context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
+
+ override def hasNext(): Boolean = {
+ if (dataReaderFailed.get()) {
+ throw new SparkException("data read failed", dataReaderThread.failureReason)
+ }
+ if (epochPollFailed.get()) {
+ throw new SparkException("epoch poll failed", epochPollRunnable.failureReason)
+ }
+
+ queue.take() match {
--- End diff --
What if `dataReaderFailed` is set when a thread is blocking here? Seems the task will block forever.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/19984#discussion_r158389177
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala ---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2}
+import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, ContinuousWriteSupport, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
+import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.{Clock, Utils}
+
+class ContinuousExecution(
+ sparkSession: SparkSession,
+ name: String,
+ checkpointRoot: String,
+ analyzedPlan: LogicalPlan,
+ sink: ContinuousWriteSupport,
+ trigger: Trigger,
+ triggerClock: Clock,
+ outputMode: OutputMode,
+ extraOptions: Map[String, String],
+ deleteCheckpointOnStop: Boolean)
+ extends StreamExecution(
+ sparkSession, name, checkpointRoot, analyzedPlan, sink,
+ trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
+
+ @volatile protected var continuousSources: Seq[ContinuousReader] = Seq.empty
+ override protected def sources: Seq[BaseStreamingSource] = continuousSources
+
+ override lazy val logicalPlan: LogicalPlan = {
+ assert(queryExecutionThread eq Thread.currentThread,
+ "logicalPlan must be initialized in StreamExecutionThread " +
+ s"but the current thread was ${Thread.currentThread}")
+ var nextSourceId = 0L
+ val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]()
+ analyzedPlan.transform {
+ case r @ StreamingRelationV2(
+ source: ContinuousReadSupport, _, extraReaderOptions, output, _) =>
+ toExecutionRelationMap.getOrElseUpdate(r, {
+ ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession)
+ })
+ case StreamingRelationV2(_, sourceName, _, _, _) =>
+ throw new AnalysisException(
+ s"Data source $sourceName does not support continuous processing.")
+ }
+ }
+
+ private val triggerExecutor = trigger match {
+ case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock)
+ case _ => throw new IllegalStateException(s"Unsupported type of trigger: $trigger")
+ }
+
+ override protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = {
+ do {
+ try {
+ runContinuous(sparkSessionForStream)
+ } catch {
+ case _: InterruptedException if state.get().equals(RECONFIGURING) =>
+ // swallow exception and run again
+ state.set(ACTIVE)
+ }
+ } while (state.get() == ACTIVE)
+ }
+
+ /**
+ * Populate the start offsets to start the execution at the current offsets stored in the sink
+ * (i.e. avoid reprocessing data that we have already processed). This function must be called
+ * before any processing occurs and will populate the following fields:
+ * - currentBatchId
+ * - committedOffsets
+ * The basic structure of this method is as follows:
+ *
+ * Identify (from the commit log) the latest epoch that has committed
+ * IF last epoch exists THEN
+ * Get end offsets for the epoch
+ * Set those offsets as the current commit progress
+ * Set the next epoch ID as the last + 1
+ * Return the end offsets of the last epoch as start for the next one
+ * DONE
+ * ELSE
+ * Start a new query log
+ * DONE
+ */
+ private def getStartOffsets(sparkSessionToRunBatches: SparkSession): OffsetSeq = {
+ // Note that this will need a slight modification for exactly once. If ending offsets were
+ // reported but not committed for any epochs, we must replay exactly to those offsets.
+ // For at least once, we can just ignore those reports and risk duplicates.
+ commitLog.getLatest() match {
+ case Some((latestEpochId, _)) =>
+ val nextOffsets = offsetLog.get(latestEpochId).getOrElse {
+ throw new IllegalStateException(
+ s"Batch $latestEpochId was committed without end epoch offsets!")
+ }
+ committedOffsets = nextOffsets.toStreamProgress(sources)
+
+ // Forcibly align commit and offset logs by slicing off any spurious offset logs from
+ // a previous run. We can't allow commits to an epoch that a previous run reached but
+ // this run has not.
+ offsetLog.purgeAfter(latestEpochId)
+
+ currentBatchId = latestEpochId + 1
+ logDebug(s"Resuming at epoch $currentBatchId with committed offsets $committedOffsets")
+ nextOffsets
+ case None =>
+ // We are starting this stream for the first time. Offsets are all None.
+ logInfo(s"Starting new streaming query.")
+ currentBatchId = 0
+ OffsetSeq.fill(continuousSources.map(_ => null): _*)
+ }
+ }
+
+ /**
+ * Do a continuous run.
+ * @param sparkSessionForQuery Isolated [[SparkSession]] to run the continuous query with.
+ */
+ private def runContinuous(sparkSessionForQuery: SparkSession): Unit = {
+ // A list of attributes that will need to be updated.
+ val replacements = new ArrayBuffer[(Attribute, Attribute)]
+ // Translate from continuous relation to the underlying data source.
+ var nextSourceId = 0
+ continuousSources = logicalPlan.collect {
+ case ContinuousExecutionRelation(dataSource, extraReaderOptions, output) =>
+ val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
+ nextSourceId += 1
+
+ dataSource.createContinuousReader(
+ java.util.Optional.empty[StructType](),
+ metadataPath,
+ new DataSourceV2Options(extraReaderOptions.asJava))
+ }
+ uniqueSources = continuousSources.distinct
+
+ val offsets = getStartOffsets(sparkSessionForQuery)
+
+ var insertedSourceId = 0
+ val withNewSources = logicalPlan transform {
+ case ContinuousExecutionRelation(_, _, output) =>
+ val reader = continuousSources(insertedSourceId)
+ insertedSourceId += 1
+ val newOutput = reader.readSchema().toAttributes
+
+ assert(output.size == newOutput.size,
+ s"Invalid reader: ${Utils.truncatedString(output, ",")} != " +
+ s"${Utils.truncatedString(newOutput, ",")}")
+ replacements ++= output.zip(newOutput)
+
+ val loggedOffset = offsets.offsets(0)
+ val realOffset = loggedOffset.map(off => reader.deserializeOffset(off.json))
+ reader.setOffset(java.util.Optional.ofNullable(realOffset.orNull))
+ DataSourceV2Relation(newOutput, reader)
+ }
+
+ // Rewire the plan to use the new attributes that were returned by the source.
+ val replacementMap = AttributeMap(replacements)
+ val triggerLogicalPlan = withNewSources transformAllExpressions {
+ case a: Attribute if replacementMap.contains(a) =>
+ replacementMap(a).withMetadata(a.metadata)
+ case (_: CurrentTimestamp | _: CurrentDate) =>
+ throw new IllegalStateException(
+ "CurrentTimestamp and CurrentDate not yet supported for continuous processing")
+ }
+
+ val writer = sink.createContinuousWriter(
+ s"$runId",
+ triggerLogicalPlan.schema,
+ outputMode,
+ new DataSourceV2Options(extraOptions.asJava))
+ val withSink = WriteToDataSourceV2(writer.get(), triggerLogicalPlan)
+
+ val reader = withSink.collect {
+ case DataSourceV2Relation(_, r: ContinuousReader) => r
+ }.head
+
+ reportTimeTaken("queryPlanning") {
+ lastExecution = new IncrementalExecution(
+ sparkSessionForQuery,
+ withSink,
+ outputMode,
+ checkpointFile("state"),
+ runId,
+ currentBatchId,
+ offsetSeqMetadata)
+ lastExecution.executedPlan // Force the lazy generation of execution plan
+ }
+
+ sparkSession.sparkContext.setLocalProperty(
+ ContinuousExecution.START_EPOCH_KEY, currentBatchId.toString)
+ sparkSession.sparkContext.setLocalProperty(
+ ContinuousExecution.RUN_ID_KEY, runId.toString)
+
+ // Use the parent Spark session for the endpoint since it's where this query ID is registered.
+ val epochEndpoint =
+ EpochCoordinatorRef.create(
+ writer.get(), reader, currentBatchId,
+ id.toString, runId.toString, sparkSession, SparkEnv.get)
+ val epochUpdateThread = new Thread(new Runnable {
--- End diff --
nit: -> `new Thread(new Runnable {` -> `new Thread(s"epoch update thread for $prettyIdString", new Runnable {`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/19984
**[Test build #85332 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85332/testReport)** for PR 19984 at commit [`b4f7976`](https://github.com/apache/spark/commit/b4f79762c083735011bf98250c39c263876c8cc8).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/19984
**[Test build #84976 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84976/testReport)** for PR 19984 at commit [`527cc5d`](https://github.com/apache/spark/commit/527cc5d20e13ec3b1779de6e678b71dc3071b4a7).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/19984#discussion_r158139103
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader}
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.execution.streaming.continuous._
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.streaming.ProcessingTime
+import org.apache.spark.util.{SystemClock, ThreadUtils}
+
+class ContinuousDataSourceRDD(
+ sc: SparkContext,
+ sqlContext: SQLContext,
+ @transient private val readTasks: java.util.List[ReadTask[UnsafeRow]])
+ extends RDD[UnsafeRow](sc, Nil) {
+
+ private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize
+ private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs
+
+ override protected def getPartitions: Array[Partition] = {
+ readTasks.asScala.zipWithIndex.map {
+ case (readTask, index) => new DataSourceRDDPartition(index, readTask)
+ }.toArray
+ }
+
+ override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = {
+ val reader = split.asInstanceOf[DataSourceRDDPartition].readTask.createDataReader()
+
+ val runId = context.getLocalProperty(ContinuousExecution.RUN_ID_KEY)
+
+ // This queue contains two types of messages:
+ // * (null, null) representing an epoch boundary.
+ // * (row, off) containing a data row and its corresponding PartitionOffset.
+ val queue = new ArrayBlockingQueue[(UnsafeRow, PartitionOffset)](dataQueueSize)
+
+ val epochPollFailed = new AtomicBoolean(false)
+ val epochPollExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+ s"epoch-poll--${runId}--${context.partitionId()}")
+ val epochPollRunnable = new EpochPollRunnable(queue, context, epochPollFailed)
+ epochPollExecutor.scheduleWithFixedDelay(
+ epochPollRunnable, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS)
+
+ // Important sequencing - we must get start offset before the data reader thread begins
+ val startOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset
+
+ val dataReaderFailed = new AtomicBoolean(false)
+ val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed)
+ dataReaderThread.setDaemon(true)
+ dataReaderThread.start()
+
+ context.addTaskCompletionListener(_ => {
+ reader.close()
+ dataReaderThread.interrupt()
+ epochPollExecutor.shutdown()
+ })
+
+ val epochEndpoint = EpochCoordinatorRef.get(runId, SparkEnv.get)
+ new Iterator[UnsafeRow] {
+ private var currentRow: UnsafeRow = _
+ private var currentOffset: PartitionOffset = startOffset
+ private var currentEpoch =
+ context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
+
+ override def hasNext(): Boolean = {
+ if (dataReaderFailed.get()) {
+ throw new SparkException("data read failed", dataReaderThread.failureReason)
+ }
+ if (epochPollFailed.get()) {
+ throw new SparkException("epoch poll failed", epochPollRunnable.failureReason)
+ }
+
+ queue.take() match {
+ // epoch boundary marker
+ case (null, null) =>
+ epochEndpoint.send(ReportPartitionOffset(
+ context.partitionId(),
+ currentEpoch,
+ currentOffset))
+ currentEpoch += 1
+ false
+ // real row
+ case (row, offset) =>
+ currentRow = row
+ currentOffset = offset
+ true
+ }
+ }
+
+ override def next(): UnsafeRow = {
+ val r = currentRow
+ currentRow = null
+ r
+ }
+ }
+ }
+
+ override def getPreferredLocations(split: Partition): Seq[String] = {
+ split.asInstanceOf[DataSourceRDDPartition].readTask.preferredLocations()
+ }
+}
+
+case class EpochPackedPartitionOffset(epoch: Long) extends PartitionOffset
+
+class EpochPollRunnable(
+ queue: BlockingQueue[(UnsafeRow, PartitionOffset)],
+ context: TaskContext,
+ failedFlag: AtomicBoolean)
+ extends Thread with Logging {
+ private[continuous] var failureReason: Throwable = _
+
+ private val epochEndpoint = EpochCoordinatorRef.get(
+ context.getLocalProperty(ContinuousExecution.RUN_ID_KEY), SparkEnv.get)
+ private var currentEpoch = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
+
+ override def run(): Unit = {
+ try {
+ val newEpoch = epochEndpoint.askSync[Long](GetCurrentEpoch())
+ for (i <- currentEpoch to newEpoch - 1) {
+ queue.put((null, null))
+ logDebug(s"Sent marker to start epoch ${i + 1}")
+ }
+ currentEpoch = newEpoch
+ } catch {
+ case t: Throwable =>
+ failedFlag.set(true)
+ failureReason = t
+ throw t
--- End diff --
ditto
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/19984
Merged build finished. Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/19984
**[Test build #84937 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84937/testReport)** for PR 19984 at commit [`25a23d1`](https://github.com/apache/spark/commit/25a23d10e35f1c4283cdf7a2657c844f190d4d15).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/19984
Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85079/
Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/19984
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/19984
**[Test build #84973 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84973/testReport)** for PR 19984 at commit [`70d5d7c`](https://github.com/apache/spark/commit/70d5d7caed94394248c1d28777e74f3df9e398fe).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/19984
**[Test build #85218 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85218/testReport)** for PR 19984 at commit [`825d437`](https://github.com/apache/spark/commit/825d437fe1e897c2047171ce78c6bb92805dc5be).
* This patch **fails Spark unit tests**.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:
https://github.com/apache/spark/pull/19984#discussion_r157135337
--- Diff: sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java ---
@@ -95,4 +96,57 @@ public static Trigger ProcessingTime(String interval) {
public static Trigger Once() {
return OneTimeTrigger$.MODULE$;
}
+
+ /**
+ * A trigger that continuously processes streaming data, asynchronously checkpointing at
+ * the specified interval.
+ *
+ * @since 2.3.0
+ */
+ public static Trigger Continuous(long intervalMs) {
+ return ContinuousTrigger.apply(intervalMs);
+ }
+
+ /**
+ * A trigger that continuously processes streaming data, asynchronously checkpointing at
+ * the specified interval.
+ *
+ * {{{
+ * import java.util.concurrent.TimeUnit
+ * df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
--- End diff --
`Trigger.Continuous(10, TimeUnit.SECONDS)` instead of `ProcessingTime.create(10, TimeUnit.SECONDS)`?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/19984
**[Test build #85287 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85287/testReport)** for PR 19984 at commit [`07a9e06`](https://github.com/apache/spark/commit/07a9e0654df61ad52f7db28ee663a380dee3c2a8).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/19984
Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84973/
Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/19984
**[Test build #85230 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85230/testReport)** for PR 19984 at commit [`825d437`](https://github.com/apache/spark/commit/825d437fe1e897c2047171ce78c6bb92805dc5be).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/19984
**[Test build #84975 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84975/testReport)** for PR 19984 at commit [`63f78d2`](https://github.com/apache/spark/commit/63f78d266f3fd4ac5fc2fec53c04c0029d1a5e68).
* This patch **fails Spark unit tests**.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/19984#discussion_r158389368
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala ---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2}
+import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, ContinuousWriteSupport, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
+import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.{Clock, Utils}
+
+class ContinuousExecution(
+ sparkSession: SparkSession,
+ name: String,
+ checkpointRoot: String,
+ analyzedPlan: LogicalPlan,
+ sink: ContinuousWriteSupport,
+ trigger: Trigger,
+ triggerClock: Clock,
+ outputMode: OutputMode,
+ extraOptions: Map[String, String],
+ deleteCheckpointOnStop: Boolean)
+ extends StreamExecution(
+ sparkSession, name, checkpointRoot, analyzedPlan, sink,
+ trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
+
+ @volatile protected var continuousSources: Seq[ContinuousReader] = Seq.empty
+ override protected def sources: Seq[BaseStreamingSource] = continuousSources
+
+ override lazy val logicalPlan: LogicalPlan = {
+ assert(queryExecutionThread eq Thread.currentThread,
+ "logicalPlan must be initialized in StreamExecutionThread " +
+ s"but the current thread was ${Thread.currentThread}")
+ var nextSourceId = 0L
+ val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]()
+ analyzedPlan.transform {
+ case r @ StreamingRelationV2(
+ source: ContinuousReadSupport, _, extraReaderOptions, output, _) =>
+ toExecutionRelationMap.getOrElseUpdate(r, {
+ ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession)
+ })
+ case StreamingRelationV2(_, sourceName, _, _, _) =>
+ throw new AnalysisException(
+ s"Data source $sourceName does not support continuous processing.")
+ }
+ }
+
+ private val triggerExecutor = trigger match {
+ case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock)
+ case _ => throw new IllegalStateException(s"Unsupported type of trigger: $trigger")
+ }
+
+ override protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = {
+ do {
+ try {
+ runContinuous(sparkSessionForStream)
+ } catch {
+ case _: InterruptedException if state.get().equals(RECONFIGURING) =>
+ // swallow exception and run again
+ state.set(ACTIVE)
+ }
+ } while (state.get() == ACTIVE)
+ }
+
+ /**
+ * Populate the start offsets to start the execution at the current offsets stored in the sink
+ * (i.e. avoid reprocessing data that we have already processed). This function must be called
+ * before any processing occurs and will populate the following fields:
+ * - currentBatchId
+ * - committedOffsets
+ * The basic structure of this method is as follows:
+ *
+ * Identify (from the commit log) the latest epoch that has committed
+ * IF last epoch exists THEN
+ * Get end offsets for the epoch
+ * Set those offsets as the current commit progress
+ * Set the next epoch ID as the last + 1
+ * Return the end offsets of the last epoch as start for the next one
+ * DONE
+ * ELSE
+ * Start a new query log
+ * DONE
+ */
+ private def getStartOffsets(sparkSessionToRunBatches: SparkSession): OffsetSeq = {
+ // Note that this will need a slight modification for exactly once. If ending offsets were
+ // reported but not committed for any epochs, we must replay exactly to those offsets.
+ // For at least once, we can just ignore those reports and risk duplicates.
+ commitLog.getLatest() match {
+ case Some((latestEpochId, _)) =>
+ val nextOffsets = offsetLog.get(latestEpochId).getOrElse {
+ throw new IllegalStateException(
+ s"Batch $latestEpochId was committed without end epoch offsets!")
+ }
+ committedOffsets = nextOffsets.toStreamProgress(sources)
+
+ // Forcibly align commit and offset logs by slicing off any spurious offset logs from
+ // a previous run. We can't allow commits to an epoch that a previous run reached but
+ // this run has not.
+ offsetLog.purgeAfter(latestEpochId)
+
+ currentBatchId = latestEpochId + 1
+ logDebug(s"Resuming at epoch $currentBatchId with committed offsets $committedOffsets")
+ nextOffsets
+ case None =>
+ // We are starting this stream for the first time. Offsets are all None.
+ logInfo(s"Starting new streaming query.")
+ currentBatchId = 0
+ OffsetSeq.fill(continuousSources.map(_ => null): _*)
+ }
+ }
+
+ /**
+ * Do a continuous run.
+ * @param sparkSessionForQuery Isolated [[SparkSession]] to run the continuous query with.
+ */
+ private def runContinuous(sparkSessionForQuery: SparkSession): Unit = {
+ // A list of attributes that will need to be updated.
+ val replacements = new ArrayBuffer[(Attribute, Attribute)]
+ // Translate from continuous relation to the underlying data source.
+ var nextSourceId = 0
+ continuousSources = logicalPlan.collect {
+ case ContinuousExecutionRelation(dataSource, extraReaderOptions, output) =>
+ val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
+ nextSourceId += 1
+
+ dataSource.createContinuousReader(
+ java.util.Optional.empty[StructType](),
+ metadataPath,
+ new DataSourceV2Options(extraReaderOptions.asJava))
+ }
+ uniqueSources = continuousSources.distinct
+
+ val offsets = getStartOffsets(sparkSessionForQuery)
+
+ var insertedSourceId = 0
+ val withNewSources = logicalPlan transform {
+ case ContinuousExecutionRelation(_, _, output) =>
+ val reader = continuousSources(insertedSourceId)
+ insertedSourceId += 1
+ val newOutput = reader.readSchema().toAttributes
+
+ assert(output.size == newOutput.size,
+ s"Invalid reader: ${Utils.truncatedString(output, ",")} != " +
+ s"${Utils.truncatedString(newOutput, ",")}")
+ replacements ++= output.zip(newOutput)
+
+ val loggedOffset = offsets.offsets(0)
+ val realOffset = loggedOffset.map(off => reader.deserializeOffset(off.json))
+ reader.setOffset(java.util.Optional.ofNullable(realOffset.orNull))
+ DataSourceV2Relation(newOutput, reader)
+ }
+
+ // Rewire the plan to use the new attributes that were returned by the source.
+ val replacementMap = AttributeMap(replacements)
+ val triggerLogicalPlan = withNewSources transformAllExpressions {
+ case a: Attribute if replacementMap.contains(a) =>
+ replacementMap(a).withMetadata(a.metadata)
+ case (_: CurrentTimestamp | _: CurrentDate) =>
+ throw new IllegalStateException(
+ "CurrentTimestamp and CurrentDate not yet supported for continuous processing")
+ }
+
+ val writer = sink.createContinuousWriter(
+ s"$runId",
+ triggerLogicalPlan.schema,
+ outputMode,
+ new DataSourceV2Options(extraOptions.asJava))
+ val withSink = WriteToDataSourceV2(writer.get(), triggerLogicalPlan)
+
+ val reader = withSink.collect {
+ case DataSourceV2Relation(_, r: ContinuousReader) => r
+ }.head
+
+ reportTimeTaken("queryPlanning") {
+ lastExecution = new IncrementalExecution(
+ sparkSessionForQuery,
+ withSink,
+ outputMode,
+ checkpointFile("state"),
+ runId,
+ currentBatchId,
+ offsetSeqMetadata)
+ lastExecution.executedPlan // Force the lazy generation of execution plan
+ }
+
+ sparkSession.sparkContext.setLocalProperty(
+ ContinuousExecution.START_EPOCH_KEY, currentBatchId.toString)
+ sparkSession.sparkContext.setLocalProperty(
+ ContinuousExecution.RUN_ID_KEY, runId.toString)
+
+ // Use the parent Spark session for the endpoint since it's where this query ID is registered.
+ val epochEndpoint =
+ EpochCoordinatorRef.create(
+ writer.get(), reader, currentBatchId,
+ id.toString, runId.toString, sparkSession, SparkEnv.get)
+ val epochUpdateThread = new Thread(new Runnable {
+ override def run: Unit = {
+ try {
+ triggerExecutor.execute(() => {
+ startTrigger()
+
+ if (reader.needsReconfiguration()) {
+ stopSources()
+ state.set(RECONFIGURING)
--- End diff --
nit: move this above `stopSources()`. Otherwise, the stream thread may see the exception before setting state to `RECONFIGURING`.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Posted by joseph-torres <gi...@git.apache.org>.
Github user joseph-torres commented on a diff in the pull request:
https://github.com/apache/spark/pull/19984#discussion_r157278184
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1035,6 +1035,22 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val CONTINUOUS_STREAMING_EXECUTOR_QUEUE_SIZE =
+ buildConf("spark.sql.streaming.continuous.executorQueueSize")
+ .internal()
+ .doc("The size (measured in number of rows) of the queue used in continuous execution to" +
+ " buffer the results of a ContinuousDataReader.")
+ .intConf
--- End diff --
Should it be? I can't imagine anything close to MAX_INT being a reasonable value here. Will it be hard to migrate to a long if we later discover it's needed?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/19984
Merged build finished. Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/19984#discussion_r158121770
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala ---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2}
+import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, ContinuousWriteSupport, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
+import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.{Clock, Utils}
+
+class ContinuousExecution(
+ sparkSession: SparkSession,
+ name: String,
+ checkpointRoot: String,
+ analyzedPlan: LogicalPlan,
+ sink: ContinuousWriteSupport,
+ trigger: Trigger,
+ triggerClock: Clock,
+ outputMode: OutputMode,
+ extraOptions: Map[String, String],
+ deleteCheckpointOnStop: Boolean)
+ extends StreamExecution(
+ sparkSession, name, checkpointRoot, analyzedPlan, sink,
+ trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
+
+ @volatile protected var continuousSources: Seq[ContinuousReader] = Seq.empty
+ override protected def sources: Seq[BaseStreamingSource] = continuousSources
+
+ override lazy val logicalPlan: LogicalPlan = {
+ assert(queryExecutionThread eq Thread.currentThread,
+ "logicalPlan must be initialized in StreamExecutionThread " +
+ s"but the current thread was ${Thread.currentThread}")
+ var nextSourceId = 0L
+ val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]()
+ analyzedPlan.transform {
+ case r @ StreamingRelationV2(
+ source: ContinuousReadSupport, _, extraReaderOptions, output, _) =>
+ toExecutionRelationMap.getOrElseUpdate(r, {
+ ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession)
+ })
+ case StreamingRelationV2(_, sourceName, _, _, _) =>
+ throw new AnalysisException(
+ s"Data source $sourceName does not support continuous processing.")
+ }
+ }
+
+ private val triggerExecutor = trigger match {
+ case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock)
+ case _ => throw new IllegalStateException(s"Unsupported type of trigger: $trigger")
+ }
+
+ override protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = {
+ do {
+ try {
+ runContinuous(sparkSessionForStream)
+ } catch {
+ case _: Throwable if state.get().equals(RECONFIGURING) =>
--- End diff --
In a second thought, it's dangerous to swallow exception here. If the source has some bugs in `Source.stop`, it will hide it and continue to run. I would expect to see a better mechanism to ask the source to stop it without throwing exceptions.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/19984#discussion_r158382180
--- Diff: sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java ---
@@ -19,6 +19,7 @@
import java.util.concurrent.TimeUnit;
+import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger;
--- End diff --
nit: move this below `import org.apache.spark.annotation.InterfaceStability;`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:
https://github.com/apache/spark/pull/19984
Thanks! Merging to master!
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/19984
**[Test build #85089 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85089/testReport)** for PR 19984 at commit [`359ebdd`](https://github.com/apache/spark/commit/359ebdd8bdac0b93aa6b88beab0212393f1e2577).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/19984#discussion_r158361942
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala ---
@@ -109,6 +125,42 @@ object DataWritingSparkTask extends Logging {
logError(s"Writer for partition ${context.partitionId()} aborted.")
})
}
+
+ def runContinuous(
+ writeTask: DataWriterFactory[InternalRow],
+ context: TaskContext,
+ iter: Iterator[InternalRow]): WriterCommitMessage = {
+ val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber())
+ val runId = context.getLocalProperty(ContinuousExecution.RUN_ID_KEY)
+ val currentMsg: WriterCommitMessage = null
+ var currentEpoch = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
+
+ do {
+ // write the data and commit this writer.
+ Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
+ try {
+ iter.foreach(dataWriter.write)
+ logInfo(s"Writer for partition ${context.partitionId()} is committing.")
+ val msg = dataWriter.commit()
+ logInfo(s"Writer for partition ${context.partitionId()} committed.")
+ EpochCoordinatorRef.get(runId, SparkEnv.get).send(
--- End diff --
nit: `EpochCoordinatorRef.get` is not cheap. You can store it outside the loop.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/19984
Merged build finished. Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/19984
Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85089/
Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/19984
**[Test build #84976 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84976/testReport)** for PR 19984 at commit [`527cc5d`](https://github.com/apache/spark/commit/527cc5d20e13ec3b1779de6e678b71dc3071b4a7).
* This patch **fails Spark unit tests**.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/19984
Merged build finished. Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:
https://github.com/apache/spark/pull/19984#discussion_r157124505
--- Diff: sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java ---
@@ -95,4 +96,57 @@ public static Trigger ProcessingTime(String interval) {
public static Trigger Once() {
return OneTimeTrigger$.MODULE$;
}
+
+ /**
+ * A trigger that continuously processes streaming data, asynchronously checkpointing at
+ * the specified interval.
+ *
+ * @since 2.3.0
+ */
+ public static Trigger Continuous(long intervalMs) {
+ return ContinuousTrigger.apply(intervalMs);
+ }
+
+ /**
+ * A trigger that continuously processes streaming data, asynchronously checkpointing at
+ * the specified interval.
+ *
+ * {{{
+ * import java.util.concurrent.TimeUnit
+ * df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
+ * }}}
+ *
+ * @since 2.3.0
+ */
+ public static Trigger Continuous(long interval, TimeUnit timeUnit) {
+ return ContinuousTrigger.create(interval, timeUnit);
+ }
+
+ /**
+ * (Scala-friendly)
+ * A trigger that continuously processes streaming data, asynchronously checkpointing at
+ * the specified interval.
+ *
+ * {{{
+ * import scala.concurrent.duration._
+ * df.writeStream.trigger(Trigger.Continuous(10.seconds))
+ * }}}
+ * @since 2.2.0
--- End diff --
2.3.0?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/19984#discussion_r158388472
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala ---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2}
+import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, ContinuousWriteSupport, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
+import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.{Clock, Utils}
+
+class ContinuousExecution(
+ sparkSession: SparkSession,
+ name: String,
+ checkpointRoot: String,
+ analyzedPlan: LogicalPlan,
+ sink: ContinuousWriteSupport,
+ trigger: Trigger,
+ triggerClock: Clock,
+ outputMode: OutputMode,
+ extraOptions: Map[String, String],
+ deleteCheckpointOnStop: Boolean)
+ extends StreamExecution(
+ sparkSession, name, checkpointRoot, analyzedPlan, sink,
+ trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
+
+ @volatile protected var continuousSources: Seq[ContinuousReader] = Seq.empty
+ override protected def sources: Seq[BaseStreamingSource] = continuousSources
+
+ override lazy val logicalPlan: LogicalPlan = {
+ assert(queryExecutionThread eq Thread.currentThread,
+ "logicalPlan must be initialized in StreamExecutionThread " +
+ s"but the current thread was ${Thread.currentThread}")
+ var nextSourceId = 0L
--- End diff --
nit: unused
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/19984
**[Test build #84973 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84973/testReport)** for PR 19984 at commit [`70d5d7c`](https://github.com/apache/spark/commit/70d5d7caed94394248c1d28777e74f3df9e398fe).
* This patch **fails MiMa tests**.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by joseph-torres <gi...@git.apache.org>.
Github user joseph-torres commented on the issue:
https://github.com/apache/spark/pull/19984
The result says it fails Spark unit tests, but clicking through shows a count of 0.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Posted by joseph-torres <gi...@git.apache.org>.
Github user joseph-torres commented on a diff in the pull request:
https://github.com/apache/spark/pull/19984#discussion_r158159270
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala ---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2}
+import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, ContinuousWriteSupport, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
+import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.{Clock, Utils}
+
+class ContinuousExecution(
+ sparkSession: SparkSession,
+ name: String,
+ checkpointRoot: String,
+ analyzedPlan: LogicalPlan,
+ sink: ContinuousWriteSupport,
+ trigger: Trigger,
+ triggerClock: Clock,
+ outputMode: OutputMode,
+ extraOptions: Map[String, String],
+ deleteCheckpointOnStop: Boolean)
+ extends StreamExecution(
+ sparkSession, name, checkpointRoot, analyzedPlan, sink,
+ trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
+
+ @volatile protected var continuousSources: Seq[ContinuousReader] = Seq.empty
+ override protected def sources: Seq[BaseStreamingSource] = continuousSources
+
+ override lazy val logicalPlan: LogicalPlan = {
+ assert(queryExecutionThread eq Thread.currentThread,
+ "logicalPlan must be initialized in StreamExecutionThread " +
+ s"but the current thread was ${Thread.currentThread}")
+ var nextSourceId = 0L
+ val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]()
+ analyzedPlan.transform {
+ case r @ StreamingRelationV2(
+ source: ContinuousReadSupport, _, extraReaderOptions, output, _) =>
+ toExecutionRelationMap.getOrElseUpdate(r, {
+ ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession)
+ })
+ case StreamingRelationV2(_, sourceName, _, _, _) =>
+ throw new AnalysisException(
+ s"Data source $sourceName does not support continuous processing.")
+ }
+ }
+
+ private val triggerExecutor = trigger match {
+ case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock)
--- End diff --
I think the coupling is correct here. ProcessingTime represents the rate of progress through the query's fenceposts, which applies here as well as it does in the microbatch case.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/19984#discussion_r158116511
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala ---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.mutable
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
+import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, PartitionOffset}
+import org.apache.spark.sql.sources.v2.writer.{ContinuousWriter, WriterCommitMessage}
+import org.apache.spark.util.RpcUtils
+
+private[continuous] sealed trait EpochCoordinatorMessage extends Serializable
+
+// Driver epoch trigger message
+/**
+ * Atomically increment the current epoch and get the new value.
+ */
+private[sql] case class IncrementAndGetEpoch() extends EpochCoordinatorMessage
--- End diff --
nit: `case class` -> `case object`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/19984
Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85216/
Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/19984
Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85230/
Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/19984
**[Test build #84937 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84937/testReport)** for PR 19984 at commit [`25a23d1`](https://github.com/apache/spark/commit/25a23d10e35f1c4283cdf7a2657c844f190d4d15).
* This patch **fails MiMa tests**.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19984: [SPARK-22789] Map-only continuous processing execution
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/19984
**[Test build #85216 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85216/testReport)** for PR 19984 at commit [`c00e104`](https://github.com/apache/spark/commit/c00e104803333eea181685142aebafc8fef8f0a8).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:
https://github.com/apache/spark/pull/19984#discussion_r157124263
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1035,6 +1035,22 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val CONTINUOUS_STREAMING_EXECUTOR_QUEUE_SIZE =
+ buildConf("spark.sql.streaming.continuous.executorQueueSize")
+ .internal()
+ .doc("The size (measured in number of rows) of the queue used in continuous execution to" +
+ " buffer the results of a ContinuousDataReader.")
+ .intConf
+ .createWithDefault(1024)
+
+ val CONTINUOUS_STREAMING_EXECUTOR_POLL_INTERVAL_MS =
+ buildConf("spark.sql.streaming.continuous.executorPollIntervalMs")
+ .internal()
+ .doc("The interval at which continuous execution readers will poll to check whether" +
+ " the epoch has advanced on the driver.")
+ .intConf
--- End diff --
`timeConf`?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org