You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2018/01/08 09:44:41 UTC
[4/6] incubator-griffin git commit: Measure module enhancement
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
index 1e3ecb1..6ba0cf8 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
@@ -18,339 +18,1198 @@ under the License.
*/
package org.apache.griffin.measure.rule.adaptor
-import org.apache.griffin.measure.data.connector.GroupByColumn
+import org.apache.griffin.measure.cache.tmst.{TempName, TmstCache}
+import org.apache.griffin.measure.process.engine.DataFrameOprs.AccuracyOprKeys
+import org.apache.griffin.measure.process.temp.TableRegisters
+import org.apache.griffin.measure.process._
import org.apache.griffin.measure.rule.dsl._
import org.apache.griffin.measure.rule.dsl.analyzer._
import org.apache.griffin.measure.rule.dsl.expr._
import org.apache.griffin.measure.rule.dsl.parser.GriffinDslParser
-import org.apache.griffin.measure.rule.step._
+import org.apache.griffin.measure.rule.plan.{TimeInfo, _}
import org.apache.griffin.measure.utils.ParamUtil._
+import org.apache.griffin.measure.utils.TimeUtil
-case class GriffinDslAdaptor(dataSourceNames: Seq[String],
- functionNames: Seq[String],
- adaptPhase: AdaptPhase
- ) extends RuleAdaptor {
+object AccuracyKeys {
+ val _source = "source"
+ val _target = "target"
+ val _miss = "miss"
+ val _total = "total"
+ val _matched = "matched"
+// val _missRecords = "missRecords"
+}
- object StepInfo {
- val _Name = "name"
- val _PersistType = "persist.type"
- val _UpdateDataSource = "update.data.source"
- def getNameOpt(param: Map[String, Any]): Option[String] = param.get(_Name).map(_.toString)
- def getPersistType(param: Map[String, Any]): PersistType = PersistType(param.getString(_PersistType, ""))
- def getUpdateDataSourceOpt(param: Map[String, Any]): Option[String] = param.get(_UpdateDataSource).map(_.toString)
- }
- object AccuracyInfo {
- val _Source = "source"
- val _Target = "target"
- val _MissRecords = "miss.records"
- val _Accuracy = "accuracy"
- val _Miss = "miss"
- val _Total = "total"
- val _Matched = "matched"
- }
- object ProfilingInfo {
- val _Source = "source"
- val _Profiling = "profiling"
- }
+object ProfilingKeys {
+ val _source = "source"
+}
- def getNameOpt(param: Map[String, Any], key: String): Option[String] = param.get(key).map(_.toString)
- def resultName(param: Map[String, Any], key: String): String = {
- val nameOpt = param.get(key) match {
- case Some(prm: Map[String, Any]) => StepInfo.getNameOpt(prm)
- case _ => None
- }
- nameOpt.getOrElse(key)
- }
- def resultPersistType(param: Map[String, Any], key: String, defPersistType: PersistType): PersistType = {
- param.get(key) match {
- case Some(prm: Map[String, Any]) => StepInfo.getPersistType(prm)
- case _ => defPersistType
- }
- }
- def resultUpdateDataSourceOpt(param: Map[String, Any], key: String): Option[String] = {
- param.get(key) match {
- case Some(prm: Map[String, Any]) => StepInfo.getUpdateDataSourceOpt(prm)
- case _ => None
- }
- }
+object DuplicateKeys {
+ val _source = "source"
+ val _target = "target"
+ val _dup = "dup"
+ val _num = "num"
+}
+
+object TimelinessKeys {
+ val _source = "source"
+ val _latency = "latency"
+ val _threshold = "threshold"
+}
+
+object GlobalKeys {
+ val _initRule = "init.rule"
+// val _globalMetricKeep = "global.metric.keep"
+}
- val _dqType = "dq.type"
+case class GriffinDslAdaptor(dataSourceNames: Seq[String],
+ functionNames: Seq[String]
+ ) extends RuleAdaptor {
- protected def getDqType(param: Map[String, Any]) = DqType(param.getString(_dqType, ""))
+ import RuleParamKeys._
val filteredFunctionNames = functionNames.filter { fn =>
fn.matches("""^[a-zA-Z_]\w*$""")
}
val parser = GriffinDslParser(dataSourceNames, filteredFunctionNames)
- def genRuleStep(param: Map[String, Any]): Seq[RuleStep] = {
- GriffinDslStep(getName(param), getRule(param), getDqType(param), getDetails(param)) :: Nil
- }
+ private val emptyRulePlan = RulePlan(Nil, Nil)
+ private val emptyMap = Map[String, Any]()
- def getTempSourceNames(param: Map[String, Any]): Seq[String] = {
+ override def genRulePlan(timeInfo: TimeInfo, param: Map[String, Any], processType: ProcessType
+ ): RulePlan = {
+ val name = getRuleName(param)
+ val rule = getRule(param)
val dqType = getDqType(param)
- param.get(_name) match {
- case Some(name) => {
+ try {
+ val result = parser.parseRule(rule, dqType)
+ if (result.successful) {
+ val expr = result.get
dqType match {
- case AccuracyType => {
- Seq[String](
- resultName(param, AccuracyInfo._MissRecords),
- resultName(param, AccuracyInfo._Accuracy)
- )
- }
- case ProfilingType => {
- Seq[String](
- resultName(param, ProfilingInfo._Profiling)
- )
- }
- case TimelinessType => {
- Nil
- }
- case _ => Nil
+ case AccuracyType => accuracyRulePlan(timeInfo, name, expr, param, processType)
+ case ProfilingType => profilingRulePlan(timeInfo, name, expr, param, processType)
+ case DuplicateType => duplicateRulePlan(timeInfo, name, expr, param, processType)
+ case TimelinessType => timelinessRulePlan(timeInfo, name, expr, param, processType)
+ case _ => emptyRulePlan
}
+ } else {
+ warn(s"parse rule [ ${rule} ] fails: \n${result}")
+ emptyRulePlan
+ }
+ } catch {
+ case e: Throwable => {
+ error(s"generate rule plan ${name} fails: ${e.getMessage}")
+ emptyRulePlan
}
- case _ => Nil
}
}
- def adaptConcreteRuleStep(ruleStep: RuleStep): Seq[ConcreteRuleStep] = {
- ruleStep match {
- case rs @ GriffinDslStep(_, rule, dqType, _) => {
- val exprOpt = try {
- val result = parser.parseRule(rule, dqType)
- if (result.successful) Some(result.get)
- else {
- println(result)
- warn(s"adapt concrete rule step warn: parse rule [ ${rule} ] fails")
- None
- }
- } catch {
- case e: Throwable => {
- error(s"adapt concrete rule step error: ${e.getMessage}")
- None
- }
- }
+ // with accuracy opr
+ private def accuracyRulePlan(timeInfo: TimeInfo, name: String, expr: Expr,
+ param: Map[String, Any], processType: ProcessType
+ ): RulePlan = {
+ val details = getDetails(param)
+ val sourceName = details.getString(AccuracyKeys._source, dataSourceNames.head)
+ val targetName = details.getString(AccuracyKeys._target, dataSourceNames.tail.head)
+ val analyzer = AccuracyAnalyzer(expr.asInstanceOf[LogicalExpr], sourceName, targetName)
- exprOpt match {
- case Some(expr) => {
- try {
- transConcreteRuleSteps(rs, expr)
- } catch {
- case e: Throwable => {
- error(s"trans concrete rule step error: ${e.getMessage}")
- Nil
- }
- }
- }
- case _ => Nil
+ if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
+ println(s"[${timeInfo.calcTime}] data source ${sourceName} not exists")
+ emptyRulePlan
+ } else {
+ // 1. miss record
+ val missRecordsTableName = "__missRecords"
+ val selClause = s"`${sourceName}`.*"
+ val missRecordsSql = if (!TableRegisters.existRunTempTable(timeInfo.key, targetName)) {
+ println(s"[${timeInfo.calcTime}] data source ${targetName} not exists")
+ s"SELECT ${selClause} FROM `${sourceName}`"
+ } else {
+ val onClause = expr.coalesceDesc
+ val sourceIsNull = analyzer.sourceSelectionExprs.map { sel =>
+ s"${sel.desc} IS NULL"
+ }.mkString(" AND ")
+ val targetIsNull = analyzer.targetSelectionExprs.map { sel =>
+ s"${sel.desc} IS NULL"
+ }.mkString(" AND ")
+ val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})"
+ s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}"
+ }
+ val missRecordsStep = SparkSqlStep(missRecordsTableName, missRecordsSql, emptyMap, true)
+ val missRecordsExports = processType match {
+ case BatchProcessType => {
+ val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
+ genRecordExport(recordParam, missRecordsTableName, missRecordsTableName) :: Nil
}
+ case StreamingProcessType => Nil
}
- case _ => Nil
- }
- }
- private def transConcreteRuleSteps(ruleStep: GriffinDslStep, expr: Expr
- ): Seq[ConcreteRuleStep] = {
- val details = ruleStep.details
- ruleStep.dqType match {
- case AccuracyType => {
- val sourceName = getNameOpt(details, AccuracyInfo._Source) match {
- case Some(name) => name
- case _ => dataSourceNames.head
+ // 2. miss count
+ val missCountTableName = "__missCount"
+ val missColName = details.getStringOrKey(AccuracyKeys._miss)
+ val missCountSql = processType match {
+ case BatchProcessType => s"SELECT COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}`"
+ case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}` GROUP BY `${InternalColumns.tmst}`"
+ }
+ val missCountStep = SparkSqlStep(missCountTableName, missCountSql, emptyMap)
+
+ // 3. total count
+ val totalCountTableName = "__totalCount"
+ val totalColName = details.getStringOrKey(AccuracyKeys._total)
+ val totalCountSql = processType match {
+ case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceName}`"
+ case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${totalColName}` FROM `${sourceName}` GROUP BY `${InternalColumns.tmst}`"
+ }
+ val totalCountStep = SparkSqlStep(totalCountTableName, totalCountSql, emptyMap)
+
+ // 4. accuracy metric
+ val accuracyTableName = name
+ val matchedColName = details.getStringOrKey(AccuracyKeys._matched)
+ val accuracyMetricSql = processType match {
+ case BatchProcessType => {
+ s"""
+ |SELECT `${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
+ |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS `${missColName}`,
+ |(`${totalColName}` - `${missColName}`) AS `${matchedColName}`
+ |FROM `${totalCountTableName}` FULL JOIN `${missCountTableName}`
+ """.stripMargin
}
- val targetName = getNameOpt(details, AccuracyInfo._Target) match {
- case Some(name) => name
- case _ => dataSourceNames.tail.head
+ case StreamingProcessType => {
+ s"""
+ |SELECT `${totalCountTableName}`.`${InternalColumns.tmst}` AS `${InternalColumns.tmst}`,
+ |`${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
+ |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS `${missColName}`,
+ |(`${totalColName}` - `${missColName}`) AS `${matchedColName}`
+ |FROM `${totalCountTableName}` FULL JOIN `${missCountTableName}`
+ |ON `${totalCountTableName}`.`${InternalColumns.tmst}` = `${missCountTableName}`.`${InternalColumns.tmst}`
+ """.stripMargin
}
- val analyzer = AccuracyAnalyzer(expr.asInstanceOf[LogicalExpr], sourceName, targetName)
-
-
- if (!checkDataSourceExists(sourceName)) {
- Nil
- } else {
- // 1. miss record
- val missRecordsSql = if (!checkDataSourceExists(targetName)) {
- val selClause = s"`${sourceName}`.*"
- s"SELECT ${selClause} FROM `${sourceName}`"
- } else {
- val selClause = s"`${sourceName}`.*"
- val onClause = expr.coalesceDesc
- val sourceIsNull = analyzer.sourceSelectionExprs.map { sel =>
- s"${sel.desc} IS NULL"
- }.mkString(" AND ")
- val targetIsNull = analyzer.targetSelectionExprs.map { sel =>
- s"${sel.desc} IS NULL"
- }.mkString(" AND ")
- val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})"
- s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}"
- }
- val missRecordsName = resultName(details, AccuracyInfo._MissRecords)
- val missRecordsStep = SparkSqlStep(
- missRecordsName,
- missRecordsSql,
- Map[String, Any](),
- resultPersistType(details, AccuracyInfo._MissRecords, RecordPersistType),
- resultUpdateDataSourceOpt(details, AccuracyInfo._MissRecords)
- )
+ }
+ val accuracyStep = SparkSqlStep(accuracyTableName, accuracyMetricSql, emptyMap)
+ val accuracyExports = processType match {
+ case BatchProcessType => {
+ val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
+ genMetricExport(metricParam, accuracyTableName, accuracyTableName) :: Nil
+ }
+ case StreamingProcessType => Nil
+ }
- // 2. miss count
- val missTableName = "_miss_"
- val missColName = getNameOpt(details, AccuracyInfo._Miss).getOrElse(AccuracyInfo._Miss)
- val missSql = {
- s"SELECT `${GroupByColumn.tmst}` AS `${GroupByColumn.tmst}`, COUNT(*) AS `${missColName}` FROM `${missRecordsName}` GROUP BY `${GroupByColumn.tmst}`"
- }
- val missStep = SparkSqlStep(
- missTableName,
- missSql,
- Map[String, Any](),
- NonePersistType,
- None
- )
+ // current accu plan
+ val accuSteps = missRecordsStep :: missCountStep :: totalCountStep :: accuracyStep :: Nil
+ val accuExports = missRecordsExports ++ accuracyExports
+ val accuPlan = RulePlan(accuSteps, accuExports)
- // 3. total count
- val totalTableName = "_total_"
- val totalColName = getNameOpt(details, AccuracyInfo._Total).getOrElse(AccuracyInfo._Total)
- val totalSql = {
- s"SELECT `${GroupByColumn.tmst}` AS `${GroupByColumn.tmst}`, COUNT(*) AS `${totalColName}` FROM `${sourceName}` GROUP BY `${GroupByColumn.tmst}`"
- }
- val totalStep = SparkSqlStep(
- totalTableName,
- totalSql,
- Map[String, Any](),
- NonePersistType,
- None
+ // streaming extra accu plan
+ val streamingAccuPlan = processType match {
+ case BatchProcessType => emptyRulePlan
+ case StreamingProcessType => {
+ // 5. accuracy metric merge
+ val accuracyMetricTableName = "__accuracy"
+ val accuracyMetricRule = "accuracy"
+ val accuracyMetricDetails = Map[String, Any](
+ (AccuracyOprKeys._dfName -> accuracyTableName),
+ (AccuracyOprKeys._miss -> missColName),
+ (AccuracyOprKeys._total -> totalColName),
+ (AccuracyOprKeys._matched -> matchedColName)
)
+ val accuracyMetricStep = DfOprStep(accuracyMetricTableName,
+ accuracyMetricRule, accuracyMetricDetails)
+ val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
+ val accuracyMetricExports = genMetricExport(metricParam, name, accuracyMetricTableName) :: Nil
- // 4. accuracy metric
- val matchedColName = getNameOpt(details, AccuracyInfo._Matched).getOrElse(AccuracyInfo._Matched)
- val accuracyMetricSql = {
+ // 6. collect accuracy records
+ val accuracyRecordTableName = "__accuracyRecords"
+ val accuracyRecordSql = {
s"""
- |SELECT `${totalTableName}`.`${GroupByColumn.tmst}` AS `${GroupByColumn.tmst}`,
- |`${missTableName}`.`${missColName}` AS `${missColName}`,
- |`${totalTableName}`.`${totalColName}` AS `${totalColName}`
- |FROM `${totalTableName}` FULL JOIN `${missTableName}`
- |ON `${totalTableName}`.`${GroupByColumn.tmst}` = `${missTableName}`.`${GroupByColumn.tmst}`
- """.stripMargin
+ |SELECT `${InternalColumns.tmst}`, `${InternalColumns.empty}`
+ |FROM `${accuracyMetricTableName}` WHERE `${InternalColumns.record}`
+ """.stripMargin
}
- val accuracyMetricName = resultName(details, AccuracyInfo._Accuracy)
- val accuracyMetricStep = SparkSqlStep(
- accuracyMetricName,
- accuracyMetricSql,
- details,
- // resultPersistType(details, AccuracyInfo._Accuracy, MetricPersistType)
- NonePersistType,
- None
- )
+ val accuracyRecordStep = SparkSqlStep(accuracyRecordTableName, accuracyRecordSql, emptyMap)
+ val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
+ val accuracyRecordParam = recordParam.addIfNotExist(ExportParamKeys._dataSourceCache, sourceName)
+ .addIfNotExist(ExportParamKeys._originDF, missRecordsTableName)
+ val accuracyRecordExports = genRecordExport(
+ accuracyRecordParam, missRecordsTableName, accuracyRecordTableName) :: Nil
- // 5. accuracy metric filter
- val accuracyStep = DfOprStep(
- accuracyMetricName,
- "accuracy",
- Map[String, Any](
- ("df.name" -> accuracyMetricName),
- ("miss" -> missColName),
- ("total" -> totalColName),
- ("matched" -> matchedColName)
- ),
- resultPersistType(details, AccuracyInfo._Accuracy, MetricPersistType),
- None
- )
+ // gen accu plan
+ val extraSteps = accuracyMetricStep :: accuracyRecordStep :: Nil
+ val extraExports = accuracyMetricExports ++ accuracyRecordExports
+ val extraPlan = RulePlan(extraSteps, extraExports)
- missRecordsStep :: missStep :: totalStep :: accuracyMetricStep :: accuracyStep :: Nil
+ extraPlan
}
}
- case ProfilingType => {
- val profilingClause = expr.asInstanceOf[ProfilingClause]
- val sourceName = profilingClause.fromClauseOpt match {
- case Some(fc) => fc.dataSource
- case _ => {
- getNameOpt(details, ProfilingInfo._Source) match {
- case Some(name) => name
- case _ => dataSourceNames.head
- }
- }
- }
- val analyzer = ProfilingAnalyzer(profilingClause, sourceName)
-// analyzer.selectionExprs.foreach(println)
+ // return accu plan
+ accuPlan.merge(streamingAccuPlan)
- val selExprDescs = analyzer.selectionExprs.map { sel =>
- val alias = sel match {
- case s: AliasableExpr if (s.alias.nonEmpty) => s" AS `${s.alias.get}`"
- case _ => ""
- }
- s"${sel.desc}${alias}"
- }
+ }
+ }
+
+// private def accuracyRulePlan(timeInfo: TimeInfo, name: String, expr: Expr,
+// param: Map[String, Any], processType: ProcessType
+// ): RulePlan = {
+// val details = getDetails(param)
+// val sourceName = details.getString(AccuracyKeys._source, dataSourceNames.head)
+// val targetName = details.getString(AccuracyKeys._target, dataSourceNames.tail.head)
+// val analyzer = AccuracyAnalyzer(expr.asInstanceOf[LogicalExpr], sourceName, targetName)
+//
+// if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
+// emptyRulePlan
+// } else {
+// // 1. miss record
+// val missRecordsTableName = "__missRecords"
+// val selClause = s"`${sourceName}`.*"
+// val missRecordsSql = if (!TableRegisters.existRunTempTable(timeInfo.key, targetName)) {
+// s"SELECT ${selClause} FROM `${sourceName}`"
+// } else {
+// val onClause = expr.coalesceDesc
+// val sourceIsNull = analyzer.sourceSelectionExprs.map { sel =>
+// s"${sel.desc} IS NULL"
+// }.mkString(" AND ")
+// val targetIsNull = analyzer.targetSelectionExprs.map { sel =>
+// s"${sel.desc} IS NULL"
+// }.mkString(" AND ")
+// val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})"
+// s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}"
+// }
+// val missRecordsStep = SparkSqlStep(missRecordsTableName, missRecordsSql, emptyMap, true)
+// val missRecordsExports = processType match {
+// case BatchProcessType => {
+// val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
+// genRecordExport(recordParam, missRecordsTableName, missRecordsTableName) :: Nil
+// }
+// case StreamingProcessType => Nil
+// }
+//
+// // 2. miss count
+// val missCountTableName = "__missCount"
+// val missColName = details.getStringOrKey(AccuracyKeys._miss)
+// val missCountSql = processType match {
+// case BatchProcessType => s"SELECT COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}`"
+// case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}` GROUP BY `${InternalColumns.tmst}`"
+// }
+// val missCountStep = SparkSqlStep(missCountTableName, missCountSql, emptyMap)
+//
+// // 3. total count
+// val totalCountTableName = "__totalCount"
+// val totalColName = details.getStringOrKey(AccuracyKeys._total)
+// val totalCountSql = processType match {
+// case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceName}`"
+// case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${totalColName}` FROM `${sourceName}` GROUP BY `${InternalColumns.tmst}`"
+// }
+// val totalCountStep = SparkSqlStep(totalCountTableName, totalCountSql, emptyMap)
+//
+// // 4. accuracy metric
+// val accuracyTableName = name
+// val matchedColName = details.getStringOrKey(AccuracyKeys._matched)
+// val accuracyMetricSql = processType match {
+// case BatchProcessType => {
+// s"""
+// |SELECT `${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
+// |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS `${missColName}`,
+// |(`${totalColName}` - `${missColName}`) AS `${matchedColName}`
+// |FROM `${totalCountTableName}` FULL JOIN `${missCountTableName}`
+// """.stripMargin
+// }
+// case StreamingProcessType => {
+// s"""
+// |SELECT `${totalCountTableName}`.`${InternalColumns.tmst}` AS `${InternalColumns.tmst}`,
+// |`${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
+// |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS `${missColName}`,
+// |(`${totalColName}` - `${missColName}`) AS `${matchedColName}`
+// |FROM `${totalCountTableName}` FULL JOIN `${missCountTableName}`
+// |ON `${totalCountTableName}`.`${InternalColumns.tmst}` = `${missCountTableName}`.`${InternalColumns.tmst}`
+// """.stripMargin
+// }
+// }
+// val accuracyStep = SparkSqlStep(accuracyTableName, accuracyMetricSql, emptyMap, true)
+// val accuracyExports = processType match {
+// case BatchProcessType => {
+// val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
+// genMetricExport(metricParam, accuracyTableName, accuracyTableName) :: Nil
+// }
+// case StreamingProcessType => Nil
+// }
+//
+// // current accu plan
+// val accuSteps = missRecordsStep :: missCountStep :: totalCountStep :: accuracyStep :: Nil
+// val accuExports = missRecordsExports ++ accuracyExports
+// val accuPlan = RulePlan(accuSteps, accuExports)
+//
+// // streaming extra accu plan
+// val streamingAccuPlan = processType match {
+// case BatchProcessType => emptyRulePlan
+// case StreamingProcessType => {
+// // 5. global accuracy metric merge
+// val globalAccuracyTableName = "__globalAccuracy"
+// val globalAccuracySql = {
+// s"""
+// |SELECT coalesce(`${globalAccuracyTableName}`.`${InternalColumns.tmst}`, `${accuracyTableName}`.`${InternalColumns.tmst}`) AS `${InternalColumns.tmst}`,
+// |coalesce(`${accuracyTableName}`.`${missColName}`, `${globalAccuracyTableName}`.`${missColName}`) AS `${missColName}`,
+// |coalesce(`${globalAccuracyTableName}`.`${totalColName}`, `${accuracyTableName}`.`${totalColName}`) AS `${totalColName}`,
+// |((`${accuracyTableName}`.`${missColName}` IS NOT NULL) AND ((`${globalAccuracyTableName}`.`${missColName}` IS NULL) OR (`${accuracyTableName}`.`${missColName}` < `${globalAccuracyTableName}`.`${missColName}`))) AS `${InternalColumns.metric}`
+// |FROM `${globalAccuracyTableName}` FULL JOIN `${accuracyTableName}`
+// |ON `${globalAccuracyTableName}`.`${InternalColumns.tmst}` = `${accuracyTableName}`.`${InternalColumns.tmst}`
+// """.stripMargin
+// }
+// val globalAccuracyInitSql = {
+// s"""
+// |SELECT `${InternalColumns.tmst}`, `${totalColName}`, `${missColName}`,
+// |(true) AS `${InternalColumns.metric}`
+// |FROM `${accuracyTableName}`
+// """.stripMargin
+// }
+// val globalAccuracyDetails = Map[String, Any](GlobalKeys._initRule -> globalAccuracyInitSql)
+// val globalAccuracyStep = SparkSqlStep(globalAccuracyTableName,
+// globalAccuracySql, globalAccuracyDetails, true, true)
+//
+// // 6. collect accuracy metrics
+// val accuracyMetricTableName = name
+// val accuracyMetricSql = {
+// s"""
+// |SELECT `${InternalColumns.tmst}`, `${totalColName}`, `${missColName}`,
+// |(`${totalColName}` - `${missColName}`) AS `${matchedColName}`
+// |FROM `${globalAccuracyTableName}` WHERE `${InternalColumns.metric}`
+// """.stripMargin
+// }
+// val accuracyMetricStep = SparkSqlStep(accuracyMetricTableName, accuracyMetricSql, emptyMap)
+// val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
+// val accuracyMetricExports = genMetricExport(metricParam, accuracyMetricTableName, accuracyMetricTableName) :: Nil
+//
+// // 7. collect accuracy records
+// val accuracyRecordTableName = "__accuracyRecords"
+// val accuracyRecordSql = {
+// s"""
+// |SELECT `${InternalColumns.tmst}`
+// |FROM `${accuracyMetricTableName}` WHERE `${matchedColName}` > 0
+// """.stripMargin
+// }
+// val accuracyRecordStep = SparkSqlStep(accuracyRecordTableName, accuracyRecordSql, emptyMap)
+// val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
+// val accuracyRecordParam = recordParam.addIfNotExist(ExportParamKeys._dataSourceCache, sourceName)
+// .addIfNotExist(ExportParamKeys._originDF, missRecordsTableName)
+// val accuracyRecordExports = genRecordExport(
+// accuracyRecordParam, missRecordsTableName, accuracyRecordTableName) :: Nil
+//
+// // 8. update global accuracy metric
+// val updateGlobalAccuracyTableName = globalAccuracyTableName
+// val globalMetricKeepTime = details.getString(GlobalKeys._globalMetricKeep, "")
+// val updateGlobalAccuracySql = TimeUtil.milliseconds(globalMetricKeepTime) match {
+// case Some(kt) => {
+// s"""
+// |SELECT * FROM `${globalAccuracyTableName}`
+// |WHERE (`${missColName}` > 0) AND (`${InternalColumns.tmst}` > ${timeInfo.calcTime - kt})
+// """.stripMargin
+// }
+// case _ => {
+// s"""
+// |SELECT * FROM `${globalAccuracyTableName}`
+// |WHERE (`${missColName}` > 0)
+// """.stripMargin
+// }
+// }
+// val updateGlobalAccuracyStep = SparkSqlStep(updateGlobalAccuracyTableName,
+// updateGlobalAccuracySql, emptyMap, true, true)
+//
+// // gen accu plan
+// val extraSteps = globalAccuracyStep :: accuracyMetricStep :: accuracyRecordStep :: updateGlobalAccuracyStep :: Nil
+// val extraExports = accuracyMetricExports ++ accuracyRecordExports
+// val extraPlan = RulePlan(extraSteps, extraExports)
+//
+// extraPlan
+// }
+// }
+//
+// // return accu plan
+// accuPlan.merge(streamingAccuPlan)
+//
+// }
+// }
+
+ private def profilingRulePlan(timeInfo: TimeInfo, name: String, expr: Expr,
+ param: Map[String, Any], processType: ProcessType
+ ): RulePlan = {
+ val details = getDetails(param)
+ val profilingClause = expr.asInstanceOf[ProfilingClause]
+ val sourceName = profilingClause.fromClauseOpt match {
+ case Some(fc) => fc.dataSource
+ case _ => details.getString(ProfilingKeys._source, dataSourceNames.head)
+ }
+ val fromClause = profilingClause.fromClauseOpt.getOrElse(FromClause(sourceName)).desc
-// val selClause = (s"`${GroupByColumn.tmst}`" +: selExprDescs).mkString(", ")
- val selClause = if (analyzer.containsAllSelectionExpr) {
- selExprDescs.mkString(", ")
- } else {
- (s"`${GroupByColumn.tmst}`" +: selExprDescs).mkString(", ")
+ if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
+ emptyRulePlan
+ } else {
+ val analyzer = ProfilingAnalyzer(profilingClause, sourceName)
+ val selExprDescs = analyzer.selectionExprs.map { sel =>
+ val alias = sel match {
+ case s: AliasableExpr if (s.alias.nonEmpty) => s" AS `${s.alias.get}`"
+ case _ => ""
}
+ s"${sel.desc}${alias}"
+ }
+ val selCondition = profilingClause.selectClause.extraConditionOpt.map(_.desc).mkString
+ val selClause = processType match {
+ case BatchProcessType => selExprDescs.mkString(", ")
+ case StreamingProcessType => (s"`${InternalColumns.tmst}`" +: selExprDescs).mkString(", ")
+ }
+ val groupByClauseOpt = analyzer.groupbyExprOpt
+ val groupbyClause = processType match {
+ case BatchProcessType => groupByClauseOpt.map(_.desc).getOrElse("")
+ case StreamingProcessType => {
+ val tmstGroupbyClause = GroupbyClause(LiteralStringExpr(s"`${InternalColumns.tmst}`") :: Nil, None)
+ val mergedGroubbyClause = tmstGroupbyClause.merge(groupByClauseOpt match {
+ case Some(gbc) => gbc
+ case _ => GroupbyClause(Nil, None)
+ })
+ mergedGroubbyClause.desc
+ }
+ }
+ val preGroupbyClause = analyzer.preGroupbyExprs.map(_.desc).mkString(" ")
+ val postGroupbyClause = analyzer.postGroupbyExprs.map(_.desc).mkString(" ")
- val fromClause = profilingClause.fromClauseOpt.getOrElse(FromClause(sourceName)).desc
-
-// val tailClause = analyzer.tailsExprs.map(_.desc).mkString(" ")
- val tmstGroupbyClause = GroupbyClause(LiteralStringExpr(s"`${GroupByColumn.tmst}`") :: Nil, None)
- val mergedGroubbyClause = tmstGroupbyClause.merge(analyzer.groupbyExprOpt match {
- case Some(gbc) => gbc
- case _ => GroupbyClause(Nil, None)
- })
- val groupbyClause = mergedGroubbyClause.desc
- val preGroupbyClause = analyzer.preGroupbyExprs.map(_.desc).mkString(" ")
- val postGroupbyClause = analyzer.postGroupbyExprs.map(_.desc).mkString(" ")
-
- if (!checkDataSourceExists(sourceName)) {
- Nil
- } else {
- // 1. select statement
- val profilingSql = {
-// s"SELECT `${GroupByColumn.tmst}`, ${selClause} FROM ${sourceName} ${tailClause} GROUP BY `${GroupByColumn.tmst}`"
- s"SELECT ${selClause} ${fromClause} ${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}"
- }
- val profilingMetricName = resultName(details, ProfilingInfo._Profiling)
- val profilingStep = SparkSqlStep(
- profilingMetricName,
- profilingSql,
- details,
- resultPersistType(details, ProfilingInfo._Profiling, MetricPersistType),
- None
- )
+ // 1. select statement
+ val profilingSql = {
+ s"SELECT ${selCondition} ${selClause} ${fromClause} ${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}"
+ }
+ val profilingName = name
+ val profilingStep = SparkSqlStep(profilingName, profilingSql, details)
+ val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
+ val profilingExports = genMetricExport(metricParam, name, profilingName) :: Nil
- // 2. clear processed data
-// val clearDataSourceStep = DfOprStep(
-// s"${sourceName}_clear",
-// "clear",
-// Map[String, Any](
-// ("df.name" -> sourceName)
-// ),
-// NonePersistType,
-// Some(sourceName)
-// )
-//
-// profilingStep :: clearDataSourceStep :: Nil
+ RulePlan(profilingStep :: Nil, profilingExports)
+ }
+ }
- profilingStep:: Nil
- }
+ private def duplicateRulePlan(timeInfo: TimeInfo, name: String, expr: Expr,
+ param: Map[String, Any], processType: ProcessType
+ ): RulePlan = {
+ val details = getDetails(param)
+ val sourceName = details.getString(DuplicateKeys._source, dataSourceNames.head)
+ val targetName = details.getString(DuplicateKeys._target, dataSourceNames.tail.head)
+ val analyzer = DuplicateAnalyzer(expr.asInstanceOf[DuplicateClause], sourceName, targetName)
+
+ if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
+ println(s"[${timeInfo.calcTime}] data source ${sourceName} not exists")
+ emptyRulePlan
+ } else if (!TableRegisters.existRunTempTable(timeInfo.key, targetName)) {
+ println(s"[${timeInfo.calcTime}] data source ${targetName} not exists")
+ emptyRulePlan
+ } else {
+ val selItemsClause = analyzer.selectionPairs.map { pair =>
+ val (expr, alias) = pair
+ s"${expr.desc} AS `${alias}`"
+ }.mkString(", ")
+ val aliases = analyzer.selectionPairs.map(_._2)
+
+ val selClause = processType match {
+ case BatchProcessType => selItemsClause
+ case StreamingProcessType => s"`${InternalColumns.tmst}`, ${selItemsClause}"
+ }
+ val selAliases = processType match {
+ case BatchProcessType => aliases
+ case StreamingProcessType => InternalColumns.tmst +: aliases
+ }
+
+ // 1. source mapping
+ val sourceTableName = "__source"
+ val sourceSql = s"SELECT DISTINCT ${selClause} FROM ${sourceName}"
+ val sourceStep = SparkSqlStep(sourceTableName, sourceSql, emptyMap)
+
+ // 2. target mapping
+ val targetTableName = "__target"
+ val targetSql = s"SELECT ${selClause} FROM ${targetName}"
+ val targetStep = SparkSqlStep(targetTableName, targetSql, emptyMap)
+
+ // 3. joined
+ val joinedTableName = "__joined"
+ val joinedSelClause = selAliases.map { alias =>
+ s"`${sourceTableName}`.`${alias}` AS `${alias}`"
+ }.mkString(", ")
+ val onClause = aliases.map { alias =>
+ s"`${sourceTableName}`.`${alias}` = `${targetTableName}`.`${alias}`"
+ }.mkString(" AND ")
+ val joinedSql = {
+ s"SELECT ${joinedSelClause} FROM `${targetTableName}` RIGHT JOIN `${sourceTableName}` ON ${onClause}"
+ }
+ val joinedStep = SparkSqlStep(joinedTableName, joinedSql, emptyMap)
+ // 4. group
+ val groupTableName = "__group"
+ val groupSelClause = selAliases.map { alias =>
+ s"`${alias}`"
+ }.mkString(", ")
+ val dupColName = details.getStringOrKey(DuplicateKeys._dup)
+ val groupSql = {
+ s"SELECT ${groupSelClause}, (COUNT(*) - 1) AS `${dupColName}` FROM `${joinedTableName}` GROUP BY ${groupSelClause}"
}
- case TimelinessType => {
- Nil
+ val groupStep = SparkSqlStep(groupTableName, groupSql, emptyMap)
+
+ // 5. duplicate record
+ val dupRecordTableName = "__dupRecords"
+ val dupRecordSql = {
+ s"""
+ |SELECT * FROM `${groupTableName}` WHERE `${dupColName}` > 0
+ """.stripMargin
+ }
+ val dupRecordStep = SparkSqlStep(dupRecordTableName, dupRecordSql, emptyMap, true)
+ val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
+ val dupRecordxports = genRecordExport(recordParam, dupRecordTableName, dupRecordTableName) :: Nil
+
+ // 6. duplicate metric
+ val dupMetricTableName = name
+ val numColName = details.getStringOrKey(DuplicateKeys._num)
+ val dupMetricSelClause = processType match {
+ case BatchProcessType => s"`${dupColName}`, COUNT(*) AS `${numColName}`"
+ case StreamingProcessType => s"`${InternalColumns.tmst}`, `${dupColName}`, COUNT(*) AS `${numColName}`"
+ }
+ val dupMetricGroupbyClause = processType match {
+ case BatchProcessType => s"`${dupColName}`"
+ case StreamingProcessType => s"`${InternalColumns.tmst}`, `${dupColName}`"
+ }
+ val dupMetricSql = {
+ s"""
+ |SELECT ${dupMetricSelClause} FROM `${dupRecordTableName}`
+ |GROUP BY ${dupMetricGroupbyClause}
+ """.stripMargin
}
- case _ => Nil
+ val dupMetricStep = SparkSqlStep(dupMetricTableName, dupMetricSql, emptyMap)
+ val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
+ .addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc)
+ val dupMetricExports = genMetricExport(metricParam, name, dupMetricTableName) :: Nil
+
+ val dupSteps = sourceStep :: targetStep :: joinedStep :: groupStep :: dupRecordStep :: dupMetricStep :: Nil
+ val dupExports = dupRecordxports ++ dupMetricExports
+
+ RulePlan(dupSteps, dupExports)
}
}
- private def checkDataSourceExists(name: String): Boolean = {
- try {
- RuleAdaptorGroup.dataChecker.existDataSourceName(name)
- } catch {
- case e: Throwable => {
- error(s"check data source exists error: ${e.getMessage}")
- false
+ private def timelinessRulePlan(timeInfo: TimeInfo, name: String, expr: Expr,
+ param: Map[String, Any], processType: ProcessType
+ ): RulePlan = {
+ val details = getDetails(param)
+ val timelinessClause = expr.asInstanceOf[TimelinessClause]
+ val sourceName = details.getString(TimelinessKeys._source, dataSourceNames.head)
+
+ if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
+ emptyRulePlan
+ } else {
+ val analyzer = TimelinessAnalyzer(timelinessClause, sourceName)
+ val btsSel = analyzer.btsExpr
+ val etsSelOpt = analyzer.etsExprOpt
+
+ // 1. in time
+ val inTimeTableName = "__inTime"
+ val inTimeSql = etsSelOpt match {
+ case Some(etsSel) => {
+ s"""
+ |SELECT *, (${btsSel}) AS `${InternalColumns.beginTs}`,
+ |(${etsSel}) AS `${InternalColumns.endTs}`
+ |FROM ${sourceName} WHERE (${btsSel}) IS NOT NULL AND (${etsSel}) IS NOT NULL
+ """.stripMargin
+ }
+ case _ => {
+ s"""
+ |SELECT *, (${btsSel}) AS `${InternalColumns.beginTs}`
+ |FROM ${sourceName} WHERE (${btsSel}) IS NOT NULL
+ """.stripMargin
+ }
+ }
+ val inTimeStep = SparkSqlStep(inTimeTableName, inTimeSql, emptyMap)
+
+ // 2. latency
+ val latencyTableName = "__lat"
+ val latencyColName = details.getStringOrKey(TimelinessKeys._latency)
+ val etsColName = etsSelOpt match {
+ case Some(_) => InternalColumns.endTs
+ case _ => InternalColumns.tmst
+ }
+ val latencySql = {
+ s"SELECT *, (`${etsColName}` - `${InternalColumns.beginTs}`) AS `${latencyColName}` FROM `${inTimeTableName}`"
+ }
+ val latencyStep = SparkSqlStep(latencyTableName, latencySql, emptyMap, true)
+
+ // 3. timeliness metric
+ val metricTableName = name
+ val metricSql = processType match {
+ case BatchProcessType => {
+ s"""
+ |SELECT CAST(AVG(`${latencyColName}`) AS BIGINT) AS `avg`,
+ |MAX(`${latencyColName}`) AS `max`,
+ |MIN(`${latencyColName}`) AS `min`
+ |FROM `${latencyTableName}`
+ """.stripMargin
+ }
+ case StreamingProcessType => {
+ s"""
+ |SELECT `${InternalColumns.tmst}`,
+ |CAST(AVG(`${latencyColName}`) AS BIGINT) AS `avg`,
+ |MAX(`${latencyColName}`) AS `max`,
+ |MIN(`${latencyColName}`) AS `min`
+ |FROM `${latencyTableName}`
+ |GROUP BY `${InternalColumns.tmst}`
+ """.stripMargin
+ }
+ }
+ val metricStep = SparkSqlStep(metricTableName, metricSql, emptyMap)
+ val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
+ val metricExports = genMetricExport(metricParam, name, metricTableName) :: Nil
+
+ // current timeliness plan
+ val timeSteps = inTimeStep :: latencyStep :: metricStep :: Nil
+ val timeExports = metricExports
+ val timePlan = RulePlan(timeSteps, timeExports)
+
+ // 4. timeliness record
+ val recordPlan = TimeUtil.milliseconds(details.getString(TimelinessKeys._threshold, "")) match {
+ case Some(tsh) => {
+ val recordTableName = "__lateRecords"
+ val recordSql = {
+ s"SELECT * FROM `${latencyTableName}` WHERE `${latencyColName}` > ${tsh}"
+ }
+ val recordStep = SparkSqlStep(recordTableName, recordSql, emptyMap)
+ val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
+ val recordExports = genRecordExport(recordParam, recordTableName, recordTableName) :: Nil
+ RulePlan(recordStep :: Nil, recordExports)
+ }
+ case _ => emptyRulePlan
}
+
+ // return timeliness plan
+ timePlan.merge(recordPlan)
}
}
+ // override def genRuleInfos(param: Map[String, Any], timeInfo: TimeInfo): Seq[RuleInfo] = {
+// val ruleInfo = RuleInfoGen(param)
+// val dqType = RuleInfoGen.dqType(param)
+// try {
+// val result = parser.parseRule(ruleInfo.rule, dqType)
+// if (result.successful) {
+// val expr = result.get
+// dqType match {
+// case AccuracyType => accuracyRuleInfos(ruleInfo, expr, timeInfo)
+// case ProfilingType => profilingRuleInfos(ruleInfo, expr, timeInfo)
+// case TimelinessType => Nil
+// case _ => Nil
+// }
+// } else {
+// warn(s"parse rule [ ${ruleInfo.rule} ] fails: \n${result}")
+// Nil
+// }
+// } catch {
+// case e: Throwable => {
+// error(s"generate rule info ${ruleInfo} fails: ${e.getMessage}")
+// Nil
+// }
+// }
+// }
+
+ // group by version
+// private def accuracyRuleInfos(ruleInfo: RuleInfo, expr: Expr, timeInfo: TimeInfo): Seq[RuleInfo] = {
+// val calcTime = timeInfo.calcTime
+// val details = ruleInfo.details
+// val sourceName = details.getString(AccuracyKeys._source, dataSourceNames.head)
+// val targetName = details.getString(AccuracyKeys._target, dataSourceNames.tail.head)
+// val analyzer = AccuracyAnalyzer(expr.asInstanceOf[LogicalExpr], sourceName, targetName)
+//
+// if (!TempTables.existTable(timeInfo.key, sourceName)) {
+// Nil
+// } else {
+// // 1. miss record
+// val missRecordsSql = if (!TempTables.existTable(timeInfo.key, targetName)) {
+// val selClause = s"`${sourceName}`.*"
+// s"SELECT ${selClause} FROM `${sourceName}`"
+// } else {
+// val selClause = s"`${sourceName}`.*"
+// val onClause = expr.coalesceDesc
+// val sourceIsNull = analyzer.sourceSelectionExprs.map { sel =>
+// s"${sel.desc} IS NULL"
+// }.mkString(" AND ")
+// val targetIsNull = analyzer.targetSelectionExprs.map { sel =>
+// s"${sel.desc} IS NULL"
+// }.mkString(" AND ")
+// val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})"
+// s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}"
+// }
+// val missRecordsName = AccuracyKeys._missRecords
+// // val tmstMissRecordsName = TempName.tmstName(missRecordsName, timeInfo)
+// val missRecordsParams = details.getParamMap(AccuracyKeys._missRecords)
+// .addIfNotExist(RuleDetailKeys._persistType, RecordPersistType.desc)
+// .addIfNotExist(RuleDetailKeys._persistName, missRecordsName)
+// val missRecordsRuleInfo = RuleInfo(missRecordsName, None, SparkSqlType,
+// missRecordsSql, missRecordsParams, true)
+// // val missRecordsStep = SparkSqlStep(
+// // timeInfo,
+// // RuleInfo(missRecordsName, Some(tmstMissRecordsName), missRecordsSql, missRecordsParams)
+// // )
+//
+// // 2. miss count
+// val missTableName = "_miss_"
+// // val tmstMissTableName = TempName.tmstName(missTableName, timeInfo)
+// val missColName = details.getStringOrKey(AccuracyKeys._miss)
+// val missSql = {
+// s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${missColName}` FROM `${missRecordsName}` GROUP BY `${InternalColumns.tmst}`"
+// }
+// val missRuleInfo = RuleInfo(missTableName, None, SparkSqlType,
+// missSql, Map[String, Any](), true)
+// // val missStep = SparkSqlStep(
+// // timeInfo,
+// // RuleInfo(missTableName, None, missSql, Map[String, Any]())
+// // )
+//
+// // 3. total count
+// val totalTableName = "_total_"
+// // val tmstTotalTableName = TempName.tmstName(totalTableName, timeInfo)
+// val totalColName = details.getStringOrKey(AccuracyKeys._total)
+// val totalSql = {
+// s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${totalColName}` FROM `${sourceName}` GROUP BY `${InternalColumns.tmst}`"
+// }
+// val totalRuleInfo = RuleInfo(totalTableName, None, SparkSqlType,
+// totalSql, Map[String, Any](), true)
+// // val totalStep = SparkSqlStep(
+// // timeInfo,
+// // RuleInfo(totalTableName, None, totalSql, Map[String, Any]())
+// // )
+//
+// // 4. accuracy metric
+// val accuracyMetricName = details.getString(RuleDetailKeys._persistName, ruleInfo.name)
+// // val tmstAccuracyMetricName = TempName.tmstName(accuracyMetricName, timeInfo)
+// val matchedColName = details.getStringOrKey(AccuracyKeys._matched)
+// val accuracyMetricSql = {
+// s"""
+// |SELECT `${totalTableName}`.`${InternalColumns.tmst}` AS `${InternalColumns.tmst}`,
+// |`${missTableName}`.`${missColName}` AS `${missColName}`,
+// |`${totalTableName}`.`${totalColName}` AS `${totalColName}`,
+// |(`${totalColName}` - `${missColName}`) AS `${matchedColName}`
+// |FROM `${totalTableName}` FULL JOIN `${missTableName}`
+// |ON `${totalTableName}`.`${InternalColumns.tmst}` = `${missTableName}`.`${InternalColumns.tmst}`
+// """.stripMargin
+// }
+// // val accuracyParams = details.addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc)
+//// val accuracyMetricRuleInfo = RuleInfo(accuracyMetricName, None, SparkSqlType,
+//// accuracyMetricSql, Map[String, Any](), true)
+// val accuracyParams = details.addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc)
+// .addIfNotExist(RuleDetailKeys._persistName, accuracyMetricName)
+// val accuracyMetricRuleInfo = RuleInfo(accuracyMetricName, None, SparkSqlType,
+// accuracyMetricSql, Map[String, Any](), true)
+//
+// // 5. accuracy metric merge
+// val globalMetricName = "accu_global"
+// val globalAccuSql = if (TempTables.existGlobalTable(globalMetricName)) {
+// s"""
+// |SELECT coalesce(`${globalMetricName}`.`${InternalColumns.tmst}`, `${accuracyMetricName}`.`${InternalColumns.tmst}`) AS `${InternalColumns.tmst}`,
+// |coalesce(`${accuracyMetricName}`.`${missColName}`, `${globalMetricName}`.`${missColName}`) AS `${missColName}`,
+// |coalesce(`${globalMetricName}`.`${totalColName}`, `${accuracyMetricName}`.`${totalColName}`) AS `${totalColName}`,
+// |(`${totalColName}` - `${missColName}`) AS `${matchedColName}`,
+// |(`${totalColName}` = 0) AS `empty`,
+// |(`${missColName}` = 0) AS `no_miss`,
+// |(`${accuracyMetricName}`.`${missColName}` < `${globalMetricName}`.`${missColName}`) AS `update`
+// |FROM `${globalMetricName}` FULL JOIN `${accuracyMetricName}`
+// |ON `${globalMetricName}`.`${InternalColumns.tmst}` = `${accuracyMetricName}`.`${InternalColumns.tmst}`
+// """.stripMargin
+// } else {
+// s"""
+// |SELECT `${accuracyMetricName}`.`${InternalColumns.tmst}` AS `${InternalColumns.tmst}`,
+// |`${accuracyMetricName}`.`${missColName}` AS `${missColName}`,
+// |`${accuracyMetricName}`.`${totalColName}` AS `${totalColName}`,
+// |(`${totalColName}` - `${missColName}`) AS `${matchedColName}`,
+// |(`${totalColName}` = 0) AS `empty`,
+// |(`${missColName}` = 0) AS `no_miss`,
+// |true AS `update`
+// |FROM `${accuracyMetricName}`
+// """.stripMargin
+// }
+// val globalAccuParams = Map[String, Any](
+// ("global" -> true)
+// )
+// val mergeRuleInfo = RuleInfo(globalMetricName, None, SparkSqlType,
+// globalAccuSql, globalAccuParams, true)
+//
+// // 6. persist metrics
+// val persistMetricName = "persist"
+// val persistSql = {
+// s"""
+// |SELECT `${InternalColumns.tmst}`, `${missColName}`, `${totalColName}`, `${matchedColName}`
+// |FROM `${globalMetricName}`
+// |WHERE `update`
+// """.stripMargin
+// }
+// val persistParams = details.addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc)
+// .addIfNotExist(RuleDetailKeys._persistName, accuracyMetricName)
+// val persistRuleInfo = RuleInfo(persistMetricName, None, SparkSqlType,
+// persistSql, persistParams, true)
+//
+// // 5. accuracy metric filter
+//// val accuracyParams = details.addIfNotExist("df.name", accuracyMetricName)
+//// .addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc)
+//// .addIfNotExist(RuleDetailKeys._persistName, accuracyMetricName)
+//// val accuracyRuleInfo = RuleInfo(accuracyMetricName, None, DfOprType,
+//// "accuracy", accuracyParams, true)
+//
+//// missRecordsRuleInfo :: missRuleInfo :: totalRuleInfo ::
+//// accuracyMetricRuleInfo :: accuracyRuleInfo :: Nil
+// missRecordsRuleInfo :: missRuleInfo :: totalRuleInfo ::
+// accuracyMetricRuleInfo :: mergeRuleInfo :: persistRuleInfo :: Nil
+// }
+// }
+
+// private def accuracyRuleInfos(ruleInfo: RuleInfo, expr: Expr, timeInfo: TimeInfo): Seq[RuleInfo] = {
+// val calcTime = timeInfo.calcTime
+// val details = ruleInfo.details
+// val sourceName = details.getString(AccuracyKeys._source, dataSourceNames.head)
+// val targetName = details.getString(AccuracyKeys._target, dataSourceNames.tail.head)
+// val analyzer = AccuracyAnalyzer(expr.asInstanceOf[LogicalExpr], sourceName, targetName)
+//
+// if (!TempTables.existTable(timeInfo.key, sourceName)) {
+// Nil
+// } else {
+// // 1. miss record
+// val missRecordsSql = if (!TempTables.existTable(timeInfo.key, targetName)) {
+// val selClause = s"`${sourceName}`.*"
+// s"SELECT ${selClause} FROM `${sourceName}`"
+// } else {
+// val selClause = s"`${sourceName}`.*"
+// val onClause = expr.coalesceDesc
+// val sourceIsNull = analyzer.sourceSelectionExprs.map { sel =>
+// s"${sel.desc} IS NULL"
+// }.mkString(" AND ")
+// val targetIsNull = analyzer.targetSelectionExprs.map { sel =>
+// s"${sel.desc} IS NULL"
+// }.mkString(" AND ")
+// val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})"
+// s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}"
+// }
+// val missRecordsName = AccuracyKeys._missRecords
+//// val tmstMissRecordsName = TempName.tmstName(missRecordsName, timeInfo)
+// val missRecordsParams = details.getParamMap(AccuracyKeys._missRecords)
+// .addIfNotExist(RuleDetailKeys._persistType, RecordPersistType.desc)
+// .addIfNotExist(RuleDetailKeys._persistName, missRecordsName)
+// val missRecordsRuleInfo = RuleInfo(missRecordsName, None, SparkSqlType,
+// missRecordsSql, missRecordsParams, true)
+//// val missRecordsStep = SparkSqlStep(
+//// timeInfo,
+//// RuleInfo(missRecordsName, Some(tmstMissRecordsName), missRecordsSql, missRecordsParams)
+//// )
+//
+// // 2. miss count
+// val missTableName = "_miss_"
+// // val tmstMissTableName = TempName.tmstName(missTableName, timeInfo)
+// val missColName = details.getStringOrKey(AccuracyKeys._miss)
+// val missSql = {
+// s"SELECT COUNT(*) AS `${missColName}` FROM `${missRecordsName}`"
+// }
+// val missRuleInfo = RuleInfo(missTableName, None, SparkSqlType,
+// missSql, Map[String, Any](), false)
+//// val missStep = SparkSqlStep(
+//// timeInfo,
+//// RuleInfo(missTableName, None, missSql, Map[String, Any]())
+//// )
+//
+// // 3. total count
+// val totalTableName = "_total_"
+// // val tmstTotalTableName = TempName.tmstName(totalTableName, timeInfo)
+// val totalColName = details.getStringOrKey(AccuracyKeys._total)
+// val totalSql = {
+// s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceName}`"
+// }
+// val totalRuleInfo = RuleInfo(totalTableName, None, SparkSqlType,
+// totalSql, Map[String, Any](), false)
+//// val totalStep = SparkSqlStep(
+//// timeInfo,
+//// RuleInfo(totalTableName, None, totalSql, Map[String, Any]())
+//// )
+//
+// // 4. accuracy metric
+// val accuracyMetricName = details.getString(RuleDetailKeys._persistName, ruleInfo.name)
+//// val tmstAccuracyMetricName = TempName.tmstName(accuracyMetricName, timeInfo)
+// val matchedColName = details.getStringOrKey(AccuracyKeys._matched)
+// val accuracyMetricSql = {
+// s"""
+// |SELECT `${missTableName}`.`${missColName}` AS `${missColName}`,
+// |`${totalTableName}`.`${totalColName}` AS `${totalColName}`
+// |FROM `${totalTableName}` FULL JOIN `${missTableName}`
+// """.stripMargin
+// }
+// // val accuracyParams = details.addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc)
+// val accuracyMetricRuleInfo = RuleInfo(accuracyMetricName, None, SparkSqlType,
+// accuracyMetricSql, Map[String, Any](), false)
+//// val accuracyMetricStep = SparkSqlStep(
+//// timeInfo,
+//// RuleInfo(accuracyMetricName, Some(tmstAccuracyMetricName), accuracyMetricSql, Map[String, Any]())
+//// )
+//
+// // 5. accuracy metric filter
+// val accuracyParams = details.addIfNotExist("df.name", accuracyMetricName)
+// .addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc)
+// .addIfNotExist(RuleDetailKeys._persistName, accuracyMetricName)
+// val accuracyRuleInfo = RuleInfo(accuracyMetricName, None, DfOprType,
+// "accuracy", accuracyParams, false)
+//// val accuracyStep = DfOprStep(
+//// timeInfo,
+//// RuleInfo(accuracyMetricName, Some(tmstAccuracyMetricName), "accuracy", accuracyParams)
+//// )
+//
+// missRecordsRuleInfo :: missRuleInfo :: totalRuleInfo ::
+// accuracyMetricRuleInfo :: accuracyRuleInfo :: Nil
+// }
+// }
+
+// private def profilingRuleInfos(ruleInfo: RuleInfo, expr: Expr, timeInfo: TimeInfo): Seq[RuleInfo] = {
+// val details = ruleInfo.details
+// val profilingClause = expr.asInstanceOf[ProfilingClause]
+// val sourceName = profilingClause.fromClauseOpt match {
+// case Some(fc) => fc.dataSource
+// case _ => details.getString(ProfilingKeys._source, dataSourceNames.head)
+// }
+// val fromClause = profilingClause.fromClauseOpt.getOrElse(FromClause(sourceName)).desc
+//
+// if (!TempTables.existTable(timeInfo.key, sourceName)) {
+// Nil
+// } else {
+// val tmstAnalyzer = ProfilingAnalyzer(profilingClause, sourceName)
+//
+// val selExprDescs = tmstAnalyzer.selectionExprs.map { sel =>
+// val alias = sel match {
+// case s: AliasableExpr if (s.alias.nonEmpty) => s" AS `${s.alias.get}`"
+// case _ => ""
+// }
+// s"${sel.desc}${alias}"
+// }
+// val selCondition = profilingClause.selectClause.extraConditionOpt.map(_.desc).mkString
+// val selClause = selExprDescs.mkString(", ")
+//// val tmstFromClause = profilingClause.fromClauseOpt.getOrElse(FromClause(sourceName)).desc
+// val groupByClauseOpt = tmstAnalyzer.groupbyExprOpt
+// val groupbyClause = groupByClauseOpt.map(_.desc).getOrElse("")
+// val preGroupbyClause = tmstAnalyzer.preGroupbyExprs.map(_.desc).mkString(" ")
+// val postGroupbyClause = tmstAnalyzer.postGroupbyExprs.map(_.desc).mkString(" ")
+//
+// // 1. select statement
+// val profilingSql = {
+// s"SELECT ${selCondition} ${selClause} ${fromClause} ${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}"
+// }
+// // println(profilingSql)
+// val metricName = details.getString(RuleDetailKeys._persistName, ruleInfo.name)
+// // val tmstMetricName = TempName.tmstName(metricName, timeInfo)
+// val profilingParams = details.addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc)
+// .addIfNotExist(RuleDetailKeys._persistName, metricName)
+// val profilingRuleInfo = ruleInfo.setDslType(SparkSqlType)
+// .setRule(profilingSql).setDetails(profilingParams)
+//// val profilingStep = SparkSqlStep(
+//// timeInfo,
+//// ruleInfo.setRule(profilingSql).setDetails(profilingParams)
+//// )
+//
+// // filterStep :: profilingStep :: Nil
+// profilingRuleInfo :: Nil
+// }
+// }
+
+// def genRuleStep(timeInfo: TimeInfo, param: Map[String, Any]): Seq[RuleStep] = {
+// val ruleInfo = RuleInfoGen(param, timeInfo)
+// val dqType = RuleInfoGen.dqType(param)
+// GriffinDslStep(timeInfo, ruleInfo, dqType) :: Nil
+// }
+//
+// def adaptConcreteRuleStep(ruleStep: RuleStep
+// ): Seq[ConcreteRuleStep] = {
+// ruleStep match {
+// case rs @ GriffinDslStep(_, ri, dqType) => {
+// try {
+// val result = parser.parseRule(ri.rule, dqType)
+// if (result.successful) {
+// val expr = result.get
+// transConcreteRuleStep(rs, expr)
+// } else {
+// println(result)
+// warn(s"adapt concrete rule step warn: parse rule [ ${ri.rule} ] fails")
+// Nil
+// }
+// } catch {
+// case e: Throwable => {
+// error(s"adapt concrete rule step error: ${e.getMessage}")
+// Nil
+// }
+// }
+// }
+// case _ => Nil
+// }
+// }
+//
+// private def transConcreteRuleStep(ruleStep: GriffinDslStep, expr: Expr
+// ): Seq[ConcreteRuleStep] = {
+// ruleStep.dqType match {
+// case AccuracyType => transAccuracyRuleStep(ruleStep, expr)
+// case ProfilingType => transProfilingRuleStep(ruleStep, expr)
+// case TimelinessType => Nil
+// case _ => Nil
+// }
+// }
+
+// private def transAccuracyRuleStep(ruleStep: GriffinDslStep, expr: Expr
+// ): Seq[ConcreteRuleStep] = {
+// val timeInfo = ruleStep.timeInfo
+// val ruleInfo = ruleStep.ruleInfo
+// val calcTime = timeInfo.calcTime
+// val tmst = timeInfo.tmst
+//
+// val details = ruleInfo.details
+// val sourceName = details.getString(AccuracyKeys._source, dataSourceNames.head)
+// val targetName = details.getString(AccuracyKeys._target, dataSourceNames.tail.head)
+// val analyzer = AccuracyAnalyzer(expr.asInstanceOf[LogicalExpr], sourceName, targetName)
+//
+// if (!TempTables.existTable(key(calcTime), sourceName)) {
+// Nil
+// } else {
+// // 1. miss record
+// val missRecordsSql = if (!TempTables.existTable(key(calcTime), targetName)) {
+// val selClause = s"`${sourceName}`.*"
+// s"SELECT ${selClause} FROM `${sourceName}`"
+// } else {
+// val selClause = s"`${sourceName}`.*"
+// val onClause = expr.coalesceDesc
+// val sourceIsNull = analyzer.sourceSelectionExprs.map { sel =>
+// s"${sel.desc} IS NULL"
+// }.mkString(" AND ")
+// val targetIsNull = analyzer.targetSelectionExprs.map { sel =>
+// s"${sel.desc} IS NULL"
+// }.mkString(" AND ")
+// val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})"
+// s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}"
+// }
+// val missRecordsName = AccuracyKeys._missRecords
+// val tmstMissRecordsName = TempName.tmstName(missRecordsName, timeInfo)
+// val missRecordsParams = details.getParamMap(AccuracyKeys._missRecords)
+// .addIfNotExist(RuleDetailKeys._persistType, RecordPersistType.desc)
+// .addIfNotExist(RuleDetailKeys._persistName, missRecordsName)
+// val missRecordsStep = SparkSqlStep(
+// timeInfo,
+// RuleInfo(missRecordsName, Some(tmstMissRecordsName), missRecordsSql, missRecordsParams)
+// )
+//
+// // 2. miss count
+// val missTableName = "_miss_"
+//// val tmstMissTableName = TempName.tmstName(missTableName, timeInfo)
+// val missColName = details.getStringOrKey(AccuracyKeys._miss)
+// val missSql = {
+// s"SELECT COUNT(*) AS `${missColName}` FROM `${missRecordsName}`"
+// }
+// val missStep = SparkSqlStep(
+// timeInfo,
+// RuleInfo(missTableName, None, missSql, Map[String, Any]())
+// )
+//
+// // 3. total count
+// val totalTableName = "_total_"
+//// val tmstTotalTableName = TempName.tmstName(totalTableName, timeInfo)
+// val totalColName = details.getStringOrKey(AccuracyKeys._total)
+// val totalSql = {
+// s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceName}`"
+// }
+// val totalStep = SparkSqlStep(
+// timeInfo,
+// RuleInfo(totalTableName, None, totalSql, Map[String, Any]())
+// )
+//
+// // 4. accuracy metric
+// val accuracyMetricName = details.getString(RuleDetailKeys._persistName, ruleStep.name)
+// val tmstAccuracyMetricName = TempName.tmstName(accuracyMetricName, timeInfo)
+// val matchedColName = details.getStringOrKey(AccuracyKeys._matched)
+// val accuracyMetricSql = {
+// s"""
+// |SELECT `${missTableName}`.`${missColName}` AS `${missColName}`,
+// |`${totalTableName}`.`${totalColName}` AS `${totalColName}`
+// |FROM `${totalTableName}` FULL JOIN `${missTableName}`
+// """.stripMargin
+// }
+//// val accuracyParams = details.addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc)
+// val accuracyMetricStep = SparkSqlStep(
+// timeInfo,
+// RuleInfo(accuracyMetricName, Some(tmstAccuracyMetricName), accuracyMetricSql, Map[String, Any]())
+// )
+//
+// // 5. accuracy metric filter
+// val accuracyParams = details.addIfNotExist("df.name", accuracyMetricName)
+// .addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc)
+// .addIfNotExist(RuleDetailKeys._persistName, accuracyMetricName)
+// val accuracyStep = DfOprStep(
+// timeInfo,
+// RuleInfo(accuracyMetricName, Some(tmstAccuracyMetricName), "accuracy", accuracyParams)
+// )
+//
+// missRecordsStep :: missStep :: totalStep :: accuracyMetricStep :: accuracyStep :: Nil
+// }
+// }
+
+// private def transProfilingRuleStep(ruleStep: GriffinDslStep, expr: Expr
+// ): Seq[ConcreteRuleStep] = {
+// val calcTime = ruleStep.timeInfo.calcTime
+// val details = ruleStep.ruleInfo.details
+// val profilingClause = expr.asInstanceOf[ProfilingClause]
+// val sourceName = profilingClause.fromClauseOpt match {
+// case Some(fc) => fc.dataSource
+// case _ => details.getString(ProfilingKeys._source, dataSourceNames.head)
+// }
+// val fromClause = profilingClause.fromClauseOpt.getOrElse(FromClause(sourceName)).desc
+//
+// if (!TempTables.existTable(key(calcTime), sourceName)) {
+// Nil
+// } else {
+// val timeInfo = ruleStep.timeInfo
+// val ruleInfo = ruleStep.ruleInfo
+// val tmst = timeInfo.tmst
+//
+//// val tmstSourceName = TempName.tmstName(sourceName, timeInfo)
+//
+//// val tmstProfilingClause = profilingClause.map(dsHeadReplace(sourceName, tmstSourceName))
+// val tmstAnalyzer = ProfilingAnalyzer(profilingClause, sourceName)
+//
+// val selExprDescs = tmstAnalyzer.selectionExprs.map { sel =>
+// val alias = sel match {
+// case s: AliasableExpr if (s.alias.nonEmpty) => s" AS `${s.alias.get}`"
+// case _ => ""
+// }
+// s"${sel.desc}${alias}"
+// }
+// val selCondition = profilingClause.selectClause.extraConditionOpt.map(_.desc).mkString
+// val selClause = selExprDescs.mkString(", ")
+//// val tmstFromClause = profilingClause.fromClauseOpt.getOrElse(FromClause(sourceName)).desc
+// val groupByClauseOpt = tmstAnalyzer.groupbyExprOpt
+// val groupbyClause = groupByClauseOpt.map(_.desc).getOrElse("")
+// val preGroupbyClause = tmstAnalyzer.preGroupbyExprs.map(_.desc).mkString(" ")
+// val postGroupbyClause = tmstAnalyzer.postGroupbyExprs.map(_.desc).mkString(" ")
+//
+// // 1. select statement
+// val profilingSql = {
+// s"SELECT ${selCondition} ${selClause} ${fromClause} ${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}"
+// }
+//// println(profilingSql)
+// val metricName = details.getString(RuleDetailKeys._persistName, ruleStep.name)
+//// val tmstMetricName = TempName.tmstName(metricName, timeInfo)
+// val profilingParams = details.addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc)
+// .addIfNotExist(RuleDetailKeys._persistName, metricName)
+// val profilingStep = SparkSqlStep(
+// timeInfo,
+// ruleInfo.setRule(profilingSql).setDetails(profilingParams)
+// )
+//
+//// filterStep :: profilingStep :: Nil
+// profilingStep :: Nil
+// }
+//
+// }
+
+// private def dsHeadReplace(originName: String, replaceName: String): (Expr) => Expr = { expr: Expr =>
+// expr match {
+// case DataSourceHeadExpr(sn) if (sn == originName) => {
+// DataSourceHeadExpr(replaceName)
+// }
+// case FromClause(sn) if (sn == originName) => {
+// FromClause(replaceName)
+// }
+// case _ => expr.map(dsHeadReplace(originName, replaceName))
+// }
+// }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/InternalColumns.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/InternalColumns.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/InternalColumns.scala
new file mode 100644
index 0000000..bd344b1
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/InternalColumns.scala
@@ -0,0 +1,31 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.adaptor
+
+object InternalColumns {
+ val tmst = "__tmst"
+ val metric = "__metric"
+ val record = "__record"
+ val empty = "__empty"
+
+ val beginTs = "__begin_ts"
+ val endTs = "__end_ts"
+
+ val columns = List[String](tmst, metric, record, empty, beginTs, endTs)
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala
index 744f52a..ebc8fdb 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala
@@ -20,44 +20,159 @@ package org.apache.griffin.measure.rule.adaptor
import java.util.concurrent.atomic.AtomicLong
+import org.apache.griffin.measure.cache.tmst.TempName
import scala.collection.mutable.{Set => MutableSet}
import org.apache.griffin.measure.config.params.user._
import org.apache.griffin.measure.log.Loggable
-import org.apache.griffin.measure.rule.step.{ConcreteRuleStep, RuleStep}
-import org.apache.griffin.measure.rule.dsl.{DslType, PersistType}
+import org.apache.griffin.measure.process.ProcessType
+import org.apache.griffin.measure.rule.dsl._
+import org.apache.griffin.measure.rule.plan.{TimeInfo, _}
-trait RuleAdaptor extends Loggable with Serializable {
-
- val adaptPhase: AdaptPhase
+//object RuleInfoKeys {
+// val _name = "name"
+// val _rule = "rule"
+// val _details = "details"
+// val _dslType = "dsl.type"
+// val _dqType = "dq.type"
+// val _global = "global"
+//// val _gatherStep = "gather.step"
+//
+// val _metric = "metric"
+// val _record = "record"
+//}
+//import RuleInfoKeys._
+import org.apache.griffin.measure.utils.ParamUtil._
+object RuleParamKeys {
val _name = "name"
val _rule = "rule"
- val _persistType = "persist.type"
- val _updateDataSource = "update.data.source"
+ val _dslType = "dsl.type"
+ val _dqType = "dq.type"
+ val _cache = "cache"
+ val _global = "global"
val _details = "details"
- protected def getName(param: Map[String, Any]) = param.getOrElse(_name, RuleStepNameGenerator.genName).toString
- protected def getRule(param: Map[String, Any]) = param.getOrElse(_rule, "").toString
- protected def getPersistType(param: Map[String, Any]) = PersistType(param.getOrElse(_persistType, "").toString)
- protected def getUpdateDataSource(param: Map[String, Any]) = param.get(_updateDataSource).map(_.toString)
- protected def getDetails(param: Map[String, Any]) = param.get(_details) match {
- case Some(dt: Map[String, Any]) => dt
- case _ => Map[String, Any]()
+ val _metric = "metric"
+ val _record = "record"
+
+ def getName(param: Map[String, Any], defName: String): String = param.getString(_name, defName)
+ def getRule(param: Map[String, Any]): String = param.getString(_rule, "")
+ def getDqType(param: Map[String, Any]): DqType = DqType(param.getString(_dqType, ""))
+ def getCache(param: Map[String, Any]): Boolean = param.getBoolean(_cache, false)
+ def getGlobal(param: Map[String, Any]): Boolean = param.getBoolean(_global, false)
+ def getDetails(param: Map[String, Any]): Map[String, Any] = param.getParamMap(_details)
+
+ def getMetricOpt(param: Map[String, Any]): Option[Map[String, Any]] = param.getParamMapOpt(_metric)
+ def getRecordOpt(param: Map[String, Any]): Option[Map[String, Any]] = param.getParamMapOpt(_record)
+}
+
+object ExportParamKeys {
+ val _name = "name"
+ val _collectType = "collect.type"
+ val _dataSourceCache = "data.source.cache"
+ val _originDF = "origin.DF"
+
+ def getName(param: Map[String, Any], defName: String): String = param.getString(_name, defName)
+ def getCollectType(param: Map[String, Any]): CollectType = CollectType(param.getString(_collectType, ""))
+ def getDataSourceCacheOpt(param: Map[String, Any]): Option[String] = param.get(_dataSourceCache).map(_.toString)
+ def getOriginDFOpt(param: Map[String, Any]): Option[String] = param.get(_originDF).map(_.toString)
+}
+
+trait RuleAdaptor extends Loggable with Serializable {
+
+// val adaptPhase: AdaptPhase
+
+// protected def genRuleInfo(param: Map[String, Any]): RuleInfo = RuleInfoGen(param)
+
+// protected def getName(param: Map[String, Any]) = param.getOrElse(_name, RuleStepNameGenerator.genName).toString
+// protected def getRule(param: Map[String, Any]) = param.getOrElse(_rule, "").toString
+// protected def getDetails(param: Map[String, Any]) = param.get(_details) match {
+// case Some(dt: Map[String, Any]) => dt
+// case _ => Map[String, Any]()
+// }
+
+
+
+// def getPersistNames(steps: Seq[RuleStep]): Seq[String] = steps.map(_.ruleInfo.persistName)
+//
+// protected def genRuleStep(timeInfo: TimeInfo, param: Map[String, Any]): Seq[RuleStep]
+// protected def adaptConcreteRuleStep(ruleStep: RuleStep): Seq[ConcreteRuleStep]
+// def genConcreteRuleStep(timeInfo: TimeInfo, param: Map[String, Any]
+// ): Seq[ConcreteRuleStep] = {
+// genRuleStep(timeInfo, param).flatMap { rs =>
+// adaptConcreteRuleStep(rs)
+// }
+// }
+
+
+
+// def genRuleInfos(param: Map[String, Any], timeInfo: TimeInfo): Seq[RuleInfo] = {
+// RuleInfoGen(param) :: Nil
+// }
+
+ protected def getRuleName(param: Map[String, Any]): String = {
+ RuleParamKeys.getName(param, RuleStepNameGenerator.genName)
}
- def getTempSourceNames(param: Map[String, Any]): Seq[String]
+ def genRulePlan(timeInfo: TimeInfo, param: Map[String, Any], procType: ProcessType): RulePlan
- def genRuleStep(param: Map[String, Any]): Seq[RuleStep]
- def genConcreteRuleStep(param: Map[String, Any]): Seq[ConcreteRuleStep] = {
- genRuleStep(param).flatMap { rs =>
- adaptConcreteRuleStep(rs)
- }
+ protected def genRuleExports(param: Map[String, Any], defName: String, stepName: String): Seq[RuleExport] = {
+ val metricOpt = RuleParamKeys.getMetricOpt(param)
+ val metricExportSeq = metricOpt.map(genMetricExport(_, defName, stepName)).toSeq
+ val recordOpt = RuleParamKeys.getRecordOpt(param)
+ val recordExportSeq = recordOpt.map(genRecordExport(_, defName, stepName)).toSeq
+ metricExportSeq ++ recordExportSeq
}
- protected def adaptConcreteRuleStep(ruleStep: RuleStep): Seq[ConcreteRuleStep]
+ protected def genMetricExport(param: Map[String, Any], name: String, stepName: String
+ ): MetricExport = {
+ MetricExport(
+ ExportParamKeys.getName(param, name),
+ stepName,
+ ExportParamKeys.getCollectType(param)
+ )
+ }
+ protected def genRecordExport(param: Map[String, Any], name: String, stepName: String
+ ): RecordExport = {
+ RecordExport(
+ ExportParamKeys.getName(param, name),
+ stepName,
+ ExportParamKeys.getDataSourceCacheOpt(param),
+ ExportParamKeys.getOriginDFOpt(param)
+ )
+ }
+
+
}
+
+
+//object RuleInfoGen {
+// def apply(param: Map[String, Any]): RuleInfo = {
+// val name = param.get(_name) match {
+// case Some(n: String) => n
+// case _ => RuleStepNameGenerator.genName
+// }
+// RuleInfo(
+// name,
+// None,
+// DslType(param.getString(_dslType, "")),
+// param.getString(_rule, ""),
+// param.getParamMap(_details),
+// param.getBoolean(_gatherStep, false)
+// )
+// }
+// def apply(ri: RuleInfo, timeInfo: TimeInfo): RuleInfo = {
+// if (ri.persistType.needPersist) {
+// val tmstName = TempName.tmstName(ri.name, timeInfo)
+// ri.setTmstNameOpt(Some(tmstName))
+// } else ri
+// }
+//
+// def dqType(param: Map[String, Any]): DqType = DqType(param.getString(_dqType, ""))
+//}
+
object RuleStepNameGenerator {
private val counter: AtomicLong = new AtomicLong(0L)
private val head: String = "rs"