You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2016/10/21 20:07:33 UTC

spark git commit: [SPARK-17926][SQL][STREAMING] Added json for statuses

Repository: spark
Updated Branches:
  refs/heads/master e371040a0 -> 7a531e305


[SPARK-17926][SQL][STREAMING] Added json for statuses

## What changes were proposed in this pull request?

StreamingQueryStatus exposed through StreamingQueryListener often needs to be recorded (similar to SparkListener events). This PR adds `.json` and `.prettyJson` to `StreamingQueryStatus`, `SourceStatus` and `SinkStatus`.

## How was this patch tested?
New unit tests

Author: Tathagata Das <ta...@gmail.com>

Closes #15476 from tdas/SPARK-17926.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7a531e30
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7a531e30
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7a531e30

Branch: refs/heads/master
Commit: 7a531e3054f8d4820216ed379433559f57f571b8
Parents: e371040
Author: Tathagata Das <ta...@gmail.com>
Authored: Fri Oct 21 13:07:29 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Fri Oct 21 13:07:29 2016 -0700

----------------------------------------------------------------------
 python/pyspark/sql/streaming.py                 |  11 +-
 .../apache/spark/sql/streaming/SinkStatus.scala |  18 +++-
 .../spark/sql/streaming/SourceStatus.scala      |  23 +++-
 .../sql/streaming/StreamingQueryStatus.scala    |  55 +++++++---
 .../streaming/StreamingQueryStatusSuite.scala   | 105 +++++++++++++++++++
 5 files changed, 187 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7a531e30/python/pyspark/sql/streaming.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index ce47bd1..35fc469 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -205,8 +205,7 @@ class StreamingQueryStatus(object):
         Pretty string of this query status.
 
         >>> print(sqs)
-        StreamingQueryStatus:
-            Query name: query
+        Status of query 'query'
             Query id: 1
             Status timestamp: 123
             Input rate: 15.5 rows/sec
@@ -220,7 +219,7 @@ class StreamingQueryStatus(object):
                 numRows.input.total: 100
                 triggerId: 5
             Source statuses [1 source]:
-                Source 1:    MySource1
+                Source 1 - MySource1
                     Available offset: #0
                     Input rate: 15.5 rows/sec
                     Processing rate: 23.5 rows/sec
@@ -228,7 +227,7 @@ class StreamingQueryStatus(object):
                         numRows.input.source: 100
                         latency.getOffset.source: 10
                         latency.getBatch.source: 20
-            Sink status:     MySink
+            Sink status - MySink
                 Committed offsets: [#1, -]
         """
         return self._jsqs.toString()
@@ -366,7 +365,7 @@ class SourceStatus(object):
         Pretty string of this source status.
 
         >>> print(sqs.sourceStatuses[0])
-        SourceStatus:    MySource1
+        Status of source MySource1
             Available offset: #0
             Input rate: 15.5 rows/sec
             Processing rate: 23.5 rows/sec
@@ -457,7 +456,7 @@ class SinkStatus(object):
         Pretty string of this source status.
 
         >>> print(sqs.sinkStatus)
-        SinkStatus:    MySink
+        Status of sink MySink
             Committed offsets: [#1, -]
         """
         return self._jss.toString()

http://git-wip-us.apache.org/repos/asf/spark/blob/7a531e30/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
index c991166..ab19602 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
@@ -17,6 +17,11 @@
 
 package org.apache.spark.sql.streaming
 
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.sql.streaming.StreamingQueryStatus.indent
 
@@ -34,8 +39,19 @@ class SinkStatus private(
     val description: String,
     val offsetDesc: String) {
 
+  /** The compact JSON representation of this status. */
+  def json: String = compact(render(jsonValue))
+
+  /** The pretty (i.e. indented) JSON representation of this status. */
+  def prettyJson: String = pretty(render(jsonValue))
+
   override def toString: String =
-    "SinkStatus:" + indent(prettyString)
+    "Status of sink " + indent(prettyString).trim
+
+  private[sql] def jsonValue: JValue = {
+    ("description" -> JString(description)) ~
+    ("offsetDesc" -> JString(offsetDesc))
+  }
 
   private[sql] def prettyString: String = {
     s"""$description

http://git-wip-us.apache.org/repos/asf/spark/blob/7a531e30/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala
index 6ace483..cfdf113 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala
@@ -21,8 +21,14 @@ import java.{util => ju}
 
 import scala.collection.JavaConverters._
 
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.sql.streaming.StreamingQueryStatus.indent
+import org.apache.spark.util.JsonProtocol
 
 /**
  * :: Experimental ::
@@ -47,8 +53,22 @@ class SourceStatus private(
     val processingRate: Double,
     val triggerDetails: ju.Map[String, String]) {
 
+  /** The compact JSON representation of this status. */
+  def json: String = compact(render(jsonValue))
+
+  /** The pretty (i.e. indented) JSON representation of this status. */
+  def prettyJson: String = pretty(render(jsonValue))
+
   override def toString: String =
-    "SourceStatus:" + indent(prettyString)
+    "Status of source " + indent(prettyString).trim
+
+  private[sql] def jsonValue: JValue = {
+    ("description" -> JString(description)) ~
+    ("offsetDesc" -> JString(offsetDesc)) ~
+    ("inputRate" -> JDouble(inputRate)) ~
+    ("processingRate" -> JDouble(processingRate)) ~
+    ("triggerDetails" -> JsonProtocol.mapToJson(triggerDetails.asScala))
+  }
 
   private[sql] def prettyString: String = {
     val triggerDetailsLines =
@@ -59,7 +79,6 @@ class SourceStatus private(
        |Processing rate: $processingRate rows/sec
        |Trigger details:
        |""".stripMargin + indent(triggerDetailsLines)
-
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7a531e30/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
index 4768992..a50b0d9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
@@ -21,8 +21,14 @@ import java.{util => ju}
 
 import scala.collection.JavaConverters._
 
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.sql.execution.streaming.{CompositeOffset, LongOffset}
+import org.apache.spark.util.JsonProtocol
 
 /**
  * :: Experimental ::
@@ -59,29 +65,46 @@ class StreamingQueryStatus private(
 
   import StreamingQueryStatus._
 
+  /** The compact JSON representation of this status. */
+  def json: String = compact(render(jsonValue))
+
+  /** The pretty (i.e. indented) JSON representation of this status. */
+  def prettyJson: String = pretty(render(jsonValue))
+
   override def toString: String = {
     val sourceStatusLines = sourceStatuses.zipWithIndex.map { case (s, i) =>
-      s"Source ${i + 1}:" + indent(s.prettyString)
+      s"Source ${i + 1} - " + indent(s.prettyString).trim
     }
-    val sinkStatusLines = sinkStatus.prettyString
+    val sinkStatusLines = sinkStatus.prettyString.trim
     val triggerDetailsLines = triggerDetails.asScala.map { case (k, v) => s"$k: $v" }.toSeq.sorted
     val numSources = sourceStatuses.length
     val numSourcesString = s"$numSources source" + { if (numSources > 1) "s" else "" }
 
-    val allLines = s"""
-        |Query name: $name
-        |Query id: $id
-        |Status timestamp: $timestamp
-        |Input rate: $inputRate rows/sec
-        |Processing rate $processingRate rows/sec
-        |Latency: ${latency.getOrElse("-")} ms
-        |Trigger details:
-        |${indent(triggerDetailsLines)}
-        |Source statuses [$numSourcesString]:
-        |${indent(sourceStatusLines)}
-        |Sink status: ${indent(sinkStatusLines)}""".stripMargin
-
-    s"StreamingQueryStatus:${indent(allLines)}"
+    val allLines =
+      s"""|Query id: $id
+          |Status timestamp: $timestamp
+          |Input rate: $inputRate rows/sec
+          |Processing rate $processingRate rows/sec
+          |Latency: ${latency.getOrElse("-")} ms
+          |Trigger details:
+          |${indent(triggerDetailsLines)}
+          |Source statuses [$numSourcesString]:
+          |${indent(sourceStatusLines)}
+          |Sink status - ${indent(sinkStatusLines).trim}""".stripMargin
+
+    s"Status of query '$name'\n${indent(allLines)}"
+  }
+
+  private[sql] def jsonValue: JValue = {
+    ("name" -> JString(name)) ~
+    ("id" -> JInt(id)) ~
+    ("timestamp" -> JInt(timestamp)) ~
+    ("inputRate" -> JDouble(inputRate)) ~
+    ("processingRate" -> JDouble(processingRate)) ~
+    ("latency" -> latency.map(JDouble).getOrElse(JNothing)) ~
+    ("triggerDetails" -> JsonProtocol.mapToJson(triggerDetails.asScala))
+    ("sourceStatuses" -> JArray(sourceStatuses.map(_.jsonValue).toList)) ~
+    ("sinkStatus" -> sinkStatus.jsonValue)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7a531e30/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala
new file mode 100644
index 0000000..1a98cf2
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.SparkFunSuite
+
+class StreamingQueryStatusSuite extends SparkFunSuite {
+  test("toString") {
+    assert(StreamingQueryStatus.testStatus.sourceStatuses(0).toString ===
+      """
+        |Status of source MySource1
+        |    Available offset: #0
+        |    Input rate: 15.5 rows/sec
+        |    Processing rate: 23.5 rows/sec
+        |    Trigger details:
+        |        numRows.input.source: 100
+        |        latency.getOffset.source: 10
+        |        latency.getBatch.source: 20
+      """.stripMargin.trim, "SourceStatus.toString does not match")
+
+    assert(StreamingQueryStatus.testStatus.sinkStatus.toString ===
+      """
+        |Status of sink MySink
+        |    Committed offsets: [#1, -]
+      """.stripMargin.trim, "SinkStatus.toString does not match")
+
+    assert(StreamingQueryStatus.testStatus.toString ===
+      """
+        |Status of query 'query'
+        |    Query id: 1
+        |    Status timestamp: 123
+        |    Input rate: 15.5 rows/sec
+        |    Processing rate 23.5 rows/sec
+        |    Latency: 345.0 ms
+        |    Trigger details:
+        |        isDataPresentInTrigger: true
+        |        isTriggerActive: true
+        |        latency.getBatch.total: 20
+        |        latency.getOffset.total: 10
+        |        numRows.input.total: 100
+        |        triggerId: 5
+        |    Source statuses [1 source]:
+        |        Source 1 - MySource1
+        |            Available offset: #0
+        |            Input rate: 15.5 rows/sec
+        |            Processing rate: 23.5 rows/sec
+        |            Trigger details:
+        |                numRows.input.source: 100
+        |                latency.getOffset.source: 10
+        |                latency.getBatch.source: 20
+        |    Sink status - MySink
+        |        Committed offsets: [#1, -]
+      """.stripMargin.trim, "StreamingQueryStatus.toString does not match")
+
+  }
+
+  test("json") {
+    assert(StreamingQueryStatus.testStatus.json ===
+      """
+        |{"sourceStatuses":[{"description":"MySource1","offsetDesc":"#0","inputRate":15.5,
+        |"processingRate":23.5,"triggerDetails":{"numRows.input.source":"100",
+        |"latency.getOffset.source":"10","latency.getBatch.source":"20"}}],
+        |"sinkStatus":{"description":"MySink","offsetDesc":"[#1, -]"}}
+      """.stripMargin.replace("\n", "").trim)
+  }
+
+  test("prettyJson") {
+    assert(
+      StreamingQueryStatus.testStatus.prettyJson ===
+        """
+          |{
+          |  "sourceStatuses" : [ {
+          |    "description" : "MySource1",
+          |    "offsetDesc" : "#0",
+          |    "inputRate" : 15.5,
+          |    "processingRate" : 23.5,
+          |    "triggerDetails" : {
+          |      "numRows.input.source" : "100",
+          |      "latency.getOffset.source" : "10",
+          |      "latency.getBatch.source" : "20"
+          |    }
+          |  } ],
+          |  "sinkStatus" : {
+          |    "description" : "MySink",
+          |    "offsetDesc" : "[#1, -]"
+          |  }
+          |}
+        """.stripMargin.trim)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org