You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by jose-torres <gi...@git.apache.org> on 2018/04/20 18:27:33 UTC
[GitHub] spark pull request #21116: [SPARK-24038][SS] Refactor continuous writing to ...
GitHub user jose-torres opened a pull request:
https://github.com/apache/spark/pull/21116
[SPARK-24038][SS] Refactor continuous writing to its own class
## What changes were proposed in this pull request?
Refactor continuous writing to its own class.
See WIP https://github.com/jose-torres/spark/pull/13 for the overall direction this is going, but I think this PR is very isolated and necessary anyway.
## How was this patch tested?
existing unit tests - refactoring only
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/jose-torres/spark SPARK-24038
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/21116.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #21116
----
commit 0e8d80da1350b950fd690ff7c762d07d0767eafd
Author: Jose Torres <to...@...>
Date: 2018-03-21T22:05:31Z
partial
commit 270f8ffe062d76e44a726753afc07c78348c3cc6
Author: Jose Torres <to...@...>
Date: 2018-03-22T06:08:52Z
make ContinuousWriteExec work
commit 0cfeaeb6c0e6a3b500341852db8c53359120a753
Author: Jose Torres <to...@...>
Date: 2018-03-23T02:44:10Z
fix docs
commit 7c375339bac0704c99ba6d87ee671dc3b7c0f531
Author: Jose Torres <to...@...>
Date: 2018-03-26T16:47:16Z
remove old path
commit 26c1eadc67ffc48bcaf877154660982455892389
Author: Jose Torres <to...@...>
Date: 2018-03-26T17:44:19Z
rm old path
commit e53707f1c2b836c38d00ad9527f1cf7b498051b7
Author: Jose Torres <to...@...>
Date: 2018-03-27T02:15:00Z
format + docs
commit 4a7a4cc4aef2e27025e019d8dde29c30026cb330
Author: Jose Torres <to...@...>
Date: 2018-03-27T02:16:28Z
rename node
commit bf1bef4a40d18fbc55f2a905dee7c01649af47ca
Author: Jose Torres <to...@...>
Date: 2018-03-30T00:59:36Z
remove inheritance altogether
commit 9303e4e5a233109f00b25e68e2dfbec6a9aa218d
Author: Jose Torres <to...@...>
Date: 2018-04-20T18:18:57Z
don't do StreamWriter changes
commit 3d8dfa415902d1d7be45a36923d2d355936eefbe
Author: Jose Torres <to...@...>
Date: 2018-04-20T18:19:25Z
Merge remote-tracking branch 'apache/master' into SPARK-24038
----
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21116: [SPARK-24038][SS] Refactor continuous writing to its own...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21116
Can one of the admins verify this patch?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21116: [SPARK-24038][SS] Refactor continuous writing to its own...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21116
**[Test build #89654 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89654/testReport)** for PR 21116 at commit [`3d8dfa4`](https://github.com/apache/spark/commit/3d8dfa415902d1d7be45a36923d2d355936eefbe).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21116: [SPARK-24038][SS] Refactor continuous writing to its own...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21116
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21116: [SPARK-24038][SS] Refactor continuous writing to its own...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:
https://github.com/apache/spark/pull/21116
LGTM!
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21116: [SPARK-24038][SS] Refactor continuous writing to ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/21116#discussion_r183489854
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala ---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.{SparkEnv, SparkException, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.datasources.v2.{DataWritingSparkTask, InternalRowDataWriterFactory}
+import org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError, logInfo}
+import org.apache.spark.sql.execution.streaming.StreamExecution
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.util.Utils
+
+/**
+ * The physical plan for writing data into a continuous processing [[StreamWriter]].
+ */
+case class WriteToContinuousDataSourceExec(writer: StreamWriter, query: SparkPlan)
+ extends SparkPlan with Logging {
+ override def children: Seq[SparkPlan] = Seq(query)
+ override def output: Seq[Attribute] = Nil
+
+ override protected def doExecute(): RDD[InternalRow] = {
+ val writerFactory = writer match {
+ case w: SupportsWriteInternalRow => w.createInternalRowWriterFactory()
+ case _ => new InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema)
+ }
+
+ val rdd = query.execute()
+ val messages = new Array[WriterCommitMessage](rdd.partitions.length)
+
+ logInfo(s"Start processing data source writer: $writer. " +
+ s"The input RDD has ${messages.length} partitions.")
+ // Let the epoch coordinator know how many partitions the write RDD has.
+ EpochCoordinatorRef.get(
+ sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
+ sparkContext.env)
+ .askSync[Unit](SetWriterPartitions(rdd.getNumPartitions))
+
+ try {
+ // Force the RDD to run so continuous processing starts; no data is actually being collected
+ // to the driver, as ContinuousWriteRDD outputs nothing.
+ sparkContext.runJob(
+ rdd,
+ (context: TaskContext, iter: Iterator[InternalRow]) =>
+ WriteToContinuousDataSourceExec.run(writerFactory, context, iter),
+ rdd.partitions.indices)
+ } catch {
+ case _: InterruptedException =>
+ // Interruption is how continuous queries are ended, so accept and ignore the exception.
+ case cause: Throwable =>
+ cause match {
+ // Do not wrap interruption exceptions that will be handled by streaming specially.
+ case _ if StreamExecution.isInterruptionException(cause) => throw cause
+ // Only wrap non fatal exceptions.
+ case NonFatal(e) => throw new SparkException("Writing job aborted.", e)
+ case _ => throw cause
+ }
+ }
+
+ sparkContext.emptyRDD
+ }
+}
+
+object WriteToContinuousDataSourceExec extends Logging {
+ def run(
+ writeTask: DataWriterFactory[InternalRow],
+ context: TaskContext,
+ iter: Iterator[InternalRow]): Unit = {
+ val epochCoordinator = EpochCoordinatorRef.get(
+ context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
+ SparkEnv.get)
+ val currentMsg: WriterCommitMessage = null
--- End diff --
Nvm. I see the msg being sent back using the epochCoordinator. Then lets just remove the `currentMsg`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21116: [SPARK-24038][SS] Refactor continuous writing to ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/21116#discussion_r183488923
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala ---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.{SparkEnv, SparkException, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.datasources.v2.{DataWritingSparkTask, InternalRowDataWriterFactory}
+import org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError, logInfo}
+import org.apache.spark.sql.execution.streaming.StreamExecution
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.util.Utils
+
+/**
+ * The physical plan for writing data into a continuous processing [[StreamWriter]].
+ */
+case class WriteToContinuousDataSourceExec(writer: StreamWriter, query: SparkPlan)
+ extends SparkPlan with Logging {
+ override def children: Seq[SparkPlan] = Seq(query)
+ override def output: Seq[Attribute] = Nil
+
+ override protected def doExecute(): RDD[InternalRow] = {
+ val writerFactory = writer match {
+ case w: SupportsWriteInternalRow => w.createInternalRowWriterFactory()
+ case _ => new InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema)
+ }
+
+ val rdd = query.execute()
+ val messages = new Array[WriterCommitMessage](rdd.partitions.length)
--- End diff --
Is this really needed. The only use of it is in the logInfo before, that too, only in the length, which is effectively `rdd.partitions.length`.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21116: [SPARK-24038][SS] Refactor continuous writing to its own...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21116
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21116: [SPARK-24038][SS] Refactor continuous writing to its own...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21116
**[Test build #89747 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89747/testReport)** for PR 21116 at commit [`b676dc8`](https://github.com/apache/spark/commit/b676dc85d5ab74b9d3457d45a97c062fd5a51dd3).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21116: [SPARK-24038][SS] Refactor continuous writing to its own...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21116
**[Test build #89654 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89654/testReport)** for PR 21116 at commit [`3d8dfa4`](https://github.com/apache/spark/commit/3d8dfa415902d1d7be45a36923d2d355936eefbe).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21116: [SPARK-24038][SS] Refactor continuous writing to its own...
Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on the issue:
https://github.com/apache/spark/pull/21116
@tdas
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21116: [SPARK-24038][SS] Refactor continuous writing to its own...
Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on the issue:
https://github.com/apache/spark/pull/21116
addressed comments
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21116: [SPARK-24038][SS] Refactor continuous writing to its own...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21116
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89747/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21116: [SPARK-24038][SS] Refactor continuous writing to ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/21116#discussion_r183489611
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala ---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.{SparkEnv, SparkException, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.datasources.v2.{DataWritingSparkTask, InternalRowDataWriterFactory}
+import org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError, logInfo}
+import org.apache.spark.sql.execution.streaming.StreamExecution
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.util.Utils
+
+/**
+ * The physical plan for writing data into a continuous processing [[StreamWriter]].
+ */
+case class WriteToContinuousDataSourceExec(writer: StreamWriter, query: SparkPlan)
+ extends SparkPlan with Logging {
+ override def children: Seq[SparkPlan] = Seq(query)
+ override def output: Seq[Attribute] = Nil
+
+ override protected def doExecute(): RDD[InternalRow] = {
+ val writerFactory = writer match {
+ case w: SupportsWriteInternalRow => w.createInternalRowWriterFactory()
+ case _ => new InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema)
+ }
+
+ val rdd = query.execute()
+ val messages = new Array[WriterCommitMessage](rdd.partitions.length)
+
+ logInfo(s"Start processing data source writer: $writer. " +
+ s"The input RDD has ${messages.length} partitions.")
+ // Let the epoch coordinator know how many partitions the write RDD has.
+ EpochCoordinatorRef.get(
+ sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
+ sparkContext.env)
+ .askSync[Unit](SetWriterPartitions(rdd.getNumPartitions))
+
+ try {
+ // Force the RDD to run so continuous processing starts; no data is actually being collected
+ // to the driver, as ContinuousWriteRDD outputs nothing.
+ sparkContext.runJob(
+ rdd,
+ (context: TaskContext, iter: Iterator[InternalRow]) =>
+ WriteToContinuousDataSourceExec.run(writerFactory, context, iter),
+ rdd.partitions.indices)
+ } catch {
+ case _: InterruptedException =>
+ // Interruption is how continuous queries are ended, so accept and ignore the exception.
+ case cause: Throwable =>
+ cause match {
+ // Do not wrap interruption exceptions that will be handled by streaming specially.
+ case _ if StreamExecution.isInterruptionException(cause) => throw cause
+ // Only wrap non fatal exceptions.
+ case NonFatal(e) => throw new SparkException("Writing job aborted.", e)
+ case _ => throw cause
+ }
+ }
+
+ sparkContext.emptyRDD
+ }
+}
+
+object WriteToContinuousDataSourceExec extends Logging {
+ def run(
+ writeTask: DataWriterFactory[InternalRow],
+ context: TaskContext,
+ iter: Iterator[InternalRow]): Unit = {
+ val epochCoordinator = EpochCoordinatorRef.get(
+ context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
+ SparkEnv.get)
+ val currentMsg: WriterCommitMessage = null
--- End diff --
Good point. I see its no long used anywhere. That raises two questions.
- This should be removed.
- Does this continuous code path not have to send back a WriterMessage? How is that working?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21116: [SPARK-24038][SS] Refactor continuous writing to ...
Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:
https://github.com/apache/spark/pull/21116#discussion_r183224838
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala ---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.{SparkEnv, SparkException, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.datasources.v2.{DataWritingSparkTask, InternalRowDataWriterFactory}
+import org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError, logInfo}
+import org.apache.spark.sql.execution.streaming.StreamExecution
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.util.Utils
+
+/**
+ * The physical plan for writing data into a continuous processing [[StreamWriter]].
+ */
+case class WriteToContinuousDataSourceExec(writer: StreamWriter, query: SparkPlan)
+ extends SparkPlan with Logging {
+ override def children: Seq[SparkPlan] = Seq(query)
+ override def output: Seq[Attribute] = Nil
+
+ override protected def doExecute(): RDD[InternalRow] = {
+ val writerFactory = writer match {
+ case w: SupportsWriteInternalRow => w.createInternalRowWriterFactory()
+ case _ => new InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema)
+ }
+
+ val rdd = query.execute()
+ val messages = new Array[WriterCommitMessage](rdd.partitions.length)
+
+ logInfo(s"Start processing data source writer: $writer. " +
+ s"The input RDD has ${messages.length} partitions.")
+ // Let the epoch coordinator know how many partitions the write RDD has.
+ EpochCoordinatorRef.get(
+ sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
+ sparkContext.env)
+ .askSync[Unit](SetWriterPartitions(rdd.getNumPartitions))
+
+ try {
+ // Force the RDD to run so continuous processing starts; no data is actually being collected
+ // to the driver, as ContinuousWriteRDD outputs nothing.
+ sparkContext.runJob(
+ rdd,
+ (context: TaskContext, iter: Iterator[InternalRow]) =>
+ WriteToContinuousDataSourceExec.run(writerFactory, context, iter),
+ rdd.partitions.indices)
+ } catch {
+ case _: InterruptedException =>
+ // Interruption is how continuous queries are ended, so accept and ignore the exception.
+ case cause: Throwable =>
+ cause match {
+ // Do not wrap interruption exceptions that will be handled by streaming specially.
+ case _ if StreamExecution.isInterruptionException(cause) => throw cause
+ // Only wrap non fatal exceptions.
+ case NonFatal(e) => throw new SparkException("Writing job aborted.", e)
+ case _ => throw cause
+ }
+ }
+
+ sparkContext.emptyRDD
+ }
+}
+
+object WriteToContinuousDataSourceExec extends Logging {
+ def run(
+ writeTask: DataWriterFactory[InternalRow],
+ context: TaskContext,
+ iter: Iterator[InternalRow]): Unit = {
+ val epochCoordinator = EpochCoordinatorRef.get(
+ context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
+ SparkEnv.get)
+ val currentMsg: WriterCommitMessage = null
--- End diff --
currentMsg is no longer needed?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21116: [SPARK-24038][SS] Refactor continuous writing to its own...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21116
**[Test build #89747 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89747/testReport)** for PR 21116 at commit [`b676dc8`](https://github.com/apache/spark/commit/b676dc85d5ab74b9d3457d45a97c062fd5a51dd3).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21116: [SPARK-24038][SS] Refactor continuous writing to its own...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21116
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89654/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21116: [SPARK-24038][SS] Refactor continuous writing to ...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/spark/pull/21116
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org