You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2018/01/08 09:44:41 UTC

[4/6] incubator-griffin git commit: Measure module enhancement

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
index 1e3ecb1..6ba0cf8 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
@@ -18,339 +18,1198 @@ under the License.
 */
 package org.apache.griffin.measure.rule.adaptor
 
-import org.apache.griffin.measure.data.connector.GroupByColumn
+import org.apache.griffin.measure.cache.tmst.{TempName, TmstCache}
+import org.apache.griffin.measure.process.engine.DataFrameOprs.AccuracyOprKeys
+import org.apache.griffin.measure.process.temp.TableRegisters
+import org.apache.griffin.measure.process._
 import org.apache.griffin.measure.rule.dsl._
 import org.apache.griffin.measure.rule.dsl.analyzer._
 import org.apache.griffin.measure.rule.dsl.expr._
 import org.apache.griffin.measure.rule.dsl.parser.GriffinDslParser
-import org.apache.griffin.measure.rule.step._
+import org.apache.griffin.measure.rule.plan.{TimeInfo, _}
 import org.apache.griffin.measure.utils.ParamUtil._
+import org.apache.griffin.measure.utils.TimeUtil
 
-case class GriffinDslAdaptor(dataSourceNames: Seq[String],
-                             functionNames: Seq[String],
-                             adaptPhase: AdaptPhase
-                            ) extends RuleAdaptor {
+object AccuracyKeys {
+  val _source = "source"
+  val _target = "target"
+  val _miss = "miss"
+  val _total = "total"
+  val _matched = "matched"
+//  val _missRecords = "missRecords"
+}
 
-  object StepInfo {
-    val _Name = "name"
-    val _PersistType = "persist.type"
-    val _UpdateDataSource = "update.data.source"
-    def getNameOpt(param: Map[String, Any]): Option[String] = param.get(_Name).map(_.toString)
-    def getPersistType(param: Map[String, Any]): PersistType = PersistType(param.getString(_PersistType, ""))
-    def getUpdateDataSourceOpt(param: Map[String, Any]): Option[String] = param.get(_UpdateDataSource).map(_.toString)
-  }
-  object AccuracyInfo {
-    val _Source = "source"
-    val _Target = "target"
-    val _MissRecords = "miss.records"
-    val _Accuracy = "accuracy"
-    val _Miss = "miss"
-    val _Total = "total"
-    val _Matched = "matched"
-  }
-  object ProfilingInfo {
-    val _Source = "source"
-    val _Profiling = "profiling"
-  }
+object ProfilingKeys {
+  val _source = "source"
+}
 
-  def getNameOpt(param: Map[String, Any], key: String): Option[String] = param.get(key).map(_.toString)
-  def resultName(param: Map[String, Any], key: String): String = {
-    val nameOpt = param.get(key) match {
-      case Some(prm: Map[String, Any]) => StepInfo.getNameOpt(prm)
-      case _ => None
-    }
-    nameOpt.getOrElse(key)
-  }
-  def resultPersistType(param: Map[String, Any], key: String, defPersistType: PersistType): PersistType = {
-    param.get(key) match {
-      case Some(prm: Map[String, Any]) => StepInfo.getPersistType(prm)
-      case _ => defPersistType
-    }
-  }
-  def resultUpdateDataSourceOpt(param: Map[String, Any], key: String): Option[String] = {
-    param.get(key) match {
-      case Some(prm: Map[String, Any]) => StepInfo.getUpdateDataSourceOpt(prm)
-      case _ => None
-    }
-  }
+object DuplicateKeys {
+  val _source = "source"
+  val _target = "target"
+  val _dup = "dup"
+  val _num = "num"
+}
+
+object TimelinessKeys {
+  val _source = "source"
+  val _latency = "latency"
+  val _threshold = "threshold"
+}
+
+object GlobalKeys {
+  val _initRule = "init.rule"
+//  val _globalMetricKeep = "global.metric.keep"
+}
 
-  val _dqType = "dq.type"
+case class GriffinDslAdaptor(dataSourceNames: Seq[String],
+                             functionNames: Seq[String]
+                            ) extends RuleAdaptor {
 
-  protected def getDqType(param: Map[String, Any]) = DqType(param.getString(_dqType, ""))
+  import RuleParamKeys._
 
   val filteredFunctionNames = functionNames.filter { fn =>
     fn.matches("""^[a-zA-Z_]\w*$""")
   }
   val parser = GriffinDslParser(dataSourceNames, filteredFunctionNames)
 
-  def genRuleStep(param: Map[String, Any]): Seq[RuleStep] = {
-    GriffinDslStep(getName(param), getRule(param), getDqType(param), getDetails(param)) :: Nil
-  }
+  private val emptyRulePlan = RulePlan(Nil, Nil)
+  private val emptyMap = Map[String, Any]()
 
-  def getTempSourceNames(param: Map[String, Any]): Seq[String] = {
+  override def genRulePlan(timeInfo: TimeInfo, param: Map[String, Any], processType: ProcessType
+                          ): RulePlan = {
+    val name = getRuleName(param)
+    val rule = getRule(param)
     val dqType = getDqType(param)
-    param.get(_name) match {
-      case Some(name) => {
+    try {
+      val result = parser.parseRule(rule, dqType)
+      if (result.successful) {
+        val expr = result.get
         dqType match {
-          case AccuracyType => {
-            Seq[String](
-              resultName(param, AccuracyInfo._MissRecords),
-              resultName(param, AccuracyInfo._Accuracy)
-            )
-          }
-          case ProfilingType => {
-            Seq[String](
-              resultName(param, ProfilingInfo._Profiling)
-            )
-          }
-          case TimelinessType => {
-            Nil
-          }
-          case _ => Nil
+          case AccuracyType => accuracyRulePlan(timeInfo, name, expr, param, processType)
+          case ProfilingType => profilingRulePlan(timeInfo, name, expr, param, processType)
+          case DuplicateType => duplicateRulePlan(timeInfo, name, expr, param, processType)
+          case TimelinessType => timelinessRulePlan(timeInfo, name, expr, param, processType)
+          case _ => emptyRulePlan
         }
+      } else {
+        warn(s"parse rule [ ${rule} ] fails: \n${result}")
+        emptyRulePlan
+      }
+    } catch {
+      case e: Throwable => {
+        error(s"generate rule plan ${name} fails: ${e.getMessage}")
+        emptyRulePlan
       }
-      case _ => Nil
     }
   }
 
-  def adaptConcreteRuleStep(ruleStep: RuleStep): Seq[ConcreteRuleStep] = {
-    ruleStep match {
-      case rs @ GriffinDslStep(_, rule, dqType, _) => {
-        val exprOpt = try {
-          val result = parser.parseRule(rule, dqType)
-          if (result.successful) Some(result.get)
-          else {
-            println(result)
-            warn(s"adapt concrete rule step warn: parse rule [ ${rule} ] fails")
-            None
-          }
-        } catch {
-          case e: Throwable => {
-            error(s"adapt concrete rule step error: ${e.getMessage}")
-            None
-          }
-        }
+  // with accuracy opr
+  private def accuracyRulePlan(timeInfo: TimeInfo, name: String, expr: Expr,
+                               param: Map[String, Any], processType: ProcessType
+                              ): RulePlan = {
+    val details = getDetails(param)
+    val sourceName = details.getString(AccuracyKeys._source, dataSourceNames.head)
+    val targetName = details.getString(AccuracyKeys._target, dataSourceNames.tail.head)
+    val analyzer = AccuracyAnalyzer(expr.asInstanceOf[LogicalExpr], sourceName, targetName)
 
-        exprOpt match {
-          case Some(expr) => {
-            try {
-              transConcreteRuleSteps(rs, expr)
-            } catch {
-              case e: Throwable => {
-                error(s"trans concrete rule step error: ${e.getMessage}")
-                Nil
-              }
-            }
-          }
-          case _ => Nil
+    if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
+      println(s"[${timeInfo.calcTime}] data source ${sourceName} not exists")
+      emptyRulePlan
+    } else {
+      // 1. miss record
+      val missRecordsTableName = "__missRecords"
+      val selClause = s"`${sourceName}`.*"
+      val missRecordsSql = if (!TableRegisters.existRunTempTable(timeInfo.key, targetName)) {
+        println(s"[${timeInfo.calcTime}] data source ${targetName} not exists")
+        s"SELECT ${selClause} FROM `${sourceName}`"
+      } else {
+        val onClause = expr.coalesceDesc
+        val sourceIsNull = analyzer.sourceSelectionExprs.map { sel =>
+          s"${sel.desc} IS NULL"
+        }.mkString(" AND ")
+        val targetIsNull = analyzer.targetSelectionExprs.map { sel =>
+          s"${sel.desc} IS NULL"
+        }.mkString(" AND ")
+        val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})"
+        s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}"
+      }
+      val missRecordsStep = SparkSqlStep(missRecordsTableName, missRecordsSql, emptyMap, true)
+      val missRecordsExports = processType match {
+        case BatchProcessType => {
+          val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
+          genRecordExport(recordParam, missRecordsTableName, missRecordsTableName) :: Nil
         }
+        case StreamingProcessType => Nil
       }
-      case _ => Nil
-    }
-  }
 
-  private def transConcreteRuleSteps(ruleStep: GriffinDslStep, expr: Expr
-                                    ): Seq[ConcreteRuleStep] = {
-    val details = ruleStep.details
-    ruleStep.dqType match {
-      case AccuracyType => {
-        val sourceName = getNameOpt(details, AccuracyInfo._Source) match {
-          case Some(name) => name
-          case _ => dataSourceNames.head
+      // 2. miss count
+      val missCountTableName = "__missCount"
+      val missColName = details.getStringOrKey(AccuracyKeys._miss)
+      val missCountSql = processType match {
+        case BatchProcessType => s"SELECT COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}`"
+        case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}` GROUP BY `${InternalColumns.tmst}`"
+      }
+      val missCountStep = SparkSqlStep(missCountTableName, missCountSql, emptyMap)
+
+      // 3. total count
+      val totalCountTableName = "__totalCount"
+      val totalColName = details.getStringOrKey(AccuracyKeys._total)
+      val totalCountSql = processType match {
+        case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceName}`"
+        case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${totalColName}` FROM `${sourceName}` GROUP BY `${InternalColumns.tmst}`"
+      }
+      val totalCountStep = SparkSqlStep(totalCountTableName, totalCountSql, emptyMap)
+
+      // 4. accuracy metric
+      val accuracyTableName = name
+      val matchedColName = details.getStringOrKey(AccuracyKeys._matched)
+      val accuracyMetricSql = processType match {
+        case BatchProcessType => {
+          s"""
+             |SELECT `${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
+             |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS `${missColName}`,
+             |(`${totalColName}` - `${missColName}`) AS `${matchedColName}`
+             |FROM `${totalCountTableName}` FULL JOIN `${missCountTableName}`
+         """.stripMargin
         }
-        val targetName = getNameOpt(details, AccuracyInfo._Target) match {
-          case Some(name) => name
-          case _ => dataSourceNames.tail.head
+        case StreamingProcessType => {
+          s"""
+             |SELECT `${totalCountTableName}`.`${InternalColumns.tmst}` AS `${InternalColumns.tmst}`,
+             |`${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
+             |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS `${missColName}`,
+             |(`${totalColName}` - `${missColName}`) AS `${matchedColName}`
+             |FROM `${totalCountTableName}` FULL JOIN `${missCountTableName}`
+             |ON `${totalCountTableName}`.`${InternalColumns.tmst}` = `${missCountTableName}`.`${InternalColumns.tmst}`
+         """.stripMargin
         }
-        val analyzer = AccuracyAnalyzer(expr.asInstanceOf[LogicalExpr], sourceName, targetName)
-
-
-        if (!checkDataSourceExists(sourceName)) {
-          Nil
-        } else {
-          // 1. miss record
-          val missRecordsSql = if (!checkDataSourceExists(targetName)) {
-            val selClause = s"`${sourceName}`.*"
-            s"SELECT ${selClause} FROM `${sourceName}`"
-          } else {
-            val selClause = s"`${sourceName}`.*"
-            val onClause = expr.coalesceDesc
-            val sourceIsNull = analyzer.sourceSelectionExprs.map { sel =>
-              s"${sel.desc} IS NULL"
-            }.mkString(" AND ")
-            val targetIsNull = analyzer.targetSelectionExprs.map { sel =>
-              s"${sel.desc} IS NULL"
-            }.mkString(" AND ")
-            val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})"
-            s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}"
-          }
-          val missRecordsName = resultName(details, AccuracyInfo._MissRecords)
-          val missRecordsStep = SparkSqlStep(
-            missRecordsName,
-            missRecordsSql,
-            Map[String, Any](),
-            resultPersistType(details, AccuracyInfo._MissRecords, RecordPersistType),
-            resultUpdateDataSourceOpt(details, AccuracyInfo._MissRecords)
-          )
+      }
+      val accuracyStep = SparkSqlStep(accuracyTableName, accuracyMetricSql, emptyMap)
+      val accuracyExports = processType match {
+        case BatchProcessType => {
+          val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
+          genMetricExport(metricParam, accuracyTableName, accuracyTableName) :: Nil
+        }
+        case StreamingProcessType => Nil
+      }
 
-          // 2. miss count
-          val missTableName = "_miss_"
-          val missColName = getNameOpt(details, AccuracyInfo._Miss).getOrElse(AccuracyInfo._Miss)
-          val missSql = {
-            s"SELECT `${GroupByColumn.tmst}` AS `${GroupByColumn.tmst}`, COUNT(*) AS `${missColName}` FROM `${missRecordsName}` GROUP BY `${GroupByColumn.tmst}`"
-          }
-          val missStep = SparkSqlStep(
-            missTableName,
-            missSql,
-            Map[String, Any](),
-            NonePersistType,
-            None
-          )
+      // current accu plan
+      val accuSteps = missRecordsStep :: missCountStep :: totalCountStep :: accuracyStep :: Nil
+      val accuExports = missRecordsExports ++ accuracyExports
+      val accuPlan = RulePlan(accuSteps, accuExports)
 
-          // 3. total count
-          val totalTableName = "_total_"
-          val totalColName = getNameOpt(details, AccuracyInfo._Total).getOrElse(AccuracyInfo._Total)
-          val totalSql = {
-            s"SELECT `${GroupByColumn.tmst}` AS `${GroupByColumn.tmst}`, COUNT(*) AS `${totalColName}` FROM `${sourceName}` GROUP BY `${GroupByColumn.tmst}`"
-          }
-          val totalStep = SparkSqlStep(
-            totalTableName,
-            totalSql,
-            Map[String, Any](),
-            NonePersistType,
-            None
+      // streaming extra accu plan
+      val streamingAccuPlan = processType match {
+        case BatchProcessType => emptyRulePlan
+        case StreamingProcessType => {
+          // 5. accuracy metric merge
+          val accuracyMetricTableName = "__accuracy"
+          val accuracyMetricRule = "accuracy"
+          val accuracyMetricDetails = Map[String, Any](
+            (AccuracyOprKeys._dfName -> accuracyTableName),
+            (AccuracyOprKeys._miss -> missColName),
+            (AccuracyOprKeys._total -> totalColName),
+            (AccuracyOprKeys._matched -> matchedColName)
           )
+          val accuracyMetricStep = DfOprStep(accuracyMetricTableName,
+            accuracyMetricRule, accuracyMetricDetails)
+          val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
+          val accuracyMetricExports = genMetricExport(metricParam, name, accuracyMetricTableName) :: Nil
 
-          // 4. accuracy metric
-          val matchedColName = getNameOpt(details, AccuracyInfo._Matched).getOrElse(AccuracyInfo._Matched)
-          val accuracyMetricSql = {
+          // 6. collect accuracy records
+          val accuracyRecordTableName = "__accuracyRecords"
+          val accuracyRecordSql = {
             s"""
-               |SELECT `${totalTableName}`.`${GroupByColumn.tmst}` AS `${GroupByColumn.tmst}`,
-               |`${missTableName}`.`${missColName}` AS `${missColName}`,
-               |`${totalTableName}`.`${totalColName}` AS `${totalColName}`
-               |FROM `${totalTableName}` FULL JOIN `${missTableName}`
-               |ON `${totalTableName}`.`${GroupByColumn.tmst}` = `${missTableName}`.`${GroupByColumn.tmst}`
-            """.stripMargin
+               |SELECT `${InternalColumns.tmst}`, `${InternalColumns.empty}`
+               |FROM `${accuracyMetricTableName}` WHERE `${InternalColumns.record}`
+             """.stripMargin
           }
-          val accuracyMetricName = resultName(details, AccuracyInfo._Accuracy)
-          val accuracyMetricStep = SparkSqlStep(
-            accuracyMetricName,
-            accuracyMetricSql,
-            details,
-            //          resultPersistType(details, AccuracyInfo._Accuracy, MetricPersistType)
-            NonePersistType,
-            None
-          )
+          val accuracyRecordStep = SparkSqlStep(accuracyRecordTableName, accuracyRecordSql, emptyMap)
+          val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
+          val accuracyRecordParam = recordParam.addIfNotExist(ExportParamKeys._dataSourceCache, sourceName)
+            .addIfNotExist(ExportParamKeys._originDF, missRecordsTableName)
+          val accuracyRecordExports = genRecordExport(
+            accuracyRecordParam, missRecordsTableName, accuracyRecordTableName) :: Nil
 
-          // 5. accuracy metric filter
-          val accuracyStep = DfOprStep(
-            accuracyMetricName,
-            "accuracy",
-            Map[String, Any](
-              ("df.name" -> accuracyMetricName),
-              ("miss" -> missColName),
-              ("total" -> totalColName),
-              ("matched" -> matchedColName)
-            ),
-            resultPersistType(details, AccuracyInfo._Accuracy, MetricPersistType),
-            None
-          )
+          // gen accu plan
+          val extraSteps = accuracyMetricStep :: accuracyRecordStep :: Nil
+          val extraExports = accuracyMetricExports ++ accuracyRecordExports
+          val extraPlan = RulePlan(extraSteps, extraExports)
 
-          missRecordsStep :: missStep :: totalStep :: accuracyMetricStep :: accuracyStep :: Nil
+          extraPlan
         }
       }
-      case ProfilingType => {
-        val profilingClause = expr.asInstanceOf[ProfilingClause]
-        val sourceName = profilingClause.fromClauseOpt match {
-          case Some(fc) => fc.dataSource
-          case _ => {
-            getNameOpt(details, ProfilingInfo._Source) match {
-              case Some(name) => name
-              case _ => dataSourceNames.head
-            }
-          }
-        }
-        val analyzer = ProfilingAnalyzer(profilingClause, sourceName)
 
-//        analyzer.selectionExprs.foreach(println)
+      // return accu plan
+      accuPlan.merge(streamingAccuPlan)
 
-        val selExprDescs = analyzer.selectionExprs.map { sel =>
-          val alias = sel match {
-            case s: AliasableExpr if (s.alias.nonEmpty) => s" AS `${s.alias.get}`"
-            case _ => ""
-          }
-          s"${sel.desc}${alias}"
-        }
+    }
+  }
+
+//  private def accuracyRulePlan(timeInfo: TimeInfo, name: String, expr: Expr,
+//                               param: Map[String, Any], processType: ProcessType
+//                              ): RulePlan = {
+//    val details = getDetails(param)
+//    val sourceName = details.getString(AccuracyKeys._source, dataSourceNames.head)
+//    val targetName = details.getString(AccuracyKeys._target, dataSourceNames.tail.head)
+//    val analyzer = AccuracyAnalyzer(expr.asInstanceOf[LogicalExpr], sourceName, targetName)
+//
+//    if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
+//      emptyRulePlan
+//    } else {
+//      // 1. miss record
+//      val missRecordsTableName = "__missRecords"
+//      val selClause = s"`${sourceName}`.*"
+//      val missRecordsSql = if (!TableRegisters.existRunTempTable(timeInfo.key, targetName)) {
+//        s"SELECT ${selClause} FROM `${sourceName}`"
+//      } else {
+//        val onClause = expr.coalesceDesc
+//        val sourceIsNull = analyzer.sourceSelectionExprs.map { sel =>
+//          s"${sel.desc} IS NULL"
+//        }.mkString(" AND ")
+//        val targetIsNull = analyzer.targetSelectionExprs.map { sel =>
+//          s"${sel.desc} IS NULL"
+//        }.mkString(" AND ")
+//        val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})"
+//        s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}"
+//      }
+//      val missRecordsStep = SparkSqlStep(missRecordsTableName, missRecordsSql, emptyMap, true)
+//      val missRecordsExports = processType match {
+//        case BatchProcessType => {
+//          val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
+//          genRecordExport(recordParam, missRecordsTableName, missRecordsTableName) :: Nil
+//        }
+//        case StreamingProcessType => Nil
+//      }
+//
+//      // 2. miss count
+//      val missCountTableName = "__missCount"
+//      val missColName = details.getStringOrKey(AccuracyKeys._miss)
+//      val missCountSql = processType match {
+//        case BatchProcessType => s"SELECT COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}`"
+//        case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}` GROUP BY `${InternalColumns.tmst}`"
+//      }
+//      val missCountStep = SparkSqlStep(missCountTableName, missCountSql, emptyMap)
+//
+//      // 3. total count
+//      val totalCountTableName = "__totalCount"
+//      val totalColName = details.getStringOrKey(AccuracyKeys._total)
+//      val totalCountSql = processType match {
+//        case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceName}`"
+//        case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${totalColName}` FROM `${sourceName}` GROUP BY `${InternalColumns.tmst}`"
+//      }
+//      val totalCountStep = SparkSqlStep(totalCountTableName, totalCountSql, emptyMap)
+//
+//      // 4. accuracy metric
+//      val accuracyTableName = name
+//      val matchedColName = details.getStringOrKey(AccuracyKeys._matched)
+//      val accuracyMetricSql = processType match {
+//        case BatchProcessType => {
+//          s"""
+//             |SELECT `${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
+//             |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS `${missColName}`,
+//             |(`${totalColName}` - `${missColName}`) AS `${matchedColName}`
+//             |FROM `${totalCountTableName}` FULL JOIN `${missCountTableName}`
+//         """.stripMargin
+//        }
+//        case StreamingProcessType => {
+//          s"""
+//             |SELECT `${totalCountTableName}`.`${InternalColumns.tmst}` AS `${InternalColumns.tmst}`,
+//             |`${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
+//             |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS `${missColName}`,
+//             |(`${totalColName}` - `${missColName}`) AS `${matchedColName}`
+//             |FROM `${totalCountTableName}` FULL JOIN `${missCountTableName}`
+//             |ON `${totalCountTableName}`.`${InternalColumns.tmst}` = `${missCountTableName}`.`${InternalColumns.tmst}`
+//         """.stripMargin
+//        }
+//      }
+//      val accuracyStep = SparkSqlStep(accuracyTableName, accuracyMetricSql, emptyMap, true)
+//      val accuracyExports = processType match {
+//        case BatchProcessType => {
+//          val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
+//          genMetricExport(metricParam, accuracyTableName, accuracyTableName) :: Nil
+//        }
+//        case StreamingProcessType => Nil
+//      }
+//
+//      // current accu plan
+//      val accuSteps = missRecordsStep :: missCountStep :: totalCountStep :: accuracyStep :: Nil
+//      val accuExports = missRecordsExports ++ accuracyExports
+//      val accuPlan = RulePlan(accuSteps, accuExports)
+//
+//      // streaming extra accu plan
+//      val streamingAccuPlan = processType match {
+//        case BatchProcessType => emptyRulePlan
+//        case StreamingProcessType => {
+//          // 5. global accuracy metric merge
+//          val globalAccuracyTableName = "__globalAccuracy"
+//          val globalAccuracySql = {
+//            s"""
+//               |SELECT coalesce(`${globalAccuracyTableName}`.`${InternalColumns.tmst}`, `${accuracyTableName}`.`${InternalColumns.tmst}`) AS `${InternalColumns.tmst}`,
+//               |coalesce(`${accuracyTableName}`.`${missColName}`, `${globalAccuracyTableName}`.`${missColName}`) AS `${missColName}`,
+//               |coalesce(`${globalAccuracyTableName}`.`${totalColName}`, `${accuracyTableName}`.`${totalColName}`) AS `${totalColName}`,
+//               |((`${accuracyTableName}`.`${missColName}` IS NOT NULL) AND ((`${globalAccuracyTableName}`.`${missColName}` IS NULL) OR (`${accuracyTableName}`.`${missColName}` < `${globalAccuracyTableName}`.`${missColName}`))) AS `${InternalColumns.metric}`
+//               |FROM `${globalAccuracyTableName}` FULL JOIN `${accuracyTableName}`
+//               |ON `${globalAccuracyTableName}`.`${InternalColumns.tmst}` = `${accuracyTableName}`.`${InternalColumns.tmst}`
+//            """.stripMargin
+//          }
+//          val globalAccuracyInitSql = {
+//            s"""
+//               |SELECT `${InternalColumns.tmst}`, `${totalColName}`, `${missColName}`,
+//               |(true) AS `${InternalColumns.metric}`
+//               |FROM `${accuracyTableName}`
+//             """.stripMargin
+//          }
+//          val globalAccuracyDetails = Map[String, Any](GlobalKeys._initRule -> globalAccuracyInitSql)
+//          val globalAccuracyStep = SparkSqlStep(globalAccuracyTableName,
+//            globalAccuracySql, globalAccuracyDetails, true, true)
+//
+//          // 6. collect accuracy metrics
+//          val accuracyMetricTableName = name
+//          val accuracyMetricSql = {
+//            s"""
+//               |SELECT `${InternalColumns.tmst}`, `${totalColName}`, `${missColName}`,
+//               |(`${totalColName}` - `${missColName}`) AS `${matchedColName}`
+//               |FROM `${globalAccuracyTableName}` WHERE `${InternalColumns.metric}`
+//             """.stripMargin
+//          }
+//          val accuracyMetricStep = SparkSqlStep(accuracyMetricTableName, accuracyMetricSql, emptyMap)
+//          val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
+//          val accuracyMetricExports = genMetricExport(metricParam, accuracyMetricTableName, accuracyMetricTableName) :: Nil
+//
+//          // 7. collect accuracy records
+//          val accuracyRecordTableName = "__accuracyRecords"
+//          val accuracyRecordSql = {
+//            s"""
+//               |SELECT `${InternalColumns.tmst}`
+//               |FROM `${accuracyMetricTableName}` WHERE `${matchedColName}` > 0
+//             """.stripMargin
+//          }
+//          val accuracyRecordStep = SparkSqlStep(accuracyRecordTableName, accuracyRecordSql, emptyMap)
+//          val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
+//          val accuracyRecordParam = recordParam.addIfNotExist(ExportParamKeys._dataSourceCache, sourceName)
+//            .addIfNotExist(ExportParamKeys._originDF, missRecordsTableName)
+//          val accuracyRecordExports = genRecordExport(
+//            accuracyRecordParam, missRecordsTableName, accuracyRecordTableName) :: Nil
+//
+//          // 8. update global accuracy metric
+//          val updateGlobalAccuracyTableName = globalAccuracyTableName
+//          val globalMetricKeepTime = details.getString(GlobalKeys._globalMetricKeep, "")
+//          val updateGlobalAccuracySql = TimeUtil.milliseconds(globalMetricKeepTime) match {
+//            case Some(kt) => {
+//              s"""
+//                 |SELECT * FROM `${globalAccuracyTableName}`
+//                 |WHERE (`${missColName}` > 0) AND (`${InternalColumns.tmst}` > ${timeInfo.calcTime - kt})
+//               """.stripMargin
+//            }
+//            case _ => {
+//              s"""
+//                 |SELECT * FROM `${globalAccuracyTableName}`
+//                 |WHERE (`${missColName}` > 0)
+//               """.stripMargin
+//            }
+//          }
+//          val updateGlobalAccuracyStep = SparkSqlStep(updateGlobalAccuracyTableName,
+//            updateGlobalAccuracySql, emptyMap, true, true)
+//
+//          // gen accu plan
+//          val extraSteps = globalAccuracyStep :: accuracyMetricStep :: accuracyRecordStep :: updateGlobalAccuracyStep :: Nil
+//          val extraExports = accuracyMetricExports ++ accuracyRecordExports
+//          val extraPlan = RulePlan(extraSteps, extraExports)
+//
+//          extraPlan
+//        }
+//      }
+//
+//      // return accu plan
+//      accuPlan.merge(streamingAccuPlan)
+//
+//    }
+//  }
+
+  private def profilingRulePlan(timeInfo: TimeInfo, name: String, expr: Expr,
+                                param: Map[String, Any], processType: ProcessType
+                               ): RulePlan = {
+    val details = getDetails(param)
+    val profilingClause = expr.asInstanceOf[ProfilingClause]
+    val sourceName = profilingClause.fromClauseOpt match {
+      case Some(fc) => fc.dataSource
+      case _ => details.getString(ProfilingKeys._source, dataSourceNames.head)
+    }
+    val fromClause = profilingClause.fromClauseOpt.getOrElse(FromClause(sourceName)).desc
 
-//        val selClause = (s"`${GroupByColumn.tmst}`" +: selExprDescs).mkString(", ")
-        val selClause = if (analyzer.containsAllSelectionExpr) {
-          selExprDescs.mkString(", ")
-        } else {
-          (s"`${GroupByColumn.tmst}`" +: selExprDescs).mkString(", ")
+    if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
+      emptyRulePlan
+    } else {
+      val analyzer = ProfilingAnalyzer(profilingClause, sourceName)
+      val selExprDescs = analyzer.selectionExprs.map { sel =>
+        val alias = sel match {
+          case s: AliasableExpr if (s.alias.nonEmpty) => s" AS `${s.alias.get}`"
+          case _ => ""
         }
+        s"${sel.desc}${alias}"
+      }
+      val selCondition = profilingClause.selectClause.extraConditionOpt.map(_.desc).mkString
+      val selClause = processType match {
+        case BatchProcessType => selExprDescs.mkString(", ")
+        case StreamingProcessType => (s"`${InternalColumns.tmst}`" +: selExprDescs).mkString(", ")
+      }
+      val groupByClauseOpt = analyzer.groupbyExprOpt
+      val groupbyClause = processType match {
+        case BatchProcessType => groupByClauseOpt.map(_.desc).getOrElse("")
+        case StreamingProcessType => {
+          val tmstGroupbyClause = GroupbyClause(LiteralStringExpr(s"`${InternalColumns.tmst}`") :: Nil, None)
+          val mergedGroubbyClause = tmstGroupbyClause.merge(groupByClauseOpt match {
+            case Some(gbc) => gbc
+            case _ => GroupbyClause(Nil, None)
+          })
+          mergedGroubbyClause.desc
+        }
+      }
+      val preGroupbyClause = analyzer.preGroupbyExprs.map(_.desc).mkString(" ")
+      val postGroupbyClause = analyzer.postGroupbyExprs.map(_.desc).mkString(" ")
 
-        val fromClause = profilingClause.fromClauseOpt.getOrElse(FromClause(sourceName)).desc
-
-//        val tailClause = analyzer.tailsExprs.map(_.desc).mkString(" ")
-        val tmstGroupbyClause = GroupbyClause(LiteralStringExpr(s"`${GroupByColumn.tmst}`") :: Nil, None)
-        val mergedGroubbyClause = tmstGroupbyClause.merge(analyzer.groupbyExprOpt match {
-          case Some(gbc) => gbc
-          case _ => GroupbyClause(Nil, None)
-        })
-        val groupbyClause = mergedGroubbyClause.desc
-        val preGroupbyClause = analyzer.preGroupbyExprs.map(_.desc).mkString(" ")
-        val postGroupbyClause = analyzer.postGroupbyExprs.map(_.desc).mkString(" ")
-
-        if (!checkDataSourceExists(sourceName)) {
-          Nil
-        } else {
-          // 1. select statement
-          val profilingSql = {
-//            s"SELECT `${GroupByColumn.tmst}`, ${selClause} FROM ${sourceName} ${tailClause} GROUP BY `${GroupByColumn.tmst}`"
-            s"SELECT ${selClause} ${fromClause} ${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}"
-          }
-          val profilingMetricName = resultName(details, ProfilingInfo._Profiling)
-          val profilingStep = SparkSqlStep(
-            profilingMetricName,
-            profilingSql,
-            details,
-            resultPersistType(details, ProfilingInfo._Profiling, MetricPersistType),
-            None
-          )
+      // 1. select statement
+      val profilingSql = {
+        s"SELECT ${selCondition} ${selClause} ${fromClause} ${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}"
+      }
+      val profilingName = name
+      val profilingStep = SparkSqlStep(profilingName, profilingSql, details)
+      val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
+      val profilingExports = genMetricExport(metricParam, name, profilingName) :: Nil
 
-          // 2. clear processed data
-//          val clearDataSourceStep = DfOprStep(
-//            s"${sourceName}_clear",
-//            "clear",
-//            Map[String, Any](
-//              ("df.name" -> sourceName)
-//            ),
-//            NonePersistType,
-//            Some(sourceName)
-//          )
-//
-//          profilingStep :: clearDataSourceStep :: Nil
+      RulePlan(profilingStep :: Nil, profilingExports)
+    }
+  }
 
-          profilingStep:: Nil
-        }
+  private def duplicateRulePlan(timeInfo: TimeInfo, name: String, expr: Expr,
+                                param: Map[String, Any], processType: ProcessType
+                               ): RulePlan = {
+    val details = getDetails(param)
+    val sourceName = details.getString(DuplicateKeys._source, dataSourceNames.head)
+    val targetName = details.getString(DuplicateKeys._target, dataSourceNames.tail.head)
+    val analyzer = DuplicateAnalyzer(expr.asInstanceOf[DuplicateClause], sourceName, targetName)
+
+    if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
+      println(s"[${timeInfo.calcTime}] data source ${sourceName} not exists")
+      emptyRulePlan
+    } else if (!TableRegisters.existRunTempTable(timeInfo.key, targetName)) {
+      println(s"[${timeInfo.calcTime}] data source ${targetName} not exists")
+      emptyRulePlan
+    } else {
+      val selItemsClause = analyzer.selectionPairs.map { pair =>
+        val (expr, alias) = pair
+        s"${expr.desc} AS `${alias}`"
+      }.mkString(", ")
+      val aliases = analyzer.selectionPairs.map(_._2)
+
+      val selClause = processType match {
+        case BatchProcessType => selItemsClause
+        case StreamingProcessType => s"`${InternalColumns.tmst}`, ${selItemsClause}"
+      }
+      val selAliases = processType match {
+        case BatchProcessType => aliases
+        case StreamingProcessType => InternalColumns.tmst +: aliases
+      }
+
+      // 1. source mapping
+      val sourceTableName = "__source"
+      val sourceSql = s"SELECT DISTINCT ${selClause} FROM ${sourceName}"
+      val sourceStep = SparkSqlStep(sourceTableName, sourceSql, emptyMap)
+
+      // 2. target mapping
+      val targetTableName = "__target"
+      val targetSql = s"SELECT ${selClause} FROM ${targetName}"
+      val targetStep = SparkSqlStep(targetTableName, targetSql, emptyMap)
+
+      // 3. joined
+      val joinedTableName = "__joined"
+      val joinedSelClause = selAliases.map { alias =>
+        s"`${sourceTableName}`.`${alias}` AS `${alias}`"
+      }.mkString(", ")
+      val onClause = aliases.map { alias =>
+        s"`${sourceTableName}`.`${alias}` = `${targetTableName}`.`${alias}`"
+      }.mkString(" AND ")
+      val joinedSql = {
+        s"SELECT ${joinedSelClause} FROM `${targetTableName}` RIGHT JOIN `${sourceTableName}` ON ${onClause}"
+      }
+      val joinedStep = SparkSqlStep(joinedTableName, joinedSql, emptyMap)
 
+      // 4. group
+      val groupTableName = "__group"
+      val groupSelClause = selAliases.map { alias =>
+        s"`${alias}`"
+      }.mkString(", ")
+      val dupColName = details.getStringOrKey(DuplicateKeys._dup)
+      val groupSql = {
+        s"SELECT ${groupSelClause}, (COUNT(*) - 1) AS `${dupColName}` FROM `${joinedTableName}` GROUP BY ${groupSelClause}"
       }
-      case TimelinessType => {
-        Nil
+      val groupStep = SparkSqlStep(groupTableName, groupSql, emptyMap)
+
+      // 5. duplicate record
+      val dupRecordTableName = "__dupRecords"
+      val dupRecordSql = {
+        s"""
+           |SELECT * FROM `${groupTableName}` WHERE `${dupColName}` > 0
+         """.stripMargin
+      }
+      val dupRecordStep = SparkSqlStep(dupRecordTableName, dupRecordSql, emptyMap, true)
+      val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
+      val dupRecordxports = genRecordExport(recordParam, dupRecordTableName, dupRecordTableName) :: Nil
+
+      // 6. duplicate metric
+      val dupMetricTableName = name
+      val numColName = details.getStringOrKey(DuplicateKeys._num)
+      val dupMetricSelClause = processType match {
+        case BatchProcessType => s"`${dupColName}`, COUNT(*) AS `${numColName}`"
+        case StreamingProcessType => s"`${InternalColumns.tmst}`, `${dupColName}`, COUNT(*) AS `${numColName}`"
+      }
+      val dupMetricGroupbyClause = processType match {
+        case BatchProcessType => s"`${dupColName}`"
+        case StreamingProcessType => s"`${InternalColumns.tmst}`, `${dupColName}`"
+      }
+      val dupMetricSql = {
+        s"""
+           |SELECT ${dupMetricSelClause} FROM `${dupRecordTableName}`
+           |GROUP BY ${dupMetricGroupbyClause}
+         """.stripMargin
       }
-      case _ => Nil
+      val dupMetricStep = SparkSqlStep(dupMetricTableName, dupMetricSql, emptyMap)
+      val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
+        .addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc)
+      val dupMetricExports = genMetricExport(metricParam, name, dupMetricTableName) :: Nil
+
+      val dupSteps = sourceStep :: targetStep :: joinedStep :: groupStep :: dupRecordStep :: dupMetricStep :: Nil
+      val dupExports = dupRecordxports ++ dupMetricExports
+
+      RulePlan(dupSteps, dupExports)
     }
   }
 
-  private def checkDataSourceExists(name: String): Boolean = {
-    try {
-      RuleAdaptorGroup.dataChecker.existDataSourceName(name)
-    } catch {
-      case e: Throwable => {
-        error(s"check data source exists error: ${e.getMessage}")
-        false
+  private def timelinessRulePlan(timeInfo: TimeInfo, name: String, expr: Expr,
+                                 param: Map[String, Any], processType: ProcessType
+                                ): RulePlan = {
+    val details = getDetails(param)
+    val timelinessClause = expr.asInstanceOf[TimelinessClause]
+    val sourceName = details.getString(TimelinessKeys._source, dataSourceNames.head)
+
+    if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
+      emptyRulePlan
+    } else {
+      val analyzer = TimelinessAnalyzer(timelinessClause, sourceName)
+      val btsSel = analyzer.btsExpr
+      val etsSelOpt = analyzer.etsExprOpt
+
+      // 1. in time
+      val inTimeTableName = "__inTime"
+      val inTimeSql = etsSelOpt match {
+        case Some(etsSel) => {
+          s"""
+             |SELECT *, (${btsSel}) AS `${InternalColumns.beginTs}`,
+             |(${etsSel}) AS `${InternalColumns.endTs}`
+             |FROM ${sourceName} WHERE (${btsSel}) IS NOT NULL AND (${etsSel}) IS NOT NULL
+           """.stripMargin
+        }
+        case _ => {
+          s"""
+             |SELECT *, (${btsSel}) AS `${InternalColumns.beginTs}`
+             |FROM ${sourceName} WHERE (${btsSel}) IS NOT NULL
+           """.stripMargin
+        }
+      }
+      val inTimeStep = SparkSqlStep(inTimeTableName, inTimeSql, emptyMap)
+
+      // 2. latency
+      val latencyTableName = "__lat"
+      val latencyColName = details.getStringOrKey(TimelinessKeys._latency)
+      val etsColName = etsSelOpt match {
+        case Some(_) => InternalColumns.endTs
+        case _ => InternalColumns.tmst
+      }
+      val latencySql = {
+        s"SELECT *, (`${etsColName}` - `${InternalColumns.beginTs}`) AS `${latencyColName}` FROM `${inTimeTableName}`"
+      }
+      val latencyStep = SparkSqlStep(latencyTableName, latencySql, emptyMap, true)
+
+      // 3. timeliness metric
+      val metricTableName = name
+      val metricSql = processType match {
+        case BatchProcessType => {
+          s"""
+             |SELECT CAST(AVG(`${latencyColName}`) AS BIGINT) AS `avg`,
+             |MAX(`${latencyColName}`) AS `max`,
+             |MIN(`${latencyColName}`) AS `min`
+             |FROM `${latencyTableName}`
+           """.stripMargin
+        }
+        case StreamingProcessType => {
+          s"""
+             |SELECT `${InternalColumns.tmst}`,
+             |CAST(AVG(`${latencyColName}`) AS BIGINT) AS `avg`,
+             |MAX(`${latencyColName}`) AS `max`,
+             |MIN(`${latencyColName}`) AS `min`
+             |FROM `${latencyTableName}`
+             |GROUP BY `${InternalColumns.tmst}`
+           """.stripMargin
+        }
+      }
+      val metricStep = SparkSqlStep(metricTableName, metricSql, emptyMap)
+      val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
+      val metricExports = genMetricExport(metricParam, name, metricTableName) :: Nil
+
+      // current timeliness plan
+      val timeSteps = inTimeStep :: latencyStep :: metricStep :: Nil
+      val timeExports = metricExports
+      val timePlan = RulePlan(timeSteps, timeExports)
+
+      // 4. timeliness record
+      val recordPlan = TimeUtil.milliseconds(details.getString(TimelinessKeys._threshold, "")) match {
+        case Some(tsh) => {
+          val recordTableName = "__lateRecords"
+          val recordSql = {
+            s"SELECT * FROM `${latencyTableName}` WHERE `${latencyColName}` > ${tsh}"
+          }
+          val recordStep = SparkSqlStep(recordTableName, recordSql, emptyMap)
+          val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
+          val recordExports = genRecordExport(recordParam, recordTableName, recordTableName) :: Nil
+          RulePlan(recordStep :: Nil, recordExports)
+        }
+        case _ => emptyRulePlan
       }
+
+      // return timeliness plan
+      timePlan.merge(recordPlan)
     }
   }
 
+  //  override def genRuleInfos(param: Map[String, Any], timeInfo: TimeInfo): Seq[RuleInfo] = {
+//    val ruleInfo = RuleInfoGen(param)
+//    val dqType = RuleInfoGen.dqType(param)
+//    try {
+//      val result = parser.parseRule(ruleInfo.rule, dqType)
+//      if (result.successful) {
+//        val expr = result.get
+//        dqType match {
+//          case AccuracyType => accuracyRuleInfos(ruleInfo, expr, timeInfo)
+//          case ProfilingType => profilingRuleInfos(ruleInfo, expr, timeInfo)
+//          case TimelinessType => Nil
+//          case _ => Nil
+//        }
+//      } else {
+//        warn(s"parse rule [ ${ruleInfo.rule} ] fails: \n${result}")
+//        Nil
+//      }
+//    } catch {
+//      case e: Throwable => {
+//        error(s"generate rule info ${ruleInfo} fails: ${e.getMessage}")
+//        Nil
+//      }
+//    }
+//  }
+
+  // group by version
+//  private def accuracyRuleInfos(ruleInfo: RuleInfo, expr: Expr, timeInfo: TimeInfo): Seq[RuleInfo] = {
+//    val calcTime = timeInfo.calcTime
+//    val details = ruleInfo.details
+//    val sourceName = details.getString(AccuracyKeys._source, dataSourceNames.head)
+//    val targetName = details.getString(AccuracyKeys._target, dataSourceNames.tail.head)
+//    val analyzer = AccuracyAnalyzer(expr.asInstanceOf[LogicalExpr], sourceName, targetName)
+//
+//    if (!TempTables.existTable(timeInfo.key, sourceName)) {
+//      Nil
+//    } else {
+//      // 1. miss record
+//      val missRecordsSql = if (!TempTables.existTable(timeInfo.key, targetName)) {
+//        val selClause = s"`${sourceName}`.*"
+//        s"SELECT ${selClause} FROM `${sourceName}`"
+//      } else {
+//        val selClause = s"`${sourceName}`.*"
+//        val onClause = expr.coalesceDesc
+//        val sourceIsNull = analyzer.sourceSelectionExprs.map { sel =>
+//          s"${sel.desc} IS NULL"
+//        }.mkString(" AND ")
+//        val targetIsNull = analyzer.targetSelectionExprs.map { sel =>
+//          s"${sel.desc} IS NULL"
+//        }.mkString(" AND ")
+//        val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})"
+//        s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}"
+//      }
+//      val missRecordsName = AccuracyKeys._missRecords
+//      //      val tmstMissRecordsName = TempName.tmstName(missRecordsName, timeInfo)
+//      val missRecordsParams = details.getParamMap(AccuracyKeys._missRecords)
+//        .addIfNotExist(RuleDetailKeys._persistType, RecordPersistType.desc)
+//        .addIfNotExist(RuleDetailKeys._persistName, missRecordsName)
+//      val missRecordsRuleInfo = RuleInfo(missRecordsName, None, SparkSqlType,
+//        missRecordsSql, missRecordsParams, true)
+//      //      val missRecordsStep = SparkSqlStep(
+//      //        timeInfo,
+//      //        RuleInfo(missRecordsName, Some(tmstMissRecordsName), missRecordsSql, missRecordsParams)
+//      //      )
+//
+//      // 2. miss count
+//      val missTableName = "_miss_"
+//      //      val tmstMissTableName = TempName.tmstName(missTableName, timeInfo)
+//      val missColName = details.getStringOrKey(AccuracyKeys._miss)
+//      val missSql = {
+//        s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${missColName}` FROM `${missRecordsName}` GROUP BY `${InternalColumns.tmst}`"
+//      }
+//      val missRuleInfo = RuleInfo(missTableName, None, SparkSqlType,
+//        missSql, Map[String, Any](), true)
+//      //      val missStep = SparkSqlStep(
+//      //        timeInfo,
+//      //        RuleInfo(missTableName, None, missSql, Map[String, Any]())
+//      //      )
+//
+//      // 3. total count
+//      val totalTableName = "_total_"
+//      //      val tmstTotalTableName = TempName.tmstName(totalTableName, timeInfo)
+//      val totalColName = details.getStringOrKey(AccuracyKeys._total)
+//      val totalSql = {
+//        s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${totalColName}` FROM `${sourceName}` GROUP BY `${InternalColumns.tmst}`"
+//      }
+//      val totalRuleInfo = RuleInfo(totalTableName, None, SparkSqlType,
+//        totalSql, Map[String, Any](), true)
+//      //      val totalStep = SparkSqlStep(
+//      //        timeInfo,
+//      //        RuleInfo(totalTableName, None, totalSql, Map[String, Any]())
+//      //      )
+//
+//      // 4. accuracy metric
+//      val accuracyMetricName = details.getString(RuleDetailKeys._persistName, ruleInfo.name)
+//      //      val tmstAccuracyMetricName = TempName.tmstName(accuracyMetricName, timeInfo)
+//      val matchedColName = details.getStringOrKey(AccuracyKeys._matched)
+//      val accuracyMetricSql = {
+//        s"""
+//           |SELECT `${totalTableName}`.`${InternalColumns.tmst}` AS `${InternalColumns.tmst}`,
+//           |`${missTableName}`.`${missColName}` AS `${missColName}`,
+//           |`${totalTableName}`.`${totalColName}` AS `${totalColName}`,
+//           |(`${totalColName}` - `${missColName}`) AS `${matchedColName}`
+//           |FROM `${totalTableName}` FULL JOIN `${missTableName}`
+//           |ON `${totalTableName}`.`${InternalColumns.tmst}` = `${missTableName}`.`${InternalColumns.tmst}`
+//         """.stripMargin
+//      }
+//      //      val accuracyParams = details.addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc)
+////      val accuracyMetricRuleInfo = RuleInfo(accuracyMetricName, None, SparkSqlType,
+////        accuracyMetricSql, Map[String, Any](), true)
+//      val accuracyParams = details.addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc)
+//        .addIfNotExist(RuleDetailKeys._persistName, accuracyMetricName)
+//      val accuracyMetricRuleInfo = RuleInfo(accuracyMetricName, None, SparkSqlType,
+//        accuracyMetricSql, Map[String, Any](), true)
+//
+//      // 5. accuracy metric merge
+//      val globalMetricName = "accu_global"
+//      val globalAccuSql = if (TempTables.existGlobalTable(globalMetricName)) {
+//        s"""
+//           |SELECT coalesce(`${globalMetricName}`.`${InternalColumns.tmst}`, `${accuracyMetricName}`.`${InternalColumns.tmst}`) AS `${InternalColumns.tmst}`,
+//           |coalesce(`${accuracyMetricName}`.`${missColName}`, `${globalMetricName}`.`${missColName}`) AS `${missColName}`,
+//           |coalesce(`${globalMetricName}`.`${totalColName}`, `${accuracyMetricName}`.`${totalColName}`) AS `${totalColName}`,
+//           |(`${totalColName}` - `${missColName}`) AS `${matchedColName}`,
+//           |(`${totalColName}` = 0) AS `empty`,
+//           |(`${missColName}` = 0) AS `no_miss`,
+//           |(`${accuracyMetricName}`.`${missColName}` < `${globalMetricName}`.`${missColName}`) AS `update`
+//           |FROM `${globalMetricName}` FULL JOIN `${accuracyMetricName}`
+//           |ON `${globalMetricName}`.`${InternalColumns.tmst}` = `${accuracyMetricName}`.`${InternalColumns.tmst}`
+//         """.stripMargin
+//      } else {
+//        s"""
+//           |SELECT `${accuracyMetricName}`.`${InternalColumns.tmst}` AS `${InternalColumns.tmst}`,
+//           |`${accuracyMetricName}`.`${missColName}` AS `${missColName}`,
+//           |`${accuracyMetricName}`.`${totalColName}` AS `${totalColName}`,
+//           |(`${totalColName}` - `${missColName}`) AS `${matchedColName}`,
+//           |(`${totalColName}` = 0) AS `empty`,
+//           |(`${missColName}` = 0) AS `no_miss`,
+//           |true AS `update`
+//           |FROM `${accuracyMetricName}`
+//         """.stripMargin
+//      }
+//      val globalAccuParams = Map[String, Any](
+//        ("global" -> true)
+//      )
+//      val mergeRuleInfo = RuleInfo(globalMetricName, None, SparkSqlType,
+//        globalAccuSql, globalAccuParams, true)
+//
+//      // 6. persist metrics
+//      val persistMetricName = "persist"
+//      val persistSql = {
+//        s"""
+//           |SELECT `${InternalColumns.tmst}`, `${missColName}`, `${totalColName}`, `${matchedColName}`
+//           |FROM `${globalMetricName}`
+//           |WHERE `update`
+//         """.stripMargin
+//      }
+//      val persistParams = details.addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc)
+//        .addIfNotExist(RuleDetailKeys._persistName, accuracyMetricName)
+//      val persistRuleInfo = RuleInfo(persistMetricName, None, SparkSqlType,
+//        persistSql, persistParams, true)
+//
+//      // 5. accuracy metric filter
+////      val accuracyParams = details.addIfNotExist("df.name", accuracyMetricName)
+////        .addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc)
+////        .addIfNotExist(RuleDetailKeys._persistName, accuracyMetricName)
+////      val accuracyRuleInfo = RuleInfo(accuracyMetricName, None, DfOprType,
+////        "accuracy", accuracyParams, true)
+//
+////      missRecordsRuleInfo :: missRuleInfo :: totalRuleInfo ::
+////        accuracyMetricRuleInfo :: accuracyRuleInfo :: Nil
+//      missRecordsRuleInfo :: missRuleInfo :: totalRuleInfo ::
+//        accuracyMetricRuleInfo :: mergeRuleInfo :: persistRuleInfo :: Nil
+//    }
+//  }
+
+//  private def accuracyRuleInfos(ruleInfo: RuleInfo, expr: Expr, timeInfo: TimeInfo): Seq[RuleInfo] = {
+//    val calcTime = timeInfo.calcTime
+//    val details = ruleInfo.details
+//    val sourceName = details.getString(AccuracyKeys._source, dataSourceNames.head)
+//    val targetName = details.getString(AccuracyKeys._target, dataSourceNames.tail.head)
+//    val analyzer = AccuracyAnalyzer(expr.asInstanceOf[LogicalExpr], sourceName, targetName)
+//
+//    if (!TempTables.existTable(timeInfo.key, sourceName)) {
+//      Nil
+//    } else {
+//      // 1. miss record
+//      val missRecordsSql = if (!TempTables.existTable(timeInfo.key, targetName)) {
+//        val selClause = s"`${sourceName}`.*"
+//        s"SELECT ${selClause} FROM `${sourceName}`"
+//      } else {
+//        val selClause = s"`${sourceName}`.*"
+//        val onClause = expr.coalesceDesc
+//        val sourceIsNull = analyzer.sourceSelectionExprs.map { sel =>
+//          s"${sel.desc} IS NULL"
+//        }.mkString(" AND ")
+//        val targetIsNull = analyzer.targetSelectionExprs.map { sel =>
+//          s"${sel.desc} IS NULL"
+//        }.mkString(" AND ")
+//        val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})"
+//        s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}"
+//      }
+//      val missRecordsName = AccuracyKeys._missRecords
+////      val tmstMissRecordsName = TempName.tmstName(missRecordsName, timeInfo)
+//      val missRecordsParams = details.getParamMap(AccuracyKeys._missRecords)
+//        .addIfNotExist(RuleDetailKeys._persistType, RecordPersistType.desc)
+//        .addIfNotExist(RuleDetailKeys._persistName, missRecordsName)
+//      val missRecordsRuleInfo = RuleInfo(missRecordsName, None, SparkSqlType,
+//        missRecordsSql, missRecordsParams, true)
+////      val missRecordsStep = SparkSqlStep(
+////        timeInfo,
+////        RuleInfo(missRecordsName, Some(tmstMissRecordsName), missRecordsSql, missRecordsParams)
+////      )
+//
+//      // 2. miss count
+//      val missTableName = "_miss_"
+//      //      val tmstMissTableName = TempName.tmstName(missTableName, timeInfo)
+//      val missColName = details.getStringOrKey(AccuracyKeys._miss)
+//      val missSql = {
+//        s"SELECT COUNT(*) AS `${missColName}` FROM `${missRecordsName}`"
+//      }
+//      val missRuleInfo = RuleInfo(missTableName, None, SparkSqlType,
+//        missSql, Map[String, Any](), false)
+////      val missStep = SparkSqlStep(
+////        timeInfo,
+////        RuleInfo(missTableName, None, missSql, Map[String, Any]())
+////      )
+//
+//      // 3. total count
+//      val totalTableName = "_total_"
+//      //      val tmstTotalTableName = TempName.tmstName(totalTableName, timeInfo)
+//      val totalColName = details.getStringOrKey(AccuracyKeys._total)
+//      val totalSql = {
+//        s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceName}`"
+//      }
+//      val totalRuleInfo = RuleInfo(totalTableName, None, SparkSqlType,
+//        totalSql, Map[String, Any](), false)
+////      val totalStep = SparkSqlStep(
+////        timeInfo,
+////        RuleInfo(totalTableName, None, totalSql, Map[String, Any]())
+////      )
+//
+//      // 4. accuracy metric
+//      val accuracyMetricName = details.getString(RuleDetailKeys._persistName, ruleInfo.name)
+////      val tmstAccuracyMetricName = TempName.tmstName(accuracyMetricName, timeInfo)
+//      val matchedColName = details.getStringOrKey(AccuracyKeys._matched)
+//      val accuracyMetricSql = {
+//        s"""
+//           |SELECT `${missTableName}`.`${missColName}` AS `${missColName}`,
+//           |`${totalTableName}`.`${totalColName}` AS `${totalColName}`
+//           |FROM `${totalTableName}` FULL JOIN `${missTableName}`
+//         """.stripMargin
+//      }
+//      //      val accuracyParams = details.addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc)
+//      val accuracyMetricRuleInfo = RuleInfo(accuracyMetricName, None, SparkSqlType,
+//        accuracyMetricSql, Map[String, Any](), false)
+////      val accuracyMetricStep = SparkSqlStep(
+////        timeInfo,
+////        RuleInfo(accuracyMetricName, Some(tmstAccuracyMetricName), accuracyMetricSql, Map[String, Any]())
+////      )
+//
+//      // 5. accuracy metric filter
+//      val accuracyParams = details.addIfNotExist("df.name", accuracyMetricName)
+//        .addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc)
+//        .addIfNotExist(RuleDetailKeys._persistName, accuracyMetricName)
+//      val accuracyRuleInfo = RuleInfo(accuracyMetricName, None, DfOprType,
+//        "accuracy", accuracyParams, false)
+////      val accuracyStep = DfOprStep(
+////        timeInfo,
+////        RuleInfo(accuracyMetricName, Some(tmstAccuracyMetricName), "accuracy", accuracyParams)
+////      )
+//
+//      missRecordsRuleInfo :: missRuleInfo :: totalRuleInfo ::
+//        accuracyMetricRuleInfo :: accuracyRuleInfo :: Nil
+//    }
+//  }
+
+//  private def profilingRuleInfos(ruleInfo: RuleInfo, expr: Expr, timeInfo: TimeInfo): Seq[RuleInfo] = {
+//    val details = ruleInfo.details
+//    val profilingClause = expr.asInstanceOf[ProfilingClause]
+//    val sourceName = profilingClause.fromClauseOpt match {
+//      case Some(fc) => fc.dataSource
+//      case _ => details.getString(ProfilingKeys._source, dataSourceNames.head)
+//    }
+//    val fromClause = profilingClause.fromClauseOpt.getOrElse(FromClause(sourceName)).desc
+//
+//    if (!TempTables.existTable(timeInfo.key, sourceName)) {
+//      Nil
+//    } else {
+//      val tmstAnalyzer = ProfilingAnalyzer(profilingClause, sourceName)
+//
+//      val selExprDescs = tmstAnalyzer.selectionExprs.map { sel =>
+//        val alias = sel match {
+//          case s: AliasableExpr if (s.alias.nonEmpty) => s" AS `${s.alias.get}`"
+//          case _ => ""
+//        }
+//        s"${sel.desc}${alias}"
+//      }
+//      val selCondition = profilingClause.selectClause.extraConditionOpt.map(_.desc).mkString
+//      val selClause = selExprDescs.mkString(", ")
+////      val tmstFromClause = profilingClause.fromClauseOpt.getOrElse(FromClause(sourceName)).desc
+//      val groupByClauseOpt = tmstAnalyzer.groupbyExprOpt
+//      val groupbyClause = groupByClauseOpt.map(_.desc).getOrElse("")
+//      val preGroupbyClause = tmstAnalyzer.preGroupbyExprs.map(_.desc).mkString(" ")
+//      val postGroupbyClause = tmstAnalyzer.postGroupbyExprs.map(_.desc).mkString(" ")
+//
+//      // 1. select statement
+//      val profilingSql = {
+//        s"SELECT ${selCondition} ${selClause} ${fromClause} ${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}"
+//      }
+//      //      println(profilingSql)
+//      val metricName = details.getString(RuleDetailKeys._persistName, ruleInfo.name)
+//      //      val tmstMetricName = TempName.tmstName(metricName, timeInfo)
+//      val profilingParams = details.addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc)
+//        .addIfNotExist(RuleDetailKeys._persistName, metricName)
+//      val profilingRuleInfo = ruleInfo.setDslType(SparkSqlType)
+//        .setRule(profilingSql).setDetails(profilingParams)
+////      val profilingStep = SparkSqlStep(
+////        timeInfo,
+////        ruleInfo.setRule(profilingSql).setDetails(profilingParams)
+////      )
+//
+//      //      filterStep :: profilingStep :: Nil
+//      profilingRuleInfo :: Nil
+//    }
+//  }
+
+//  def genRuleStep(timeInfo: TimeInfo, param: Map[String, Any]): Seq[RuleStep] = {
+//    val ruleInfo = RuleInfoGen(param, timeInfo)
+//    val dqType = RuleInfoGen.dqType(param)
+//    GriffinDslStep(timeInfo, ruleInfo, dqType) :: Nil
+//  }
+//
+//  def adaptConcreteRuleStep(ruleStep: RuleStep
+//                           ): Seq[ConcreteRuleStep] = {
+//    ruleStep match {
+//      case rs @ GriffinDslStep(_, ri, dqType) => {
+//        try {
+//          val result = parser.parseRule(ri.rule, dqType)
+//          if (result.successful) {
+//            val expr = result.get
+//            transConcreteRuleStep(rs, expr)
+//          } else {
+//            println(result)
+//            warn(s"adapt concrete rule step warn: parse rule [ ${ri.rule} ] fails")
+//            Nil
+//          }
+//        } catch {
+//          case e: Throwable => {
+//            error(s"adapt concrete rule step error: ${e.getMessage}")
+//            Nil
+//          }
+//        }
+//      }
+//      case _ => Nil
+//    }
+//  }
+//
+//  private def transConcreteRuleStep(ruleStep: GriffinDslStep, expr: Expr
+//                                   ): Seq[ConcreteRuleStep] = {
+//    ruleStep.dqType match {
+//      case AccuracyType => transAccuracyRuleStep(ruleStep, expr)
+//      case ProfilingType => transProfilingRuleStep(ruleStep, expr)
+//      case TimelinessType => Nil
+//      case _ => Nil
+//    }
+//  }
+
+//  private def transAccuracyRuleStep(ruleStep: GriffinDslStep, expr: Expr
+//                                   ): Seq[ConcreteRuleStep] = {
+//    val timeInfo = ruleStep.timeInfo
+//    val ruleInfo = ruleStep.ruleInfo
+//    val calcTime = timeInfo.calcTime
+//    val tmst = timeInfo.tmst
+//
+//    val details = ruleInfo.details
+//    val sourceName = details.getString(AccuracyKeys._source, dataSourceNames.head)
+//    val targetName = details.getString(AccuracyKeys._target, dataSourceNames.tail.head)
+//    val analyzer = AccuracyAnalyzer(expr.asInstanceOf[LogicalExpr], sourceName, targetName)
+//
+//    if (!TempTables.existTable(key(calcTime), sourceName)) {
+//      Nil
+//    } else {
+//      // 1. miss record
+//      val missRecordsSql = if (!TempTables.existTable(key(calcTime), targetName)) {
+//        val selClause = s"`${sourceName}`.*"
+//        s"SELECT ${selClause} FROM `${sourceName}`"
+//      } else {
+//        val selClause = s"`${sourceName}`.*"
+//        val onClause = expr.coalesceDesc
+//        val sourceIsNull = analyzer.sourceSelectionExprs.map { sel =>
+//          s"${sel.desc} IS NULL"
+//        }.mkString(" AND ")
+//        val targetIsNull = analyzer.targetSelectionExprs.map { sel =>
+//          s"${sel.desc} IS NULL"
+//        }.mkString(" AND ")
+//        val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})"
+//        s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}"
+//      }
+//      val missRecordsName = AccuracyKeys._missRecords
+//      val tmstMissRecordsName = TempName.tmstName(missRecordsName, timeInfo)
+//      val missRecordsParams = details.getParamMap(AccuracyKeys._missRecords)
+//        .addIfNotExist(RuleDetailKeys._persistType, RecordPersistType.desc)
+//        .addIfNotExist(RuleDetailKeys._persistName, missRecordsName)
+//      val missRecordsStep = SparkSqlStep(
+//        timeInfo,
+//        RuleInfo(missRecordsName, Some(tmstMissRecordsName), missRecordsSql, missRecordsParams)
+//      )
+//
+//      // 2. miss count
+//      val missTableName = "_miss_"
+////      val tmstMissTableName = TempName.tmstName(missTableName, timeInfo)
+//      val missColName = details.getStringOrKey(AccuracyKeys._miss)
+//      val missSql = {
+//        s"SELECT COUNT(*) AS `${missColName}` FROM `${missRecordsName}`"
+//      }
+//      val missStep = SparkSqlStep(
+//        timeInfo,
+//        RuleInfo(missTableName, None, missSql, Map[String, Any]())
+//      )
+//
+//      // 3. total count
+//      val totalTableName = "_total_"
+////      val tmstTotalTableName = TempName.tmstName(totalTableName, timeInfo)
+//      val totalColName = details.getStringOrKey(AccuracyKeys._total)
+//      val totalSql = {
+//        s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceName}`"
+//      }
+//      val totalStep = SparkSqlStep(
+//        timeInfo,
+//        RuleInfo(totalTableName, None, totalSql, Map[String, Any]())
+//      )
+//
+//      // 4. accuracy metric
+//      val accuracyMetricName = details.getString(RuleDetailKeys._persistName, ruleStep.name)
+//      val tmstAccuracyMetricName = TempName.tmstName(accuracyMetricName, timeInfo)
+//      val matchedColName = details.getStringOrKey(AccuracyKeys._matched)
+//      val accuracyMetricSql = {
+//        s"""
+//           |SELECT `${missTableName}`.`${missColName}` AS `${missColName}`,
+//           |`${totalTableName}`.`${totalColName}` AS `${totalColName}`
+//           |FROM `${totalTableName}` FULL JOIN `${missTableName}`
+//         """.stripMargin
+//      }
+////      val accuracyParams = details.addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc)
+//      val accuracyMetricStep = SparkSqlStep(
+//        timeInfo,
+//        RuleInfo(accuracyMetricName, Some(tmstAccuracyMetricName), accuracyMetricSql, Map[String, Any]())
+//      )
+//
+//      // 5. accuracy metric filter
+//      val accuracyParams = details.addIfNotExist("df.name", accuracyMetricName)
+//        .addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc)
+//        .addIfNotExist(RuleDetailKeys._persistName, accuracyMetricName)
+//      val accuracyStep = DfOprStep(
+//        timeInfo,
+//        RuleInfo(accuracyMetricName, Some(tmstAccuracyMetricName), "accuracy", accuracyParams)
+//      )
+//
+//      missRecordsStep :: missStep :: totalStep :: accuracyMetricStep :: accuracyStep :: Nil
+//    }
+//  }
+
+//  private def transProfilingRuleStep(ruleStep: GriffinDslStep, expr: Expr
+//                                    ): Seq[ConcreteRuleStep] = {
+//    val calcTime = ruleStep.timeInfo.calcTime
+//    val details = ruleStep.ruleInfo.details
+//    val profilingClause = expr.asInstanceOf[ProfilingClause]
+//    val sourceName = profilingClause.fromClauseOpt match {
+//      case Some(fc) => fc.dataSource
+//      case _ => details.getString(ProfilingKeys._source, dataSourceNames.head)
+//    }
+//    val fromClause = profilingClause.fromClauseOpt.getOrElse(FromClause(sourceName)).desc
+//
+//    if (!TempTables.existTable(key(calcTime), sourceName)) {
+//      Nil
+//    } else {
+//      val timeInfo = ruleStep.timeInfo
+//      val ruleInfo = ruleStep.ruleInfo
+//      val tmst = timeInfo.tmst
+//
+////      val tmstSourceName = TempName.tmstName(sourceName, timeInfo)
+//
+////      val tmstProfilingClause = profilingClause.map(dsHeadReplace(sourceName, tmstSourceName))
+//      val tmstAnalyzer = ProfilingAnalyzer(profilingClause, sourceName)
+//
+//      val selExprDescs = tmstAnalyzer.selectionExprs.map { sel =>
+//        val alias = sel match {
+//          case s: AliasableExpr if (s.alias.nonEmpty) => s" AS `${s.alias.get}`"
+//          case _ => ""
+//        }
+//        s"${sel.desc}${alias}"
+//      }
+//      val selCondition = profilingClause.selectClause.extraConditionOpt.map(_.desc).mkString
+//      val selClause = selExprDescs.mkString(", ")
+////      val tmstFromClause = profilingClause.fromClauseOpt.getOrElse(FromClause(sourceName)).desc
+//      val groupByClauseOpt = tmstAnalyzer.groupbyExprOpt
+//      val groupbyClause = groupByClauseOpt.map(_.desc).getOrElse("")
+//      val preGroupbyClause = tmstAnalyzer.preGroupbyExprs.map(_.desc).mkString(" ")
+//      val postGroupbyClause = tmstAnalyzer.postGroupbyExprs.map(_.desc).mkString(" ")
+//
+//      // 1. select statement
+//      val profilingSql = {
+//        s"SELECT ${selCondition} ${selClause} ${fromClause} ${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}"
+//      }
+////      println(profilingSql)
+//      val metricName = details.getString(RuleDetailKeys._persistName, ruleStep.name)
+////      val tmstMetricName = TempName.tmstName(metricName, timeInfo)
+//      val profilingParams = details.addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc)
+//        .addIfNotExist(RuleDetailKeys._persistName, metricName)
+//      val profilingStep = SparkSqlStep(
+//        timeInfo,
+//        ruleInfo.setRule(profilingSql).setDetails(profilingParams)
+//      )
+//
+////      filterStep :: profilingStep :: Nil
+//      profilingStep :: Nil
+//    }
+//
+//  }
+
+//  private def dsHeadReplace(originName: String, replaceName: String): (Expr) => Expr = { expr: Expr =>
+//    expr match {
+//      case DataSourceHeadExpr(sn) if (sn == originName) => {
+//        DataSourceHeadExpr(replaceName)
+//      }
+//      case FromClause(sn) if (sn == originName) => {
+//        FromClause(replaceName)
+//      }
+//      case _ => expr.map(dsHeadReplace(originName, replaceName))
+//    }
+//  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/InternalColumns.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/InternalColumns.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/InternalColumns.scala
new file mode 100644
index 0000000..bd344b1
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/InternalColumns.scala
@@ -0,0 +1,31 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.adaptor
+
+object InternalColumns {
+  val tmst = "__tmst"
+  val metric = "__metric"
+  val record = "__record"
+  val empty = "__empty"
+
+  val beginTs = "__begin_ts"
+  val endTs = "__end_ts"
+
+  val columns = List[String](tmst, metric, record, empty, beginTs, endTs)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala
index 744f52a..ebc8fdb 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala
@@ -20,44 +20,159 @@ package org.apache.griffin.measure.rule.adaptor
 
 import java.util.concurrent.atomic.AtomicLong
 
+import org.apache.griffin.measure.cache.tmst.TempName
 
 import scala.collection.mutable.{Set => MutableSet}
 import org.apache.griffin.measure.config.params.user._
 import org.apache.griffin.measure.log.Loggable
-import org.apache.griffin.measure.rule.step.{ConcreteRuleStep, RuleStep}
-import org.apache.griffin.measure.rule.dsl.{DslType, PersistType}
+import org.apache.griffin.measure.process.ProcessType
+import org.apache.griffin.measure.rule.dsl._
+import org.apache.griffin.measure.rule.plan.{TimeInfo, _}
 
-trait RuleAdaptor extends Loggable with Serializable {
-
-  val adaptPhase: AdaptPhase
+//object RuleInfoKeys {
+//  val _name = "name"
+//  val _rule = "rule"
+//  val _details = "details"
+//  val _dslType = "dsl.type"
+//  val _dqType = "dq.type"
+//  val _global = "global"
+////  val _gatherStep = "gather.step"
+//
+//  val _metric = "metric"
+//  val _record = "record"
+//}
+//import RuleInfoKeys._
+import org.apache.griffin.measure.utils.ParamUtil._
 
+object RuleParamKeys {
   val _name = "name"
   val _rule = "rule"
-  val _persistType = "persist.type"
-  val _updateDataSource = "update.data.source"
+  val _dslType = "dsl.type"
+  val _dqType = "dq.type"
+  val _cache = "cache"
+  val _global = "global"
   val _details = "details"
 
-  protected def getName(param: Map[String, Any]) = param.getOrElse(_name, RuleStepNameGenerator.genName).toString
-  protected def getRule(param: Map[String, Any]) = param.getOrElse(_rule, "").toString
-  protected def getPersistType(param: Map[String, Any]) = PersistType(param.getOrElse(_persistType, "").toString)
-  protected def getUpdateDataSource(param: Map[String, Any]) = param.get(_updateDataSource).map(_.toString)
-  protected def getDetails(param: Map[String, Any]) = param.get(_details) match {
-    case Some(dt: Map[String, Any]) => dt
-    case _ => Map[String, Any]()
+  val _metric = "metric"
+  val _record = "record"
+
+  def getName(param: Map[String, Any], defName: String): String = param.getString(_name, defName)
+  def getRule(param: Map[String, Any]): String = param.getString(_rule, "")
+  def getDqType(param: Map[String, Any]): DqType = DqType(param.getString(_dqType, ""))
+  def getCache(param: Map[String, Any]): Boolean = param.getBoolean(_cache, false)
+  def getGlobal(param: Map[String, Any]): Boolean = param.getBoolean(_global, false)
+  def getDetails(param: Map[String, Any]): Map[String, Any] = param.getParamMap(_details)
+
+  def getMetricOpt(param: Map[String, Any]): Option[Map[String, Any]] = param.getParamMapOpt(_metric)
+  def getRecordOpt(param: Map[String, Any]): Option[Map[String, Any]] = param.getParamMapOpt(_record)
+}
+
+object ExportParamKeys {
+  val _name = "name"
+  val _collectType = "collect.type"
+  val _dataSourceCache = "data.source.cache"
+  val _originDF = "origin.DF"
+
+  def getName(param: Map[String, Any], defName: String): String = param.getString(_name, defName)
+  def getCollectType(param: Map[String, Any]): CollectType = CollectType(param.getString(_collectType, ""))
+  def getDataSourceCacheOpt(param: Map[String, Any]): Option[String] = param.get(_dataSourceCache).map(_.toString)
+  def getOriginDFOpt(param: Map[String, Any]): Option[String] = param.get(_originDF).map(_.toString)
+}
+
+trait RuleAdaptor extends Loggable with Serializable {
+
+//  val adaptPhase: AdaptPhase
+
+//  protected def genRuleInfo(param: Map[String, Any]): RuleInfo = RuleInfoGen(param)
+
+//  protected def getName(param: Map[String, Any]) = param.getOrElse(_name, RuleStepNameGenerator.genName).toString
+//  protected def getRule(param: Map[String, Any]) = param.getOrElse(_rule, "").toString
+//  protected def getDetails(param: Map[String, Any]) = param.get(_details) match {
+//    case Some(dt: Map[String, Any]) => dt
+//    case _ => Map[String, Any]()
+//  }
+
+
+
+//  def getPersistNames(steps: Seq[RuleStep]): Seq[String] = steps.map(_.ruleInfo.persistName)
+//
+//  protected def genRuleStep(timeInfo: TimeInfo, param: Map[String, Any]): Seq[RuleStep]
+//  protected def adaptConcreteRuleStep(ruleStep: RuleStep): Seq[ConcreteRuleStep]
+//  def genConcreteRuleStep(timeInfo: TimeInfo, param: Map[String, Any]
+//                         ): Seq[ConcreteRuleStep] = {
+//    genRuleStep(timeInfo, param).flatMap { rs =>
+//      adaptConcreteRuleStep(rs)
+//    }
+//  }
+
+
+
+//  def genRuleInfos(param: Map[String, Any], timeInfo: TimeInfo): Seq[RuleInfo] = {
+//    RuleInfoGen(param) :: Nil
+//  }
+
+  protected def getRuleName(param: Map[String, Any]): String = {
+    RuleParamKeys.getName(param, RuleStepNameGenerator.genName)
   }
 
-  def getTempSourceNames(param: Map[String, Any]): Seq[String]
+  def genRulePlan(timeInfo: TimeInfo, param: Map[String, Any], procType: ProcessType): RulePlan
 
-  def genRuleStep(param: Map[String, Any]): Seq[RuleStep]
-  def genConcreteRuleStep(param: Map[String, Any]): Seq[ConcreteRuleStep] = {
-    genRuleStep(param).flatMap { rs =>
-      adaptConcreteRuleStep(rs)
-    }
+  protected def genRuleExports(param: Map[String, Any], defName: String, stepName: String): Seq[RuleExport] = {
+    val metricOpt = RuleParamKeys.getMetricOpt(param)
+    val metricExportSeq = metricOpt.map(genMetricExport(_, defName, stepName)).toSeq
+    val recordOpt = RuleParamKeys.getRecordOpt(param)
+    val recordExportSeq = recordOpt.map(genRecordExport(_, defName, stepName)).toSeq
+    metricExportSeq ++ recordExportSeq
   }
-  protected def adaptConcreteRuleStep(ruleStep: RuleStep): Seq[ConcreteRuleStep]
+  protected def genMetricExport(param: Map[String, Any], name: String, stepName: String
+                               ): MetricExport = {
+    MetricExport(
+      ExportParamKeys.getName(param, name),
+      stepName,
+      ExportParamKeys.getCollectType(param)
+    )
+  }
+  protected def genRecordExport(param: Map[String, Any], name: String, stepName: String
+                               ): RecordExport = {
+    RecordExport(
+      ExportParamKeys.getName(param, name),
+      stepName,
+      ExportParamKeys.getDataSourceCacheOpt(param),
+      ExportParamKeys.getOriginDFOpt(param)
+    )
+  }
+
+
 
 }
 
+
+
+//object RuleInfoGen {
+//  def apply(param: Map[String, Any]): RuleInfo = {
+//    val name = param.get(_name) match {
+//      case Some(n: String) => n
+//      case _ => RuleStepNameGenerator.genName
+//    }
+//    RuleInfo(
+//      name,
+//      None,
+//      DslType(param.getString(_dslType, "")),
+//      param.getString(_rule, ""),
+//      param.getParamMap(_details),
+//      param.getBoolean(_gatherStep, false)
+//    )
+//  }
+//  def apply(ri: RuleInfo, timeInfo: TimeInfo): RuleInfo = {
+//    if (ri.persistType.needPersist) {
+//      val tmstName = TempName.tmstName(ri.name, timeInfo)
+//      ri.setTmstNameOpt(Some(tmstName))
+//    } else ri
+//  }
+//
+//  def dqType(param: Map[String, Any]): DqType = DqType(param.getString(_dqType, ""))
+//}
+
 object RuleStepNameGenerator {
   private val counter: AtomicLong = new AtomicLong(0L)
   private val head: String = "rs"