You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2017/07/18 23:29:49 UTC
spark git commit: [SPARK-21462][SS] Added batchId to
StreamingQueryProgress.json
Repository: spark
Updated Branches:
refs/heads/master f18b905f6 -> 84f1b25f3
[SPARK-21462][SS] Added batchId to StreamingQueryProgress.json
## What changes were proposed in this pull request?
- Added batchId to StreamingQueryProgress.json as that was missing from the generated json.
- Also, removed recently added numPartitions from StatefulOperatorProgress as this value does not change through the query run, and there are other ways to find that.
## How was this patch tested?
Updated unit tests
Author: Tathagata Das <ta...@gmail.com>
Closes #18675 from tdas/SPARK-21462.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/84f1b25f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/84f1b25f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/84f1b25f
Branch: refs/heads/master
Commit: 84f1b25f316a42ce4d3b69a3e136d0db41c9aec2
Parents: f18b905
Author: Tathagata Das <ta...@gmail.com>
Authored: Tue Jul 18 16:29:45 2017 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Tue Jul 18 16:29:45 2017 -0700
----------------------------------------------------------------------
.../sql/execution/streaming/statefulOperators.scala | 3 +--
.../scala/org/apache/spark/sql/streaming/progress.scala | 9 ++++-----
.../StreamingQueryStatusAndProgressSuite.scala | 12 ++++++------
3 files changed, 11 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/84f1b25f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index 3ca7f4b..6addab6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -87,8 +87,7 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan =>
new StateOperatorProgress(
numRowsTotal = longMetric("numTotalStateRows").value,
numRowsUpdated = longMetric("numUpdatedStateRows").value,
- memoryUsedBytes = longMetric("stateMemory").value,
- numPartitions = this.sqlContext.conf.numShufflePartitions)
+ memoryUsedBytes = longMetric("stateMemory").value)
}
/** Records the duration of running `body` for the next query progress update. */
http://git-wip-us.apache.org/repos/asf/spark/blob/84f1b25f/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
index 81a2387..3000c42 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -38,8 +38,7 @@ import org.apache.spark.annotation.InterfaceStability
class StateOperatorProgress private[sql](
val numRowsTotal: Long,
val numRowsUpdated: Long,
- val memoryUsedBytes: Long,
- val numPartitions: Long
+ val memoryUsedBytes: Long
) extends Serializable {
/** The compact JSON representation of this progress. */
@@ -49,13 +48,12 @@ class StateOperatorProgress private[sql](
def prettyJson: String = pretty(render(jsonValue))
private[sql] def copy(newNumRowsUpdated: Long): StateOperatorProgress =
- new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes, numPartitions)
+ new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes)
private[sql] def jsonValue: JValue = {
("numRowsTotal" -> JInt(numRowsTotal)) ~
("numRowsUpdated" -> JInt(numRowsUpdated)) ~
- ("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~
- ("numPartitions" -> JInt(numPartitions))
+ ("memoryUsedBytes" -> JInt(memoryUsedBytes))
}
}
@@ -131,6 +129,7 @@ class StreamingQueryProgress private[sql](
("runId" -> JString(runId.toString)) ~
("name" -> JString(name)) ~
("timestamp" -> JString(timestamp)) ~
+ ("batchId" -> JInt(batchId)) ~
("numInputRows" -> JInt(numInputRows)) ~
("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~
http://git-wip-us.apache.org/repos/asf/spark/blob/84f1b25f/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
index d3cafac..79bb827 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
@@ -43,6 +43,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
| "runId" : "${testProgress1.runId.toString}",
| "name" : "myName",
| "timestamp" : "2016-12-05T20:54:20.827Z",
+ | "batchId" : 2,
| "numInputRows" : 678,
| "inputRowsPerSecond" : 10.0,
| "durationMs" : {
@@ -57,8 +58,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
| "stateOperators" : [ {
| "numRowsTotal" : 0,
| "numRowsUpdated" : 1,
- | "memoryUsedBytes" : 2,
- | "numPartitions" : 4
+ | "memoryUsedBytes" : 2
| } ],
| "sources" : [ {
| "description" : "source",
@@ -83,6 +83,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
| "runId" : "${testProgress2.runId.toString}",
| "name" : null,
| "timestamp" : "2016-12-05T20:54:20.827Z",
+ | "batchId" : 2,
| "numInputRows" : 678,
| "durationMs" : {
| "total" : 0
@@ -90,8 +91,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
| "stateOperators" : [ {
| "numRowsTotal" : 0,
| "numRowsUpdated" : 1,
- | "memoryUsedBytes" : 2,
- | "numPartitions" : 4
+ | "memoryUsedBytes" : 2
| } ],
| "sources" : [ {
| "description" : "source",
@@ -230,7 +230,7 @@ object StreamingQueryStatusAndProgressSuite {
"avg" -> "2016-12-05T20:54:20.827Z",
"watermark" -> "2016-12-05T20:54:20.827Z").asJava),
stateOperators = Array(new StateOperatorProgress(
- numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2, numPartitions = 4)),
+ numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2)),
sources = Array(
new SourceProgress(
description = "source",
@@ -254,7 +254,7 @@ object StreamingQueryStatusAndProgressSuite {
// empty maps should be handled correctly
eventTime = new java.util.HashMap(Map.empty[String, String].asJava),
stateOperators = Array(new StateOperatorProgress(
- numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2, numPartitions = 4)),
+ numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2)),
sources = Array(
new SourceProgress(
description = "source",
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org