You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2016/10/17 23:56:45 UTC
[2/2] spark git commit: [SPARK-17731][SQL][STREAMING] Metrics for
structured streaming for branch-2.0
[SPARK-17731][SQL][STREAMING] Metrics for structured streaming for branch-2.0
**This PR adds the same metrics to branch-2.0 that was added to master in #15307.**
The differences compared to the #15307 are
- The new configuration is added only in the `SQLConf `object (i.e. `SQLConf.STREAMING_METRICS_ENABLED`) and not in the `SQLConf` class (i.e. no `SQLConf.isStreamingMetricsEnabled`). Spark master has all the streaming configurations exposed as actual fields in SQLConf class (e.g. [streamingPollingDelay](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L642)), but [not in Spark 2.0](https://github.com/apache/spark/blob/branch-2.0/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L608). So I didnt add it in this 2.0 PR.
- In the previous master PR, the aboveconfiguration was read in `StreamExecution` as `sparkSession.sessionState.conf.isStreamingMetricsEnabled`. In this 2.0 PR, I am instead reading it as `sparkSession.conf.get(STREAMING_METRICS_ENABLED)`(i.e. no `sessionState`) to keep it consistent with how other confs are read in `StreamExecution` (e.g. [STREAMING_POLLING_DELAY](https://github.com/tdas/spark/blob/ee8e899e4c274c363a8b4d13e8bf57b0b467a50e/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L62)).
- Different Mima exclusions
------
## What changes were proposed in this pull request?
Metrics are needed for monitoring structured streaming apps. Here is the design doc for implementing the necessary metrics.
https://docs.google.com/document/d/1NIdcGuR1B3WIe8t7VxLrt58TJB4DtipWEbj5I_mzJys/edit?usp=sharing
Specifically, this PR adds the following public APIs changes.
- `StreamingQuery.status` returns a `StreamingQueryStatus` object (renamed from `StreamingQueryInfo`, see later)
- `StreamingQueryStatus` has the following important fields
- inputRate - Current rate (rows/sec) at which data is being generated by all the sources
- processingRate - Current rate (rows/sec) at which the query is processing data from
all the sources
- ~~outputRate~~ - *Does not work with wholestage codegen*
- latency - Current average latency between the data being available in source and the sink writing the corresponding output
- sourceStatuses: Array[SourceStatus] - Current statuses of the sources
- sinkStatus: SinkStatus - Current status of the sink
- triggerStatus - Low-level detailed status of the last completed/currently active trigger
- latencies - getOffset, getBatch, full trigger, wal writes
- timestamps - trigger start, finish, after getOffset, after getBatch
- numRows - input, output, state total/updated rows for aggregations
- `SourceStatus` has the following important fields
- inputRate - Current rate (rows/sec) at which data is being generated by the source
- processingRate - Current rate (rows/sec) at which the query is processing data from the source
- triggerStatus - Low-level detailed status of the last completed/currently active trigger
- Python API for `StreamingQuery.status()`
### Breaking changes to existing APIs
**Existing direct public facing APIs**
- Deprecated direct public-facing APIs `StreamingQuery.sourceStatuses` and `StreamingQuery.sinkStatus` in favour of `StreamingQuery.status.sourceStatuses/sinkStatus`.
- Branch 2.0 should have it deprecated, master should have it removed.
**Existing advanced listener APIs**
- `StreamingQueryInfo` renamed to `StreamingQueryStatus` for consistency with `SourceStatus`, `SinkStatus`
- Earlier StreamingQueryInfo was used only in the advanced listener API, but now it is used in direct public-facing API (StreamingQuery.status)
- Field `queryInfo` in listener events `QueryStarted`, `QueryProgress`, `QueryTerminated` changed have name `queryStatus` and return type `StreamingQueryStatus`.
- Field `offsetDesc` in `SourceStatus` was Option[String], converted it to `String`.
- For `SourceStatus` and `SinkStatus` made constructor private instead of private[sql] to make them more java-safe. Instead added `private[sql] object SourceStatus/SinkStatus.apply()` which are harder to accidentally use in Java.
## How was this patch tested?
Old and new unit tests.
- Rate calculation and other internal logic of StreamMetrics tested by StreamMetricsSuite.
- New info in statuses returned through StreamingQueryListener is tested in StreamingQueryListenerSuite.
- New and old info returned through StreamingQuery.status is tested in StreamingQuerySuite.
- Source-specific tests for making sure input rows are counted are is source-specific test suites.
- Additional tests to test minor additions in LocalTableScanExec, StateStore, etc.
Metrics also manually tested using Ganglia sink
Author: Tathagata Das <ta...@gmail.com>
Closes #15472 from tdas/SPARK-17731-branch-2.0.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/881e0eb0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/881e0eb0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/881e0eb0
Branch: refs/heads/branch-2.0
Commit: 881e0eb05782ea74cf92a62954466b14ea9e05b6
Parents: a0d9015
Author: Tathagata Das <ta...@gmail.com>
Authored: Mon Oct 17 16:56:40 2016 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Mon Oct 17 16:56:40 2016 -0700
----------------------------------------------------------------------
.../spark/sql/kafka010/KafkaSourceSuite.scala | 27 ++
project/MimaExcludes.scala | 10 +
python/pyspark/sql/streaming.py | 301 ++++++++++++++++++
.../spark/sql/catalyst/trees/TreeNode.scala | 7 +
.../sql/execution/LocalTableScanExec.scala | 5 +-
.../execution/streaming/StatefulAggregate.scala | 31 +-
.../execution/streaming/StreamExecution.scala | 307 ++++++++++++++-----
.../sql/execution/streaming/StreamMetrics.scala | 242 +++++++++++++++
.../spark/sql/execution/streaming/memory.scala | 7 +
.../state/HDFSBackedStateStoreProvider.scala | 2 +
.../execution/streaming/state/StateStore.scala | 3 +
.../org/apache/spark/sql/internal/SQLConf.scala | 6 +
.../apache/spark/sql/streaming/SinkStatus.scala | 28 +-
.../spark/sql/streaming/SourceStatus.scala | 54 +++-
.../spark/sql/streaming/StreamingQuery.scala | 13 +-
.../sql/streaming/StreamingQueryInfo.scala | 37 ---
.../sql/streaming/StreamingQueryListener.scala | 8 +-
.../sql/streaming/StreamingQueryStatus.scala | 139 +++++++++
.../sql/execution/metric/SQLMetricsSuite.scala | 17 +
.../streaming/StreamMetricsSuite.scala | 213 +++++++++++++
.../streaming/TextSocketStreamSuite.scala | 24 ++
.../streaming/state/StateStoreSuite.scala | 5 +
.../sql/streaming/FileStreamSourceSuite.scala | 14 +
.../apache/spark/sql/streaming/StreamTest.scala | 72 +++++
.../streaming/StreamingAggregationSuite.scala | 54 ++++
.../streaming/StreamingQueryListenerSuite.scala | 220 +++++--------
.../sql/streaming/StreamingQuerySuite.scala | 180 ++++++++++-
27 files changed, 1753 insertions(+), 273 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index c640b93..8b5296e 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -264,6 +264,33 @@ class KafkaSourceSuite extends KafkaSourceTest {
testUnsupportedConfig("kafka.auto.offset.reset", "latest")
}
+ test("input row metrics") {
+ val topic = newTopic()
+ testUtils.createTopic(topic, partitions = 5)
+ testUtils.sendMessages(topic, Array("-1"))
+ require(testUtils.getLatestOffsets(Set(topic)).size === 5)
+
+ val kafka = spark
+ .readStream
+ .format("kafka")
+ .option("subscribe", topic)
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+
+ val mapped = kafka.map(kv => kv._2.toInt + 1)
+ testStream(mapped)(
+ makeSureGetOffsetCalled,
+ AddKafkaData(Set(topic), 1, 2, 3),
+ CheckAnswer(2, 3, 4),
+ AssertOnLastQueryStatus { status =>
+ assert(status.triggerDetails.get("numRows.input.total").toInt > 0)
+ assert(status.sourceStatuses(0).processingRate > 0.0)
+ }
+ )
+ }
+
private def newTopic(): String = s"topic-${topicId.getAndIncrement()}"
private def testFromLatestOffsets(topic: String, options: (String, String)*): Unit = {
http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 423cbd4..ddf53bb 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -787,6 +787,16 @@ object MimaExcludes {
) ++ Seq(
// SPARK-16240: ML persistence backward compatibility for LDA
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ml.clustering.LDA$")
+ ) ++ Seq(
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryInfo"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryStarted.this"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryStarted.queryInfo"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryProgress.this"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryProgress.queryInfo"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryTerminated.queryInfo"),
+ ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.SourceStatus.offsetDesc"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.SourceStatus.this"),
+ ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.status")
)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/python/pyspark/sql/streaming.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 118a02b..0df63a7 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -189,6 +189,304 @@ class StreamingQueryManager(object):
self._jsqm.resetTerminated()
+class StreamingQueryStatus(object):
+ """A class used to report information about the progress of a StreamingQuery.
+
+ .. note:: Experimental
+
+ .. versionadded:: 2.1
+ """
+
+ def __init__(self, jsqs):
+ self._jsqs = jsqs
+
+ def __str__(self):
+ """
+ Pretty string of this query status.
+
+ >>> print(sqs)
+ StreamingQueryStatus:
+ Query name: query
+ Query id: 1
+ Status timestamp: 123
+ Input rate: 15.5 rows/sec
+ Processing rate 23.5 rows/sec
+ Latency: 345.0 ms
+ Trigger details:
+ isDataPresentInTrigger: true
+ isTriggerActive: true
+ latency.getBatch.total: 20
+ latency.getOffset.total: 10
+ numRows.input.total: 100
+ triggerId: 5
+ Source statuses [1 source]:
+ Source 1: MySource1
+ Available offset: #0
+ Input rate: 15.5 rows/sec
+ Processing rate: 23.5 rows/sec
+ Trigger details:
+ numRows.input.source: 100
+ latency.getOffset.source: 10
+ latency.getBatch.source: 20
+ Sink status: MySink
+ Committed offsets: [#1, -]
+ """
+ return self._jsqs.toString()
+
+ @property
+ @ignore_unicode_prefix
+ @since(2.1)
+ def name(self):
+ """
+ Name of the query. This name is unique across all active queries.
+
+ >>> sqs.name
+ u'query'
+ """
+ return self._jsqs.name()
+
+ @property
+ @since(2.1)
+ def id(self):
+ """
+ Id of the query. This id is unique across all queries that have been started in
+ the current process.
+
+ >>> int(sqs.id)
+ 1
+ """
+ return self._jsqs.id()
+
+ @property
+ @since(2.1)
+ def timestamp(self):
+ """
+ Timestamp (ms) of when this query was generated.
+
+ >>> int(sqs.timestamp)
+ 123
+ """
+ return self._jsqs.timestamp()
+
+ @property
+ @since(2.1)
+ def inputRate(self):
+ """
+ Current total rate (rows/sec) at which data is being generated by all the sources.
+
+ >>> sqs.inputRate
+ 15.5
+ """
+ return self._jsqs.inputRate()
+
+ @property
+ @since(2.1)
+ def processingRate(self):
+ """
+ Current rate (rows/sec) at which the query is processing data from all the sources.
+
+ >>> sqs.processingRate
+ 23.5
+ """
+ return self._jsqs.processingRate()
+
+ @property
+ @since(2.1)
+ def latency(self):
+ """
+ Current average latency between the data being available in source and the sink
+ writing the corresponding output.
+
+ >>> sqs.latency
+ 345.0
+ """
+ if (self._jsqs.latency().nonEmpty()):
+ return self._jsqs.latency().get()
+ else:
+ return None
+
+ @property
+ @ignore_unicode_prefix
+ @since(2.1)
+ def sourceStatuses(self):
+ """
+ Current statuses of the sources as a list.
+
+ >>> len(sqs.sourceStatuses)
+ 1
+ >>> sqs.sourceStatuses[0].description
+ u'MySource1'
+ """
+ return [SourceStatus(ss) for ss in self._jsqs.sourceStatuses()]
+
+ @property
+ @ignore_unicode_prefix
+ @since(2.1)
+ def sinkStatus(self):
+ """
+ Current status of the sink.
+
+ >>> sqs.sinkStatus.description
+ u'MySink'
+ """
+ return SinkStatus(self._jsqs.sinkStatus())
+
+ @property
+ @ignore_unicode_prefix
+ @since(2.1)
+ def triggerDetails(self):
+ """
+ Low-level details of the currently active trigger (e.g. number of rows processed
+ in trigger, latency of intermediate steps, etc.).
+
+ If no trigger is currently active, then it will have details of the last completed trigger.
+
+ >>> sqs.triggerDetails
+ {u'triggerId': u'5', u'latency.getBatch.total': u'20', u'numRows.input.total': u'100',
+ u'isTriggerActive': u'true', u'latency.getOffset.total': u'10',
+ u'isDataPresentInTrigger': u'true'}
+ """
+ return self._jsqs.triggerDetails()
+
+
+class SourceStatus(object):
+ """
+ Status and metrics of a streaming Source.
+
+ .. note:: Experimental
+
+ .. versionadded:: 2.1
+ """
+
+ def __init__(self, jss):
+ self._jss = jss
+
+ def __str__(self):
+ """
+ Pretty string of this source status.
+
+ >>> print(sqs.sourceStatuses[0])
+ SourceStatus: MySource1
+ Available offset: #0
+ Input rate: 15.5 rows/sec
+ Processing rate: 23.5 rows/sec
+ Trigger details:
+ numRows.input.source: 100
+ latency.getOffset.source: 10
+ latency.getBatch.source: 20
+ """
+ return self._jss.toString()
+
+ @property
+ @ignore_unicode_prefix
+ @since(2.1)
+ def description(self):
+ """
+ Description of the source corresponding to this status.
+
+ >>> sqs.sourceStatuses[0].description
+ u'MySource1'
+ """
+ return self._jss.description()
+
+ @property
+ @ignore_unicode_prefix
+ @since(2.1)
+ def offsetDesc(self):
+ """
+ Description of the current offset if known.
+
+ >>> sqs.sourceStatuses[0].offsetDesc
+ u'#0'
+ """
+ return self._jss.offsetDesc()
+
+ @property
+ @since(2.1)
+ def inputRate(self):
+ """
+ Current rate (rows/sec) at which data is being generated by the source.
+
+ >>> sqs.sourceStatuses[0].inputRate
+ 15.5
+ """
+ return self._jss.inputRate()
+
+ @property
+ @since(2.1)
+ def processingRate(self):
+ """
+ Current rate (rows/sec) at which the query is processing data from the source.
+
+ >>> sqs.sourceStatuses[0].processingRate
+ 23.5
+ """
+ return self._jss.processingRate()
+
+ @property
+ @ignore_unicode_prefix
+ @since(2.1)
+ def triggerDetails(self):
+ """
+ Low-level details of the currently active trigger (e.g. number of rows processed
+ in trigger, latency of intermediate steps, etc.).
+
+ If no trigger is currently active, then it will have details of the last completed trigger.
+
+ >>> sqs.sourceStatuses[0].triggerDetails
+ {u'numRows.input.source': u'100', u'latency.getOffset.source': u'10',
+ u'latency.getBatch.source': u'20'}
+ """
+ return self._jss.triggerDetails()
+
+
+class SinkStatus(object):
+ """
+ Status and metrics of a streaming Sink.
+
+ .. note:: Experimental
+
+ .. versionadded:: 2.1
+ """
+
+ def __init__(self, jss):
+ self._jss = jss
+
+ def __str__(self):
+ """
+ Pretty string of this source status.
+
+ >>> print(sqs.sinkStatus)
+ SinkStatus: MySink
+ Committed offsets: [#1, -]
+ """
+ return self._jss.toString()
+
+ @property
+ @ignore_unicode_prefix
+ @since(2.1)
+ def description(self):
+ """
+ Description of the source corresponding to this status.
+
+ >>> sqs.sinkStatus.description
+ u'MySink'
+ """
+ return self._jss.description()
+
+ @property
+ @ignore_unicode_prefix
+ @since(2.1)
+ def offsetDesc(self):
+ """
+ Description of the current offsets up to which data has been written by the sink.
+
+ >>> sqs.sinkStatus.offsetDesc
+ u'[#1, -]'
+ """
+ return self._jss.offsetDesc()
+
+
class Trigger(object):
"""Used to indicate how often results should be produced by a :class:`StreamingQuery`.
@@ -751,11 +1049,14 @@ def _test():
globs['sdf_schema'] = StructType([StructField("data", StringType(), False)])
globs['df'] = \
globs['spark'].readStream.format('text').load('python/test_support/sql/streaming')
+ globs['sqs'] = StreamingQueryStatus(
+ spark.sparkContext._jvm.org.apache.spark.sql.streaming.StreamingQueryStatus.testStatus())
(failure_count, test_count) = doctest.testmod(
pyspark.sql.streaming, globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF)
globs['spark'].stop()
+
if failure_count:
exit(-1)
http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index eeccba7..931d14d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -159,6 +159,13 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
}
/**
+ * Returns a Seq containing the leaves in this tree.
+ */
+ def collectLeaves(): Seq[BaseType] = {
+ this.collect { case p if p.children.isEmpty => p }
+ }
+
+ /**
* Finds and returns the first [[TreeNode]] of the tree for which the given partial function
* is defined (pre-order), and applies the partial function to it.
*/
http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
index 9f53a99..c998e04 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
@@ -57,10 +57,13 @@ case class LocalTableScanExec(
}
override def executeCollect(): Array[InternalRow] = {
+ longMetric("numOutputRows").add(unsafeRows.size)
unsafeRows
}
override def executeTake(limit: Int): Array[InternalRow] = {
- unsafeRows.take(limit)
+ val taken = unsafeRows.take(limit)
+ longMetric("numOutputRows").add(taken.size)
+ taken
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
index 4d0283f..587ea7d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.execution
+import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.execution.streaming.state._
import org.apache.spark.sql.execution.SparkPlan
@@ -56,7 +57,12 @@ case class StateStoreRestoreExec(
child: SparkPlan)
extends execution.UnaryExecNode with StatefulOperator {
+ override lazy val metrics = Map(
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
+
override protected def doExecute(): RDD[InternalRow] = {
+ val numOutputRows = longMetric("numOutputRows")
+
child.execute().mapPartitionsWithStateStore(
getStateId.checkpointLocation,
operatorId = getStateId.operatorId,
@@ -69,6 +75,7 @@ case class StateStoreRestoreExec(
iter.flatMap { row =>
val key = getKey(row)
val savedState = store.get(key)
+ numOutputRows += 1
row +: savedState.toSeq
}
}
@@ -86,7 +93,13 @@ case class StateStoreSaveExec(
child: SparkPlan)
extends execution.UnaryExecNode with StatefulOperator {
+ override lazy val metrics = Map(
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
+ "numTotalStateRows" -> SQLMetrics.createMetric(sparkContext, "number of total state rows"),
+ "numUpdatedStateRows" -> SQLMetrics.createMetric(sparkContext, "number of updated state rows"))
+
override protected def doExecute(): RDD[InternalRow] = {
+ metrics // force lazy init at driver
assert(returnAllStates.nonEmpty,
"Incorrect planning in IncrementalExecution, returnAllStates have not been set")
val saveAndReturnFunc = if (returnAllStates.get) saveAndReturnAll _ else saveAndReturnUpdated _
@@ -111,6 +124,10 @@ case class StateStoreSaveExec(
private def saveAndReturnUpdated(
store: StateStore,
iter: Iterator[InternalRow]): Iterator[InternalRow] = {
+ val numOutputRows = longMetric("numOutputRows")
+ val numTotalStateRows = longMetric("numTotalStateRows")
+ val numUpdatedStateRows = longMetric("numUpdatedStateRows")
+
new Iterator[InternalRow] {
private[this] val baseIterator = iter
private[this] val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
@@ -118,6 +135,7 @@ case class StateStoreSaveExec(
override def hasNext: Boolean = {
if (!baseIterator.hasNext) {
store.commit()
+ numTotalStateRows += store.numKeys()
false
} else {
true
@@ -128,6 +146,8 @@ case class StateStoreSaveExec(
val row = baseIterator.next().asInstanceOf[UnsafeRow]
val key = getKey(row)
store.put(key.copy(), row.copy())
+ numOutputRows += 1
+ numUpdatedStateRows += 1
row
}
}
@@ -142,12 +162,21 @@ case class StateStoreSaveExec(
store: StateStore,
iter: Iterator[InternalRow]): Iterator[InternalRow] = {
val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
+ val numOutputRows = longMetric("numOutputRows")
+ val numTotalStateRows = longMetric("numTotalStateRows")
+ val numUpdatedStateRows = longMetric("numUpdatedStateRows")
+
while (iter.hasNext) {
val row = iter.next().asInstanceOf[UnsafeRow]
val key = getKey(row)
store.put(key.copy(), row.copy())
+ numUpdatedStateRows += 1
}
store.commit()
- store.iterator().map(_._2.asInstanceOf[InternalRow])
+ numTotalStateRows += store.numKeys()
+ store.iterator().map { case (k, v) =>
+ numOutputRows += 1
+ v.asInstanceOf[InternalRow]
+ }
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 8e0688d..6330e0a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.execution.{QueryExecution, SparkPlan}
import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming._
@@ -57,6 +57,7 @@ class StreamExecution(
extends StreamingQuery with Logging {
import org.apache.spark.sql.streaming.StreamingQueryListener._
+ import StreamMetrics._
private val pollingDelayMs = sparkSession.conf.get(SQLConf.STREAMING_POLLING_DELAY)
@@ -105,11 +106,22 @@ class StreamExecution(
var lastExecution: QueryExecution = null
@volatile
- var streamDeathCause: StreamingQueryException = null
+ private var streamDeathCause: StreamingQueryException = null
/* Get the call site in the caller thread; will pass this into the micro batch thread */
private val callSite = Utils.getCallSite()
+ /** Metrics for this query */
+ private val streamMetrics =
+ new StreamMetrics(uniqueSources.toSet, triggerClock, s"StructuredStreaming.$name")
+
+ @volatile
+ private var currentStatus: StreamingQueryStatus = null
+
+ /** Flag that signals whether any error with input metrics have already been logged */
+ @volatile
+ private var metricWarningLogged: Boolean = false
+
/**
* The thread that runs the micro-batches of this stream. Note that this thread must be
* [[org.apache.spark.util.UninterruptibleThread]] to avoid potential deadlocks in using
@@ -136,16 +148,14 @@ class StreamExecution(
/** Whether the query is currently active or not */
override def isActive: Boolean = state == ACTIVE
+ /** Returns the current status of the query. */
+ override def status: StreamingQueryStatus = currentStatus
+
/** Returns current status of all the sources. */
- override def sourceStatuses: Array[SourceStatus] = {
- val localAvailableOffsets = availableOffsets
- sources.map(s =>
- new SourceStatus(s.toString, localAvailableOffsets.get(s).map(_.toString))).toArray
- }
+ override def sourceStatuses: Array[SourceStatus] = currentStatus.sourceStatuses.toArray
/** Returns current status of the sink. */
- override def sinkStatus: SinkStatus =
- new SinkStatus(sink.toString, committedOffsets.toCompositeOffset(sources).toString)
+ override def sinkStatus: SinkStatus = currentStatus.sinkStatus
/** Returns the [[StreamingQueryException]] if the query was terminated by an exception. */
override def exception: Option[StreamingQueryException] = Option(streamDeathCause)
@@ -176,7 +186,11 @@ class StreamExecution(
// Mark ACTIVE and then post the event. QueryStarted event is synchronously sent to listeners,
// so must mark this as ACTIVE first.
state = ACTIVE
- postEvent(new QueryStarted(this.toInfo)) // Assumption: Does not throw exception.
+ if (sparkSession.conf.get(SQLConf.STREAMING_METRICS_ENABLED)) {
+ sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics)
+ }
+ updateStatus()
+ postEvent(new QueryStarted(currentStatus)) // Assumption: Does not throw exception.
// Unblock starting thread
startLatch.countDown()
@@ -185,25 +199,41 @@ class StreamExecution(
SparkSession.setActiveSession(sparkSession)
triggerExecutor.execute(() => {
- if (isActive) {
- if (currentBatchId < 0) {
- // We'll do this initialization only once
- populateStartOffsets()
- logDebug(s"Stream running from $committedOffsets to $availableOffsets")
+ streamMetrics.reportTriggerStarted(currentBatchId)
+ streamMetrics.reportTriggerDetail(STATUS_MESSAGE, "Finding new data from sources")
+ updateStatus()
+ val isTerminated = reportTimeTaken(TRIGGER_LATENCY) {
+ if (isActive) {
+ if (currentBatchId < 0) {
+ // We'll do this initialization only once
+ populateStartOffsets()
+ logDebug(s"Stream running from $committedOffsets to $availableOffsets")
+ } else {
+ constructNextBatch()
+ }
+ if (dataAvailable) {
+ streamMetrics.reportTriggerDetail(IS_DATA_PRESENT_IN_TRIGGER, true)
+ streamMetrics.reportTriggerDetail(STATUS_MESSAGE, "Processing new data")
+ updateStatus()
+ runBatch()
+ // We'll increase currentBatchId after we complete processing current batch's data
+ currentBatchId += 1
+ } else {
+ streamMetrics.reportTriggerDetail(IS_DATA_PRESENT_IN_TRIGGER, false)
+ streamMetrics.reportTriggerDetail(STATUS_MESSAGE, "No new data")
+ updateStatus()
+ Thread.sleep(pollingDelayMs)
+ }
+ true
} else {
- constructNextBatch()
+ false
}
- if (dataAvailable) {
- runBatch()
- // We'll increase currentBatchId after we complete processing current batch's data
- currentBatchId += 1
- } else {
- Thread.sleep(pollingDelayMs)
- }
- true
- } else {
- false
}
+ // Update metrics and notify others
+ streamMetrics.reportTriggerFinished()
+ updateStatus()
+ postEvent(new QueryProgress(currentStatus))
+ isTerminated
})
} catch {
case _: InterruptedException if state == TERMINATED => // interrupted by stop()
@@ -221,8 +251,16 @@ class StreamExecution(
}
} finally {
state = TERMINATED
+
+ // Update metrics and status
+ streamMetrics.stop()
+ sparkSession.sparkContext.env.metricsSystem.removeSource(streamMetrics)
+ updateStatus()
+
+ // Notify others
sparkSession.streams.notifyQueryTermination(StreamExecution.this)
- postEvent(new QueryTerminated(this.toInfo, exception.map(_.cause).map(Utils.exceptionString)))
+ postEvent(
+ new QueryTerminated(currentStatus, exception.map(_.cause).map(Utils.exceptionString)))
terminationLatch.countDown()
}
}
@@ -248,7 +286,6 @@ class StreamExecution(
committedOffsets = lastOffsets.toStreamProgress(sources)
logDebug(s"Resuming with committed offsets: $committedOffsets")
}
-
case None => // We are starting this stream for the first time.
logInfo(s"Starting new streaming query.")
currentBatchId = 0
@@ -278,8 +315,14 @@ class StreamExecution(
val hasNewData = {
awaitBatchLock.lock()
try {
- val newData = uniqueSources.flatMap(s => s.getOffset.map(o => s -> o))
- availableOffsets ++= newData
+ reportTimeTaken(GET_OFFSET_LATENCY) {
+ val latestOffsets: Map[Source, Option[Offset]] = uniqueSources.map { s =>
+ reportTimeTaken(s, SOURCE_GET_OFFSET_LATENCY) {
+ (s, s.getOffset)
+ }
+ }.toMap
+ availableOffsets ++= latestOffsets.filter { case (s, o) => o.nonEmpty }.mapValues(_.get)
+ }
if (dataAvailable) {
true
@@ -292,16 +335,19 @@ class StreamExecution(
}
}
if (hasNewData) {
- assert(offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)),
- s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
- logInfo(s"Committed offsets for batch $currentBatchId.")
-
- // Now that we have logged the new batch, no further processing will happen for
- // the previous batch, and it is safe to discard the old metadata.
- // Note that purge is exclusive, i.e. it purges everything before currentBatchId.
- // NOTE: If StreamExecution implements pipeline parallelism (multiple batches in
- // flight at the same time), this cleanup logic will need to change.
- offsetLog.purge(currentBatchId)
+ reportTimeTaken(OFFSET_WAL_WRITE_LATENCY) {
+ assert(
+ offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)),
+ s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
+ logInfo(s"Committed offsets for batch $currentBatchId.")
+
+ // Now that we have logged the new batch, no further processing will happen for
+ // the previous batch, and it is safe to discard the old metadata.
+ // Note that purge is exclusive, i.e. it purges everything before currentBatchId.
+ // NOTE: If StreamExecution implements pipeline parallelism (multiple batches in
+ // flight at the same time), this cleanup logic will need to change.
+ offsetLog.purge(currentBatchId)
+ }
} else {
awaitBatchLock.lock()
try {
@@ -311,26 +357,30 @@ class StreamExecution(
awaitBatchLock.unlock()
}
}
+ reportTimestamp(GET_OFFSET_TIMESTAMP)
}
/**
* Processes any data available between `availableOffsets` and `committedOffsets`.
*/
private def runBatch(): Unit = {
- val startTime = System.nanoTime()
-
// TODO: Move this to IncrementalExecution.
// Request unprocessed data from all sources.
- val newData = availableOffsets.flatMap {
- case (source, available)
+ val newData = reportTimeTaken(GET_BATCH_LATENCY) {
+ availableOffsets.flatMap {
+ case (source, available)
if committedOffsets.get(source).map(_ != available).getOrElse(true) =>
- val current = committedOffsets.get(source)
- val batch = source.getBatch(current, available)
- logDebug(s"Retrieving data from $source: $current -> $available")
- Some(source -> batch)
- case _ => None
- }.toMap
+ val current = committedOffsets.get(source)
+ val batch = reportTimeTaken(source, SOURCE_GET_BATCH_LATENCY) {
+ source.getBatch(current, available)
+ }
+ logDebug(s"Retrieving data from $source: $current -> $available")
+ Some(source -> batch)
+ case _ => None
+ }
+ }
+ reportTimestamp(GET_BATCH_TIMESTAMP)
// A list of attributes that will need to be updated.
var replacements = new ArrayBuffer[(Attribute, Attribute)]
@@ -351,25 +401,24 @@ class StreamExecution(
// Rewire the plan to use the new attributes that were returned by the source.
val replacementMap = AttributeMap(replacements)
- val newPlan = withNewSources transformAllExpressions {
+ val triggerLogicalPlan = withNewSources transformAllExpressions {
case a: Attribute if replacementMap.contains(a) => replacementMap(a)
}
- val optimizerStart = System.nanoTime()
- lastExecution = new IncrementalExecution(
- sparkSession,
- newPlan,
- outputMode,
- checkpointFile("state"),
- currentBatchId)
-
- lastExecution.executedPlan
- val optimizerTime = (System.nanoTime() - optimizerStart).toDouble / 1000000
- logDebug(s"Optimized batch in ${optimizerTime}ms")
+ val executedPlan = reportTimeTaken(OPTIMIZER_LATENCY) {
+ lastExecution = new IncrementalExecution(
+ sparkSession,
+ triggerLogicalPlan,
+ outputMode,
+ checkpointFile("state"),
+ currentBatchId)
+ lastExecution.executedPlan // Force the lazy generation of execution plan
+ }
val nextBatch =
new Dataset(sparkSession, lastExecution, RowEncoder(lastExecution.analyzed.schema))
sink.addBatch(currentBatchId, nextBatch)
+ reportNumRows(executedPlan, triggerLogicalPlan, newData)
awaitBatchLock.lock()
try {
@@ -379,11 +428,8 @@ class StreamExecution(
awaitBatchLock.unlock()
}
- val batchTime = (System.nanoTime() - startTime).toDouble / 1000000
- logInfo(s"Completed up to $availableOffsets in ${batchTime}ms")
// Update committed offsets.
committedOffsets ++= availableOffsets
- postEvent(new QueryProgress(this.toInfo))
}
private def postEvent(event: StreamingQueryListener.Event) {
@@ -516,12 +562,131 @@ class StreamExecution(
""".stripMargin
}
- private def toInfo: StreamingQueryInfo = {
- new StreamingQueryInfo(
- this.name,
- this.id,
- this.sourceStatuses,
- this.sinkStatus)
+ /**
+ * Report row metrics of the executed trigger
+ * @param triggerExecutionPlan Execution plan of the trigger
+ * @param triggerLogicalPlan Logical plan of the trigger, generated from the query logical plan
+ * @param sourceToDF Source to DataFrame returned by the source.getBatch in this trigger
+ */
+ private def reportNumRows(
+ triggerExecutionPlan: SparkPlan,
+ triggerLogicalPlan: LogicalPlan,
+ sourceToDF: Map[Source, DataFrame]): Unit = {
+ // We want to associate execution plan leaves to sources that generate them, so that we match
+ // the their metrics (e.g. numOutputRows) to the sources. To do this we do the following.
+ // Consider the translation from the streaming logical plan to the final executed plan.
+ //
+ // streaming logical plan (with sources) <==> trigger's logical plan <==> executed plan
+ //
+ // 1. We keep track of streaming sources associated with each leaf in the trigger's logical plan
+ // - Each logical plan leaf will be associated with a single streaming source.
+ // - There can be multiple logical plan leaves associated with a streaming source.
+ // - There can be leaves not associated with any streaming source, because they were
+ // generated from a batch source (e.g. stream-batch joins)
+ //
+ // 2. Assuming that the executed plan has same number of leaves in the same order as that of
+ // the trigger logical plan, we associate executed plan leaves with corresponding
+ // streaming sources.
+ //
+ // 3. For each source, we sum the metrics of the associated execution plan leaves.
+ //
+ val logicalPlanLeafToSource = sourceToDF.flatMap { case (source, df) =>
+ df.logicalPlan.collectLeaves().map { leaf => leaf -> source }
+ }
+ val allLogicalPlanLeaves = triggerLogicalPlan.collectLeaves() // includes non-streaming sources
+ val allExecPlanLeaves = triggerExecutionPlan.collectLeaves()
+ val sourceToNumInputRows: Map[Source, Long] =
+ if (allLogicalPlanLeaves.size == allExecPlanLeaves.size) {
+ val execLeafToSource = allLogicalPlanLeaves.zip(allExecPlanLeaves).flatMap {
+ case (lp, ep) => logicalPlanLeafToSource.get(lp).map { source => ep -> source }
+ }
+ val sourceToNumInputRows = execLeafToSource.map { case (execLeaf, source) =>
+ val numRows = execLeaf.metrics.get("numOutputRows").map(_.value).getOrElse(0L)
+ source -> numRows
+ }
+ sourceToNumInputRows.groupBy(_._1).mapValues(_.map(_._2).sum) // sum up rows for each source
+ } else {
+ if (!metricWarningLogged) {
+ def toString[T](seq: Seq[T]): String = s"(size = ${seq.size}), ${seq.mkString(", ")}"
+ logWarning(
+ "Could not report metrics as number leaves in trigger logical plan did not match that" +
+ s" of the execution plan:\n" +
+ s"logical plan leaves: ${toString(allLogicalPlanLeaves)}\n" +
+ s"execution plan leaves: ${toString(allExecPlanLeaves)}\n")
+ metricWarningLogged = true
+ }
+ Map.empty
+ }
+ val numOutputRows = triggerExecutionPlan.metrics.get("numOutputRows").map(_.value)
+ val stateNodes = triggerExecutionPlan.collect {
+ case p if p.isInstanceOf[StateStoreSaveExec] => p
+ }
+
+ streamMetrics.reportNumInputRows(sourceToNumInputRows)
+ stateNodes.zipWithIndex.foreach { case (s, i) =>
+ streamMetrics.reportTriggerDetail(
+ NUM_TOTAL_STATE_ROWS(i + 1),
+ s.metrics.get("numTotalStateRows").map(_.value).getOrElse(0L))
+ streamMetrics.reportTriggerDetail(
+ NUM_UPDATED_STATE_ROWS(i + 1),
+ s.metrics.get("numUpdatedStateRows").map(_.value).getOrElse(0L))
+ }
+ updateStatus()
+ }
+
+ private def reportTimeTaken[T](triggerDetailKey: String)(body: => T): T = {
+ val startTime = triggerClock.getTimeMillis()
+ val result = body
+ val endTime = triggerClock.getTimeMillis()
+ val timeTaken = math.max(endTime - startTime, 0)
+ streamMetrics.reportTriggerDetail(triggerDetailKey, timeTaken)
+ updateStatus()
+ if (triggerDetailKey == TRIGGER_LATENCY) {
+ logInfo(s"Completed up to $availableOffsets in $timeTaken ms")
+ }
+ result
+ }
+
+ private def reportTimeTaken[T](source: Source, triggerDetailKey: String)(body: => T): T = {
+ val startTime = triggerClock.getTimeMillis()
+ val result = body
+ val endTime = triggerClock.getTimeMillis()
+ streamMetrics.reportSourceTriggerDetail(
+ source, triggerDetailKey, math.max(endTime - startTime, 0))
+ updateStatus()
+ result
+ }
+
+ private def reportTimestamp(triggerDetailKey: String): Unit = {
+ streamMetrics.reportTriggerDetail(triggerDetailKey, triggerClock.getTimeMillis)
+ updateStatus()
+ }
+
+ private def updateStatus(): Unit = {
+ val localAvailableOffsets = availableOffsets
+ val sourceStatuses = sources.map { s =>
+ SourceStatus(
+ s.toString,
+ localAvailableOffsets.get(s).map(_.toString).getOrElse("-"), // TODO: use json if available
+ streamMetrics.currentSourceInputRate(s),
+ streamMetrics.currentSourceProcessingRate(s),
+ streamMetrics.currentSourceTriggerDetails(s))
+ }.toArray
+ val sinkStatus = SinkStatus(
+ sink.toString,
+ committedOffsets.toCompositeOffset(sources).toString)
+
+ currentStatus =
+ StreamingQueryStatus(
+ name = name,
+ id = id,
+ timestamp = triggerClock.getTimeMillis(),
+ inputRate = streamMetrics.currentInputRate(),
+ processingRate = streamMetrics.currentProcessingRate(),
+ latency = streamMetrics.currentLatency(),
+ sourceStatuses = sourceStatuses,
+ sinkStatus = sinkStatus,
+ triggerDetails = streamMetrics.currentTriggerDetails())
}
trait State
http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala
new file mode 100644
index 0000000..e98d188
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.{util => ju}
+
+import scala.collection.mutable
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.source.{Source => CodahaleSource}
+import org.apache.spark.util.Clock
+
+/**
+ * Class that manages all the metrics related to a StreamingQuery. It does the following.
+ * - Calculates metrics (rates, latencies, etc.) based on information reported by StreamExecution.
+ * - Allows the current metric values to be queried
+ * - Serves some of the metrics through Codahale/DropWizard metrics
+ *
+ * @param sources Unique set of sources in a query
+ * @param triggerClock Clock used for triggering in StreamExecution
+ * @param codahaleSourceName Root name for all the Codahale metrics
+ */
+class StreamMetrics(sources: Set[Source], triggerClock: Clock, codahaleSourceName: String)
+ extends CodahaleSource with Logging {
+
+ import StreamMetrics._
+
+ // Trigger infos
+ private val triggerDetails = new mutable.HashMap[String, String]
+ private val sourceTriggerDetails = new mutable.HashMap[Source, mutable.HashMap[String, String]]
+
+ // Rate estimators for sources and sinks
+ private val inputRates = new mutable.HashMap[Source, RateCalculator]
+ private val processingRates = new mutable.HashMap[Source, RateCalculator]
+
+ // Number of input rows in the current trigger
+ private val numInputRows = new mutable.HashMap[Source, Long]
+ private var currentTriggerStartTimestamp: Long = -1
+ private var previousTriggerStartTimestamp: Long = -1
+ private var latency: Option[Double] = None
+
+ override val sourceName: String = codahaleSourceName
+ override val metricRegistry: MetricRegistry = new MetricRegistry
+
+ // =========== Initialization ===========
+
+ // Metric names should not have . in them, so that all the metrics of a query are identified
+ // together in Ganglia as a single metric group
+ registerGauge("inputRate-total", currentInputRate)
+ registerGauge("processingRate-total", () => currentProcessingRate)
+ registerGauge("latency", () => currentLatency().getOrElse(-1.0))
+
+ sources.foreach { s =>
+ inputRates.put(s, new RateCalculator)
+ processingRates.put(s, new RateCalculator)
+ sourceTriggerDetails.put(s, new mutable.HashMap[String, String])
+
+ registerGauge(s"inputRate-${s.toString}", () => currentSourceInputRate(s))
+ registerGauge(s"processingRate-${s.toString}", () => currentSourceProcessingRate(s))
+ }
+
+ // =========== Setter methods ===========
+
+ def reportTriggerStarted(triggerId: Long): Unit = synchronized {
+ numInputRows.clear()
+ triggerDetails.clear()
+ sourceTriggerDetails.values.foreach(_.clear())
+
+ reportTriggerDetail(TRIGGER_ID, triggerId)
+ sources.foreach(s => reportSourceTriggerDetail(s, TRIGGER_ID, triggerId))
+ reportTriggerDetail(IS_TRIGGER_ACTIVE, true)
+ currentTriggerStartTimestamp = triggerClock.getTimeMillis()
+ reportTriggerDetail(START_TIMESTAMP, currentTriggerStartTimestamp)
+ }
+
+ def reportTriggerDetail[T](key: String, value: T): Unit = synchronized {
+ triggerDetails.put(key, value.toString)
+ }
+
+ def reportSourceTriggerDetail[T](source: Source, key: String, value: T): Unit = synchronized {
+ sourceTriggerDetails(source).put(key, value.toString)
+ }
+
+ def reportNumInputRows(inputRows: Map[Source, Long]): Unit = synchronized {
+ numInputRows ++= inputRows
+ }
+
+ def reportTriggerFinished(): Unit = synchronized {
+ require(currentTriggerStartTimestamp >= 0)
+ val currentTriggerFinishTimestamp = triggerClock.getTimeMillis()
+ reportTriggerDetail(FINISH_TIMESTAMP, currentTriggerFinishTimestamp)
+ triggerDetails.remove(STATUS_MESSAGE)
+ reportTriggerDetail(IS_TRIGGER_ACTIVE, false)
+
+ // Report number of rows
+ val totalNumInputRows = numInputRows.values.sum
+ reportTriggerDetail(NUM_INPUT_ROWS, totalNumInputRows)
+ numInputRows.foreach { case (s, r) =>
+ reportSourceTriggerDetail(s, NUM_SOURCE_INPUT_ROWS, r)
+ }
+
+ val currentTriggerDuration = currentTriggerFinishTimestamp - currentTriggerStartTimestamp
+ val previousInputIntervalOption = if (previousTriggerStartTimestamp >= 0) {
+ Some(currentTriggerStartTimestamp - previousTriggerStartTimestamp)
+ } else None
+
+ // Update input rate = num rows received by each source during the previous trigger interval
+ // Interval is measures as interval between start times of previous and current trigger.
+ //
+ // TODO: Instead of trigger start, we should use time when getOffset was called on each source
+ // as this may be different for each source if there are many sources in the query plan
+ // and getOffset is called serially on them.
+ if (previousInputIntervalOption.nonEmpty) {
+ sources.foreach { s =>
+ inputRates(s).update(numInputRows.getOrElse(s, 0), previousInputIntervalOption.get)
+ }
+ }
+
+ // Update processing rate = num rows processed for each source in current trigger duration
+ sources.foreach { s =>
+ processingRates(s).update(numInputRows.getOrElse(s, 0), currentTriggerDuration)
+ }
+
+ // Update latency = if data present, 0.5 * previous trigger interval + current trigger duration
+ if (previousInputIntervalOption.nonEmpty && totalNumInputRows > 0) {
+ latency = Some((previousInputIntervalOption.get.toDouble / 2) + currentTriggerDuration)
+ } else {
+ latency = None
+ }
+
+ previousTriggerStartTimestamp = currentTriggerStartTimestamp
+ currentTriggerStartTimestamp = -1
+ }
+
+ // =========== Getter methods ===========
+
+ def currentInputRate(): Double = synchronized {
+ // Since we are calculating source input rates using the same time interval for all sources
+ // it is fine to calculate total input rate as the sum of per source input rate.
+ inputRates.map(_._2.currentRate).sum
+ }
+
+ def currentSourceInputRate(source: Source): Double = synchronized {
+ inputRates(source).currentRate
+ }
+
+ def currentProcessingRate(): Double = synchronized {
+ // Since we are calculating source processing rates using the same time interval for all sources
+ // it is fine to calculate total processing rate as the sum of per source processing rate.
+ processingRates.map(_._2.currentRate).sum
+ }
+
+ def currentSourceProcessingRate(source: Source): Double = synchronized {
+ processingRates(source).currentRate
+ }
+
+ def currentLatency(): Option[Double] = synchronized { latency }
+
+ def currentTriggerDetails(): Map[String, String] = synchronized { triggerDetails.toMap }
+
+ def currentSourceTriggerDetails(source: Source): Map[String, String] = synchronized {
+ sourceTriggerDetails(source).toMap
+ }
+
+ // =========== Other methods ===========
+
+ private def registerGauge[T](name: String, f: () => T)(implicit num: Numeric[T]): Unit = {
+ synchronized {
+ metricRegistry.register(name, new Gauge[T] {
+ override def getValue: T = f()
+ })
+ }
+ }
+
+ def stop(): Unit = synchronized {
+ triggerDetails.clear()
+ inputRates.valuesIterator.foreach { _.stop() }
+ processingRates.valuesIterator.foreach { _.stop() }
+ latency = None
+ }
+}
+
+object StreamMetrics extends Logging {
+ /** Simple utility class to calculate rate while avoiding DivideByZero */
+ class RateCalculator {
+ @volatile private var rate: Option[Double] = None
+
+ def update(numRows: Long, timeGapMs: Long): Unit = {
+ if (timeGapMs > 0) {
+ rate = Some(numRows.toDouble * 1000 / timeGapMs)
+ } else {
+ rate = None
+ logDebug(s"Rate updates cannot with zero or negative time gap $timeGapMs")
+ }
+ }
+
+ def currentRate: Double = rate.getOrElse(0.0)
+
+ def stop(): Unit = { rate = None }
+ }
+
+
+ val TRIGGER_ID = "triggerId"
+ val IS_TRIGGER_ACTIVE = "isTriggerActive"
+ val IS_DATA_PRESENT_IN_TRIGGER = "isDataPresentInTrigger"
+ val STATUS_MESSAGE = "statusMessage"
+
+ val START_TIMESTAMP = "timestamp.triggerStart"
+ val GET_OFFSET_TIMESTAMP = "timestamp.afterGetOffset"
+ val GET_BATCH_TIMESTAMP = "timestamp.afterGetBatch"
+ val FINISH_TIMESTAMP = "timestamp.triggerFinish"
+
+ val GET_OFFSET_LATENCY = "latency.getOffset.total"
+ val GET_BATCH_LATENCY = "latency.getBatch.total"
+ val OFFSET_WAL_WRITE_LATENCY = "latency.offsetLogWrite"
+ val OPTIMIZER_LATENCY = "latency.optimizer"
+ val TRIGGER_LATENCY = "latency.fullTrigger"
+ val SOURCE_GET_OFFSET_LATENCY = "latency.getOffset.source"
+ val SOURCE_GET_BATCH_LATENCY = "latency.getBatch.source"
+
+ val NUM_INPUT_ROWS = "numRows.input.total"
+ val NUM_SOURCE_INPUT_ROWS = "numRows.input.source"
+ def NUM_TOTAL_STATE_ROWS(aggId: Int): String = s"numRows.state.aggregation$aggId.total"
+ def NUM_UPDATED_STATE_ROWS(aggId: Int): String = s"numRows.state.aggregation$aggId.updated"
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index e37f0c7..53eebae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -112,6 +112,11 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
}
override def stop() {}
+
+ def reset(): Unit = synchronized {
+ batches.clear()
+ currentOffset = new LongOffset(-1)
+ }
}
/**
@@ -165,6 +170,8 @@ class MemorySink(val schema: StructType, outputMode: OutputMode) extends Sink wi
logDebug(s"Skipping already committed batch: $batchId")
}
}
+
+ override def toString(): String = "MemorySink"
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 3335755..dce5349 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -197,6 +197,8 @@ private[state] class HDFSBackedStateStoreProvider(
allUpdates.values().asScala.toIterator
}
+ override def numKeys(): Long = mapToUpdate.size()
+
/**
* Whether all updates have been committed
*/
http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index a67fdce..7132e28 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -77,6 +77,9 @@ trait StateStore {
*/
def updates(): Iterator[StoreUpdate]
+ /** Number of keys in the state store */
+ def numKeys(): Long
+
/**
* Whether all updates have been committed
*/
http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 22f29c7..452eeed 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -582,6 +582,12 @@ object SQLConf {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(10L)
+ val STREAMING_METRICS_ENABLED =
+ SQLConfigBuilder("spark.sql.streaming.metricsEnabled")
+ .doc("Whether Dropwizard/Codahale metrics will be reported for active streaming queries.")
+ .booleanConf
+ .createWithDefault(false)
+
object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
}
http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
index de1efe9..c991166 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
@@ -18,17 +18,33 @@
package org.apache.spark.sql.streaming
import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.execution.streaming.Sink
+import org.apache.spark.sql.streaming.StreamingQueryStatus.indent
/**
* :: Experimental ::
- * Status and metrics of a streaming [[Sink]].
+ * Status and metrics of a streaming sink.
*
- * @param description Description of the source corresponding to this status
- * @param offsetDesc Description of the current offset up to which data has been written by the sink
+ * @param description Description of the source corresponding to this status.
+ * @param offsetDesc Description of the current offsets up to which data has been written
+ * by the sink.
* @since 2.0.0
*/
@Experimental
-class SinkStatus private[sql](
+class SinkStatus private(
val description: String,
- val offsetDesc: String)
+ val offsetDesc: String) {
+
+ override def toString: String =
+ "SinkStatus:" + indent(prettyString)
+
+ private[sql] def prettyString: String = {
+ s"""$description
+ |Committed offsets: $offsetDesc
+ |""".stripMargin
+ }
+}
+
+/** Companion object, primarily for creating SinkStatus instances internally */
+private[sql] object SinkStatus {
+ def apply(desc: String, offsetDesc: String): SinkStatus = new SinkStatus(desc, offsetDesc)
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala
index bd0c848..6ace483 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala
@@ -17,18 +17,60 @@
package org.apache.spark.sql.streaming
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.execution.streaming.Source
+import org.apache.spark.sql.streaming.StreamingQueryStatus.indent
/**
* :: Experimental ::
- * Status and metrics of a streaming [[Source]].
+ * Status and metrics of a streaming Source.
*
- * @param description Description of the source corresponding to this status
- * @param offsetDesc Description of the current [[Source]] offset if known
+ * @param description Description of the source corresponding to this status.
+ * @param offsetDesc Description of the current offset if known.
+ * @param inputRate Current rate (rows/sec) at which data is being generated by the source.
+ * @param processingRate Current rate (rows/sec) at which the query is processing data from
+ * the source.
+ * @param triggerDetails Low-level details of the currently active trigger (e.g. number of
+ * rows processed in trigger, latency of intermediate steps, etc.).
+ * If no trigger is active, then it will have details of the last completed
+ * trigger.
* @since 2.0.0
*/
@Experimental
-class SourceStatus private[sql] (
+class SourceStatus private(
val description: String,
- val offsetDesc: Option[String])
+ val offsetDesc: String,
+ val inputRate: Double,
+ val processingRate: Double,
+ val triggerDetails: ju.Map[String, String]) {
+
+ override def toString: String =
+ "SourceStatus:" + indent(prettyString)
+
+ private[sql] def prettyString: String = {
+ val triggerDetailsLines =
+ triggerDetails.asScala.map { case (k, v) => s"$k: $v" }
+ s"""$description
+ |Available offset: $offsetDesc
+ |Input rate: $inputRate rows/sec
+ |Processing rate: $processingRate rows/sec
+ |Trigger details:
+ |""".stripMargin + indent(triggerDetailsLines)
+
+ }
+}
+
+/** Companion object, primarily for creating SourceStatus instances internally */
+private[sql] object SourceStatus {
+ def apply(
+ desc: String,
+ offsetDesc: String,
+ inputRate: Double,
+ processingRate: Double,
+ triggerDetails: Map[String, String]): SourceStatus = {
+ new SourceStatus(desc, offsetDesc, inputRate, processingRate, triggerDetails.asJava)
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
index 91f0a1e..0a85414 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
@@ -63,12 +63,23 @@ trait StreamingQuery {
def exception: Option[StreamingQueryException]
/**
+ * Returns the current status of the query.
+ * @since 2.0.2
+ */
+ def status: StreamingQueryStatus
+
+ /**
* Returns current status of all the sources.
* @since 2.0.0
*/
+ @deprecated("use status.sourceStatuses", "2.0.2")
def sourceStatuses: Array[SourceStatus]
- /** Returns current status of the sink. */
+ /**
+ * Returns current status of the sink.
+ * @since 2.0.0
+ */
+ @deprecated("use status.sinkStatus", "2.0.2")
def sinkStatus: SinkStatus
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryInfo.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryInfo.scala
deleted file mode 100644
index 1af2668..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryInfo.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming
-
-import org.apache.spark.annotation.Experimental
-
-/**
- * :: Experimental ::
- * A class used to report information about the progress of a [[StreamingQuery]].
- *
- * @param name The [[StreamingQuery]] name. This name is unique across all active queries.
- * @param id The [[StreamingQuery]] id. This id is unique across
- * all queries that have been started in the current process.
- * @param sourceStatuses The current statuses of the [[StreamingQuery]]'s sources.
- * @param sinkStatus The current status of the [[StreamingQuery]]'s sink.
- */
-@Experimental
-class StreamingQueryInfo private[sql](
- val name: String,
- val id: Long,
- val sourceStatuses: Seq[SourceStatus],
- val sinkStatus: SinkStatus)
http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
index 8a8855d..69790e3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
@@ -84,7 +84,7 @@ object StreamingQueryListener {
* @since 2.0.0
*/
@Experimental
- class QueryStarted private[sql](val queryInfo: StreamingQueryInfo) extends Event
+ class QueryStarted private[sql](val queryStatus: StreamingQueryStatus) extends Event
/**
* :: Experimental ::
@@ -92,19 +92,19 @@ object StreamingQueryListener {
* @since 2.0.0
*/
@Experimental
- class QueryProgress private[sql](val queryInfo: StreamingQueryInfo) extends Event
+ class QueryProgress private[sql](val queryStatus: StreamingQueryStatus) extends Event
/**
* :: Experimental ::
* Event representing that termination of a query
*
- * @param queryInfo Information about the status of the query.
+ * @param queryStatus Information about the status of the query.
* @param exception The exception message of the [[StreamingQuery]] if the query was terminated
* with an exception. Otherwise, it will be `None`.
* @since 2.0.0
*/
@Experimental
class QueryTerminated private[sql](
- val queryInfo: StreamingQueryInfo,
+ val queryStatus: StreamingQueryStatus,
val exception: Option[String]) extends Event
}
http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
new file mode 100644
index 0000000..4768992
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.execution.streaming.{CompositeOffset, LongOffset}
+
+/**
+ * :: Experimental ::
+ * A class used to report information about the progress of a [[StreamingQuery]].
+ *
+ * @param name Name of the query. This name is unique across all active queries.
+ * @param id Id of the query. This id is unique across
+ * all queries that have been started in the current process.
+ * @param timestamp Timestamp (ms) of when this query was generated.
+ * @param inputRate Current rate (rows/sec) at which data is being generated by all the sources.
+ * @param processingRate Current rate (rows/sec) at which the query is processing data from
+ * all the sources.
+ * @param latency Current average latency between the data being available in source and the sink
+ * writing the corresponding output.
+ * @param sourceStatuses Current statuses of the sources.
+ * @param sinkStatus Current status of the sink.
+ * @param triggerDetails Low-level details of the currently active trigger (e.g. number of
+ * rows processed in trigger, latency of intermediate steps, etc.).
+ * If no trigger is active, then it will have details of the last completed
+ * trigger.
+ * @since 2.0.0
+ */
+@Experimental
+class StreamingQueryStatus private(
+ val name: String,
+ val id: Long,
+ val timestamp: Long,
+ val inputRate: Double,
+ val processingRate: Double,
+ val latency: Option[Double],
+ val sourceStatuses: Array[SourceStatus],
+ val sinkStatus: SinkStatus,
+ val triggerDetails: ju.Map[String, String]) {
+
+ import StreamingQueryStatus._
+
+ override def toString: String = {
+ val sourceStatusLines = sourceStatuses.zipWithIndex.map { case (s, i) =>
+ s"Source ${i + 1}:" + indent(s.prettyString)
+ }
+ val sinkStatusLines = sinkStatus.prettyString
+ val triggerDetailsLines = triggerDetails.asScala.map { case (k, v) => s"$k: $v" }.toSeq.sorted
+ val numSources = sourceStatuses.length
+ val numSourcesString = s"$numSources source" + { if (numSources > 1) "s" else "" }
+
+ val allLines = s"""
+ |Query name: $name
+ |Query id: $id
+ |Status timestamp: $timestamp
+ |Input rate: $inputRate rows/sec
+ |Processing rate $processingRate rows/sec
+ |Latency: ${latency.getOrElse("-")} ms
+ |Trigger details:
+ |${indent(triggerDetailsLines)}
+ |Source statuses [$numSourcesString]:
+ |${indent(sourceStatusLines)}
+ |Sink status: ${indent(sinkStatusLines)}""".stripMargin
+
+ s"StreamingQueryStatus:${indent(allLines)}"
+ }
+}
+
+/** Companion object, primarily for creating StreamingQueryInfo instances internally */
+private[sql] object StreamingQueryStatus {
+ def apply(
+ name: String,
+ id: Long,
+ timestamp: Long,
+ inputRate: Double,
+ processingRate: Double,
+ latency: Option[Double],
+ sourceStatuses: Array[SourceStatus],
+ sinkStatus: SinkStatus,
+ triggerDetails: Map[String, String]): StreamingQueryStatus = {
+ new StreamingQueryStatus(name, id, timestamp, inputRate, processingRate,
+ latency, sourceStatuses, sinkStatus, triggerDetails.asJava)
+ }
+
+ def indent(strings: Iterable[String]): String = strings.map(indent).mkString("\n")
+ def indent(string: String): String = string.split("\n").map(" " + _).mkString("\n")
+
+ /** Create an instance of status for python testing */
+ def testStatus(): StreamingQueryStatus = {
+ import org.apache.spark.sql.execution.streaming.StreamMetrics._
+ StreamingQueryStatus(
+ name = "query",
+ id = 1,
+ timestamp = 123,
+ inputRate = 15.5,
+ processingRate = 23.5,
+ latency = Some(345),
+ sourceStatuses = Array(
+ SourceStatus(
+ desc = "MySource1",
+ offsetDesc = LongOffset(0).toString,
+ inputRate = 15.5,
+ processingRate = 23.5,
+ triggerDetails = Map(
+ NUM_SOURCE_INPUT_ROWS -> "100",
+ SOURCE_GET_OFFSET_LATENCY -> "10",
+ SOURCE_GET_BATCH_LATENCY -> "20"))),
+ sinkStatus = SinkStatus(
+ desc = "MySink",
+ offsetDesc = CompositeOffset(Some(LongOffset(1)) :: None :: Nil).toString),
+ triggerDetails = Map(
+ TRIGGER_ID -> "5",
+ IS_TRIGGER_ACTIVE -> "true",
+ IS_DATA_PRESENT_IN_TRIGGER -> "true",
+ GET_OFFSET_LATENCY -> "10",
+ GET_BATCH_LATENCY -> "20",
+ NUM_INPUT_ROWS -> "100"
+ ))
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index bba40c6..229d881 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.metric
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.execution.SparkPlanInfo
import org.apache.spark.sql.execution.ui.SparkPlanGraph
import org.apache.spark.sql.functions._
@@ -85,6 +86,22 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
}
}
+ test("LocalTableScanExec computes metrics in collect and take") {
+ val df1 = spark.createDataset(Seq(1, 2, 3))
+ val logical = df1.queryExecution.logical
+ require(logical.isInstanceOf[LocalRelation])
+ df1.collect()
+ val metrics1 = df1.queryExecution.executedPlan.collectLeaves().head.metrics
+ assert(metrics1.contains("numOutputRows"))
+ assert(metrics1("numOutputRows").value === 3)
+
+ val df2 = spark.createDataset(Seq(1, 2, 3)).limit(2)
+ df2.collect()
+ val metrics2 = df2.queryExecution.executedPlan.collectLeaves().head.metrics
+ assert(metrics2.contains("numOutputRows"))
+ assert(metrics2("numOutputRows").value === 2)
+ }
+
test("Filter metrics") {
// Assume the execution plan is
// PhysicalRDD(nodeId = 1) -> Filter(nodeId = 0)
http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala
new file mode 100644
index 0000000..938423d
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.scalactic.TolerantNumerics
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.ManualClock
+
+class StreamMetricsSuite extends SparkFunSuite {
+ import StreamMetrics._
+
+ // To make === between double tolerate inexact values
+ implicit val doubleEquality = TolerantNumerics.tolerantDoubleEquality(0.01)
+
+ test("rates, latencies, trigger details - basic life cycle") {
+ val sm = newStreamMetrics(source)
+ assert(sm.currentInputRate() === 0.0)
+ assert(sm.currentProcessingRate() === 0.0)
+ assert(sm.currentSourceInputRate(source) === 0.0)
+ assert(sm.currentSourceProcessingRate(source) === 0.0)
+ assert(sm.currentLatency() === None)
+ assert(sm.currentTriggerDetails().isEmpty)
+
+ // When trigger started, the rates should not change, but should return
+ // reported trigger details
+ sm.reportTriggerStarted(1)
+ sm.reportTriggerDetail("key", "value")
+ sm.reportSourceTriggerDetail(source, "key2", "value2")
+ assert(sm.currentInputRate() === 0.0)
+ assert(sm.currentProcessingRate() === 0.0)
+ assert(sm.currentSourceInputRate(source) === 0.0)
+ assert(sm.currentSourceProcessingRate(source) === 0.0)
+ assert(sm.currentLatency() === None)
+ assert(sm.currentTriggerDetails() ===
+ Map(TRIGGER_ID -> "1", IS_TRIGGER_ACTIVE -> "true",
+ START_TIMESTAMP -> "0", "key" -> "value"))
+ assert(sm.currentSourceTriggerDetails(source) ===
+ Map(TRIGGER_ID -> "1", "key2" -> "value2"))
+
+ // Finishing the trigger should calculate the rates, except input rate which needs
+ // to have another trigger interval
+ sm.reportNumInputRows(Map(source -> 100L)) // 100 input rows, 10 output rows
+ clock.advance(1000)
+ sm.reportTriggerFinished()
+ assert(sm.currentInputRate() === 0.0)
+ assert(sm.currentProcessingRate() === 100.0) // 100 input rows processed in 1 sec
+ assert(sm.currentSourceInputRate(source) === 0.0)
+ assert(sm.currentSourceProcessingRate(source) === 100.0)
+ assert(sm.currentLatency() === None)
+ assert(sm.currentTriggerDetails() ===
+ Map(TRIGGER_ID -> "1", IS_TRIGGER_ACTIVE -> "false",
+ START_TIMESTAMP -> "0", FINISH_TIMESTAMP -> "1000",
+ NUM_INPUT_ROWS -> "100", "key" -> "value"))
+ assert(sm.currentSourceTriggerDetails(source) ===
+ Map(TRIGGER_ID -> "1", NUM_SOURCE_INPUT_ROWS -> "100", "key2" -> "value2"))
+
+ // After another trigger starts, the rates and latencies should not change until
+ // new rows are reported
+ clock.advance(1000)
+ sm.reportTriggerStarted(2)
+ assert(sm.currentInputRate() === 0.0)
+ assert(sm.currentProcessingRate() === 100.0)
+ assert(sm.currentSourceInputRate(source) === 0.0)
+ assert(sm.currentSourceProcessingRate(source) === 100.0)
+ assert(sm.currentLatency() === None)
+
+ // Reporting new rows should update the rates and latencies
+ sm.reportNumInputRows(Map(source -> 200L)) // 200 input rows
+ clock.advance(500)
+ sm.reportTriggerFinished()
+ assert(sm.currentInputRate() === 100.0) // 200 input rows generated in 2 seconds b/w starts
+ assert(sm.currentProcessingRate() === 400.0) // 200 output rows processed in 0.5 sec
+ assert(sm.currentSourceInputRate(source) === 100.0)
+ assert(sm.currentSourceProcessingRate(source) === 400.0)
+ assert(sm.currentLatency().get === 1500.0) // 2000 ms / 2 + 500 ms
+
+ // Rates should be set to 0 after stop
+ sm.stop()
+ assert(sm.currentInputRate() === 0.0)
+ assert(sm.currentProcessingRate() === 0.0)
+ assert(sm.currentSourceInputRate(source) === 0.0)
+ assert(sm.currentSourceProcessingRate(source) === 0.0)
+ assert(sm.currentLatency() === None)
+ assert(sm.currentTriggerDetails().isEmpty)
+ }
+
+ test("rates and latencies - after trigger with no data") {
+ val sm = newStreamMetrics(source)
+ // Trigger 1 with data
+ sm.reportTriggerStarted(1)
+ sm.reportNumInputRows(Map(source -> 100L)) // 100 input rows
+ clock.advance(1000)
+ sm.reportTriggerFinished()
+
+ // Trigger 2 with data
+ clock.advance(1000)
+ sm.reportTriggerStarted(2)
+ sm.reportNumInputRows(Map(source -> 200L)) // 200 input rows
+ clock.advance(500)
+ sm.reportTriggerFinished()
+
+ // Make sure that all rates are set
+ require(sm.currentInputRate() === 100.0) // 200 input rows generated in 2 seconds b/w starts
+ require(sm.currentProcessingRate() === 400.0) // 200 output rows processed in 0.5 sec
+ require(sm.currentSourceInputRate(source) === 100.0)
+ require(sm.currentSourceProcessingRate(source) === 400.0)
+ require(sm.currentLatency().get === 1500.0) // 2000 ms / 2 + 500 ms
+
+ // Trigger 3 with data
+ clock.advance(500)
+ sm.reportTriggerStarted(3)
+ clock.advance(500)
+ sm.reportTriggerFinished()
+
+ // Rates are set to zero and latency is set to None
+ assert(sm.currentInputRate() === 0.0)
+ assert(sm.currentProcessingRate() === 0.0)
+ assert(sm.currentSourceInputRate(source) === 0.0)
+ assert(sm.currentSourceProcessingRate(source) === 0.0)
+ assert(sm.currentLatency() === None)
+ sm.stop()
+ }
+
+ test("rates - after trigger with multiple sources, and one source having no info") {
+ val source1 = TestSource(1)
+ val source2 = TestSource(2)
+ val sm = newStreamMetrics(source1, source2)
+ // Trigger 1 with data
+ sm.reportTriggerStarted(1)
+ sm.reportNumInputRows(Map(source1 -> 100L, source2 -> 100L))
+ clock.advance(1000)
+ sm.reportTriggerFinished()
+
+ // Trigger 2 with data
+ clock.advance(1000)
+ sm.reportTriggerStarted(2)
+ sm.reportNumInputRows(Map(source1 -> 200L, source2 -> 200L))
+ clock.advance(500)
+ sm.reportTriggerFinished()
+
+ // Make sure that all rates are set
+ assert(sm.currentInputRate() === 200.0) // 200*2 input rows generated in 2 seconds b/w starts
+ assert(sm.currentProcessingRate() === 800.0) // 200*2 output rows processed in 0.5 sec
+ assert(sm.currentSourceInputRate(source1) === 100.0)
+ assert(sm.currentSourceInputRate(source2) === 100.0)
+ assert(sm.currentSourceProcessingRate(source1) === 400.0)
+ assert(sm.currentSourceProcessingRate(source2) === 400.0)
+
+ // Trigger 3 with data
+ clock.advance(500)
+ sm.reportTriggerStarted(3)
+ clock.advance(500)
+ sm.reportNumInputRows(Map(source1 -> 200L))
+ sm.reportTriggerFinished()
+
+ // Rates are set to zero and latency is set to None
+ assert(sm.currentInputRate() === 200.0)
+ assert(sm.currentProcessingRate() === 400.0)
+ assert(sm.currentSourceInputRate(source1) === 200.0)
+ assert(sm.currentSourceInputRate(source2) === 0.0)
+ assert(sm.currentSourceProcessingRate(source1) === 400.0)
+ assert(sm.currentSourceProcessingRate(source2) === 0.0)
+ sm.stop()
+ }
+
+ test("registered Codahale metrics") {
+ import scala.collection.JavaConverters._
+ val sm = newStreamMetrics(source)
+ val gaugeNames = sm.metricRegistry.getGauges().keySet().asScala
+
+ // so that all metrics are considered as a single metric group in Ganglia
+ assert(!gaugeNames.exists(_.contains(".")))
+ assert(gaugeNames === Set(
+ "inputRate-total",
+ "inputRate-source0",
+ "processingRate-total",
+ "processingRate-source0",
+ "latency"))
+ }
+
+ private def newStreamMetrics(sources: Source*): StreamMetrics = {
+ new StreamMetrics(sources.toSet, clock, "test")
+ }
+
+ private val clock = new ManualClock()
+ private val source = TestSource(0)
+
+ case class TestSource(id: Int) extends Source {
+ override def schema: StructType = StructType(Array.empty[StructField])
+ override def getOffset: Option[Offset] = Some(new LongOffset(0))
+ override def getBatch(start: Option[Offset], end: Offset): DataFrame = { null }
+ override def stop() {}
+ override def toString(): String = s"source$id"
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala
index 6b0ba7a..5174a04 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala
@@ -156,6 +156,30 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
}
}
+ test("input row metrics") {
+ serverThread = new ServerThread()
+ serverThread.start()
+
+ val provider = new TextSocketSourceProvider
+ val parameters = Map("host" -> "localhost", "port" -> serverThread.port.toString)
+ source = provider.createSource(sqlContext, "", None, "", parameters)
+
+ failAfter(streamingTimeout) {
+ serverThread.enqueue("hello")
+ while (source.getOffset.isEmpty) {
+ Thread.sleep(10)
+ }
+ val batch = source.getBatch(None, source.getOffset.get).as[String]
+ batch.collect()
+ val numRowsMetric =
+ batch.queryExecution.executedPlan.collectLeaves().head.metrics.get("numOutputRows")
+ assert(numRowsMetric.nonEmpty)
+ assert(numRowsMetric.get.value === 1)
+ source.stop()
+ source = null
+ }
+ }
+
private class ServerThread extends Thread with Logging {
private val serverSocket = new ServerSocket(0)
private val messageQueue = new LinkedBlockingQueue[String]()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org