You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2018/01/24 02:54:50 UTC

incubator-griffin git commit: [GRIFFIN-96] Add timeliness as a new feature

Repository: incubator-griffin
Updated Branches:
  refs/heads/master 71fcf93b9 -> a0b130ae0


[GRIFFIN-96] Add timeliness as a new feature

Timeliness supported, to measure the latency of streaming data, only supporting the data which containing start_time and end_time itself.

Author: Lionel Liu <bh...@163.com>

Closes #196 from bhlx3lyx7/tmst.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/a0b130ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/a0b130ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/a0b130ae

Branch: refs/heads/master
Commit: a0b130ae01199c1c95a8ed56a00dcd700adb6651
Parents: 71fcf93
Author: Lionel Liu <bh...@163.com>
Authored: Wed Jan 24 10:54:44 2018 +0800
Committer: Lionel Liu <bh...@163.com>
Committed: Wed Jan 24 10:54:44 2018 +0800

----------------------------------------------------------------------
 .../measure/cache/info/ZKInfoCache.scala        |   2 +
 .../measure/rule/adaptor/GlobalKeys.scala       |   5 +
 .../rule/adaptor/GriffinDslAdaptor.scala        | 110 +++++++++++++++++--
 .../apache/griffin/measure/utils/TimeUtil.scala |  67 +++++++----
 .../resources/_timeliness-batch-griffindsl.json |   5 +-
 .../_timeliness-streaming-griffindsl.json       |  11 +-
 .../griffin/measure/utils/TimeUtilTest.scala    |  38 +++++++
 7 files changed, 208 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0b130ae/measure/src/main/scala/org/apache/griffin/measure/cache/info/ZKInfoCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/cache/info/ZKInfoCache.scala b/measure/src/main/scala/org/apache/griffin/measure/cache/info/ZKInfoCache.scala
index ee99099..3789a05 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/cache/info/ZKInfoCache.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/cache/info/ZKInfoCache.scala
@@ -117,6 +117,8 @@ case class ZKInfoCache(config: Map[String, Any], metricName: String) extends Inf
 
   def clearInfo(): Unit = {
 //    delete("/")
+    deleteInfo(TimeInfoCache.finalCacheInfoPath :: Nil)
+    deleteInfo(TimeInfoCache.infoPath :: Nil)
     println("clear info")
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0b130ae/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GlobalKeys.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GlobalKeys.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GlobalKeys.scala
index f592709..bd27b19 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GlobalKeys.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GlobalKeys.scala
@@ -58,7 +58,12 @@ object DistinctnessKeys {
 object TimelinessKeys {
   val _source = "source"
   val _latency = "latency"
+  val _total = "total"
+  val _avg = "avg"
   val _threshold = "threshold"
+  val _step = "step"
+  val _count = "count"
+  val _stepSize = "step.size"
 }
 
 object GlobalKeys {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0b130ae/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
index ad4a195..5655a13 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
@@ -666,21 +666,21 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
 
       // 3. timeliness metric
       val metricTableName = name
+      val totalColName = details.getStringOrKey(TimelinessKeys._total)
+      val avgColName = details.getStringOrKey(TimelinessKeys._avg)
       val metricSql = procType match {
         case BatchProcessType => {
           s"""
-             |SELECT CAST(AVG(`${latencyColName}`) AS BIGINT) AS `avg`,
-             |MAX(`${latencyColName}`) AS `max`,
-             |MIN(`${latencyColName}`) AS `min`
+             |SELECT COUNT(*) AS `${totalColName}`,
+             |CAST(AVG(`${latencyColName}`) AS BIGINT) AS `${avgColName}`
              |FROM `${latencyTableName}`
            """.stripMargin
         }
         case StreamingProcessType => {
           s"""
              |SELECT `${InternalColumns.tmst}`,
-             |CAST(AVG(`${latencyColName}`) AS BIGINT) AS `avg`,
-             |MAX(`${latencyColName}`) AS `max`,
-             |MIN(`${latencyColName}`) AS `min`
+             |COUNT(*) AS `${totalColName}`,
+             |CAST(AVG(`${latencyColName}`) AS BIGINT) AS `${avgColName}`
              |FROM `${latencyTableName}`
              |GROUP BY `${InternalColumns.tmst}`
            """.stripMargin
@@ -710,9 +710,105 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
         case _ => emptyRulePlan
       }
 
+      // 5. ranges
+//      val rangePlan = details.get(TimelinessKeys._rangeSplit) match {
+//        case Some(arr: Seq[String]) => {
+//          val ranges = splitTimeRanges(arr)
+//          if (ranges.size > 0) {
+//            try {
+//              // 5.1. range
+//              val rangeTableName = "__range"
+//              val rangeColName = details.getStringOrKey(TimelinessKeys._range)
+//              val caseClause = {
+//                val whenClause = ranges.map { range =>
+//                  s"WHEN `${latencyColName}` < ${range._1} THEN '<${range._2}'"
+//                }.mkString("\n")
+//                s"CASE ${whenClause} ELSE '>=${ranges.last._2}' END AS `${rangeColName}`"
+//              }
+//              val rangeSql = {
+//                s"SELECT *, ${caseClause} FROM `${latencyTableName}`"
+//              }
+//              val rangeStep = SparkSqlStep(rangeTableName, rangeSql, emptyMap)
+//
+//              // 5.2. range metric
+//              val rangeMetricTableName = "__rangeMetric"
+//              val countColName = details.getStringOrKey(TimelinessKeys._count)
+//              val rangeMetricSql = procType match {
+//                case BatchProcessType => {
+//                  s"""
+//                     |SELECT `${rangeColName}`, COUNT(*) AS `${countColName}`
+//                     |FROM `${rangeTableName}` GROUP BY `${rangeColName}`
+//                  """.stripMargin
+//                }
+//                case StreamingProcessType => {
+//                  s"""
+//                     |SELECT `${InternalColumns.tmst}`, `${rangeColName}`, COUNT(*) AS `${countColName}`
+//                     |FROM `${rangeTableName}` GROUP BY `${InternalColumns.tmst}`, `${rangeColName}`
+//                  """.stripMargin
+//                }
+//              }
+//              val rangeMetricStep = SparkSqlStep(rangeMetricTableName, rangeMetricSql, emptyMap)
+//              val rangeMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc)
+//              val rangeMetricExports = genMetricExport(rangeMetricParam, rangeColName, rangeMetricTableName, ct, mode) :: Nil
+//
+//              RulePlan(rangeStep :: rangeMetricStep :: Nil, rangeMetricExports)
+//            } catch {
+//              case _: Throwable => emptyRulePlan
+//            }
+//          } else emptyRulePlan
+//        }
+//        case _ => emptyRulePlan
+//      }
+
       // return timeliness plan
-      timePlan.merge(recordPlan)
+
+      // 5. ranges
+      val rangePlan = TimeUtil.milliseconds(details.getString(TimelinessKeys._stepSize, "")) match {
+        case Some(stepSize) => {
+          // 5.1 range
+          val rangeTableName = "__range"
+          val stepColName = details.getStringOrKey(TimelinessKeys._step)
+          val rangeSql = {
+            s"""
+               |SELECT *, CAST((`${latencyColName}` / ${stepSize}) AS BIGINT) AS `${stepColName}`
+               |FROM `${latencyTableName}`
+             """.stripMargin
+          }
+          val rangeStep = SparkSqlStep(rangeTableName, rangeSql, emptyMap)
+
+          // 5.2 range metric
+          val rangeMetricTableName = "__rangeMetric"
+          val countColName = details.getStringOrKey(TimelinessKeys._count)
+          val rangeMetricSql = procType match {
+            case BatchProcessType => {
+              s"""
+                 |SELECT `${stepColName}`, COUNT(*) AS `${countColName}`
+                 |FROM `${rangeTableName}` GROUP BY `${stepColName}`
+                """.stripMargin
+            }
+            case StreamingProcessType => {
+              s"""
+                 |SELECT `${InternalColumns.tmst}`, `${stepColName}`, COUNT(*) AS `${countColName}`
+                 |FROM `${rangeTableName}` GROUP BY `${InternalColumns.tmst}`, `${stepColName}`
+                """.stripMargin
+            }
+          }
+          val rangeMetricStep = SparkSqlStep(rangeMetricTableName, rangeMetricSql, emptyMap)
+          val rangeMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc)
+          val rangeMetricExports = genMetricExport(rangeMetricParam, stepColName, rangeMetricTableName, ct, mode) :: Nil
+
+          RulePlan(rangeStep :: rangeMetricStep :: Nil, rangeMetricExports)
+        }
+        case _ => emptyRulePlan
+      }
+
+      timePlan.merge(recordPlan).merge(rangePlan)
     }
   }
 
+  private def splitTimeRanges(tstrs: Seq[String]): List[(Long, String)] = {
+    val ts = tstrs.flatMap(TimeUtil.milliseconds(_)).sorted.toList
+    ts.map { t => (t, TimeUtil.time2String(t)) }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0b130ae/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala
index 42a140f..9b4d58e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala
@@ -20,11 +20,30 @@ package org.apache.griffin.measure.utils
 
 import org.apache.griffin.measure.log.Loggable
 
+import scala.util.matching.Regex
 import scala.util.{Failure, Success, Try}
 
 object TimeUtil extends Loggable {
 
-  final val TimeRegex = """^([+\-]?\d+)(ms|s|m|h|d)$""".r
+  private object Units {
+    case class TimeUnit(name: String, shortName: String, ut: Long, regex: Regex) {
+      def toMs(t: Long) = t * ut
+      def fromMs(ms: Long) = ms / ut
+      def fitUnit(ms: Long) = (ms % ut == 0)
+    }
+
+    val dayUnit = TimeUnit("day", "d", 24 * 60 * 60 * 1000, """^(?i)d(?:ay)?$""".r)
+    val hourUnit = TimeUnit("hour", "h", 60 * 60 * 1000, """^(?i)h(?:our|r)?$""".r)
+    val minUnit = TimeUnit("minute", "m", 60 * 1000, """^(?i)m(?:in(?:ute)?)?$""".r)
+    val secUnit = TimeUnit("second", "s", 1000, """^(?i)s(?:ec(?:ond)?)?$""".r)
+    val msUnit = TimeUnit("millisecond", "ms", 1, """^(?i)m(?:illi)?s(?:ec(?:ond)?)?$""".r)
+
+    val timeUnits = dayUnit :: hourUnit :: minUnit :: secUnit :: msUnit :: Nil
+  }
+  import Units._
+
+//  final val TimeRegex = """^([+\-]?\d+)(ms|s|m|h|d)$""".r
+  final val TimeRegex = """^([+\-]?\d+)([a-zA-Z]+)$""".r
   final val PureTimeRegex = """^([+\-]?\d+)$""".r
 
   def milliseconds(timeString: String): Option[Long] = {
@@ -34,17 +53,17 @@ object TimeUtil extends Loggable {
           case TimeRegex(time, unit) => {
             val t = time.toLong
             unit match {
-              case "d" => t * 24 * 60 * 60 * 1000
-              case "h" => t * 60 * 60 * 1000
-              case "m" => t * 60 * 1000
-              case "s" => t * 1000
-              case "ms" => t
+              case dayUnit.regex() => dayUnit.toMs(t)
+              case hourUnit.regex() => hourUnit.toMs(t)
+              case minUnit.regex() => minUnit.toMs(t)
+              case secUnit.regex() => secUnit.toMs(t)
+              case msUnit.regex() => msUnit.toMs(t)
               case _ => throw new Exception(s"${timeString} is invalid time format")
             }
           }
           case PureTimeRegex(time) => {
             val t = time.toLong
-            t
+            msUnit.toMs(t)
           }
           case _ => throw new Exception(s"${timeString} is invalid time format")
         }
@@ -58,24 +77,34 @@ object TimeUtil extends Loggable {
 
   def timeToUnit(ms: Long, unit: String): Long = {
     unit match {
-      case "ms" => ms
-      case "sec" => ms / 1000
-      case "min" => ms / (60 * 1000)
-      case "hour" => ms / (60 * 60 * 1000)
-      case "day" => ms / (24 * 60 * 60 * 1000)
-      case _ => ms / (60 * 1000)
+      case dayUnit.regex() => dayUnit.fromMs(ms)
+      case hourUnit.regex() => hourUnit.fromMs(ms)
+      case minUnit.regex() => minUnit.fromMs(ms)
+      case secUnit.regex() => secUnit.fromMs(ms)
+      case msUnit.regex() => msUnit.fromMs(ms)
+      case _ => ms
     }
   }
 
   def timeFromUnit(t: Long, unit: String): Long = {
     unit match {
-      case "ms" => t
-      case "sec" => t * 1000
-      case "min" => t * 60 * 1000
-      case "hour" => t * 60 * 60 * 1000
-      case "day" => t * 24 * 60 * 60 * 1000
-      case _ => t * 60 * 1000
+      case dayUnit.regex() => dayUnit.toMs(t)
+      case hourUnit.regex() => hourUnit.toMs(t)
+      case minUnit.regex() => minUnit.toMs(t)
+      case secUnit.regex() => secUnit.toMs(t)
+      case msUnit.regex() => msUnit.toMs(t)
+      case _ => t
+    }
+  }
+
+  def time2String(t: Long): String = {
+    val matchedUnitOpt = timeUnits.foldLeft(None: Option[TimeUnit]) { (retOpt, unit) =>
+      if (retOpt.isEmpty && unit.fitUnit(t)) Some(unit) else retOpt
     }
+    val unit = matchedUnitOpt.getOrElse(msUnit)
+    val unitTime = unit.fromMs(t)
+    val unitStr = unit.shortName
+    s"${unitTime}${unitStr}"
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0b130ae/measure/src/test/resources/_timeliness-batch-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_timeliness-batch-griffindsl.json b/measure/src/test/resources/_timeliness-batch-griffindsl.json
index 2af98f1..bd48401 100644
--- a/measure/src/test/resources/_timeliness-batch-griffindsl.json
+++ b/measure/src/test/resources/_timeliness-batch-griffindsl.json
@@ -28,7 +28,10 @@
         "details": {
           "source": "source",
           "latency": "latency",
-          "threshold": "3m"
+          "threshold": "3m",
+          "step": "step",
+          "count": "cnt",
+          "step.size": "2m"
         },
         "metric": {
           "name": "timeliness"

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0b130ae/measure/src/test/resources/_timeliness-streaming-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_timeliness-streaming-griffindsl.json b/measure/src/test/resources/_timeliness-streaming-griffindsl.json
index 776c3b5..fbaf8d4 100644
--- a/measure/src/test/resources/_timeliness-streaming-griffindsl.json
+++ b/measure/src/test/resources/_timeliness-streaming-griffindsl.json
@@ -33,7 +33,7 @@
             {
               "dsl.type": "spark-sql",
               "name": "${this}",
-              "rule": "select ts, name, age from ${s1}"
+              "rule": "select ts, end_ts, name, age from ${s1}"
             }
           ]
         }
@@ -54,11 +54,16 @@
         "dsl.type": "griffin-dsl",
         "dq.type": "timeliness",
         "name": "timeliness",
-        "rule": "ts",
+        "rule": "ts, end_ts",
         "details": {
           "source": "source",
           "latency": "latency",
-          "threshold": "1h"
+          "total": "total",
+          "avg": "avg",
+          "threshold": "1h",
+          "step": "step",
+          "count": "cnt",
+          "step.size": "5m"
         },
         "metric": {
           "name": "timeliness"

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0b130ae/measure/src/test/scala/org/apache/griffin/measure/utils/TimeUtilTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/utils/TimeUtilTest.scala b/measure/src/test/scala/org/apache/griffin/measure/utils/TimeUtilTest.scala
new file mode 100644
index 0000000..673eca0
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/utils/TimeUtilTest.scala
@@ -0,0 +1,38 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.utils
+
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+
+@RunWith(classOf[JUnitRunner])
+class TimeUtilTest extends FunSuite with Matchers with BeforeAndAfter {
+
+  test ("milliseconds") {
+    val ts = "1h"
+    val res = TimeUtil.milliseconds(ts)
+    println(res)
+
+    val t = 1200000
+    val s = TimeUtil.time2String(t)
+    println(s)
+  }
+
+}