You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by arunmahadevan <gi...@git.apache.org> on 2018/05/01 01:03:09 UTC

[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21200#discussion_r185148972
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala ---
    @@ -0,0 +1,153 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.util.concurrent.TimeUnit
    +import javax.annotation.concurrent.GuardedBy
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark._
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.{Row, SQLContext}
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader}
    +import org.apache.spark.sql.sources.v2.reader._
    +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset}
    +import org.apache.spark.util.ThreadUtils
    +
    +/**
    + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]]
    + * to read from the remote source, and polls that queue for incoming rows.
    + *
    + * Note that continuous processing calls compute() multiple times, and the same
    + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split.
    + */
    +class ContinuousDataSourceRDD(
    +    sc: SparkContext,
    +    sqlContext: SQLContext,
    +    @transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]])
    +  extends RDD[UnsafeRow](sc, Nil) {
    +
    +  private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize
    +  private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs
    +
    +  // When computing the same partition multiple times, we need to use the same data reader to
    +  // do so for continuity in offsets.
    +  @GuardedBy("dataReaders")
    +  private val dataReaders: mutable.Map[Partition, ContinuousQueuedDataReader] =
    +    mutable.Map[Partition, ContinuousQueuedDataReader]()
    +
    +  override protected def getPartitions: Array[Partition] = {
    +    readerFactories.zipWithIndex.map {
    +      case (readerFactory, index) => new DataSourceRDDPartition(index, readerFactory)
    +    }.toArray
    +  }
    +
    +  override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = {
    +    // If attempt number isn't 0, this is a task retry, which we don't support.
    +    if (context.attemptNumber() != 0) {
    +      throw new ContinuousTaskRetryException()
    +    }
    +
    +    val readerForPartition = dataReaders.synchronized {
    +      if (!dataReaders.contains(split)) {
    +        dataReaders.put(
    +          split,
    +          new ContinuousQueuedDataReader(split, context, dataQueueSize, epochPollIntervalMs))
    +      }
    +
    +      dataReaders(split)
    +    }
    +
    +    val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY)
    +    val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, SparkEnv.get)
    +    new Iterator[UnsafeRow] {
    +      private val POLL_TIMEOUT_MS = 1000
    +
    +      private var currentEntry: (UnsafeRow, PartitionOffset) = _
    +
    +      override def hasNext(): Boolean = {
    +        while (currentEntry == null) {
    +          if (context.isInterrupted() || context.isCompleted()) {
    +            currentEntry = (null, null)
    +          }
    +          if (readerForPartition.dataReaderFailed.get()) {
    +            throw new SparkException(
    +              "data read failed", readerForPartition.dataReaderThread.failureReason)
    +          }
    +          if (readerForPartition.epochPollFailed.get()) {
    +            throw new SparkException(
    +              "epoch poll failed", readerForPartition.epochPollRunnable.failureReason)
    +          }
    +          currentEntry = readerForPartition.queue.poll(POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS)
    +        }
    +
    +        currentEntry match {
    +          // epoch boundary marker
    +          case (null, null) =>
    +            epochEndpoint.send(ReportPartitionOffset(
    +              context.partitionId(),
    +              readerForPartition.currentEpoch,
    +              readerForPartition.currentOffset))
    +            readerForPartition.currentEpoch += 1
    +            currentEntry = null
    +            false
    --- End diff --
    
    Not sure if the iterator hack would lead to more hacks when there are multiple stages so that the intermediate stages do not terminate. Is there a plan to change this approach later (Say something like an unbounded RDD that never terminates but passes the epoch markers along with the data in the pipeline) ?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org