You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2018/01/24 02:54:50 UTC
incubator-griffin git commit: [GRIFFIN-96] Add timeliness as a new
feature
Repository: incubator-griffin
Updated Branches:
refs/heads/master 71fcf93b9 -> a0b130ae0
[GRIFFIN-96] Add timeliness as a new feature
Timeliness supported, to measure the latency of streaming data, only supporting the data which containing start_time and end_time itself.
Author: Lionel Liu <bh...@163.com>
Closes #196 from bhlx3lyx7/tmst.
Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/a0b130ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/a0b130ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/a0b130ae
Branch: refs/heads/master
Commit: a0b130ae01199c1c95a8ed56a00dcd700adb6651
Parents: 71fcf93
Author: Lionel Liu <bh...@163.com>
Authored: Wed Jan 24 10:54:44 2018 +0800
Committer: Lionel Liu <bh...@163.com>
Committed: Wed Jan 24 10:54:44 2018 +0800
----------------------------------------------------------------------
.../measure/cache/info/ZKInfoCache.scala | 2 +
.../measure/rule/adaptor/GlobalKeys.scala | 5 +
.../rule/adaptor/GriffinDslAdaptor.scala | 110 +++++++++++++++++--
.../apache/griffin/measure/utils/TimeUtil.scala | 67 +++++++----
.../resources/_timeliness-batch-griffindsl.json | 5 +-
.../_timeliness-streaming-griffindsl.json | 11 +-
.../griffin/measure/utils/TimeUtilTest.scala | 38 +++++++
7 files changed, 208 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0b130ae/measure/src/main/scala/org/apache/griffin/measure/cache/info/ZKInfoCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/cache/info/ZKInfoCache.scala b/measure/src/main/scala/org/apache/griffin/measure/cache/info/ZKInfoCache.scala
index ee99099..3789a05 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/cache/info/ZKInfoCache.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/cache/info/ZKInfoCache.scala
@@ -117,6 +117,8 @@ case class ZKInfoCache(config: Map[String, Any], metricName: String) extends Inf
def clearInfo(): Unit = {
// delete("/")
+ deleteInfo(TimeInfoCache.finalCacheInfoPath :: Nil)
+ deleteInfo(TimeInfoCache.infoPath :: Nil)
println("clear info")
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0b130ae/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GlobalKeys.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GlobalKeys.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GlobalKeys.scala
index f592709..bd27b19 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GlobalKeys.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GlobalKeys.scala
@@ -58,7 +58,12 @@ object DistinctnessKeys {
object TimelinessKeys {
val _source = "source"
val _latency = "latency"
+ val _total = "total"
+ val _avg = "avg"
val _threshold = "threshold"
+ val _step = "step"
+ val _count = "count"
+ val _stepSize = "step.size"
}
object GlobalKeys {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0b130ae/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
index ad4a195..5655a13 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
@@ -666,21 +666,21 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
// 3. timeliness metric
val metricTableName = name
+ val totalColName = details.getStringOrKey(TimelinessKeys._total)
+ val avgColName = details.getStringOrKey(TimelinessKeys._avg)
val metricSql = procType match {
case BatchProcessType => {
s"""
- |SELECT CAST(AVG(`${latencyColName}`) AS BIGINT) AS `avg`,
- |MAX(`${latencyColName}`) AS `max`,
- |MIN(`${latencyColName}`) AS `min`
+ |SELECT COUNT(*) AS `${totalColName}`,
+ |CAST(AVG(`${latencyColName}`) AS BIGINT) AS `${avgColName}`
|FROM `${latencyTableName}`
""".stripMargin
}
case StreamingProcessType => {
s"""
|SELECT `${InternalColumns.tmst}`,
- |CAST(AVG(`${latencyColName}`) AS BIGINT) AS `avg`,
- |MAX(`${latencyColName}`) AS `max`,
- |MIN(`${latencyColName}`) AS `min`
+ |COUNT(*) AS `${totalColName}`,
+ |CAST(AVG(`${latencyColName}`) AS BIGINT) AS `${avgColName}`
|FROM `${latencyTableName}`
|GROUP BY `${InternalColumns.tmst}`
""".stripMargin
@@ -710,9 +710,105 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
case _ => emptyRulePlan
}
+ // 5. ranges
+// val rangePlan = details.get(TimelinessKeys._rangeSplit) match {
+// case Some(arr: Seq[String]) => {
+// val ranges = splitTimeRanges(arr)
+// if (ranges.size > 0) {
+// try {
+// // 5.1. range
+// val rangeTableName = "__range"
+// val rangeColName = details.getStringOrKey(TimelinessKeys._range)
+// val caseClause = {
+// val whenClause = ranges.map { range =>
+// s"WHEN `${latencyColName}` < ${range._1} THEN '<${range._2}'"
+// }.mkString("\n")
+// s"CASE ${whenClause} ELSE '>=${ranges.last._2}' END AS `${rangeColName}`"
+// }
+// val rangeSql = {
+// s"SELECT *, ${caseClause} FROM `${latencyTableName}`"
+// }
+// val rangeStep = SparkSqlStep(rangeTableName, rangeSql, emptyMap)
+//
+// // 5.2. range metric
+// val rangeMetricTableName = "__rangeMetric"
+// val countColName = details.getStringOrKey(TimelinessKeys._count)
+// val rangeMetricSql = procType match {
+// case BatchProcessType => {
+// s"""
+// |SELECT `${rangeColName}`, COUNT(*) AS `${countColName}`
+// |FROM `${rangeTableName}` GROUP BY `${rangeColName}`
+// """.stripMargin
+// }
+// case StreamingProcessType => {
+// s"""
+// |SELECT `${InternalColumns.tmst}`, `${rangeColName}`, COUNT(*) AS `${countColName}`
+// |FROM `${rangeTableName}` GROUP BY `${InternalColumns.tmst}`, `${rangeColName}`
+// """.stripMargin
+// }
+// }
+// val rangeMetricStep = SparkSqlStep(rangeMetricTableName, rangeMetricSql, emptyMap)
+// val rangeMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc)
+// val rangeMetricExports = genMetricExport(rangeMetricParam, rangeColName, rangeMetricTableName, ct, mode) :: Nil
+//
+// RulePlan(rangeStep :: rangeMetricStep :: Nil, rangeMetricExports)
+// } catch {
+// case _: Throwable => emptyRulePlan
+// }
+// } else emptyRulePlan
+// }
+// case _ => emptyRulePlan
+// }
+
// return timeliness plan
- timePlan.merge(recordPlan)
+
+ // 5. ranges
+ val rangePlan = TimeUtil.milliseconds(details.getString(TimelinessKeys._stepSize, "")) match {
+ case Some(stepSize) => {
+ // 5.1 range
+ val rangeTableName = "__range"
+ val stepColName = details.getStringOrKey(TimelinessKeys._step)
+ val rangeSql = {
+ s"""
+ |SELECT *, CAST((`${latencyColName}` / ${stepSize}) AS BIGINT) AS `${stepColName}`
+ |FROM `${latencyTableName}`
+ """.stripMargin
+ }
+ val rangeStep = SparkSqlStep(rangeTableName, rangeSql, emptyMap)
+
+ // 5.2 range metric
+ val rangeMetricTableName = "__rangeMetric"
+ val countColName = details.getStringOrKey(TimelinessKeys._count)
+ val rangeMetricSql = procType match {
+ case BatchProcessType => {
+ s"""
+ |SELECT `${stepColName}`, COUNT(*) AS `${countColName}`
+ |FROM `${rangeTableName}` GROUP BY `${stepColName}`
+ """.stripMargin
+ }
+ case StreamingProcessType => {
+ s"""
+ |SELECT `${InternalColumns.tmst}`, `${stepColName}`, COUNT(*) AS `${countColName}`
+ |FROM `${rangeTableName}` GROUP BY `${InternalColumns.tmst}`, `${stepColName}`
+ """.stripMargin
+ }
+ }
+ val rangeMetricStep = SparkSqlStep(rangeMetricTableName, rangeMetricSql, emptyMap)
+ val rangeMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc)
+ val rangeMetricExports = genMetricExport(rangeMetricParam, stepColName, rangeMetricTableName, ct, mode) :: Nil
+
+ RulePlan(rangeStep :: rangeMetricStep :: Nil, rangeMetricExports)
+ }
+ case _ => emptyRulePlan
+ }
+
+ timePlan.merge(recordPlan).merge(rangePlan)
}
}
+ private def splitTimeRanges(tstrs: Seq[String]): List[(Long, String)] = {
+ val ts = tstrs.flatMap(TimeUtil.milliseconds(_)).sorted.toList
+ ts.map { t => (t, TimeUtil.time2String(t)) }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0b130ae/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala
index 42a140f..9b4d58e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala
@@ -20,11 +20,30 @@ package org.apache.griffin.measure.utils
import org.apache.griffin.measure.log.Loggable
+import scala.util.matching.Regex
import scala.util.{Failure, Success, Try}
object TimeUtil extends Loggable {
- final val TimeRegex = """^([+\-]?\d+)(ms|s|m|h|d)$""".r
+ private object Units {
+ case class TimeUnit(name: String, shortName: String, ut: Long, regex: Regex) {
+ def toMs(t: Long) = t * ut
+ def fromMs(ms: Long) = ms / ut
+ def fitUnit(ms: Long) = (ms % ut == 0)
+ }
+
+ val dayUnit = TimeUnit("day", "d", 24 * 60 * 60 * 1000, """^(?i)d(?:ay)?$""".r)
+ val hourUnit = TimeUnit("hour", "h", 60 * 60 * 1000, """^(?i)h(?:our|r)?$""".r)
+ val minUnit = TimeUnit("minute", "m", 60 * 1000, """^(?i)m(?:in(?:ute)?)?$""".r)
+ val secUnit = TimeUnit("second", "s", 1000, """^(?i)s(?:ec(?:ond)?)?$""".r)
+ val msUnit = TimeUnit("millisecond", "ms", 1, """^(?i)m(?:illi)?s(?:ec(?:ond)?)?$""".r)
+
+ val timeUnits = dayUnit :: hourUnit :: minUnit :: secUnit :: msUnit :: Nil
+ }
+ import Units._
+
+// final val TimeRegex = """^([+\-]?\d+)(ms|s|m|h|d)$""".r
+ final val TimeRegex = """^([+\-]?\d+)([a-zA-Z]+)$""".r
final val PureTimeRegex = """^([+\-]?\d+)$""".r
def milliseconds(timeString: String): Option[Long] = {
@@ -34,17 +53,17 @@ object TimeUtil extends Loggable {
case TimeRegex(time, unit) => {
val t = time.toLong
unit match {
- case "d" => t * 24 * 60 * 60 * 1000
- case "h" => t * 60 * 60 * 1000
- case "m" => t * 60 * 1000
- case "s" => t * 1000
- case "ms" => t
+ case dayUnit.regex() => dayUnit.toMs(t)
+ case hourUnit.regex() => hourUnit.toMs(t)
+ case minUnit.regex() => minUnit.toMs(t)
+ case secUnit.regex() => secUnit.toMs(t)
+ case msUnit.regex() => msUnit.toMs(t)
case _ => throw new Exception(s"${timeString} is invalid time format")
}
}
case PureTimeRegex(time) => {
val t = time.toLong
- t
+ msUnit.toMs(t)
}
case _ => throw new Exception(s"${timeString} is invalid time format")
}
@@ -58,24 +77,34 @@ object TimeUtil extends Loggable {
def timeToUnit(ms: Long, unit: String): Long = {
unit match {
- case "ms" => ms
- case "sec" => ms / 1000
- case "min" => ms / (60 * 1000)
- case "hour" => ms / (60 * 60 * 1000)
- case "day" => ms / (24 * 60 * 60 * 1000)
- case _ => ms / (60 * 1000)
+ case dayUnit.regex() => dayUnit.fromMs(ms)
+ case hourUnit.regex() => hourUnit.fromMs(ms)
+ case minUnit.regex() => minUnit.fromMs(ms)
+ case secUnit.regex() => secUnit.fromMs(ms)
+ case msUnit.regex() => msUnit.fromMs(ms)
+ case _ => ms
}
}
def timeFromUnit(t: Long, unit: String): Long = {
unit match {
- case "ms" => t
- case "sec" => t * 1000
- case "min" => t * 60 * 1000
- case "hour" => t * 60 * 60 * 1000
- case "day" => t * 24 * 60 * 60 * 1000
- case _ => t * 60 * 1000
+ case dayUnit.regex() => dayUnit.toMs(t)
+ case hourUnit.regex() => hourUnit.toMs(t)
+ case minUnit.regex() => minUnit.toMs(t)
+ case secUnit.regex() => secUnit.toMs(t)
+ case msUnit.regex() => msUnit.toMs(t)
+ case _ => t
+ }
+ }
+
+ def time2String(t: Long): String = {
+ val matchedUnitOpt = timeUnits.foldLeft(None: Option[TimeUnit]) { (retOpt, unit) =>
+ if (retOpt.isEmpty && unit.fitUnit(t)) Some(unit) else retOpt
}
+ val unit = matchedUnitOpt.getOrElse(msUnit)
+ val unitTime = unit.fromMs(t)
+ val unitStr = unit.shortName
+ s"${unitTime}${unitStr}"
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0b130ae/measure/src/test/resources/_timeliness-batch-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_timeliness-batch-griffindsl.json b/measure/src/test/resources/_timeliness-batch-griffindsl.json
index 2af98f1..bd48401 100644
--- a/measure/src/test/resources/_timeliness-batch-griffindsl.json
+++ b/measure/src/test/resources/_timeliness-batch-griffindsl.json
@@ -28,7 +28,10 @@
"details": {
"source": "source",
"latency": "latency",
- "threshold": "3m"
+ "threshold": "3m",
+ "step": "step",
+ "count": "cnt",
+ "step.size": "2m"
},
"metric": {
"name": "timeliness"
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0b130ae/measure/src/test/resources/_timeliness-streaming-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_timeliness-streaming-griffindsl.json b/measure/src/test/resources/_timeliness-streaming-griffindsl.json
index 776c3b5..fbaf8d4 100644
--- a/measure/src/test/resources/_timeliness-streaming-griffindsl.json
+++ b/measure/src/test/resources/_timeliness-streaming-griffindsl.json
@@ -33,7 +33,7 @@
{
"dsl.type": "spark-sql",
"name": "${this}",
- "rule": "select ts, name, age from ${s1}"
+ "rule": "select ts, end_ts, name, age from ${s1}"
}
]
}
@@ -54,11 +54,16 @@
"dsl.type": "griffin-dsl",
"dq.type": "timeliness",
"name": "timeliness",
- "rule": "ts",
+ "rule": "ts, end_ts",
"details": {
"source": "source",
"latency": "latency",
- "threshold": "1h"
+ "total": "total",
+ "avg": "avg",
+ "threshold": "1h",
+ "step": "step",
+ "count": "cnt",
+ "step.size": "5m"
},
"metric": {
"name": "timeliness"
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0b130ae/measure/src/test/scala/org/apache/griffin/measure/utils/TimeUtilTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/utils/TimeUtilTest.scala b/measure/src/test/scala/org/apache/griffin/measure/utils/TimeUtilTest.scala
new file mode 100644
index 0000000..673eca0
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/utils/TimeUtilTest.scala
@@ -0,0 +1,38 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.utils
+
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+
+@RunWith(classOf[JUnitRunner])
+class TimeUtilTest extends FunSuite with Matchers with BeforeAndAfter {
+
+ test ("milliseconds") {
+ val ts = "1h"
+ val res = TimeUtil.milliseconds(ts)
+ println(res)
+
+ val t = 1200000
+ val s = TimeUtil.time2String(t)
+ println(s)
+ }
+
+}