You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by jose-torres <gi...@git.apache.org> on 2018/04/20 18:27:33 UTC

[GitHub] spark pull request #21116: [SPARK-24038][SS] Refactor continuous writing to ...

GitHub user jose-torres opened a pull request:

    https://github.com/apache/spark/pull/21116

    [SPARK-24038][SS] Refactor continuous writing to its own class

    ## What changes were proposed in this pull request?
    
    Refactor continuous writing to its own class.
    
    See WIP https://github.com/jose-torres/spark/pull/13 for the overall direction this is going, but I think this PR is very isolated and necessary anyway.
    
    ## How was this patch tested?
    
    existing unit tests - refactoring only

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/jose-torres/spark SPARK-24038

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21116.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21116
    
----
commit 0e8d80da1350b950fd690ff7c762d07d0767eafd
Author: Jose Torres <to...@...>
Date:   2018-03-21T22:05:31Z

    partial

commit 270f8ffe062d76e44a726753afc07c78348c3cc6
Author: Jose Torres <to...@...>
Date:   2018-03-22T06:08:52Z

    make ContinuousWriteExec work

commit 0cfeaeb6c0e6a3b500341852db8c53359120a753
Author: Jose Torres <to...@...>
Date:   2018-03-23T02:44:10Z

    fix docs

commit 7c375339bac0704c99ba6d87ee671dc3b7c0f531
Author: Jose Torres <to...@...>
Date:   2018-03-26T16:47:16Z

    remove old path

commit 26c1eadc67ffc48bcaf877154660982455892389
Author: Jose Torres <to...@...>
Date:   2018-03-26T17:44:19Z

    rm old path

commit e53707f1c2b836c38d00ad9527f1cf7b498051b7
Author: Jose Torres <to...@...>
Date:   2018-03-27T02:15:00Z

    format + docs

commit 4a7a4cc4aef2e27025e019d8dde29c30026cb330
Author: Jose Torres <to...@...>
Date:   2018-03-27T02:16:28Z

    rename node

commit bf1bef4a40d18fbc55f2a905dee7c01649af47ca
Author: Jose Torres <to...@...>
Date:   2018-03-30T00:59:36Z

    remove inheritance altogether

commit 9303e4e5a233109f00b25e68e2dfbec6a9aa218d
Author: Jose Torres <to...@...>
Date:   2018-04-20T18:18:57Z

    don't do StreamWriter changes

commit 3d8dfa415902d1d7be45a36923d2d355936eefbe
Author: Jose Torres <to...@...>
Date:   2018-04-20T18:19:25Z

    Merge remote-tracking branch 'apache/master' into SPARK-24038

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21116: [SPARK-24038][SS] Refactor continuous writing to its own...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21116
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21116: [SPARK-24038][SS] Refactor continuous writing to its own...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21116
  
    **[Test build #89654 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89654/testReport)** for PR 21116 at commit [`3d8dfa4`](https://github.com/apache/spark/commit/3d8dfa415902d1d7be45a36923d2d355936eefbe).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21116: [SPARK-24038][SS] Refactor continuous writing to its own...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21116
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21116: [SPARK-24038][SS] Refactor continuous writing to its own...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/21116
  
    LGTM!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21116: [SPARK-24038][SS] Refactor continuous writing to ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21116#discussion_r183489854
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala ---
    @@ -0,0 +1,126 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import scala.util.control.NonFatal
    +
    +import org.apache.spark.{SparkEnv, SparkException, TaskContext}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.Attribute
    +import org.apache.spark.sql.execution.SparkPlan
    +import org.apache.spark.sql.execution.datasources.v2.{DataWritingSparkTask, InternalRowDataWriterFactory}
    +import org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError, logInfo}
    +import org.apache.spark.sql.execution.streaming.StreamExecution
    +import org.apache.spark.sql.sources.v2.writer._
    +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
    +import org.apache.spark.util.Utils
    +
    +/**
    + * The physical plan for writing data into a continuous processing [[StreamWriter]].
    + */
    +case class WriteToContinuousDataSourceExec(writer: StreamWriter, query: SparkPlan)
    +    extends SparkPlan with Logging {
    +  override def children: Seq[SparkPlan] = Seq(query)
    +  override def output: Seq[Attribute] = Nil
    +
    +  override protected def doExecute(): RDD[InternalRow] = {
    +    val writerFactory = writer match {
    +      case w: SupportsWriteInternalRow => w.createInternalRowWriterFactory()
    +      case _ => new InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema)
    +    }
    +
    +    val rdd = query.execute()
    +    val messages = new Array[WriterCommitMessage](rdd.partitions.length)
    +
    +    logInfo(s"Start processing data source writer: $writer. " +
    +      s"The input RDD has ${messages.length} partitions.")
    +    // Let the epoch coordinator know how many partitions the write RDD has.
    +    EpochCoordinatorRef.get(
    +        sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
    +        sparkContext.env)
    +      .askSync[Unit](SetWriterPartitions(rdd.getNumPartitions))
    +
    +    try {
    +      // Force the RDD to run so continuous processing starts; no data is actually being collected
    +      // to the driver, as ContinuousWriteRDD outputs nothing.
    +      sparkContext.runJob(
    +        rdd,
    +        (context: TaskContext, iter: Iterator[InternalRow]) =>
    +          WriteToContinuousDataSourceExec.run(writerFactory, context, iter),
    +        rdd.partitions.indices)
    +    } catch {
    +      case _: InterruptedException =>
    +        // Interruption is how continuous queries are ended, so accept and ignore the exception.
    +      case cause: Throwable =>
    +        cause match {
    +          // Do not wrap interruption exceptions that will be handled by streaming specially.
    +          case _ if StreamExecution.isInterruptionException(cause) => throw cause
    +          // Only wrap non fatal exceptions.
    +          case NonFatal(e) => throw new SparkException("Writing job aborted.", e)
    +          case _ => throw cause
    +        }
    +    }
    +
    +    sparkContext.emptyRDD
    +  }
    +}
    +
    +object WriteToContinuousDataSourceExec extends Logging {
    +  def run(
    +      writeTask: DataWriterFactory[InternalRow],
    +      context: TaskContext,
    +      iter: Iterator[InternalRow]): Unit = {
    +    val epochCoordinator = EpochCoordinatorRef.get(
    +      context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
    +      SparkEnv.get)
    +    val currentMsg: WriterCommitMessage = null
    --- End diff --
    
    Nvm. I see the msg being sent back using the epochCoordinator. Then lets just remove the `currentMsg`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21116: [SPARK-24038][SS] Refactor continuous writing to ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21116#discussion_r183488923
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala ---
    @@ -0,0 +1,126 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import scala.util.control.NonFatal
    +
    +import org.apache.spark.{SparkEnv, SparkException, TaskContext}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.Attribute
    +import org.apache.spark.sql.execution.SparkPlan
    +import org.apache.spark.sql.execution.datasources.v2.{DataWritingSparkTask, InternalRowDataWriterFactory}
    +import org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError, logInfo}
    +import org.apache.spark.sql.execution.streaming.StreamExecution
    +import org.apache.spark.sql.sources.v2.writer._
    +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
    +import org.apache.spark.util.Utils
    +
    +/**
    + * The physical plan for writing data into a continuous processing [[StreamWriter]].
    + */
    +case class WriteToContinuousDataSourceExec(writer: StreamWriter, query: SparkPlan)
    +    extends SparkPlan with Logging {
    +  override def children: Seq[SparkPlan] = Seq(query)
    +  override def output: Seq[Attribute] = Nil
    +
    +  override protected def doExecute(): RDD[InternalRow] = {
    +    val writerFactory = writer match {
    +      case w: SupportsWriteInternalRow => w.createInternalRowWriterFactory()
    +      case _ => new InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema)
    +    }
    +
    +    val rdd = query.execute()
    +    val messages = new Array[WriterCommitMessage](rdd.partitions.length)
    --- End diff --
    
    Is this really needed. The only use of it is in the logInfo before, that too, only in the length, which is effectively `rdd.partitions.length`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21116: [SPARK-24038][SS] Refactor continuous writing to its own...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21116
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21116: [SPARK-24038][SS] Refactor continuous writing to its own...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21116
  
    **[Test build #89747 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89747/testReport)** for PR 21116 at commit [`b676dc8`](https://github.com/apache/spark/commit/b676dc85d5ab74b9d3457d45a97c062fd5a51dd3).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21116: [SPARK-24038][SS] Refactor continuous writing to its own...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21116
  
    **[Test build #89654 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89654/testReport)** for PR 21116 at commit [`3d8dfa4`](https://github.com/apache/spark/commit/3d8dfa415902d1d7be45a36923d2d355936eefbe).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21116: [SPARK-24038][SS] Refactor continuous writing to its own...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on the issue:

    https://github.com/apache/spark/pull/21116
  
    @tdas 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21116: [SPARK-24038][SS] Refactor continuous writing to its own...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on the issue:

    https://github.com/apache/spark/pull/21116
  
    addressed comments


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21116: [SPARK-24038][SS] Refactor continuous writing to its own...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21116
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89747/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21116: [SPARK-24038][SS] Refactor continuous writing to ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21116#discussion_r183489611
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala ---
    @@ -0,0 +1,126 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import scala.util.control.NonFatal
    +
    +import org.apache.spark.{SparkEnv, SparkException, TaskContext}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.Attribute
    +import org.apache.spark.sql.execution.SparkPlan
    +import org.apache.spark.sql.execution.datasources.v2.{DataWritingSparkTask, InternalRowDataWriterFactory}
    +import org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError, logInfo}
    +import org.apache.spark.sql.execution.streaming.StreamExecution
    +import org.apache.spark.sql.sources.v2.writer._
    +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
    +import org.apache.spark.util.Utils
    +
    +/**
    + * The physical plan for writing data into a continuous processing [[StreamWriter]].
    + */
    +case class WriteToContinuousDataSourceExec(writer: StreamWriter, query: SparkPlan)
    +    extends SparkPlan with Logging {
    +  override def children: Seq[SparkPlan] = Seq(query)
    +  override def output: Seq[Attribute] = Nil
    +
    +  override protected def doExecute(): RDD[InternalRow] = {
    +    val writerFactory = writer match {
    +      case w: SupportsWriteInternalRow => w.createInternalRowWriterFactory()
    +      case _ => new InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema)
    +    }
    +
    +    val rdd = query.execute()
    +    val messages = new Array[WriterCommitMessage](rdd.partitions.length)
    +
    +    logInfo(s"Start processing data source writer: $writer. " +
    +      s"The input RDD has ${messages.length} partitions.")
    +    // Let the epoch coordinator know how many partitions the write RDD has.
    +    EpochCoordinatorRef.get(
    +        sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
    +        sparkContext.env)
    +      .askSync[Unit](SetWriterPartitions(rdd.getNumPartitions))
    +
    +    try {
    +      // Force the RDD to run so continuous processing starts; no data is actually being collected
    +      // to the driver, as ContinuousWriteRDD outputs nothing.
    +      sparkContext.runJob(
    +        rdd,
    +        (context: TaskContext, iter: Iterator[InternalRow]) =>
    +          WriteToContinuousDataSourceExec.run(writerFactory, context, iter),
    +        rdd.partitions.indices)
    +    } catch {
    +      case _: InterruptedException =>
    +        // Interruption is how continuous queries are ended, so accept and ignore the exception.
    +      case cause: Throwable =>
    +        cause match {
    +          // Do not wrap interruption exceptions that will be handled by streaming specially.
    +          case _ if StreamExecution.isInterruptionException(cause) => throw cause
    +          // Only wrap non fatal exceptions.
    +          case NonFatal(e) => throw new SparkException("Writing job aborted.", e)
    +          case _ => throw cause
    +        }
    +    }
    +
    +    sparkContext.emptyRDD
    +  }
    +}
    +
    +object WriteToContinuousDataSourceExec extends Logging {
    +  def run(
    +      writeTask: DataWriterFactory[InternalRow],
    +      context: TaskContext,
    +      iter: Iterator[InternalRow]): Unit = {
    +    val epochCoordinator = EpochCoordinatorRef.get(
    +      context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
    +      SparkEnv.get)
    +    val currentMsg: WriterCommitMessage = null
    --- End diff --
    
    Good point. I see its no long used anywhere. That raises two questions. 
    - This should be removed. 
    - Does this continuous code path not have to send back a WriterMessage? How is that working?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21116: [SPARK-24038][SS] Refactor continuous writing to ...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21116#discussion_r183224838
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala ---
    @@ -0,0 +1,126 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import scala.util.control.NonFatal
    +
    +import org.apache.spark.{SparkEnv, SparkException, TaskContext}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.Attribute
    +import org.apache.spark.sql.execution.SparkPlan
    +import org.apache.spark.sql.execution.datasources.v2.{DataWritingSparkTask, InternalRowDataWriterFactory}
    +import org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError, logInfo}
    +import org.apache.spark.sql.execution.streaming.StreamExecution
    +import org.apache.spark.sql.sources.v2.writer._
    +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
    +import org.apache.spark.util.Utils
    +
    +/**
    + * The physical plan for writing data into a continuous processing [[StreamWriter]].
    + */
    +case class WriteToContinuousDataSourceExec(writer: StreamWriter, query: SparkPlan)
    +    extends SparkPlan with Logging {
    +  override def children: Seq[SparkPlan] = Seq(query)
    +  override def output: Seq[Attribute] = Nil
    +
    +  override protected def doExecute(): RDD[InternalRow] = {
    +    val writerFactory = writer match {
    +      case w: SupportsWriteInternalRow => w.createInternalRowWriterFactory()
    +      case _ => new InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema)
    +    }
    +
    +    val rdd = query.execute()
    +    val messages = new Array[WriterCommitMessage](rdd.partitions.length)
    +
    +    logInfo(s"Start processing data source writer: $writer. " +
    +      s"The input RDD has ${messages.length} partitions.")
    +    // Let the epoch coordinator know how many partitions the write RDD has.
    +    EpochCoordinatorRef.get(
    +        sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
    +        sparkContext.env)
    +      .askSync[Unit](SetWriterPartitions(rdd.getNumPartitions))
    +
    +    try {
    +      // Force the RDD to run so continuous processing starts; no data is actually being collected
    +      // to the driver, as ContinuousWriteRDD outputs nothing.
    +      sparkContext.runJob(
    +        rdd,
    +        (context: TaskContext, iter: Iterator[InternalRow]) =>
    +          WriteToContinuousDataSourceExec.run(writerFactory, context, iter),
    +        rdd.partitions.indices)
    +    } catch {
    +      case _: InterruptedException =>
    +        // Interruption is how continuous queries are ended, so accept and ignore the exception.
    +      case cause: Throwable =>
    +        cause match {
    +          // Do not wrap interruption exceptions that will be handled by streaming specially.
    +          case _ if StreamExecution.isInterruptionException(cause) => throw cause
    +          // Only wrap non fatal exceptions.
    +          case NonFatal(e) => throw new SparkException("Writing job aborted.", e)
    +          case _ => throw cause
    +        }
    +    }
    +
    +    sparkContext.emptyRDD
    +  }
    +}
    +
    +object WriteToContinuousDataSourceExec extends Logging {
    +  def run(
    +      writeTask: DataWriterFactory[InternalRow],
    +      context: TaskContext,
    +      iter: Iterator[InternalRow]): Unit = {
    +    val epochCoordinator = EpochCoordinatorRef.get(
    +      context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
    +      SparkEnv.get)
    +    val currentMsg: WriterCommitMessage = null
    --- End diff --
    
    currentMsg is no longer needed?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21116: [SPARK-24038][SS] Refactor continuous writing to its own...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21116
  
    **[Test build #89747 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89747/testReport)** for PR 21116 at commit [`b676dc8`](https://github.com/apache/spark/commit/b676dc85d5ab74b9d3457d45a97c062fd5a51dd3).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21116: [SPARK-24038][SS] Refactor continuous writing to its own...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21116
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89654/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21116: [SPARK-24038][SS] Refactor continuous writing to ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/21116


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org