You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/12/17 19:50:38 UTC

[spark] branch master updated: [SPARK-24933][SS] Report numOutputRows in SinkProgress

This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 81d377d7 [SPARK-24933][SS] Report numOutputRows in SinkProgress
81d377d7 is described below

commit 81d377d772a527d9ae3311be0480e6403769e919
Author: Vaclav Kosar <ad...@vaclavkosar.com>
AuthorDate: Mon Dec 17 11:50:24 2018 -0800

    [SPARK-24933][SS] Report numOutputRows in SinkProgress
    
    ## What changes were proposed in this pull request?
    
    SinkProgress should report similar properties like SourceProgress as long as they are available for given Sink. Count of written rows is metric availble for all Sinks. Since relevant progress information is with respect to commited rows, ideal object to carry this info is WriterCommitMessage. For brevity the implementation will focus only on Sinks with API V2 and on Micro Batch mode. Implemention for Continuous mode will be provided at later date.
    
    ### Before
    ```
    {"description":"org.apache.spark.sql.kafka010.KafkaSourceProvider3c0bd317"}
    ```
    
    ### After
    ```
    {"description":"org.apache.spark.sql.kafka010.KafkaSourceProvider3c0bd317","numOutputRows":5000}
    ```
    
    ### This PR is related to:
    - https://issues.apache.org/jira/browse/SPARK-24647
    - https://issues.apache.org/jira/browse/SPARK-21313
    
    ## How was this patch tested?
    
    Existing and new unit tests.
    
    Please review http://spark.apache.org/contributing.html before opening a pull request.
    
    Closes #21919 from vackosar/feature/SPARK-24933-numOutputRows.
    
    Lead-authored-by: Vaclav Kosar <ad...@vaclavkosar.com>
    Co-authored-by: Kosar, Vaclav: Functions Transformation <Va...@barclayscapital.com>
    Signed-off-by: gatorsmile <ga...@gmail.com>
---
 .../apache/spark/sql/kafka010/KafkaSinkSuite.scala | 21 +++++++++++++++
 .../datasources/v2/WriteToDataSourceV2Exec.scala   | 30 +++++++++++++++++-----
 .../execution/streaming/MicroBatchExecution.scala  | 11 ++++++--
 .../sql/execution/streaming/ProgressReporter.scala |  7 +++--
 .../sql/execution/streaming/StreamExecution.scala  |  4 +++
 .../org/apache/spark/sql/streaming/progress.scala  | 21 +++++++++++++--
 .../StreamingQueryStatusAndProgressSuite.scala     | 10 +++++---
 7 files changed, 88 insertions(+), 16 deletions(-)

diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
index d46c413..07d2b8a 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
@@ -232,6 +232,27 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext with KafkaTest {
     }
   }
 
+  test("streaming - sink progress is produced") {
+    /* ensure sink progress is correctly produced. */
+    val input = MemoryStream[String]
+    val topic = newTopic()
+    testUtils.createTopic(topic)
+
+    val writer = createKafkaWriter(
+      input.toDF(),
+      withTopic = Some(topic),
+      withOutputMode = Some(OutputMode.Update()))()
+
+    try {
+      input.addData("1", "2", "3")
+      failAfter(streamingTimeout) {
+        writer.processAllAvailable()
+      }
+      assert(writer.lastProgress.sink.numOutputRows == 3L)
+    } finally {
+      writer.stop()
+    }
+  }
 
   test("streaming - write data with bad schema") {
     val input = MemoryStream[String]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index 9a1fe1e..d7e20ee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
 import org.apache.spark.sql.sources.v2.writer._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{LongAccumulator, Utils}
 
 /**
  * Deprecated logical plan for writing data into data source v2. This is being replaced by more
@@ -47,6 +47,8 @@ case class WriteToDataSourceV2(writeSupport: BatchWriteSupport, query: LogicalPl
 case class WriteToDataSourceV2Exec(writeSupport: BatchWriteSupport, query: SparkPlan)
   extends UnaryExecNode {
 
+  var commitProgress: Option[StreamWriterCommitProgress] = None
+
   override def child: SparkPlan = query
   override def output: Seq[Attribute] = Nil
 
@@ -55,6 +57,7 @@ case class WriteToDataSourceV2Exec(writeSupport: BatchWriteSupport, query: Spark
     val useCommitCoordinator = writeSupport.useCommitCoordinator
     val rdd = query.execute()
     val messages = new Array[WriterCommitMessage](rdd.partitions.length)
+    val totalNumRowsAccumulator = new LongAccumulator()
 
     logInfo(s"Start processing data source write support: $writeSupport. " +
       s"The input RDD has ${messages.length} partitions.")
@@ -65,15 +68,18 @@ case class WriteToDataSourceV2Exec(writeSupport: BatchWriteSupport, query: Spark
         (context: TaskContext, iter: Iterator[InternalRow]) =>
           DataWritingSparkTask.run(writerFactory, context, iter, useCommitCoordinator),
         rdd.partitions.indices,
-        (index, message: WriterCommitMessage) => {
-          messages(index) = message
-          writeSupport.onDataWriterCommit(message)
+        (index, result: DataWritingSparkTaskResult) => {
+          val commitMessage = result.writerCommitMessage
+          messages(index) = commitMessage
+          totalNumRowsAccumulator.add(result.numRows)
+          writeSupport.onDataWriterCommit(commitMessage)
         }
       )
 
       logInfo(s"Data source write support $writeSupport is committing.")
       writeSupport.commit(messages)
       logInfo(s"Data source write support $writeSupport committed.")
+      commitProgress = Some(StreamWriterCommitProgress(totalNumRowsAccumulator.value))
     } catch {
       case cause: Throwable =>
         logError(s"Data source write support $writeSupport is aborting.")
@@ -102,7 +108,7 @@ object DataWritingSparkTask extends Logging {
       writerFactory: DataWriterFactory,
       context: TaskContext,
       iter: Iterator[InternalRow],
-      useCommitCoordinator: Boolean): WriterCommitMessage = {
+      useCommitCoordinator: Boolean): DataWritingSparkTaskResult = {
     val stageId = context.stageId()
     val stageAttempt = context.stageAttemptNumber()
     val partId = context.partitionId()
@@ -110,9 +116,12 @@ object DataWritingSparkTask extends Logging {
     val attemptId = context.attemptNumber()
     val dataWriter = writerFactory.createWriter(partId, taskId)
 
+    var count = 0L
     // write the data and commit this writer.
     Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
       while (iter.hasNext) {
+        // Count is here.
+        count += 1
         dataWriter.write(iter.next())
       }
 
@@ -139,7 +148,7 @@ object DataWritingSparkTask extends Logging {
       logInfo(s"Committed partition $partId (task $taskId, attempt $attemptId" +
         s"stage $stageId.$stageAttempt)")
 
-      msg
+      DataWritingSparkTaskResult(count, msg)
 
     })(catchBlock = {
       // If there is an error, abort this writer
@@ -151,3 +160,12 @@ object DataWritingSparkTask extends Logging {
     })
   }
 }
+
+private[v2] case class DataWritingSparkTaskResult(
+                                                   numRows: Long,
+                                                   writerCommitMessage: WriterCommitMessage)
+
+/**
+ * Sink progress information collected after commit.
+ */
+private[sql] case class StreamWriterCommitProgress(numOutputRows: Long)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 03beefe..8ad436a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentBatchTimestamp,
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project}
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.execution.SQLExecution
-import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, WriteToDataSourceV2}
+import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, StreamWriterCommitProgress, WriteToDataSourceV2, WriteToDataSourceV2Exec}
 import org.apache.spark.sql.execution.streaming.sources.{MicroBatchWritSupport, RateControlMicroBatchReadSupport}
 import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset => OffsetV2}
@@ -246,6 +246,7 @@ class MicroBatchExecution(
    *  DONE
    */
   private def populateStartOffsets(sparkSessionToRunBatches: SparkSession): Unit = {
+    sinkCommitProgress = None
     offsetLog.getLatest() match {
       case Some((latestBatchId, nextOffsets)) =>
         /* First assume that we are re-executing the latest known batch
@@ -537,7 +538,8 @@ class MicroBatchExecution(
     val nextBatch =
       new Dataset(sparkSessionToRunBatch, lastExecution, RowEncoder(lastExecution.analyzed.schema))
 
-    reportTimeTaken("addBatch") {
+    val batchSinkProgress: Option[StreamWriterCommitProgress] =
+      reportTimeTaken("addBatch") {
       SQLExecution.withNewExecutionId(sparkSessionToRunBatch, lastExecution) {
         sink match {
           case s: Sink => s.addBatch(currentBatchId, nextBatch)
@@ -545,10 +547,15 @@ class MicroBatchExecution(
             // This doesn't accumulate any data - it just forces execution of the microbatch writer.
             nextBatch.collect()
         }
+        lastExecution.executedPlan match {
+          case w: WriteToDataSourceV2Exec => w.commitProgress
+          case _ => None
+        }
       }
     }
 
     withProgressLocked {
+      sinkCommitProgress = batchSinkProgress
       watermarkTracker.updateWatermark(lastExecution.executedPlan)
       commitLog.add(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark))
       committedOffsets ++= availableOffsets
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 39ab702..d1f3f74 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.QueryExecution
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2StreamingScanExec
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2StreamingScanExec, StreamWriterCommitProgress}
 import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport
 import org.apache.spark.sql.streaming._
 import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
@@ -56,6 +56,7 @@ trait ProgressReporter extends Logging {
   protected def logicalPlan: LogicalPlan
   protected def lastExecution: QueryExecution
   protected def newData: Map[BaseStreamingSource, LogicalPlan]
+  protected def sinkCommitProgress: Option[StreamWriterCommitProgress]
   protected def sources: Seq[BaseStreamingSource]
   protected def sink: BaseStreamingSink
   protected def offsetSeqMetadata: OffsetSeqMetadata
@@ -167,7 +168,9 @@ trait ProgressReporter extends Logging {
       )
     }
 
-    val sinkProgress = new SinkProgress(sink.toString)
+    val sinkProgress = SinkProgress(
+      sink.toString,
+      sinkCommitProgress.map(_.numOutputRows))
 
     val newProgress = new StreamingQueryProgress(
       id = id,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 89b4f40..83824f4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.command.StreamingExplainCommand
+import org.apache.spark.sql.execution.datasources.v2.StreamWriterCommitProgress
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming._
 import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
@@ -114,6 +115,9 @@ abstract class StreamExecution(
   @volatile
   var availableOffsets = new StreamProgress
 
+  @volatile
+  var sinkCommitProgress: Option[StreamWriterCommitProgress] = None
+
   /** The current batchId or -1 if execution has not yet been initialized. */
   protected var currentBatchId: Long = -1
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
index 3cd6700..0b3945c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -30,6 +30,7 @@ import org.json4s.JsonDSL._
 import org.json4s.jackson.JsonMethods._
 
 import org.apache.spark.annotation.Evolving
+import org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS
 
 /**
  * Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger.
@@ -207,11 +208,19 @@ class SourceProgress protected[sql](
  * during a trigger. See [[StreamingQueryProgress]] for more information.
  *
  * @param description Description of the source corresponding to this status.
+ * @param numOutputRows Number of rows written to the sink or -1 for Continuous Mode (temporarily)
+ * or Sink V1 (until decommissioned).
  * @since 2.1.0
  */
 @Evolving
 class SinkProgress protected[sql](
-    val description: String) extends Serializable {
+    val description: String,
+    val numOutputRows: Long) extends Serializable {
+
+  /** SinkProgress without custom metrics. */
+  protected[sql] def this(description: String) {
+    this(description, DEFAULT_NUM_OUTPUT_ROWS)
+  }
 
   /** The compact JSON representation of this progress. */
   def json: String = compact(render(jsonValue))
@@ -222,6 +231,14 @@ class SinkProgress protected[sql](
   override def toString: String = prettyJson
 
   private[sql] def jsonValue: JValue = {
-    ("description" -> JString(description))
+    ("description" -> JString(description)) ~
+      ("numOutputRows" -> JInt(numOutputRows))
   }
 }
+
+private[sql] object SinkProgress {
+  val DEFAULT_NUM_OUTPUT_ROWS: Long = -1L
+
+  def apply(description: String, numOutputRows: Option[Long]): SinkProgress =
+    new SinkProgress(description, numOutputRows.getOrElse(DEFAULT_NUM_OUTPUT_ROWS))
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
index 7bef687..2f460b0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
@@ -73,7 +73,8 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
         |    "inputRowsPerSecond" : 10.0
         |  } ],
         |  "sink" : {
-        |    "description" : "sink"
+        |    "description" : "sink",
+        |    "numOutputRows" : -1
         |  }
         |}
       """.stripMargin.trim)
@@ -105,7 +106,8 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
          |    "numInputRows" : 678
          |  } ],
          |  "sink" : {
-         |    "description" : "sink"
+         |    "description" : "sink",
+         |    "numOutputRows" : -1
          |  }
          |}
       """.stripMargin.trim)
@@ -250,7 +252,7 @@ object StreamingQueryStatusAndProgressSuite {
         processedRowsPerSecond = Double.PositiveInfinity  // should not be present in the json
       )
     ),
-    sink = new SinkProgress("sink")
+    sink = SinkProgress("sink", None)
   )
 
   val testProgress2 = new StreamingQueryProgress(
@@ -274,7 +276,7 @@ object StreamingQueryStatusAndProgressSuite {
         processedRowsPerSecond = Double.NegativeInfinity // should not be present in the json
       )
     ),
-    sink = new SinkProgress("sink")
+    sink = SinkProgress("sink", None)
   )
 
   val testStatus = new StreamingQueryStatus("active", true, false)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org