You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2018/04/10 05:01:13 UTC
incubator-griffin git commit: [GRIFFIN-135] support completeness
measurement for batch and streaming mode
Repository: incubator-griffin
Updated Branches:
refs/heads/master 8633476ea -> c1f089815
[GRIFFIN-135] support completeness measurement for batch and streaming mode
Author: Lionel Liu <bh...@163.com>
Closes #254 from bhlx3lyx7/tmst.
Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/c1f08981
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/c1f08981
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/c1f08981
Branch: refs/heads/master
Commit: c1f0898156b178407ad252969054ff994791ab6e
Parents: 8633476
Author: Lionel Liu <bh...@163.com>
Authored: Tue Apr 10 13:01:03 2018 +0800
Committer: Lionel Liu <bh...@163.com>
Committed: Tue Apr 10 13:01:03 2018 +0800
----------------------------------------------------------------------
.../measure/process/BatchDqProcess.scala | 2 +-
.../measure/process/temp/TimeRange.scala | 2 +-
.../griffin/measure/rule/dsl/DqType.scala | 7 +-
.../dsl/analyzer/CompletenessAnalyzer.scala | 46 ++++++
.../rule/dsl/expr/ClauseExpression.scala | 8 +
.../rule/dsl/parser/GriffinDslParser.scala | 9 ++
.../rule/trans/AccuracyRulePlanTrans.scala | 4 +-
.../rule/trans/CompletenessRulePlanTrans.scala | 145 +++++++++++++++++++
.../measure/rule/trans/RulePlanTrans.scala | 1 +
.../rule/trans/TimelinessRulePlanTrans.scala | 16 +-
.../_completeness-batch-griffindsl.json | 36 +++++
.../_completeness-streaming-griffindsl.json | 64 ++++++++
12 files changed, 327 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/c1f08981/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala b/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
index 8c95a39..2770de8 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
@@ -166,7 +166,7 @@ case class BatchDqProcess(allParam: AllParam) extends DqProcess {
private def printTimeRanges(timeRanges: Map[String, TimeRange]): Unit = {
val timeRangesStr = timeRanges.map { pair =>
val (name, timeRange) = pair
- s"${name} -> [${timeRange.begin}, ${timeRange.end})"
+ s"${name} -> (${timeRange.begin}, ${timeRange.end}]"
}.mkString(", ")
println(s"data source timeRanges: ${timeRangesStr}")
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/c1f08981/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala b/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala
index 9e79396..4073753 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala
@@ -24,7 +24,7 @@ case class TimeRange(begin: Long, end: Long, tmsts: Set[Long]) extends Serializa
def merge(tr: TimeRange): TimeRange = {
TimeRange(min(begin, tr.begin), max(end, tr.end), tmsts ++ tr.tmsts)
}
- def beginTmstOpt: Option[Long] = {
+ def minTmstOpt: Option[Long] = {
try {
if (tmsts.nonEmpty) Some(tmsts.min) else None
} catch {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/c1f08981/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala
index 18a5919..f6a7f85 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala
@@ -28,7 +28,7 @@ sealed trait DqType {
object DqType {
private val dqTypes: List[DqType] = List(
- AccuracyType, ProfilingType, UniquenessType, DistinctnessType, TimelinessType, UnknownType
+ AccuracyType, ProfilingType, UniquenessType, DistinctnessType, TimelinessType, CompletenessType, UnknownType
)
def apply(ptn: String): DqType = {
dqTypes.filter(tp => ptn match {
@@ -64,6 +64,11 @@ final case object TimelinessType extends DqType {
val desc = "timeliness"
}
+final case object CompletenessType extends DqType {
+ val regex = "^(?i)completeness$".r
+ val desc = "completeness"
+}
+
final case object UnknownType extends DqType {
val regex = "".r
val desc = "unknown"
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/c1f08981/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/CompletenessAnalyzer.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/CompletenessAnalyzer.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/CompletenessAnalyzer.scala
new file mode 100644
index 0000000..ad56e1a
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/CompletenessAnalyzer.scala
@@ -0,0 +1,46 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.dsl.analyzer
+
+import org.apache.griffin.measure.rule.dsl.expr._
+
+
+case class CompletenessAnalyzer(expr: CompletenessClause, sourceName: String) extends BasicAnalyzer {
+
+ val seqAlias = (expr: Expr, v: Seq[String]) => {
+ expr match {
+ case apr: AliasableExpr => v ++ apr.alias
+ case _ => v
+ }
+ }
+ val combAlias = (a: Seq[String], b: Seq[String]) => a ++ b
+
+ private val exprs = expr.exprs
+ private def genAlias(idx: Int): String = s"alias_${idx}"
+ val selectionPairs = exprs.zipWithIndex.map { pair =>
+ val (pr, idx) = pair
+ val res = pr.preOrderTraverseDepthFirst(Seq[String]())(seqAlias, combAlias)
+ (pr, res.headOption.getOrElse(genAlias(idx)))
+ }
+
+ if (selectionPairs.isEmpty) {
+ throw new Exception(s"completeness analyzer error: empty selection")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/c1f08981/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala
index 6790268..ecc5d67 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala
@@ -259,4 +259,12 @@ case class TimelinessClause(exprs: Seq[Expr]) extends ClauseExpression {
def desc: String = exprs.map(_.desc).mkString(", ")
def coalesceDesc: String = exprs.map(_.coalesceDesc).mkString(", ")
override def map(func: (Expr) => Expr): TimelinessClause = TimelinessClause(exprs.map(func(_)))
+}
+
+case class CompletenessClause(exprs: Seq[Expr]) extends ClauseExpression {
+ addChildren(exprs)
+
+ def desc: String = exprs.map(_.desc).mkString(", ")
+ def coalesceDesc: String = exprs.map(_.coalesceDesc).mkString(", ")
+ override def map(func: (Expr) => Expr): CompletenessClause = CompletenessClause(exprs.map(func(_)))
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/c1f08981/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
index d4a037b..b4496e7 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
@@ -70,6 +70,14 @@ case class GriffinDslParser(dataSourceNames: Seq[String], functionNames: Seq[Str
case exprs => TimelinessClause(exprs)
}
+ /**
+ * -- completeness clauses --
+ * <completeness-clauses> = <expr> [, <expr>]+
+ */
+ def completenessClause: Parser[CompletenessClause] = rep1sep(expression, Operator.COMMA) ^^ {
+ case exprs => CompletenessClause(exprs)
+ }
+
def parseRule(rule: String, dqType: DqType): ParseResult[Expr] = {
val rootExpr = dqType match {
case AccuracyType => logicalExpression
@@ -77,6 +85,7 @@ case class GriffinDslParser(dataSourceNames: Seq[String], functionNames: Seq[Str
case UniquenessType => uniquenessClause
case DistinctnessType => distinctnessClause
case TimelinessType => timelinessClause
+ case CompletenessType => completenessClause
case _ => expression
}
parseAll(rootExpr, rule)
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/c1f08981/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala
index 904b087..ec746d2 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala
@@ -119,7 +119,7 @@ case class AccuracyRulePlanTrans(dataSourceNames: Seq[String],
s"""
|SELECT `${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
|coalesce(`${missCountTableName}`.`${missColName}`, 0) AS `${missColName}`,
- |(`${totalColName}` - `${missColName}`) AS `${matchedColName}`
+ |(`${totalCountTableName}`.`${totalColName}` - coalesce(`${missCountTableName}`.`${missColName}`, 0)) AS `${matchedColName}`
|FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}`
""".stripMargin
}
@@ -128,7 +128,7 @@ case class AccuracyRulePlanTrans(dataSourceNames: Seq[String],
|SELECT `${totalCountTableName}`.`${InternalColumns.tmst}` AS `${InternalColumns.tmst}`,
|`${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
|coalesce(`${missCountTableName}`.`${missColName}`, 0) AS `${missColName}`,
- |(`${totalColName}` - `${missColName}`) AS `${matchedColName}`
+ |(`${totalCountTableName}`.`${totalColName}` - coalesce(`${missCountTableName}`.`${missColName}`, 0)) AS `${matchedColName}`
|FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}`
|ON `${totalCountTableName}`.`${InternalColumns.tmst}` = `${missCountTableName}`.`${InternalColumns.tmst}`
""".stripMargin
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/c1f08981/measure/src/main/scala/org/apache/griffin/measure/rule/trans/CompletenessRulePlanTrans.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/CompletenessRulePlanTrans.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/CompletenessRulePlanTrans.scala
new file mode 100644
index 0000000..1b89587
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/CompletenessRulePlanTrans.scala
@@ -0,0 +1,145 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.trans
+
+import org.apache.griffin.measure.process.temp.TableRegisters
+import org.apache.griffin.measure.process.{BatchProcessType, ExportMode, ProcessType, StreamingProcessType}
+import org.apache.griffin.measure.rule.adaptor.RuleParamKeys._
+import org.apache.griffin.measure.rule.adaptor._
+import org.apache.griffin.measure.rule.dsl.analyzer.CompletenessAnalyzer
+import org.apache.griffin.measure.rule.dsl.expr._
+import org.apache.griffin.measure.rule.plan._
+import org.apache.griffin.measure.rule.trans.RuleExportFactory._
+import org.apache.griffin.measure.utils.ParamUtil._
+
+import scala.util.Try
+
+case class CompletenessRulePlanTrans(dataSourceNames: Seq[String],
+ timeInfo: TimeInfo, name: String, expr: Expr,
+ param: Map[String, Any], procType: ProcessType
+ ) extends RulePlanTrans {
+
+ private object CompletenessKeys {
+ val _source = "source"
+ val _total = "total"
+ val _complete = "complete"
+ val _incomplete = "incomplete"
+ }
+ import CompletenessKeys._
+
+ def trans(): Try[RulePlan] = Try {
+ val details = getDetails(param)
+ val completenessClause = expr.asInstanceOf[CompletenessClause]
+ val sourceName = details.getString(_source, dataSourceNames.head)
+
+ val mode = ExportMode.defaultMode(procType)
+
+ val ct = timeInfo.calcTime
+
+ if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
+ emptyRulePlan
+ } else {
+ val analyzer = CompletenessAnalyzer(completenessClause, sourceName)
+
+ val selItemsClause = analyzer.selectionPairs.map { pair =>
+ val (expr, alias) = pair
+ s"${expr.desc} AS `${alias}`"
+ }.mkString(", ")
+ val aliases = analyzer.selectionPairs.map(_._2)
+
+ val selClause = procType match {
+ case BatchProcessType => selItemsClause
+ case StreamingProcessType => s"`${InternalColumns.tmst}`, ${selItemsClause}"
+ }
+ val selAliases = procType match {
+ case BatchProcessType => aliases
+ case StreamingProcessType => InternalColumns.tmst +: aliases
+ }
+
+ // 1. source alias
+ val sourceAliasTableName = "__sourceAlias"
+ val sourceAliasSql = {
+ s"SELECT ${selClause} FROM `${sourceName}`"
+ }
+ val sourceAliasStep = SparkSqlStep(sourceAliasTableName, sourceAliasSql, emptyMap, true)
+
+ // 2. incomplete record
+ val incompleteRecordsTableName = "__incompleteRecords"
+ val completeWhereClause = aliases.map(a => s"`${a}` IS NOT NULL").mkString(" AND ")
+ val incompleteWhereClause = s"NOT (${completeWhereClause})"
+ val incompleteRecordsSql = s"SELECT * FROM `${sourceAliasTableName}` WHERE ${incompleteWhereClause}"
+ val incompleteRecordStep = SparkSqlStep(incompleteRecordsTableName, incompleteRecordsSql, emptyMap, true)
+ val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
+ val incompleteRecordExport = genRecordExport(recordParam, incompleteRecordsTableName, incompleteRecordsTableName, ct, mode)
+
+ // 3. incomplete count
+ val incompleteCountTableName = "__incompleteCount"
+ val incompleteColName = details.getStringOrKey(_incomplete)
+ val incompleteCountSql = procType match {
+ case BatchProcessType => s"SELECT COUNT(*) AS `${incompleteColName}` FROM `${incompleteRecordsTableName}`"
+ case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${incompleteColName}` FROM `${incompleteRecordsTableName}` GROUP BY `${InternalColumns.tmst}`"
+ }
+ val incompleteCountStep = SparkSqlStep(incompleteCountTableName, incompleteCountSql, emptyMap)
+
+ // 4. total count
+ val totalCountTableName = "__totalCount"
+ val totalColName = details.getStringOrKey(_total)
+ val totalCountSql = procType match {
+ case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceAliasTableName}`"
+ case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${totalColName}` FROM `${sourceAliasTableName}` GROUP BY `${InternalColumns.tmst}`"
+ }
+ val totalCountStep = SparkSqlStep(totalCountTableName, totalCountSql, emptyMap)
+
+ // 5. complete metric
+ val completeTableName = name
+ val completeColName = details.getStringOrKey(_complete)
+ val completeMetricSql = procType match {
+ case BatchProcessType => {
+ s"""
+ |SELECT `${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
+ |coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 0) AS `${incompleteColName}`,
+ |(`${totalCountTableName}`.`${totalColName}` - coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 0)) AS `${completeColName}`
+ |FROM `${totalCountTableName}` LEFT JOIN `${incompleteCountTableName}`
+ """.stripMargin
+ }
+ case StreamingProcessType => {
+ s"""
+ |SELECT `${totalCountTableName}`.`${InternalColumns.tmst}` AS `${InternalColumns.tmst}`,
+ |`${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
+ |coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 0) AS `${incompleteColName}`,
+ |(`${totalCountTableName}`.`${totalColName}` - coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 0)) AS `${completeColName}`
+ |FROM `${totalCountTableName}` LEFT JOIN `${incompleteCountTableName}`
+ |ON `${totalCountTableName}`.`${InternalColumns.tmst}` = `${incompleteCountTableName}`.`${InternalColumns.tmst}`
+ """.stripMargin
+ }
+ }
+ val completeStep = SparkSqlStep(completeTableName, completeMetricSql, emptyMap)
+ val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
+ val completeExport = genMetricExport(metricParam, completeTableName, completeTableName, ct, mode)
+
+ // complete plan
+ val completeSteps = sourceAliasStep :: incompleteRecordStep :: incompleteCountStep :: totalCountStep :: completeStep :: Nil
+ val completeExports = incompleteRecordExport :: completeExport :: Nil
+ val completePlan = RulePlan(completeSteps, completeExports)
+
+ completePlan
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/c1f08981/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RulePlanTrans.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RulePlanTrans.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RulePlanTrans.scala
index 9289053..ba9565f 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RulePlanTrans.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RulePlanTrans.scala
@@ -53,6 +53,7 @@ object RulePlanTrans {
case UniquenessType => UniquenessRulePlanTrans(dsNames, ti, name, expr, param, procType)
case DistinctnessType => DistinctnessRulePlanTrans(dsNames, ti, name, expr, param, procType, dsTimeRanges)
case TimelinessType => TimelinessRulePlanTrans(dsNames, ti, name, expr, param, procType, dsTimeRanges)
+ case CompletenessType => CompletenessRulePlanTrans(dsNames, ti, name, expr, param, procType)
case _ => emptyRulePlanTrans
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/c1f08981/measure/src/main/scala/org/apache/griffin/measure/rule/trans/TimelinessRulePlanTrans.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/TimelinessRulePlanTrans.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/TimelinessRulePlanTrans.scala
index 7e9b8fb..d6dc499 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/TimelinessRulePlanTrans.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/TimelinessRulePlanTrans.scala
@@ -61,10 +61,10 @@ case class TimelinessRulePlanTrans(dataSourceNames: Seq[String],
// val ct = timeInfo.calcTime
- val beginTmstOpt = dsTimeRanges.get(sourceName).flatMap(_.beginTmstOpt)
- val beginTmst = beginTmstOpt match {
+ val minTmstOpt = dsTimeRanges.get(sourceName).flatMap(_.minTmstOpt)
+ val minTmst = minTmstOpt match {
case Some(t) => t
- case _ => throw new Exception(s"empty begin tmst from ${sourceName}")
+ case _ => throw new Exception(s"empty min tmst from ${sourceName}")
}
if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
@@ -129,7 +129,7 @@ case class TimelinessRulePlanTrans(dataSourceNames: Seq[String],
}
val metricStep = SparkSqlStep(metricTableName, metricSql, emptyMap)
val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
- val metricExports = genMetricExport(metricParam, name, metricTableName, beginTmst, mode) :: Nil
+ val metricExports = genMetricExport(metricParam, name, metricTableName, minTmst, mode) :: Nil
// current timeliness plan
val timeSteps = inTimeStep :: latencyStep :: metricStep :: Nil
@@ -145,7 +145,7 @@ case class TimelinessRulePlanTrans(dataSourceNames: Seq[String],
}
val recordStep = SparkSqlStep(recordTableName, recordSql, emptyMap)
val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
- val recordExports = genRecordExport(recordParam, recordTableName, recordTableName, beginTmst, mode) :: Nil
+ val recordExports = genRecordExport(recordParam, recordTableName, recordTableName, minTmst, mode) :: Nil
RulePlan(recordStep :: Nil, recordExports)
}
case _ => emptyRulePlan
@@ -184,7 +184,7 @@ case class TimelinessRulePlanTrans(dataSourceNames: Seq[String],
}
val rangeMetricStep = SparkSqlStep(rangeMetricTableName, rangeMetricSql, emptyMap)
val rangeMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc)
- val rangeMetricExports = genMetricExport(rangeMetricParam, stepColName, rangeMetricTableName, beginTmst, mode) :: Nil
+ val rangeMetricExports = genMetricExport(rangeMetricParam, stepColName, rangeMetricTableName, minTmst, mode) :: Nil
RulePlan(rangeStep :: rangeMetricStep :: Nil, rangeMetricExports)
}
@@ -208,9 +208,9 @@ case class TimelinessRulePlanTrans(dataSourceNames: Seq[String],
}
val percentileStep = SparkSqlStep(percentileTableName, percentileSql, emptyMap)
val percentileParam = emptyMap
- val percentielExports = genMetricExport(percentileParam, percentileTableName, percentileTableName, beginTmst, mode) :: Nil
+ val percentileExports = genMetricExport(percentileParam, percentileTableName, percentileTableName, minTmst, mode) :: Nil
- RulePlan(percentileStep :: Nil, percentielExports)
+ RulePlan(percentileStep :: Nil, percentileExports)
} else emptyRulePlan
timePlan.merge(recordPlan).merge(rangePlan).merge(percentilePlan)
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/c1f08981/measure/src/test/resources/_completeness-batch-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_completeness-batch-griffindsl.json b/measure/src/test/resources/_completeness-batch-griffindsl.json
new file mode 100644
index 0000000..9c00444
--- /dev/null
+++ b/measure/src/test/resources/_completeness-batch-griffindsl.json
@@ -0,0 +1,36 @@
+{
+ "name": "comp_batch",
+
+ "process.type": "batch",
+
+ "timestamp": 123456,
+
+ "data.sources": [
+ {
+ "name": "source",
+ "connectors": [
+ {
+ "type": "avro",
+ "version": "1.7",
+ "config": {
+ "file.name": "src/test/resources/users_info_src.avro"
+ }
+ }
+ ]
+ }
+ ],
+
+ "evaluate.rule": {
+ "rules": [
+ {
+ "dsl.type": "griffin-dsl",
+ "dq.type": "completeness",
+ "name": "comp",
+ "rule": "email, post_code, first_name",
+ "metric": {
+ "name": "comp"
+ }
+ }
+ ]
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/c1f08981/measure/src/test/resources/_completeness-streaming-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_completeness-streaming-griffindsl.json b/measure/src/test/resources/_completeness-streaming-griffindsl.json
new file mode 100644
index 0000000..df1b889
--- /dev/null
+++ b/measure/src/test/resources/_completeness-streaming-griffindsl.json
@@ -0,0 +1,64 @@
+{
+ "name": "comp_streaming",
+
+ "process.type": "streaming",
+
+ "data.sources": [
+ {
+ "name": "source",
+ "connectors": [
+ {
+ "type": "kafka",
+ "version": "0.8",
+ "config": {
+ "kafka.config": {
+ "bootstrap.servers": "10.147.177.107:9092",
+ "group.id": "source",
+ "auto.offset.reset": "smallest",
+ "auto.commit.enable": "false"
+ },
+ "topics": "test",
+ "key.type": "java.lang.String",
+ "value.type": "java.lang.String"
+ },
+ "pre.proc": [
+ {
+ "dsl.type": "df-opr",
+ "name": "${s1}",
+ "rule": "from_json",
+ "details": {
+ "df.name": "${this}"
+ }
+ },
+ {
+ "dsl.type": "spark-sql",
+ "name": "${this}",
+ "rule": "select name, age from ${s1}"
+ }
+ ]
+ }
+ ],
+ "cache": {
+ "file.path": "hdfs://localhost/griffin/streaming/dump/old",
+ "info.path": "old",
+ "ready.time.interval": "10s",
+ "ready.time.delay": "0",
+ "time.range": ["0", "0"]
+ }
+ }
+ ],
+
+ "evaluate.rule": {
+ "rules": [
+ {
+ "dsl.type": "griffin-dsl",
+ "dq.type": "completeness",
+ "name": "comp",
+ "rule": "name, age",
+ "metric": {
+ "name": "comp"
+ }
+ }
+ ]
+ }
+}
\ No newline at end of file