You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2018/12/17 19:54:10 UTC

[GitHub] asfgit closed pull request #21919: [SPARK-24933][SS] Report numOutputRows in SinkProgress

asfgit closed pull request #21919: [SPARK-24933][SS] Report numOutputRows in SinkProgress
URL: https://github.com/apache/spark/pull/21919
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
index d46c4139011da..07d2b8a5dc420 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
@@ -232,6 +232,27 @@ class KafkaSinkSuite extends StreamTest with SharedSQLContext with KafkaTest {
     }
   }
 
+  test("streaming - sink progress is produced") {
+    /* ensure sink progress is correctly produced. */
+    val input = MemoryStream[String]
+    val topic = newTopic()
+    testUtils.createTopic(topic)
+
+    val writer = createKafkaWriter(
+      input.toDF(),
+      withTopic = Some(topic),
+      withOutputMode = Some(OutputMode.Update()))()
+
+    try {
+      input.addData("1", "2", "3")
+      failAfter(streamingTimeout) {
+        writer.processAllAvailable()
+      }
+      assert(writer.lastProgress.sink.numOutputRows == 3L)
+    } finally {
+      writer.stop()
+    }
+  }
 
   test("streaming - write data with bad schema") {
     val input = MemoryStream[String]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index 9a1fe1e0a328b..d7e20eed4cbc0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
 import org.apache.spark.sql.sources.v2.writer._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{LongAccumulator, Utils}
 
 /**
  * Deprecated logical plan for writing data into data source v2. This is being replaced by more
@@ -47,6 +47,8 @@ case class WriteToDataSourceV2(writeSupport: BatchWriteSupport, query: LogicalPl
 case class WriteToDataSourceV2Exec(writeSupport: BatchWriteSupport, query: SparkPlan)
   extends UnaryExecNode {
 
+  var commitProgress: Option[StreamWriterCommitProgress] = None
+
   override def child: SparkPlan = query
   override def output: Seq[Attribute] = Nil
 
@@ -55,6 +57,7 @@ case class WriteToDataSourceV2Exec(writeSupport: BatchWriteSupport, query: Spark
     val useCommitCoordinator = writeSupport.useCommitCoordinator
     val rdd = query.execute()
     val messages = new Array[WriterCommitMessage](rdd.partitions.length)
+    val totalNumRowsAccumulator = new LongAccumulator()
 
     logInfo(s"Start processing data source write support: $writeSupport. " +
       s"The input RDD has ${messages.length} partitions.")
@@ -65,15 +68,18 @@ case class WriteToDataSourceV2Exec(writeSupport: BatchWriteSupport, query: Spark
         (context: TaskContext, iter: Iterator[InternalRow]) =>
           DataWritingSparkTask.run(writerFactory, context, iter, useCommitCoordinator),
         rdd.partitions.indices,
-        (index, message: WriterCommitMessage) => {
-          messages(index) = message
-          writeSupport.onDataWriterCommit(message)
+        (index, result: DataWritingSparkTaskResult) => {
+          val commitMessage = result.writerCommitMessage
+          messages(index) = commitMessage
+          totalNumRowsAccumulator.add(result.numRows)
+          writeSupport.onDataWriterCommit(commitMessage)
         }
       )
 
       logInfo(s"Data source write support $writeSupport is committing.")
       writeSupport.commit(messages)
       logInfo(s"Data source write support $writeSupport committed.")
+      commitProgress = Some(StreamWriterCommitProgress(totalNumRowsAccumulator.value))
     } catch {
       case cause: Throwable =>
         logError(s"Data source write support $writeSupport is aborting.")
@@ -102,7 +108,7 @@ object DataWritingSparkTask extends Logging {
       writerFactory: DataWriterFactory,
       context: TaskContext,
       iter: Iterator[InternalRow],
-      useCommitCoordinator: Boolean): WriterCommitMessage = {
+      useCommitCoordinator: Boolean): DataWritingSparkTaskResult = {
     val stageId = context.stageId()
     val stageAttempt = context.stageAttemptNumber()
     val partId = context.partitionId()
@@ -110,9 +116,12 @@ object DataWritingSparkTask extends Logging {
     val attemptId = context.attemptNumber()
     val dataWriter = writerFactory.createWriter(partId, taskId)
 
+    var count = 0L
     // write the data and commit this writer.
     Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
       while (iter.hasNext) {
+        // Count is here.
+        count += 1
         dataWriter.write(iter.next())
       }
 
@@ -139,7 +148,7 @@ object DataWritingSparkTask extends Logging {
       logInfo(s"Committed partition $partId (task $taskId, attempt $attemptId" +
         s"stage $stageId.$stageAttempt)")
 
-      msg
+      DataWritingSparkTaskResult(count, msg)
 
     })(catchBlock = {
       // If there is an error, abort this writer
@@ -151,3 +160,12 @@ object DataWritingSparkTask extends Logging {
     })
   }
 }
+
+private[v2] case class DataWritingSparkTaskResult(
+                                                   numRows: Long,
+                                                   writerCommitMessage: WriterCommitMessage)
+
+/**
+ * Sink progress information collected after commit.
+ */
+private[sql] case class StreamWriterCommitProgress(numOutputRows: Long)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 03beefeca269b..8ad436a4ff57d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentBatchTimestamp,
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project}
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.execution.SQLExecution
-import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, WriteToDataSourceV2}
+import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, StreamWriterCommitProgress, WriteToDataSourceV2, WriteToDataSourceV2Exec}
 import org.apache.spark.sql.execution.streaming.sources.{MicroBatchWritSupport, RateControlMicroBatchReadSupport}
 import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset => OffsetV2}
@@ -246,6 +246,7 @@ class MicroBatchExecution(
    *  DONE
    */
   private def populateStartOffsets(sparkSessionToRunBatches: SparkSession): Unit = {
+    sinkCommitProgress = None
     offsetLog.getLatest() match {
       case Some((latestBatchId, nextOffsets)) =>
         /* First assume that we are re-executing the latest known batch
@@ -537,7 +538,8 @@ class MicroBatchExecution(
     val nextBatch =
       new Dataset(sparkSessionToRunBatch, lastExecution, RowEncoder(lastExecution.analyzed.schema))
 
-    reportTimeTaken("addBatch") {
+    val batchSinkProgress: Option[StreamWriterCommitProgress] =
+      reportTimeTaken("addBatch") {
       SQLExecution.withNewExecutionId(sparkSessionToRunBatch, lastExecution) {
         sink match {
           case s: Sink => s.addBatch(currentBatchId, nextBatch)
@@ -545,10 +547,15 @@ class MicroBatchExecution(
             // This doesn't accumulate any data - it just forces execution of the microbatch writer.
             nextBatch.collect()
         }
+        lastExecution.executedPlan match {
+          case w: WriteToDataSourceV2Exec => w.commitProgress
+          case _ => None
+        }
       }
     }
 
     withProgressLocked {
+      sinkCommitProgress = batchSinkProgress
       watermarkTracker.updateWatermark(lastExecution.executedPlan)
       commitLog.add(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark))
       committedOffsets ++= availableOffsets
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 39ab702ee083c..d1f3f74c5e731 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.QueryExecution
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2StreamingScanExec
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2StreamingScanExec, StreamWriterCommitProgress}
 import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport
 import org.apache.spark.sql.streaming._
 import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
@@ -56,6 +56,7 @@ trait ProgressReporter extends Logging {
   protected def logicalPlan: LogicalPlan
   protected def lastExecution: QueryExecution
   protected def newData: Map[BaseStreamingSource, LogicalPlan]
+  protected def sinkCommitProgress: Option[StreamWriterCommitProgress]
   protected def sources: Seq[BaseStreamingSource]
   protected def sink: BaseStreamingSink
   protected def offsetSeqMetadata: OffsetSeqMetadata
@@ -167,7 +168,9 @@ trait ProgressReporter extends Logging {
       )
     }
 
-    val sinkProgress = new SinkProgress(sink.toString)
+    val sinkProgress = SinkProgress(
+      sink.toString,
+      sinkCommitProgress.map(_.numOutputRows))
 
     val newProgress = new StreamingQueryProgress(
       id = id,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 89b4f40c9c0b9..83824f40ab90b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.command.StreamingExplainCommand
+import org.apache.spark.sql.execution.datasources.v2.StreamWriterCommitProgress
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming._
 import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
@@ -114,6 +115,9 @@ abstract class StreamExecution(
   @volatile
   var availableOffsets = new StreamProgress
 
+  @volatile
+  var sinkCommitProgress: Option[StreamWriterCommitProgress] = None
+
   /** The current batchId or -1 if execution has not yet been initialized. */
   protected var currentBatchId: Long = -1
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
index 3cd6700efef5f..0b3945cbd1323 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -30,6 +30,7 @@ import org.json4s.JsonDSL._
 import org.json4s.jackson.JsonMethods._
 
 import org.apache.spark.annotation.Evolving
+import org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS
 
 /**
  * Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger.
@@ -207,11 +208,19 @@ class SourceProgress protected[sql](
  * during a trigger. See [[StreamingQueryProgress]] for more information.
  *
  * @param description Description of the source corresponding to this status.
+ * @param numOutputRows Number of rows written to the sink or -1 for Continuous Mode (temporarily)
+ * or Sink V1 (until decommissioned).
  * @since 2.1.0
  */
 @Evolving
 class SinkProgress protected[sql](
-    val description: String) extends Serializable {
+    val description: String,
+    val numOutputRows: Long) extends Serializable {
+
+  /** SinkProgress without custom metrics. */
+  protected[sql] def this(description: String) {
+    this(description, DEFAULT_NUM_OUTPUT_ROWS)
+  }
 
   /** The compact JSON representation of this progress. */
   def json: String = compact(render(jsonValue))
@@ -222,6 +231,14 @@ class SinkProgress protected[sql](
   override def toString: String = prettyJson
 
   private[sql] def jsonValue: JValue = {
-    ("description" -> JString(description))
+    ("description" -> JString(description)) ~
+      ("numOutputRows" -> JInt(numOutputRows))
   }
 }
+
+private[sql] object SinkProgress {
+  val DEFAULT_NUM_OUTPUT_ROWS: Long = -1L
+
+  def apply(description: String, numOutputRows: Option[Long]): SinkProgress =
+    new SinkProgress(description, numOutputRows.getOrElse(DEFAULT_NUM_OUTPUT_ROWS))
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
index 7bef687e7e43b..2f460b044b237 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
@@ -73,7 +73,8 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
         |    "inputRowsPerSecond" : 10.0
         |  } ],
         |  "sink" : {
-        |    "description" : "sink"
+        |    "description" : "sink",
+        |    "numOutputRows" : -1
         |  }
         |}
       """.stripMargin.trim)
@@ -105,7 +106,8 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
          |    "numInputRows" : 678
          |  } ],
          |  "sink" : {
-         |    "description" : "sink"
+         |    "description" : "sink",
+         |    "numOutputRows" : -1
          |  }
          |}
       """.stripMargin.trim)
@@ -250,7 +252,7 @@ object StreamingQueryStatusAndProgressSuite {
         processedRowsPerSecond = Double.PositiveInfinity  // should not be present in the json
       )
     ),
-    sink = new SinkProgress("sink")
+    sink = SinkProgress("sink", None)
   )
 
   val testProgress2 = new StreamingQueryProgress(
@@ -274,7 +276,7 @@ object StreamingQueryStatusAndProgressSuite {
         processedRowsPerSecond = Double.NegativeInfinity // should not be present in the json
       )
     ),
-    sink = new SinkProgress("sink")
+    sink = SinkProgress("sink", None)
   )
 
   val testStatus = new StreamingQueryStatus("active", true, false)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org