You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2018/01/16 08:32:39 UTC

[1/2] incubator-griffin git commit: Add distinctness measurement

Repository: incubator-griffin
Updated Branches:
  refs/heads/master e704da627 -> cbff5b45c


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala b/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala
new file mode 100644
index 0000000..31fe5ea
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala
@@ -0,0 +1,41 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.process.temp
+
+import scala.math.{min, max}
+
+case class TimeRange(begin: Long, end: Long, tmsts: Set[Long]) extends Serializable {
+  def merge(tr: TimeRange): TimeRange = {
+    TimeRange(min(begin, tr.begin), max(end, tr.end), tmsts ++ tr.tmsts)
+  }
+}
+
+object TimeRange {
+  val emptyTimeRange = TimeRange(0, 0, Set[Long]())
+  def apply(range: (Long, Long), tmsts: Set[Long]): TimeRange = TimeRange(range._1, range._2, tmsts)
+  def apply(ts: Long, tmsts: Set[Long]): TimeRange = TimeRange(ts, ts, tmsts)
+  def apply(ts: Long): TimeRange = TimeRange(ts, ts, Set[Long](ts))
+  def apply(tmsts: Set[Long]): TimeRange = {
+    try {
+      TimeRange(tmsts.min, tmsts.max, tmsts)
+    } catch {
+      case _: Throwable => emptyTimeRange
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala
index 5447ccc..97589ad 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala
@@ -18,7 +18,8 @@ under the License.
 */
 package org.apache.griffin.measure.rule.adaptor
 
-import org.apache.griffin.measure.process.ProcessType
+import org.apache.griffin.measure.process.{ExportMode, ProcessType}
+import org.apache.griffin.measure.process.temp.TimeRange
 import org.apache.griffin.measure.rule.plan.{TimeInfo, _}
 import org.apache.griffin.measure.utils.ParamUtil._
 
@@ -46,10 +47,12 @@ case class DataFrameOprAdaptor() extends RuleAdaptor {
 
   import RuleParamKeys._
 
-  def genRulePlan(timeInfo: TimeInfo, param: Map[String, Any], procType: ProcessType): RulePlan = {
+  def genRulePlan(timeInfo: TimeInfo, param: Map[String, Any],
+                  procType: ProcessType, dsTimeRanges: Map[String, TimeRange]): RulePlan = {
     val name = getRuleName(param)
     val step = DfOprStep(name, getRule(param), getDetails(param), getCache(param), getGlobal(param))
-    RulePlan(step :: Nil, genRuleExports(param, name, name))
+    val mode = ExportMode.defaultMode(procType)
+    RulePlan(step :: Nil, genRuleExports(param, name, name, timeInfo.calcTime, mode))
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GlobalKeys.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GlobalKeys.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GlobalKeys.scala
new file mode 100644
index 0000000..f592709
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GlobalKeys.scala
@@ -0,0 +1,70 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.adaptor
+
+object AccuracyKeys {
+  val _source = "source"
+  val _target = "target"
+  val _miss = "miss"
+  val _total = "total"
+  val _matched = "matched"
+  //  val _missRecords = "missRecords"
+}
+
+object ProfilingKeys {
+  val _source = "source"
+}
+
+object UniquenessKeys {
+  val _source = "source"
+  val _target = "target"
+  val _unique = "unique"
+  val _total = "total"
+  val _dup = "dup"
+  val _num = "num"
+
+  val _duplicationArray = "duplication.array"
+}
+
+object DistinctnessKeys {
+  val _source = "source"
+  val _target = "target"
+  val _distinct = "distinct"
+  val _total = "total"
+  val _dup = "dup"
+  val _accu_dup = "accu_dup"
+  val _num = "num"
+
+  val _duplicationArray = "duplication.array"
+  val _withAccumulate = "with.accumulate"
+}
+
+object TimelinessKeys {
+  val _source = "source"
+  val _latency = "latency"
+  val _threshold = "threshold"
+}
+
+object GlobalKeys {
+  val _initRule = "init.rule"
+}
+
+object ProcessDetailsKeys {
+  val _baselineDataSource = "baseline.data.source"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
index 98545d8..ad4a195 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
@@ -20,7 +20,7 @@ package org.apache.griffin.measure.rule.adaptor
 
 import org.apache.griffin.measure.cache.tmst.{TempName, TmstCache}
 import org.apache.griffin.measure.process.engine.DataFrameOprs.AccuracyOprKeys
-import org.apache.griffin.measure.process.temp.TableRegisters
+import org.apache.griffin.measure.process.temp.{TableRegisters, TimeRange}
 import org.apache.griffin.measure.process._
 import org.apache.griffin.measure.rule.dsl._
 import org.apache.griffin.measure.rule.dsl.analyzer._
@@ -30,39 +30,6 @@ import org.apache.griffin.measure.rule.plan.{TimeInfo, _}
 import org.apache.griffin.measure.utils.ParamUtil._
 import org.apache.griffin.measure.utils.TimeUtil
 
-object AccuracyKeys {
-  val _source = "source"
-  val _target = "target"
-  val _miss = "miss"
-  val _total = "total"
-  val _matched = "matched"
-//  val _missRecords = "missRecords"
-}
-
-object ProfilingKeys {
-  val _source = "source"
-}
-
-object UniquenessKeys {
-  val _source = "source"
-  val _target = "target"
-  val _unique = "unique"
-  val _total = "total"
-  val _dup = "dup"
-  val _num = "num"
-  val _duplicationArray = "duplication.array"
-}
-
-object TimelinessKeys {
-  val _source = "source"
-  val _latency = "latency"
-  val _threshold = "threshold"
-}
-
-object GlobalKeys {
-  val _initRule = "init.rule"
-}
-
 case class GriffinDslAdaptor(dataSourceNames: Seq[String],
                              functionNames: Seq[String]
                             ) extends RuleAdaptor {
@@ -77,7 +44,8 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
   private val emptyRulePlan = RulePlan(Nil, Nil)
   private val emptyMap = Map[String, Any]()
 
-  override def genRulePlan(timeInfo: TimeInfo, param: Map[String, Any], processType: ProcessType
+  override def genRulePlan(timeInfo: TimeInfo, param: Map[String, Any],
+                           processType: ProcessType, dsTimeRanges: Map[String, TimeRange]
                           ): RulePlan = {
     val name = getRuleName(param)
     val rule = getRule(param)
@@ -90,6 +58,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
           case AccuracyType => accuracyRulePlan(timeInfo, name, expr, param, processType)
           case ProfilingType => profilingRulePlan(timeInfo, name, expr, param, processType)
           case UniquenessType => uniquenessRulePlan(timeInfo, name, expr, param, processType)
+          case DistinctnessType => distinctRulePlan(timeInfo, name, expr, param, processType, dsTimeRanges)
           case TimelinessType => timelinessRulePlan(timeInfo, name, expr, param, processType)
           case _ => emptyRulePlan
         }
@@ -107,22 +76,26 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
 
   // with accuracy opr
   private def accuracyRulePlan(timeInfo: TimeInfo, name: String, expr: Expr,
-                               param: Map[String, Any], processType: ProcessType
+                               param: Map[String, Any], procType: ProcessType
                               ): RulePlan = {
     val details = getDetails(param)
     val sourceName = details.getString(AccuracyKeys._source, dataSourceNames.head)
     val targetName = details.getString(AccuracyKeys._target, dataSourceNames.tail.head)
     val analyzer = AccuracyAnalyzer(expr.asInstanceOf[LogicalExpr], sourceName, targetName)
 
+    val mode = ExportMode.defaultMode(procType)
+
+    val ct = timeInfo.calcTime
+
     if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
-      println(s"[${timeInfo.calcTime}] data source ${sourceName} not exists")
+      println(s"[${ct}] data source ${sourceName} not exists")
       emptyRulePlan
     } else {
       // 1. miss record
       val missRecordsTableName = "__missRecords"
       val selClause = s"`${sourceName}`.*"
       val missRecordsSql = if (!TableRegisters.existRunTempTable(timeInfo.key, targetName)) {
-        println(s"[${timeInfo.calcTime}] data source ${targetName} not exists")
+        println(s"[${ct}] data source ${targetName} not exists")
         s"SELECT ${selClause} FROM `${sourceName}`"
       } else {
         val onClause = expr.coalesceDesc
@@ -136,10 +109,10 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
         s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}"
       }
       val missRecordsStep = SparkSqlStep(missRecordsTableName, missRecordsSql, emptyMap, true)
-      val missRecordsExports = processType match {
+      val missRecordsExports = procType match {
         case BatchProcessType => {
           val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
-          genRecordExport(recordParam, missRecordsTableName, missRecordsTableName) :: Nil
+          genRecordExport(recordParam, missRecordsTableName, missRecordsTableName, ct, mode) :: Nil
         }
         case StreamingProcessType => Nil
       }
@@ -147,7 +120,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
       // 2. miss count
       val missCountTableName = "__missCount"
       val missColName = details.getStringOrKey(AccuracyKeys._miss)
-      val missCountSql = processType match {
+      val missCountSql = procType match {
         case BatchProcessType => s"SELECT COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}`"
         case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}` GROUP BY `${InternalColumns.tmst}`"
       }
@@ -156,7 +129,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
       // 3. total count
       val totalCountTableName = "__totalCount"
       val totalColName = details.getStringOrKey(AccuracyKeys._total)
-      val totalCountSql = processType match {
+      val totalCountSql = procType match {
         case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceName}`"
         case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${totalColName}` FROM `${sourceName}` GROUP BY `${InternalColumns.tmst}`"
       }
@@ -165,7 +138,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
       // 4. accuracy metric
       val accuracyTableName = name
       val matchedColName = details.getStringOrKey(AccuracyKeys._matched)
-      val accuracyMetricSql = processType match {
+      val accuracyMetricSql = procType match {
         case BatchProcessType => {
           s"""
              |SELECT `${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
@@ -186,10 +159,10 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
         }
       }
       val accuracyStep = SparkSqlStep(accuracyTableName, accuracyMetricSql, emptyMap)
-      val accuracyExports = processType match {
+      val accuracyExports = procType match {
         case BatchProcessType => {
           val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
-          genMetricExport(metricParam, accuracyTableName, accuracyTableName) :: Nil
+          genMetricExport(metricParam, accuracyTableName, accuracyTableName, ct, mode) :: Nil
         }
         case StreamingProcessType => Nil
       }
@@ -200,7 +173,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
       val accuPlan = RulePlan(accuSteps, accuExports)
 
       // streaming extra accu plan
-      val streamingAccuPlan = processType match {
+      val streamingAccuPlan = procType match {
         case BatchProcessType => emptyRulePlan
         case StreamingProcessType => {
           // 5. accuracy metric merge
@@ -215,7 +188,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
           val accuracyMetricStep = DfOprStep(accuracyMetricTableName,
             accuracyMetricRule, accuracyMetricDetails)
           val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
-          val accuracyMetricExports = genMetricExport(metricParam, name, accuracyMetricTableName) :: Nil
+          val accuracyMetricExports = genMetricExport(metricParam, name, accuracyMetricTableName, ct, mode) :: Nil
 
           // 6. collect accuracy records
           val accuracyRecordTableName = "__accuracyRecords"
@@ -230,7 +203,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
           val accuracyRecordParam = recordParam.addIfNotExist(ExportParamKeys._dataSourceCache, sourceName)
             .addIfNotExist(ExportParamKeys._originDF, missRecordsTableName)
           val accuracyRecordExports = genRecordExport(
-            accuracyRecordParam, missRecordsTableName, accuracyRecordTableName) :: Nil
+            accuracyRecordParam, missRecordsTableName, accuracyRecordTableName, ct, mode) :: Nil
 
           // gen accu plan
           val extraSteps = accuracyMetricStep :: accuracyRecordStep :: Nil
@@ -248,7 +221,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
   }
 
   private def profilingRulePlan(timeInfo: TimeInfo, name: String, expr: Expr,
-                                param: Map[String, Any], processType: ProcessType
+                                param: Map[String, Any], procType: ProcessType
                                ): RulePlan = {
     val details = getDetails(param)
     val profilingClause = expr.asInstanceOf[ProfilingClause]
@@ -258,6 +231,10 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
     }
     val fromClause = profilingClause.fromClauseOpt.getOrElse(FromClause(sourceName)).desc
 
+    val mode = ExportMode.defaultMode(procType)
+
+    val ct = timeInfo.calcTime
+
     if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
       emptyRulePlan
     } else {
@@ -270,12 +247,12 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
         s"${sel.desc}${alias}"
       }
       val selCondition = profilingClause.selectClause.extraConditionOpt.map(_.desc).mkString
-      val selClause = processType match {
+      val selClause = procType match {
         case BatchProcessType => selExprDescs.mkString(", ")
         case StreamingProcessType => (s"`${InternalColumns.tmst}`" +: selExprDescs).mkString(", ")
       }
       val groupByClauseOpt = analyzer.groupbyExprOpt
-      val groupbyClause = processType match {
+      val groupbyClause = procType match {
         case BatchProcessType => groupByClauseOpt.map(_.desc).getOrElse("")
         case StreamingProcessType => {
           val tmstGroupbyClause = GroupbyClause(LiteralStringExpr(s"`${InternalColumns.tmst}`") :: Nil, None)
@@ -296,25 +273,29 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
       val profilingName = name
       val profilingStep = SparkSqlStep(profilingName, profilingSql, details)
       val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
-      val profilingExports = genMetricExport(metricParam, name, profilingName) :: Nil
+      val profilingExports = genMetricExport(metricParam, name, profilingName, ct, mode) :: Nil
 
       RulePlan(profilingStep :: Nil, profilingExports)
     }
   }
 
   private def uniquenessRulePlan(timeInfo: TimeInfo, name: String, expr: Expr,
-                                 param: Map[String, Any], processType: ProcessType
+                                 param: Map[String, Any], procType: ProcessType
                                 ): RulePlan = {
     val details = getDetails(param)
     val sourceName = details.getString(UniquenessKeys._source, dataSourceNames.head)
     val targetName = details.getString(UniquenessKeys._target, dataSourceNames.tail.head)
     val analyzer = UniquenessAnalyzer(expr.asInstanceOf[UniquenessClause], sourceName, targetName)
 
+    val mode = ExportMode.defaultMode(procType)
+
+    val ct = timeInfo.calcTime
+
     if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
-      println(s"[${timeInfo.calcTime}] data source ${sourceName} not exists")
+      println(s"[${ct}] data source ${sourceName} not exists")
       emptyRulePlan
     } else if (!TableRegisters.existRunTempTable(timeInfo.key, targetName)) {
-      println(s"[${timeInfo.calcTime}] data source ${targetName} not exists")
+      println(s"[${ct}] data source ${targetName} not exists")
       emptyRulePlan
     } else {
       val selItemsClause = analyzer.selectionPairs.map { pair =>
@@ -323,16 +304,16 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
       }.mkString(", ")
       val aliases = analyzer.selectionPairs.map(_._2)
 
-      val selClause = processType match {
+      val selClause = procType match {
         case BatchProcessType => selItemsClause
         case StreamingProcessType => s"`${InternalColumns.tmst}`, ${selItemsClause}"
       }
-      val selAliases = processType match {
+      val selAliases = procType match {
         case BatchProcessType => aliases
         case StreamingProcessType => InternalColumns.tmst +: aliases
       }
 
-      // 1. source mapping
+      // 1. source distinct mapping
       val sourceTableName = "__source"
       val sourceSql = s"SELECT DISTINCT ${selClause} FROM ${sourceName}"
       val sourceStep = SparkSqlStep(sourceTableName, sourceSql, emptyMap)
@@ -369,7 +350,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
       // 5. total metric
       val totalTableName = "__totalMetric"
       val totalColName = details.getStringOrKey(UniquenessKeys._total)
-      val totalSql = processType match {
+      val totalSql = procType match {
         case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceName}`"
         case StreamingProcessType => {
           s"""
@@ -380,7 +361,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
       }
       val totalStep = SparkSqlStep(totalTableName, totalSql, emptyMap)
       val totalMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc)
-      val totalMetricExport = genMetricExport(totalMetricParam, totalColName, totalTableName)
+      val totalMetricExport = genMetricExport(totalMetricParam, totalColName, totalTableName, ct, mode)
 
       // 6. unique record
       val uniqueRecordTableName = "__uniqueRecord"
@@ -392,7 +373,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
       // 7. unique metric
       val uniqueTableName = "__uniqueMetric"
       val uniqueColName = details.getStringOrKey(UniquenessKeys._unique)
-      val uniqueSql = processType match {
+      val uniqueSql = procType match {
         case BatchProcessType => s"SELECT COUNT(*) AS `${uniqueColName}` FROM `${uniqueRecordTableName}`"
         case StreamingProcessType => {
           s"""
@@ -403,32 +384,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
       }
       val uniqueStep = SparkSqlStep(uniqueTableName, uniqueSql, emptyMap)
       val uniqueMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc)
-      val uniqueMetricExport = genMetricExport(uniqueMetricParam, uniqueColName, uniqueTableName)
-
-      // 8. count metric
-//      val countMetricTableName = "__countMetric"
-//      val countMetricSql = processType match {
-//        case BatchProcessType => {
-//          s"""
-//             |SELECT `${totalTableName}`.`${totalColName}` AS `${totalColName}`,
-//             |coalesce(`${uniqueTableName}`.`${uniqueColName}`, 0) AS `${uniqueColName}`
-//             |FROM `${totalTableName}` LEFT JOIN `${uniqueTableName}`
-//          """.stripMargin
-//        }
-//        case StreamingProcessType => {
-//          s"""
-//             |SELECT `${totalTableName}`.`${InternalColumns.tmst}` AS `${InternalColumns.tmst}`,
-//             |`${totalTableName}`.`${totalColName}` AS `${totalColName}`,
-//             |coalesce(`${uniqueTableName}`.`${uniqueColName}`, 0) AS `${uniqueColName}`
-//             |FROM `${totalTableName}` LEFT JOIN `${uniqueTableName}`
-//             |ON `${totalTableName}`.`${InternalColumns.tmst}` = `${uniqueTableName}`.`${InternalColumns.tmst}`
-//          """.stripMargin
-//        }
-//      }
-//      val countMetricStep = SparkSqlStep(countMetricTableName, countMetricSql, emptyMap)
-//      val countMetricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
-//        .addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc)
-//      val countMetricExport = genMetricExport(countMetricParam, "", countMetricTableName)
+      val uniqueMetricExport = genMetricExport(uniqueMetricParam, uniqueColName, uniqueTableName, ct, mode)
 
       val uniqueSteps = sourceStep :: targetStep :: joinedStep :: groupStep ::
         totalStep :: uniqueRecordStep :: uniqueStep :: Nil
@@ -444,16 +400,16 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
         }
         val dupRecordStep = SparkSqlStep(dupRecordTableName, dupRecordSql, emptyMap, true)
         val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
-        val dupRecordExport = genRecordExport(recordParam, dupRecordTableName, dupRecordTableName)
+        val dupRecordExport = genRecordExport(recordParam, dupRecordTableName, dupRecordTableName, ct, mode)
 
         // 9. duplicate metric
         val dupMetricTableName = "__dupMetric"
         val numColName = details.getStringOrKey(UniquenessKeys._num)
-        val dupMetricSelClause = processType match {
+        val dupMetricSelClause = procType match {
           case BatchProcessType => s"`${dupColName}`, COUNT(*) AS `${numColName}`"
           case StreamingProcessType => s"`${InternalColumns.tmst}`, `${dupColName}`, COUNT(*) AS `${numColName}`"
         }
-        val dupMetricGroupbyClause = processType match {
+        val dupMetricGroupbyClause = procType match {
           case BatchProcessType => s"`${dupColName}`"
           case StreamingProcessType => s"`${InternalColumns.tmst}`, `${dupColName}`"
         }
@@ -465,7 +421,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
         }
         val dupMetricStep = SparkSqlStep(dupMetricTableName, dupMetricSql, emptyMap)
         val dupMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc)
-        val dupMetricExport = genMetricExport(dupMetricParam, duplicationArrayName, dupMetricTableName)
+        val dupMetricExport = genMetricExport(dupMetricParam, duplicationArrayName, dupMetricTableName, ct, mode)
 
         RulePlan(dupRecordStep :: dupMetricStep :: Nil, dupRecordExport :: dupMetricExport :: Nil)
       } else emptyRulePlan
@@ -474,13 +430,202 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
     }
   }
 
+  private def distinctRulePlan(timeInfo: TimeInfo, name: String, expr: Expr,
+                               param: Map[String, Any], procType: ProcessType,
+                               dsTimeRanges: Map[String, TimeRange]
+                              ): RulePlan = {
+    val details = getDetails(param)
+    val sourceName = details.getString(DistinctnessKeys._source, dataSourceNames.head)
+    val targetName = details.getString(UniquenessKeys._target, dataSourceNames.tail.head)
+    val analyzer = DistinctnessAnalyzer(expr.asInstanceOf[DistinctnessClause], sourceName)
+
+    val mode = SimpleMode
+
+    val ct = timeInfo.calcTime
+
+    val sourceTimeRange = dsTimeRanges.get(sourceName).getOrElse(TimeRange(ct))
+    val beginTime = sourceTimeRange.begin
+
+    if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
+      println(s"[${ct}] data source ${sourceName} not exists")
+      emptyRulePlan
+    } else {
+      val withOlderTable = {
+        details.getBoolean(DistinctnessKeys._withAccumulate, true) &&
+          TableRegisters.existRunTempTable(timeInfo.key, targetName)
+      }
+
+      val selClause = analyzer.selectionPairs.map { pair =>
+        val (expr, alias) = pair
+        s"${expr.desc} AS `${alias}`"
+      }.mkString(", ")
+      val aliases = analyzer.selectionPairs.map(_._2)
+      val aliasesClause = aliases.map( a => s"`${a}`" ).mkString(", ")
+
+      // 1. source alias
+      val sourceAliasTableName = "__sourceAlias"
+      val sourceAliasSql = {
+        s"SELECT ${selClause} FROM `${sourceName}`"
+      }
+      val sourceAliasStep = SparkSqlStep(sourceAliasTableName, sourceAliasSql, emptyMap, true)
+
+      // 2. total metric
+      val totalTableName = "__totalMetric"
+      val totalColName = details.getStringOrKey(DistinctnessKeys._total)
+      val totalSql = {
+        s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceAliasTableName}`"
+      }
+      val totalStep = SparkSqlStep(totalTableName, totalSql, emptyMap)
+      val totalMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc)
+      val totalMetricExport = genMetricExport(totalMetricParam, totalColName, totalTableName, beginTime, mode)
+
+      // 3. group by self
+      val selfGroupTableName = "__selfGroup"
+      val dupColName = details.getStringOrKey(DistinctnessKeys._dup)
+      val accuDupColName = details.getStringOrKey(DistinctnessKeys._accu_dup)
+      val selfGroupSql = {
+        s"""
+           |SELECT ${aliasesClause}, (COUNT(*) - 1) AS `${dupColName}`,
+           |TRUE AS `${InternalColumns.distinct}`
+           |FROM `${sourceAliasTableName}` GROUP BY ${aliasesClause}
+          """.stripMargin
+      }
+      val selfGroupStep = SparkSqlStep(selfGroupTableName, selfGroupSql, emptyMap, true)
+
+      val selfDistRulePlan = RulePlan(
+        sourceAliasStep :: totalStep :: selfGroupStep :: Nil,
+        totalMetricExport :: Nil
+      )
+
+      val (distRulePlan, dupCountTableName) = procType match {
+        case StreamingProcessType if (withOlderTable) => {
+          // 4. older alias
+          val olderAliasTableName = "__older"
+          val olderAliasSql = {
+            s"SELECT ${selClause} FROM `${targetName}` WHERE `${InternalColumns.tmst}` < ${beginTime}"
+          }
+          val olderAliasStep = SparkSqlStep(olderAliasTableName, olderAliasSql, emptyMap)
+
+          // 5. join with older data
+          val joinedTableName = "__joined"
+          val selfSelClause = (aliases :+ dupColName).map { alias =>
+            s"`${selfGroupTableName}`.`${alias}`"
+          }.mkString(", ")
+          val onClause = aliases.map { alias =>
+            s"coalesce(`${selfGroupTableName}`.`${alias}`, '') = coalesce(`${olderAliasTableName}`.`${alias}`, '')"
+          }.mkString(" AND ")
+          val olderIsNull = aliases.map { alias =>
+            s"`${olderAliasTableName}`.`${alias}` IS NULL"
+          }.mkString(" AND ")
+          val joinedSql = {
+            s"""
+               |SELECT ${selfSelClause}, (${olderIsNull}) AS `${InternalColumns.distinct}`
+               |FROM `${olderAliasTableName}` RIGHT JOIN `${selfGroupTableName}`
+               |ON ${onClause}
+            """.stripMargin
+          }
+          val joinedStep = SparkSqlStep(joinedTableName, joinedSql, emptyMap)
+
+          // 6. group by joined data
+          val groupTableName = "__group"
+          val moreDupColName = "_more_dup"
+          val groupSql = {
+            s"""
+               |SELECT ${aliasesClause}, `${dupColName}`, `${InternalColumns.distinct}`,
+               |COUNT(*) AS `${moreDupColName}`
+               |FROM `${joinedTableName}`
+               |GROUP BY ${aliasesClause}, `${dupColName}`, `${InternalColumns.distinct}`
+             """.stripMargin
+          }
+          val groupStep = SparkSqlStep(groupTableName, groupSql, emptyMap)
+
+          // 7. final duplicate count
+          val finalDupCountTableName = "__finalDupCount"
+          val finalDupCountSql = {
+            s"""
+               |SELECT ${aliasesClause}, `${InternalColumns.distinct}`,
+               |CASE WHEN `${InternalColumns.distinct}` THEN `${dupColName}`
+               |ELSE (`${dupColName}` + 1) END AS `${dupColName}`,
+               |CASE WHEN `${InternalColumns.distinct}` THEN `${dupColName}`
+               |ELSE (`${dupColName}` + `${moreDupColName}`) END AS `${accuDupColName}`
+               |FROM `${groupTableName}`
+             """.stripMargin
+          }
+          val finalDupCountStep = SparkSqlStep(finalDupCountTableName, finalDupCountSql, emptyMap, true)
+
+          val rulePlan = RulePlan(olderAliasStep :: joinedStep :: groupStep :: finalDupCountStep :: Nil, Nil)
+          (rulePlan, finalDupCountTableName)
+        }
+        case _ => {
+          (emptyRulePlan, selfGroupTableName)
+        }
+      }
+
+      // 8. distinct metric
+      val distTableName = "__distMetric"
+      val distColName = details.getStringOrKey(DistinctnessKeys._distinct)
+      val distSql = {
+        s"""
+           |SELECT COUNT(*) AS `${distColName}`
+           |FROM `${dupCountTableName}` WHERE `${InternalColumns.distinct}`
+         """.stripMargin
+      }
+      val distStep = SparkSqlStep(distTableName, distSql, emptyMap)
+      val distMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc)
+      val distMetricExport = genMetricExport(distMetricParam, distColName, distTableName, beginTime, mode)
+
+      val distMetricRulePlan = RulePlan(distStep :: Nil, distMetricExport :: Nil)
+
+      val duplicationArrayName = details.getString(UniquenessKeys._duplicationArray, "")
+      val dupRulePlan = if (duplicationArrayName.nonEmpty) {
+        // 9. duplicate record
+        val dupRecordTableName = "__dupRecords"
+        val dupRecordSelClause = procType match {
+          case StreamingProcessType if (withOlderTable) => s"${aliasesClause}, `${dupColName}`, `${accuDupColName}`"
+          case _ => s"${aliasesClause}, `${dupColName}`"
+        }
+        val dupRecordSql = {
+          s"""
+             |SELECT ${dupRecordSelClause}
+             |FROM `${dupCountTableName}` WHERE `${dupColName}` > 0
+           """.stripMargin
+        }
+        val dupRecordStep = SparkSqlStep(dupRecordTableName, dupRecordSql, emptyMap, true)
+        val dupRecordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
+        val dupRecordExport = genRecordExport(dupRecordParam, dupRecordTableName, dupRecordTableName, beginTime, mode)
+
+        // 10. duplicate metric
+        val dupMetricTableName = "__dupMetric"
+        val numColName = details.getStringOrKey(DistinctnessKeys._num)
+        val dupMetricSql = {
+          s"""
+             |SELECT `${dupColName}`, COUNT(*) AS `${numColName}`
+             |FROM `${dupRecordTableName}` GROUP BY `${dupColName}`
+         """.stripMargin
+        }
+        val dupMetricStep = SparkSqlStep(dupMetricTableName, dupMetricSql, emptyMap)
+        val dupMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc)
+        val dupMetricExport = genMetricExport(dupMetricParam, duplicationArrayName, dupMetricTableName, beginTime, mode)
+
+        RulePlan(dupRecordStep :: dupMetricStep :: Nil, dupRecordExport :: dupMetricExport :: Nil)
+      } else emptyRulePlan
+
+      selfDistRulePlan.merge(distRulePlan).merge(distMetricRulePlan).merge(dupRulePlan)
+
+    }
+  }
+
   private def timelinessRulePlan(timeInfo: TimeInfo, name: String, expr: Expr,
-                                 param: Map[String, Any], processType: ProcessType
+                                 param: Map[String, Any], procType: ProcessType
                                 ): RulePlan = {
     val details = getDetails(param)
     val timelinessClause = expr.asInstanceOf[TimelinessClause]
     val sourceName = details.getString(TimelinessKeys._source, dataSourceNames.head)
 
+    val mode = ExportMode.defaultMode(procType)
+
+    val ct = timeInfo.calcTime
+
     if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
       emptyRulePlan
     } else {
@@ -521,7 +666,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
 
       // 3. timeliness metric
       val metricTableName = name
-      val metricSql = processType match {
+      val metricSql = procType match {
         case BatchProcessType => {
           s"""
              |SELECT CAST(AVG(`${latencyColName}`) AS BIGINT) AS `avg`,
@@ -543,7 +688,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
       }
       val metricStep = SparkSqlStep(metricTableName, metricSql, emptyMap)
       val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
-      val metricExports = genMetricExport(metricParam, name, metricTableName) :: Nil
+      val metricExports = genMetricExport(metricParam, name, metricTableName, ct, mode) :: Nil
 
       // current timeliness plan
       val timeSteps = inTimeStep :: latencyStep :: metricStep :: Nil
@@ -559,7 +704,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
           }
           val recordStep = SparkSqlStep(recordTableName, recordSql, emptyMap)
           val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
-          val recordExports = genRecordExport(recordParam, recordTableName, recordTableName) :: Nil
+          val recordExports = genRecordExport(recordParam, recordTableName, recordTableName, ct, mode) :: Nil
           RulePlan(recordStep :: Nil, recordExports)
         }
         case _ => emptyRulePlan

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/InternalColumns.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/InternalColumns.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/InternalColumns.scala
index bd344b1..fc6a246 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/InternalColumns.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/InternalColumns.scala
@@ -27,5 +27,7 @@ object InternalColumns {
   val beginTs = "__begin_ts"
   val endTs = "__end_ts"
 
-  val columns = List[String](tmst, metric, record, empty, beginTs, endTs)
+  val distinct = "__distinct"
+
+  val columns = List[String](tmst, metric, record, empty, beginTs, endTs, distinct)
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala
index ebc8fdb..25025ac 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala
@@ -25,7 +25,8 @@ import org.apache.griffin.measure.cache.tmst.TempName
 import scala.collection.mutable.{Set => MutableSet}
 import org.apache.griffin.measure.config.params.user._
 import org.apache.griffin.measure.log.Loggable
-import org.apache.griffin.measure.process.ProcessType
+import org.apache.griffin.measure.process.{ExportMode, ProcessType}
+import org.apache.griffin.measure.process.temp.TimeRange
 import org.apache.griffin.measure.rule.dsl._
 import org.apache.griffin.measure.rule.plan.{TimeInfo, _}
 
@@ -115,30 +116,40 @@ trait RuleAdaptor extends Loggable with Serializable {
     RuleParamKeys.getName(param, RuleStepNameGenerator.genName)
   }
 
-  def genRulePlan(timeInfo: TimeInfo, param: Map[String, Any], procType: ProcessType): RulePlan
+  def genRulePlan(timeInfo: TimeInfo, param: Map[String, Any],
+                  procType: ProcessType, dsTimeRanges: Map[String, TimeRange]): RulePlan
 
-  protected def genRuleExports(param: Map[String, Any], defName: String, stepName: String): Seq[RuleExport] = {
+  protected def genRuleExports(param: Map[String, Any], defName: String,
+                               stepName: String, defTimestamp: Long,
+                               mode: ExportMode
+                              ): Seq[RuleExport] = {
     val metricOpt = RuleParamKeys.getMetricOpt(param)
-    val metricExportSeq = metricOpt.map(genMetricExport(_, defName, stepName)).toSeq
+    val metricExportSeq = metricOpt.map(genMetricExport(_, defName, stepName, defTimestamp, mode)).toSeq
     val recordOpt = RuleParamKeys.getRecordOpt(param)
-    val recordExportSeq = recordOpt.map(genRecordExport(_, defName, stepName)).toSeq
+    val recordExportSeq = recordOpt.map(genRecordExport(_, defName, stepName, defTimestamp, mode)).toSeq
     metricExportSeq ++ recordExportSeq
   }
-  protected def genMetricExport(param: Map[String, Any], name: String, stepName: String
+  protected def genMetricExport(param: Map[String, Any], name: String, stepName: String,
+                                defTimestamp: Long, mode: ExportMode
                                ): MetricExport = {
     MetricExport(
       ExportParamKeys.getName(param, name),
       stepName,
-      ExportParamKeys.getCollectType(param)
+      ExportParamKeys.getCollectType(param),
+      defTimestamp,
+      mode
     )
   }
-  protected def genRecordExport(param: Map[String, Any], name: String, stepName: String
+  protected def genRecordExport(param: Map[String, Any], name: String, stepName: String,
+                                defTimestamp: Long, mode: ExportMode
                                ): RecordExport = {
     RecordExport(
       ExportParamKeys.getName(param, name),
       stepName,
       ExportParamKeys.getDataSourceCacheOpt(param),
-      ExportParamKeys.getOriginDFOpt(param)
+      ExportParamKeys.getOriginDFOpt(param),
+      defTimestamp,
+      mode
     )
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala
index 1e077b1..30a356c 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala
@@ -21,7 +21,7 @@ package org.apache.griffin.measure.rule.adaptor
 import org.apache.griffin.measure.cache.tmst.TempName
 import org.apache.griffin.measure.config.params.user._
 import org.apache.griffin.measure.process.ProcessType
-import org.apache.griffin.measure.process.temp.TableRegisters
+import org.apache.griffin.measure.process.temp.{TableRegisters, TimeRange}
 import org.apache.griffin.measure.rule.dsl._
 import org.apache.griffin.measure.rule.plan._
 import org.apache.spark.sql.SQLContext
@@ -114,22 +114,24 @@ object RuleAdaptorGroup {
 //  }
 
   // -- gen rule plan --
-  def genRulePlan(timeInfo: TimeInfo, evaluateRuleParam: EvaluateRuleParam, procType: ProcessType
+  def genRulePlan(timeInfo: TimeInfo, evaluateRuleParam: EvaluateRuleParam,
+                  procType: ProcessType, dsTimeRanges: Map[String, TimeRange]
                  ): RulePlan = {
     val dslTypeStr = if (evaluateRuleParam.dslType == null) "" else evaluateRuleParam.dslType
     val defaultDslType = DslType(dslTypeStr)
     val ruleParams = evaluateRuleParam.rules
-    genRulePlan(timeInfo, ruleParams, defaultDslType, procType)
+    genRulePlan(timeInfo, ruleParams, defaultDslType, procType, dsTimeRanges)
   }
 
   def genRulePlan(timeInfo: TimeInfo, ruleParams: Seq[Map[String, Any]],
-                  defaultDslType: DslType, procType: ProcessType
+                  defaultDslType: DslType, procType: ProcessType,
+                  dsTimeRanges: Map[String, TimeRange]
                  ): RulePlan = {
     val (rulePlan, dsNames) = ruleParams.foldLeft((emptyRulePlan, dataSourceNames)) { (res, param) =>
       val (plan, names) = res
       val dslType = getDslType(param, defaultDslType)
       val curPlan: RulePlan = genRuleAdaptor(dslType, names) match {
-        case Some(adaptor) => adaptor.genRulePlan(timeInfo, param, procType)
+        case Some(adaptor) => adaptor.genRulePlan(timeInfo, param, procType, dsTimeRanges)
         case _ => emptyRulePlan
       }
       val globalNames = curPlan.globalRuleSteps.map(_.name)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala
index 6b3b7cb..1fce03b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala
@@ -19,7 +19,8 @@ under the License.
 package org.apache.griffin.measure.rule.adaptor
 
 import org.apache.griffin.measure.cache.tmst.TempName
-import org.apache.griffin.measure.process.ProcessType
+import org.apache.griffin.measure.process.{ExportMode, ProcessType}
+import org.apache.griffin.measure.process.temp.TimeRange
 import org.apache.griffin.measure.rule.dsl.MetricPersistType
 import org.apache.griffin.measure.rule.plan.{TimeInfo, _}
 import org.apache.griffin.measure.utils.ParamUtil._
@@ -39,10 +40,12 @@ case class SparkSqlAdaptor() extends RuleAdaptor {
 
   import RuleParamKeys._
 
-  def genRulePlan(timeInfo: TimeInfo, param: Map[String, Any], procType: ProcessType): RulePlan = {
+  def genRulePlan(timeInfo: TimeInfo, param: Map[String, Any],
+                  procType: ProcessType, dsTimeRanges: Map[String, TimeRange]): RulePlan = {
     val name = getRuleName(param)
     val step = SparkSqlStep(name, getRule(param), getDetails(param), getCache(param), getGlobal(param))
-    RulePlan(step :: Nil, genRuleExports(param, name, name))
+    val mode = ExportMode.defaultMode(procType)
+    RulePlan(step :: Nil, genRuleExports(param, name, name, timeInfo.calcTime, mode))
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala
index 11b67f2..18a5919 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala
@@ -28,7 +28,7 @@ sealed trait DqType {
 
 object DqType {
   private val dqTypes: List[DqType] = List(
-    AccuracyType, ProfilingType, UniquenessType, TimelinessType, UnknownType
+    AccuracyType, ProfilingType, UniquenessType, DistinctnessType, TimelinessType, UnknownType
   )
   def apply(ptn: String): DqType = {
     dqTypes.filter(tp => ptn match {
@@ -54,6 +54,11 @@ final case object UniquenessType extends DqType {
   val desc = "uniqueness"
 }
 
+final case object DistinctnessType extends DqType {
+  val regex = "^(?i)distinct$".r
+  val desc = "distinct"
+}
+
 final case object TimelinessType extends DqType {
   val regex = "^(?i)timeliness$".r
   val desc = "timeliness"

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/DistinctnessAnalyzer.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/DistinctnessAnalyzer.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/DistinctnessAnalyzer.scala
new file mode 100644
index 0000000..55e4f39
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/DistinctnessAnalyzer.scala
@@ -0,0 +1,47 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.dsl.analyzer
+
+import org.apache.griffin.measure.rule.dsl.expr._
+
+
+//case class DistinctnessAnalyzer(expr: DistinctnessClause, sourceName: String, targetName: String) extends BasicAnalyzer {
+case class DistinctnessAnalyzer(expr: DistinctnessClause, sourceName: String) extends BasicAnalyzer {
+
+  val seqAlias = (expr: Expr, v: Seq[String]) => {
+    expr match {
+      case apr: AliasableExpr => v ++ apr.alias
+      case _ => v
+    }
+  }
+  val combAlias = (a: Seq[String], b: Seq[String]) => a ++ b
+
+  private val exprs = expr.exprs
+  private def genAlias(idx: Int): String = s"alias_${idx}"
+  val selectionPairs = exprs.zipWithIndex.map { pair =>
+    val (pr, idx) = pair
+    val res = pr.preOrderTraverseDepthFirst(Seq[String]())(seqAlias, combAlias)
+    (pr, res.headOption.getOrElse(genAlias(idx)))
+  }
+
+  if (selectionPairs.isEmpty) {
+    throw new Exception(s"uniqueness analyzer error: empty selection")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala
index 504e176..340c1e2 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala
@@ -227,6 +227,14 @@ case class UniquenessClause(exprs: Seq[Expr]) extends ClauseExpression {
   override def map(func: (Expr) => Expr): UniquenessClause = UniquenessClause(exprs.map(func(_)))
 }
 
+case class DistinctnessClause(exprs: Seq[Expr]) extends ClauseExpression {
+  addChildren(exprs)
+
+  def desc: String = exprs.map(_.desc).mkString(", ")
+  def coalesceDesc: String = exprs.map(_.coalesceDesc).mkString(", ")
+  override def map(func: (Expr) => Expr): DistinctnessClause = DistinctnessClause(exprs.map(func(_)))
+}
+
 case class TimelinessClause(exprs: Seq[Expr]) extends ClauseExpression {
   addChildren(exprs)
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
index 83f3153..b129ead 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
@@ -47,6 +47,14 @@ case class GriffinDslParser(dataSourceNames: Seq[String], functionNames: Seq[Str
   }
 
   /**
+    * -- distinctness clauses --
+    * <distinctness-clauses> = <expr> [, <expr>]+
+    */
+  def distinctnessClause: Parser[DistinctnessClause] = rep1sep(expression, Operator.COMMA) ^^ {
+    case exprs => DistinctnessClause(exprs)
+  }
+
+  /**
     * -- timeliness clauses --
     * <timeliness-clauses> = <expr> [, <expr>]+
     */
@@ -59,6 +67,7 @@ case class GriffinDslParser(dataSourceNames: Seq[String], functionNames: Seq[Str
       case AccuracyType => logicalExpression
       case ProfilingType => profilingClause
       case UniquenessType => uniquenessClause
+      case DistinctnessType => distinctnessClause
       case TimelinessType => timelinessClause
       case _ => expression
     }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/plan/MetricExport.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/MetricExport.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/MetricExport.scala
index 10f1f9b..ac14153 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/MetricExport.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/MetricExport.scala
@@ -18,11 +18,17 @@ under the License.
 */
 package org.apache.griffin.measure.rule.plan
 
+import org.apache.griffin.measure.process.ExportMode
 import org.apache.griffin.measure.rule.dsl._
 
 case class MetricExport(name: String,
                         stepName: String,
-                        collectType: CollectType
+                        collectType: CollectType,
+                        defTimestamp: Long,
+                        mode: ExportMode
                        ) extends RuleExport {
 
+  def setDefTimestamp(t: Long): RuleExport =
+    MetricExport(name, stepName, collectType, t, mode)
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RecordExport.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RecordExport.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RecordExport.scala
index a467543..6afc836 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RecordExport.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RecordExport.scala
@@ -18,10 +18,17 @@ under the License.
 */
 package org.apache.griffin.measure.rule.plan
 
+import org.apache.griffin.measure.process.ExportMode
+
 case class RecordExport(name: String,
                         stepName: String,
                         dataSourceCacheOpt: Option[String],
-                        originDFOpt: Option[String]
+                        originDFOpt: Option[String],
+                        defTimestamp: Long,
+                        mode: ExportMode
                        ) extends RuleExport {
 
+  def setDefTimestamp(t: Long): RuleExport =
+    RecordExport(name, stepName, dataSourceCacheOpt, originDFOpt, t, mode)
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleExport.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleExport.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleExport.scala
index 26a962a..84467c2 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleExport.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleExport.scala
@@ -18,10 +18,18 @@ under the License.
 */
 package org.apache.griffin.measure.rule.plan
 
+import org.apache.griffin.measure.process.ExportMode
+
 trait RuleExport extends Serializable {
 
   val name: String    // export name
 
   val stepName: String    // the dependant step name
 
+  val defTimestamp: Long    // the default timestamp if tmst not in value
+
+  val mode: ExportMode   // export mode
+
+  def setDefTimestamp(t: Long): RuleExport
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/test/resources/_distinctness-batch-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_distinctness-batch-griffindsl.json b/measure/src/test/resources/_distinctness-batch-griffindsl.json
new file mode 100644
index 0000000..af0c91e
--- /dev/null
+++ b/measure/src/test/resources/_distinctness-batch-griffindsl.json
@@ -0,0 +1,57 @@
+{
+  "name": "dist_batch",
+
+  "process.type": "batch",
+
+  "data.sources": [
+    {
+      "name": "source",
+      "baseline": true,
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "src/test/resources/users_info_src.avro"
+          }
+        }
+      ]
+    },
+    {
+      "name": "target",
+      "baseline": true,
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "src/test/resources/users_info_src.avro"
+          }
+        }
+      ]
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "distinct",
+        "name": "dist",
+        "rule": "user_id",
+        "details": {
+          "source": "source",
+          "target": "target",
+          "total": "total",
+          "distinct": "distinct",
+          "dup": "dup",
+          "num": "num",
+          "duplication.array": "dup"
+        },
+        "metric": {
+          "name": "distinct"
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/test/resources/_distinctness-batch-griffindsl1.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_distinctness-batch-griffindsl1.json b/measure/src/test/resources/_distinctness-batch-griffindsl1.json
new file mode 100644
index 0000000..f8aa077
--- /dev/null
+++ b/measure/src/test/resources/_distinctness-batch-griffindsl1.json
@@ -0,0 +1,73 @@
+{
+  "name": "dist_batch",
+
+  "process.type": "batch",
+
+  "timestamp": 123456,
+
+  "data.sources": [
+    {
+      "name": "source",
+      "baseline": true,
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "src/test/resources/dupdata.avro"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select DISTINCT name, age from ${this}"
+            }
+          ]
+        }
+      ]
+    },
+    {
+      "name": "target",
+      "baseline": true,
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "src/test/resources/dupdata.avro"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select DISTINCT name, age from ${this}"
+            }
+          ]
+        }
+      ]
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "distinct",
+        "name": "dist",
+        "rule": "name",
+        "details": {
+          "source": "source",
+          "target": "target",
+          "total": "total",
+          "distinct": "distinct",
+          "dup": "dup",
+          "num": "num",
+          "duplication.array": "dup"
+        },
+        "metric": {
+          "name": "distinct"
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/test/resources/_distinctness-streaming-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_distinctness-streaming-griffindsl.json b/measure/src/test/resources/_distinctness-streaming-griffindsl.json
new file mode 100644
index 0000000..c36e7ba
--- /dev/null
+++ b/measure/src/test/resources/_distinctness-streaming-griffindsl.json
@@ -0,0 +1,85 @@
+{
+  "name": "dist_streaming",
+
+  "process.type": "streaming",
+
+  "data.sources": [
+    {
+      "name": "new",
+      "cache": {
+        "file.path": "hdfs://localhost/griffin/streaming/dump/old",
+        "info.path": "new",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0",
+        "time.range": ["0", "0"],
+        "read.only": true
+      }
+    },
+    {
+      "name": "old",
+      "connectors": [
+        {
+          "type": "kafka",
+          "version": "0.8",
+          "config": {
+            "kafka.config": {
+              "bootstrap.servers": "10.149.247.156:9092",
+              "group.id": "old",
+              "auto.offset.reset": "smallest",
+              "auto.commit.enable": "false"
+            },
+            "topics": "ttt",
+            "key.type": "java.lang.String",
+            "value.type": "java.lang.String"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "df-opr",
+              "name": "${s1}",
+              "rule": "from_json",
+              "details": {
+                "df.name": "${this}"
+              }
+            },
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select name, age from ${s1}"
+            }
+          ]
+        }
+      ],
+      "cache": {
+        "file.path": "hdfs://localhost/griffin/streaming/dump/old",
+        "info.path": "old",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0",
+        "time.range": ["-24h", "0"]
+      }
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "distinct",
+        "name": "dist",
+        "rule": "name, age",
+        "details": {
+          "source": "new",
+          "target": "old",
+          "total": "total",
+          "distinct": "distinct",
+          "dup": "dup",
+          "accu_dup": "accu_dup",
+          "num": "num",
+          "duplication.array": "dup"
+        },
+        "metric": {
+          "name": "distinct"
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/test/resources/_profiling-batch-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_profiling-batch-griffindsl.json b/measure/src/test/resources/_profiling-batch-griffindsl.json
index cd99eb1..043ba85 100644
--- a/measure/src/test/resources/_profiling-batch-griffindsl.json
+++ b/measure/src/test/resources/_profiling-batch-griffindsl.json
@@ -26,7 +26,7 @@
         "dsl.type": "griffin-dsl",
         "dq.type": "profiling",
         "name": "prof",
-        "rule": "select count(*) as `cnt`, count(distinct `post_code`) as `dis-cnt`, max(user_id) as `max` from source",
+        "rule": "count(*) from source",
         "metric": {
           "name": "prof"
         }
@@ -35,7 +35,7 @@
         "dsl.type": "griffin-dsl",
         "dq.type": "profiling",
         "name": "grp",
-        "rule": "select post_code as `pc`, count(*) as `cnt` from source group by post_code",
+        "rule": "source.post_code, count(*) from source group by source.post_code",
         "metric": {
           "name": "post_group",
           "collect.type": "array"

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/test/resources/dupdata.avro
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/dupdata.avro b/measure/src/test/resources/dupdata.avro
new file mode 100644
index 0000000..f6bd312
Binary files /dev/null and b/measure/src/test/resources/dupdata.avro differ

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/test/resources/empty.avro
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/empty.avro b/measure/src/test/resources/empty.avro
new file mode 100644
index 0000000..1ac3a72
Binary files /dev/null and b/measure/src/test/resources/empty.avro differ


[2/2] incubator-griffin git commit: Add distinctness measurement

Posted by gu...@apache.org.
Add distinctness measurement

add distinct measurement
add data source type which only read data from cache in streaming mode

Author: Lionel Liu <bh...@163.com>

Closes #191 from bhlx3lyx7/tmst.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/cbff5b45
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/cbff5b45
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/cbff5b45

Branch: refs/heads/master
Commit: cbff5b45c19da1ff4354aba5a1ced35c3a437a9c
Parents: e704da6
Author: Lionel Liu <bh...@163.com>
Authored: Tue Jan 16 16:32:31 2018 +0800
Committer: Lionel Liu <bh...@163.com>
Committed: Tue Jan 16 16:32:31 2018 +0800

----------------------------------------------------------------------
 griffin-doc/measure/measure-batch-sample.md     |  64 ++--
 .../measure/measure-configuration-guide.md      |  66 ++--
 .../measure/measure-streaming-sample-old.md     | 204 -----------
 griffin-doc/measure/measure-streaming-sample.md | 256 ++++++++++++++
 .../measure/cache/info/TimeInfoCache.scala      |  24 +-
 .../measure/cache/info/ZKInfoCache.scala        |   8 +-
 .../config/params/user/DataSourceParam.scala    |   1 +
 .../measure/data/connector/DataConnector.scala  |   9 +-
 .../batch/AvroBatchDataConnector.scala          |   6 +-
 .../batch/HiveBatchDataConnector.scala          |   6 +-
 .../batch/TextDirBatchDataConnector.scala       |   6 +-
 .../streaming/StreamingDataConnector.scala      |   3 +-
 .../measure/data/source/DataSource.scala        |  14 +-
 .../measure/data/source/DataSourceCache.scala   | 303 +++++++++--------
 .../measure/data/source/DataSourceFactory.scala |   2 +-
 .../measure/process/BatchDqProcess.scala        |  12 +-
 .../griffin/measure/process/ExportMode.scala    |  34 ++
 .../measure/process/StreamingDqThread.scala     |  93 ++---
 .../measure/process/engine/DqEngine.scala       |   6 +-
 .../measure/process/engine/DqEngines.scala      |  39 ++-
 .../measure/process/engine/SparkDqEngine.scala  | 103 +++---
 .../measure/process/engine/SparkSqlEngine.scala |   3 +
 .../measure/process/temp/TimeRange.scala        |  41 +++
 .../rule/adaptor/DataFrameOprAdaptor.scala      |   9 +-
 .../measure/rule/adaptor/GlobalKeys.scala       |  70 ++++
 .../rule/adaptor/GriffinDslAdaptor.scala        | 335 +++++++++++++------
 .../measure/rule/adaptor/InternalColumns.scala  |   4 +-
 .../measure/rule/adaptor/RuleAdaptor.scala      |  29 +-
 .../measure/rule/adaptor/RuleAdaptorGroup.scala |  12 +-
 .../measure/rule/adaptor/SparkSqlAdaptor.scala  |   9 +-
 .../griffin/measure/rule/dsl/DqType.scala       |   7 +-
 .../dsl/analyzer/DistinctnessAnalyzer.scala     |  47 +++
 .../rule/dsl/expr/ClauseExpression.scala        |   8 +
 .../rule/dsl/parser/GriffinDslParser.scala      |   9 +
 .../measure/rule/plan/MetricExport.scala        |   8 +-
 .../measure/rule/plan/RecordExport.scala        |   9 +-
 .../griffin/measure/rule/plan/RuleExport.scala  |   8 +
 .../_distinctness-batch-griffindsl.json         |  57 ++++
 .../_distinctness-batch-griffindsl1.json        |  73 ++++
 .../_distinctness-streaming-griffindsl.json     |  85 +++++
 .../resources/_profiling-batch-griffindsl.json  |   4 +-
 measure/src/test/resources/dupdata.avro         | Bin 0 -> 304 bytes
 measure/src/test/resources/empty.avro           | Bin 0 -> 215 bytes
 43 files changed, 1396 insertions(+), 690 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/griffin-doc/measure/measure-batch-sample.md
----------------------------------------------------------------------
diff --git a/griffin-doc/measure/measure-batch-sample.md b/griffin-doc/measure/measure-batch-sample.md
index 3783f94..544adc7 100644
--- a/griffin-doc/measure/measure-batch-sample.md
+++ b/griffin-doc/measure/measure-batch-sample.md
@@ -29,50 +29,50 @@ Measures consists of batch measure and streaming measure. This document is for t
 
   "data.sources": [
     {
-      "name": "src",
+      "name": "source",
+      "baseline": true,
       "connectors": [
         {
           "type": "avro",
           "version": "1.7",
           "config": {
-            "file.name": "users_info_src.avro"
+            "file.name": "src/test/resources/users_info_src.avro"
           }
         }
       ]
     }, {
-      "name": "tgt",
+      "name": "target",
       "connectors": [
         {
           "type": "avro",
           "version": "1.7",
           "config": {
-            "file.name": "users_info_target.avro"
+            "file.name": "src/test/resources/users_info_target.avro"
           }
         }
       ]
     }
   ],
 
-  "evaluateRule": {
+  "evaluate.rule": {
     "rules": [
       {
         "dsl.type": "griffin-dsl",
         "dq.type": "accuracy",
-        "rule": "src.user_id = tgt.user_id AND upper(src.first_name) = upper(tgt.first_name) AND src.last_name = tgt.last_name",
+        "name": "accu",
+        "rule": "source.user_id = target.user_id AND upper(source.first_name) = upper(target.first_name) AND source.last_name = target.last_name AND source.address = target.address AND source.email = target.email AND source.phone = target.phone AND source.post_code = target.post_code",
         "details": {
-          "source": "src",
-          "target": "tgt",
-          "miss.records": {
-            "name": "miss.records",
-            "persist.type": "record"
-          },
-          "accuracy": {
-            "name": "accu",
-            "persist.type": "metric"
-          },
+          "source": "source",
+          "target": "target",
           "miss": "miss_count",
           "total": "total_count",
           "matched": "matched_count"
+        },
+        "metric": {
+          "name": "accu"
+        },
+        "record": {
+          "name": "missRecords"
         }
       }
     ]
@@ -92,7 +92,7 @@ The miss records of source will be persisted as record.
 ## Batch Profiling Sample
 ```
 {
-  "name": "prof_batch_test",
+  "name": "prof_batch",
 
   "process.type": "batch",
 
@@ -101,29 +101,35 @@ The miss records of source will be persisted as record.
       "name": "source",
       "connectors": [
         {
-          "type": "hive",
-          "version": "1.2",
+          "type": "avro",
+          "version": "1.7",
           "config": {
-          	"database": "griffin",
-          	"table.name": "demo_src"
+            "file.name": "src/test/resources/users_info_src.avro"
           }
         }
       ]
     }
   ],
 
-  "evaluateRule": {
+  "evaluate.rule": {
     "rules": [
       {
         "dsl.type": "griffin-dsl",
         "dq.type": "profiling",
-        "rule": "country, country.count() as cnt group by country order by cnt desc limit 3",
-        "details": {
-          "source": "source",
-          "profiling": {
-            "name": "cntry-group",
-            "persist.type": "metric"
-          }
+        "name": "prof",
+        "rule": "select count(*) as `cnt`, count(distinct `post_code`) as `dis-cnt`, max(user_id) as `max` from source",
+        "metric": {
+          "name": "prof"
+        }
+      },
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "profiling",
+        "name": "grp",
+        "rule": "select post_code as `pc`, count(*) as `cnt` from source group by post_code",
+        "metric": {
+          "name": "post_group",
+          "collect.type": "array"
         }
       }
     ]

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/griffin-doc/measure/measure-configuration-guide.md
----------------------------------------------------------------------
diff --git a/griffin-doc/measure/measure-configuration-guide.md b/griffin-doc/measure/measure-configuration-guide.md
index 0632927..5ac7e5f 100644
--- a/griffin-doc/measure/measure-configuration-guide.md
+++ b/griffin-doc/measure/measure-configuration-guide.md
@@ -136,26 +136,25 @@ Above lists environment parameters.
     }
   ],
 
-  "evaluateRule": {
+  "evaluate.rule": {
     "rules": [
       {
         "dsl.type": "griffin-dsl",
         "dq.type": "accuracy",
-        "rule": "src.user_id = tgt.user_id AND upper(src.first_name) = upper(tgt.first_name) AND src.last_name = tgt.last_name",
+        "name": "accu",
+        "rule": "source.user_id = target.user_id AND upper(source.first_name) = upper(target.first_name) AND source.last_name = target.last_name AND source.address = target.address AND source.email = target.email AND source.phone = target.phone AND source.post_code = target.post_code",
         "details": {
-          "source": "src",
-          "target": "tgt",
-          "miss.records": {
-            "name": "miss.records",
-            "persist.type": "record"
-          },
-          "accuracy": {
-            "name": "accu",
-            "persist.type": "metric"
-          },
+          "source": "source",
+          "target": "target",
           "miss": "miss_count",
           "total": "total_count",
           "matched": "matched_count"
+        },
+        "metric": {
+          "name": "accu"
+        },
+        "record": {
+          "name": "missRecords"
         }
       }
     ]
@@ -193,19 +192,34 @@ Above lists DQ job configure parameters.
 
 ### <a name="rule"></a>Rule
 - **dsl.type**: Rule dsl type, "spark-sql", "df-opr" and "griffin-dsl".
-- **name** (step information): Result table name of this rule, optional for "griffin-dsl" type.
-- **persist.type** (step information): Persist type of result table, optional for "griffin-dsl" type. Supporting "metric", "record" and "none" type, "metric" type indicates the result will be persisted as metrics, "record" type indicates the result will be persisted as record only, "none" type indicates the result will not be persisted. Default is "none" type.
-- **update.data.source** (step information): If the result table needs to update the data source, this parameter is the data source name, for streaming accuracy case, optional.
 - **dq.type**: DQ type of this rule, only for "griffin-dsl" type, supporting "accuracy" and "profiling".
+- **name** (step information): Result table name of this rule, optional for "griffin-dsl" type.
+- **rule**: The rule string.
 - **details**: Details of this rule, optional.
-	+ accuracy dq type detail configuration
-		* source: the data source name which as source in accuracy, default is the name of first data source in "data.sources" if not configured.
-		* target: the data source name which as target in accuracy, default is the name of second data source in "data.sources" if not configured.
-		* miss.records: step information of miss records result table step in accuracy.
-		* accuracy: step information of accuracy result table step in accuracy.
-		* miss: alias of miss column in result table.
-		* total: alias of total column in result table.
-		* matched: alias of matched column in result table.
-	+ profiling dq type detail configuration
-		* source: the data source name which as source in profiling, default is the name of first data source in "data.sources" if not configured. If the griffin-dsl rule contains from clause, this parameter is ignored.
-		* profiling: step information of profiling result table step in profiling.
\ No newline at end of file
+  + accuracy dq type detail configuration
+    * source: the data source name which as source in accuracy, default is the name of first data source in "data.sources" if not configured.
+    * target: the data source name which as target in accuracy, default is the name of second data source in "data.sources" if not configured.
+    * miss: the miss count name in metric, optional.
+    * total: the total count name in metric, optional.
+    * matched: the matched count name in metric, optional.
+  + profiling dq type detail configuration
+    * source: the data source name which as source in profiling, default is the name of first data source in "data.sources" if not configured. If the griffin-dsl rule contains from clause, this parameter is ignored.
+  + uniqueness dq type detail configuration
+    * source: name of data source to measure uniqueness.
+    * target: name of data source to compare with. It is always the same as source, or more than source.
+    * unique: the unique count name in metric, optional.
+    * total: the total count name in metric, optional.
+    * dup: the duplicate count name in metric, optional.
+    * num: the duplicate number name in metric, optional.
+    * duplication.array: optional, if set as a non-empty string, the duplication metric will be computed, and the group metric name is this string.
+  + timeliness dq type detail configuration
+    * source: name of data source to measure timeliness.
+    * latency: the latency column name in metric, optional.
+    * threshold: optional, if set as a time string like "1h", the items with latency more than 1 hour will be record.
+- **metric**: Configuration of metric export.
+  + name: name of metric.
+  + collect.type: collect metric as the type set, including "default", "entries", "array", "map", optional.
+- **record**: Configuration of record export.
+  + name: name of record.
+  + data.source.cache: optional, if set as data source name, the cache of this data source will be updated by the records, always used in streaming accuracy case.
+  + origin.DF: avaiable only if "data.source.cache" is set, the origin data frame name of records.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/griffin-doc/measure/measure-streaming-sample-old.md
----------------------------------------------------------------------
diff --git a/griffin-doc/measure/measure-streaming-sample-old.md b/griffin-doc/measure/measure-streaming-sample-old.md
deleted file mode 100644
index 004ed3b..0000000
--- a/griffin-doc/measure/measure-streaming-sample-old.md
+++ /dev/null
@@ -1,204 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-# Measure streaming sample
-Measures consists of batch measure and streaming measure. This document is for the streaming measure sample.
-
-### Data source
-At current, we support kafka as streaming data source.  
-In this sample, we also need a kafka as data source.
-
-### Measure type
-At current, we support accuracy measure in streaming mode.
-
-### Kafka decoder
-In kafka, data always needs encode and decode, we support String type kafka data currently, you can also implement and use your decoder for kafka case.
-
-### Environment
-For current griffin streaming case, we need some necessary environment dependencies, zookeeper and hdfs.  
-We use zookeeper to cache some checkpoint information, it's optional, but we recommend it.  
-We use hdfs to save the temporary data, it's also a recommend selection.
-
-### Streaming accuracy result
-The streaming data will be separated into mini-batches of data, for each mini-batch data, there should be an accuracy result. Therefore, the streaming accuracy result should be a bunch of batch accuracy results with timestamp.  
-Considering the latency of streaming data, which means the source data and the matching target data will not exactly reach exactly at the same time, we have to accept some delay of data in streaming mode, by holding unmatched data in memory or disk, and try to match them later until the data is out-time.
-
-## How to run streaming sample
-### Environment Preparation
-At first, we need some environment preparation.  
-- Zookeeper: Zookeeper 3.4.10
-- Hadoop: Hadoop 2.6
-- Spark: Spark 1.6
-- Kafka: Kafka 0.8
-
-### Data Preparation
-Create two topics in kafka, for source and target data. For example, topic "source" for source data, and topic "target" for target data.  
-Streaming data should also be prepared, the format could be json string, for example:  
-Source data could be:
-```
-{"name": "kevin", "age": 24}
-{"name": "jason", "age": 25}
-{"name": "jhon", "age": 28}
-{"name": "steve", "age": 31}
-```
-Target data could be:
-```
-{"name": "kevin", "age": 24}
-{"name": "jason", "age": 25}
-{"name": "steve", "age": 20}
-```
-You need to input the source data and target data into these two topics, through console producer might be a good choice for experimental purpose.
-
-### Configuration Preparation
-Two configuration files are required.
-Environment configuration file: env.json
-```
-{
-  "spark": {
-    "log.level": "WARN",
-    "checkpoint.dir": "hdfs:///griffin/streaming/cp",
-    "batch.interval": "5s",
-    "process.interval": "30s",
-    "config": {
-      "spark.task.maxFailures": 5,
-      "spark.streaming.kafkaMaxRatePerPartition": 1000,
-      "spark.streaming.concurrentJobs": 4
-    }
-  },
-
-  "persist": [
-    {
-      "type": "log",
-      "config": {
-        "max.log.lines": 100
-      }
-    }, {
-      "type": "hdfs",
-      "config": {
-        "path": "hdfs:///griffin/streaming/persist",
-        "max.persist.lines": 10000,
-        "max.lines.per.file": 10000
-      }
-    }
-  ],
-
-  "info.cache": [
-    {
-      "type": "zk",
-      "config": {
-        "hosts": "<zookeeper host ip>:2181",
-        "namespace": "griffin/infocache",
-        "lock.path": "lock",
-        "mode": "persist",
-        "init.clear": true,
-        "close.clear": false
-      }
-    }
-  ]
-}
-```
-In env.json, "spark" field configures the spark and spark streaming parameters, "persist" field configures the persist ways, we support "log", "hdfs" and "http" ways at current, "info.cache" field configures the information cache parameters, we support zookeeper only at current.  
-
-Process configuration file: config.json
-```
-{
-  "name": "streaming-accu-sample",
-  "type": "accuracy",
-  "process.type": "streaming",
-
-  "source": {
-    "type": "kafka",
-    "version": "0.8",
-    "config": {
-      "kafka.config": {
-        "bootstrap.servers": "<kafka host ip>:9092",
-        "group.id": "group1",
-        "auto.offset.reset": "smallest",
-        "auto.commit.enable": "false"
-      },
-      "topics": "source",
-      "key.type": "java.lang.String",
-      "value.type": "java.lang.String"
-    },
-    "cache": {
-      "type": "text",
-      "config": {
-        "file.path": "hdfs:///griffin/streaming/dump/source",
-        "info.path": "source",
-        "ready.time.interval": "10s",
-        "ready.time.delay": "0"
-      },
-      "time.range": ["-5m", "0"]
-    },
-    "match.once": true
-  },
-
-  "target": {
-    "type": "kafka",
-    "version": "0.8",
-    "config": {
-      "kafka.config": {
-        "bootstrap.servers": "<kafka host ip>:9092",
-        "group.id": "group1",
-        "auto.offset.reset": "smallest",
-        "auto.commit.enable": "false"
-      },
-      "topics": "target",
-      "key.type": "java.lang.String",
-      "value.type": "java.lang.String"
-    },
-    "cache": {
-      "type": "text",
-      "config": {
-        "file.path": "hdfs:///griffin/streaming/dump/target",
-        "info.path": "target",
-        "ready.time.interval": "10s",
-        "ready.time.delay": "0"
-      },
-      "time.range": ["-5m", "0"]
-    },
-    "match.once": false
-  },
-
-  "evaluateRule": {
-    "rules": "$source.json().name = $target.json().name AND $source.json().age = $target.json().age"
-  }
-}
-```
-In config.json, "source" and "target" fields configure the data source parameters.  
-The "cache" field in data source configuration represents the temporary data cache way, at current we support "text" and "hive" ways. We recommend "text" way, it only depends on hdfs. "time.range" means that the data older than the lower bound should be considered as out-time, and the out-time data will not be calculated any more.   
-"match.once" represents the data from this data source could be matched only once or more times.  
-"evaluateRule.rule" configures the match rule between each source and target data.
-
-### Run
-Build the measure package.
-```
-mvn clean install
-```
-Get the measure package ```measure-<version>-incubating-SNAPSHOT.jar```, rename it to ```griffin-measure.jar```.  
-Put measure package together with env.json and config.json.
-Run the following command:
-```
-spark-submit --class org.apache.griffin.measure.Application \
---master yarn-client --queue default \
-griffin-measure.jar \
-env.json config.json local,local
-```
-The first two parameters are the paths of env.json and config.json, the third parameter represents the file system type of the two configuration files, "local" or "hdfs" are both supported.  
-
-The spark streaming application will be long-time running, you can get the results of each mini-batch of data, during the run-time, you can also input more data into source and target topics, to check the results of the later mini-batches.

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/griffin-doc/measure/measure-streaming-sample.md
----------------------------------------------------------------------
diff --git a/griffin-doc/measure/measure-streaming-sample.md b/griffin-doc/measure/measure-streaming-sample.md
new file mode 100644
index 0000000..5c80576
--- /dev/null
+++ b/griffin-doc/measure/measure-streaming-sample.md
@@ -0,0 +1,256 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Measure Streaming Sample
+Measures consists of batch measure and streaming measure. This document is for the streaming measure sample.
+
+## Streaming Accuracy Sample
+```
+{
+  "name": "accu_streaming",
+
+  "process.type": "streaming",
+
+  "data.sources": [
+    {
+      "name": "source",
+      "baseline": true,
+      "connectors": [
+        {
+          "type": "kafka",
+          "version": "0.8",
+          "config": {
+            "kafka.config": {
+              "bootstrap.servers": "10.149.247.156:9092",
+              "group.id": "src_group",
+              "auto.offset.reset": "largest",
+              "auto.commit.enable": "false"
+            },
+            "topics": "sss",
+            "key.type": "java.lang.String",
+            "value.type": "java.lang.String"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "df-opr",
+              "name": "${s1}",
+              "rule": "from_json",
+              "details": {
+                "df.name": "${this}"
+              }
+            },
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select name, age from ${s1}"
+            }
+          ]
+        }
+      ],
+      "cache": {
+        "file.path": "hdfs://localhost/griffin/streaming/dump/source",
+        "info.path": "source",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0",
+        "time.range": ["-2m", "0"]
+      }
+    }, {
+      "name": "target",
+      "connectors": [
+        {
+          "type": "kafka",
+          "version": "0.8",
+          "config": {
+            "kafka.config": {
+              "bootstrap.servers": "10.149.247.156:9092",
+              "group.id": "tgt_group",
+              "auto.offset.reset": "largest",
+              "auto.commit.enable": "false"
+            },
+            "topics": "ttt",
+            "key.type": "java.lang.String",
+            "value.type": "java.lang.String"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "df-opr",
+              "name": "${t1}",
+              "rule": "from_json",
+              "details": {
+                "df.name": "${this}"
+              }
+            },
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select name, age from ${t1}"
+            }
+          ]
+        }
+      ],
+      "cache": {
+        "file.path": "hdfs://localhost/griffin/streaming/dump/target",
+        "info.path": "target",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0",
+        "time.range": ["-2m", "0"]
+      }
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "accuracy",
+        "name": "accu",
+        "rule": "source.name = target.name and source.age = target.age",
+        "details": {
+          "source": "source",
+          "target": "target",
+          "miss": "miss_count",
+          "total": "total_count",
+          "matched": "matched_count"
+        },
+        "metric": {
+          "name": "accu"
+        },
+        "record": {
+          "name": "missRecords",
+          "data.source.cache": "source"
+        }
+      }
+    ]
+  }
+}
+```
+Above is the configure file of streaming accuracy job.  
+
+### Data source
+In this sample, we use kafka topics as source and target.  
+At current, griffin supports kafka 0.8, for 1.0 or later version is during implementation.  
+In griffin implementation, we can only support json string as kafka data, which could describe itself in data. In some other solution, there might be a schema proxy for kafka binary data, you can implement such data source connector if you need, it's also during implementation by us.
+In streaming cases, the data from topics always needs some pre-process first, which is configured in `pre.proc`, just like the `rules`, griffin will not parse sql content, so we use some pattern to mark your temporory tables. `${this}` means the origin data set, and the output table name should also be `${this}`.
+
+For example, you can create two topics in kafka, for source and target data, the format could be json string.
+Source data could be:
+```
+{"name": "kevin", "age": 24}
+{"name": "jason", "age": 25}
+{"name": "jhon", "age": 28}
+{"name": "steve", "age": 31}
+```
+Target data could be:
+```
+{"name": "kevin", "age": 24}
+{"name": "jason", "age": 25}
+{"name": "steve", "age": 20}
+```
+You need to input the source data and target data into these two topics, through console producer might be a good choice for experimental purpose.
+
+### Evaluate rule
+In this accuracy sample, the rule describes the match condition: `source.name = target.name and source.age = target.age`.  
+The accuracy metrics will be persisted as metric, with miss column named "miss_count", total column named "total_count", matched column named "matched_count".  
+The miss records of source will be persisted as record.  
+
+## Streaming Profiling Sample
+```
+{
+  "name": "prof_streaming",
+
+  "process.type": "streaming",
+
+  "data.sources": [
+    {
+      "name": "source",
+      "connectors": [
+        {
+          "type": "kafka",
+          "version": "0.8",
+          "config": {
+            "kafka.config": {
+              "bootstrap.servers": "10.149.247.156:9092",
+              "group.id": "group1",
+              "auto.offset.reset": "smallest",
+              "auto.commit.enable": "false"
+            },
+            "topics": "sss",
+            "key.type": "java.lang.String",
+            "value.type": "java.lang.String"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "df-opr",
+              "name": "${s1}",
+              "rule": "from_json",
+              "details": {
+                "df.name": "${this}"
+              }
+            },
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select name, age from ${s1}"
+            }
+          ]
+        }
+      ],
+      "cache": {
+        "file.path": "hdfs://localhost/griffin/streaming/dump/source",
+        "info.path": "source",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0",
+        "time.range": ["0", "0"]
+      }
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "profiling",
+        "name": "prof",
+        "rule": "select count(name) as `cnt`, max(age) as `max`, min(age) as `min` from source",
+        "metric": {
+          "name": "prof"
+        }
+      },
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "profiling",
+        "name": "grp",
+        "rule": "select name, count(*) as `cnt` from source group by name",
+        "metric": {
+          "name": "name_group",
+          "collect.type": "array"
+        }
+      }
+    ]
+  }
+}
+```
+Above is the configure file of streaming profiling job.  
+
+### Data source
+In this sample, we use kafka topics as source.  
+
+### Evaluate rule
+In this profiling sample, the rule describes the profiling request: `select count(name) as `cnt`, max(age) as `max`, min(age) as `min` from source` and `select name, count(*) as `cnt` from source group by name`.  
+The profiling metrics will be persisted as metric, with these two results in one json.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala b/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala
index 85dfe62..aefd390 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala
@@ -60,8 +60,8 @@ object TimeInfoCache extends Loggable with Serializable {
     val subPath = InfoCacheInstance.listKeys(infoPath)
     val keys = subPath.map { p => s"${infoPath}/${p}/${ReadyTime}" }
     val result = InfoCacheInstance.readInfo(keys)
-    val time = keys.map { k =>
-      getLong(result, k)
+    val time = keys.flatMap { k =>
+      getLongOpt(result, k)
     }.min
     val map = Map[String, String]((finalReadyTime -> time.toString))
     InfoCacheInstance.cacheInfo(map)
@@ -71,8 +71,8 @@ object TimeInfoCache extends Loggable with Serializable {
     val subPath = InfoCacheInstance.listKeys(infoPath)
     val keys = subPath.map { p => s"${infoPath}/${p}/${LastProcTime}" }
     val result = InfoCacheInstance.readInfo(keys)
-    val time = keys.map { k =>
-      getLong(result, k)
+    val time = keys.flatMap { k =>
+      getLongOpt(result, k)
     }.min
     val map = Map[String, String]((finalLastProcTime -> time.toString))
     InfoCacheInstance.cacheInfo(map)
@@ -82,8 +82,8 @@ object TimeInfoCache extends Loggable with Serializable {
     val subPath = InfoCacheInstance.listKeys(infoPath)
     val keys = subPath.map { p => s"${infoPath}/${p}/${CleanTime}" }
     val result = InfoCacheInstance.readInfo(keys)
-    val time = keys.map { k =>
-      getLong(result, k)
+    val time = keys.flatMap { k =>
+      getLongOpt(result, k)
     }.min
     val map = Map[String, String]((finalCleanTime -> time.toString))
     InfoCacheInstance.cacheInfo(map)
@@ -102,15 +102,15 @@ object TimeInfoCache extends Loggable with Serializable {
     cleanTime
   }
 
-  private def getLong(map: Map[String, String], key: String): Long = {
+  private def getLongOpt(map: Map[String, String], key: String): Option[Long] = {
     try {
-      map.get(key) match {
-        case Some(v) => v.toLong
-        case _ => -1
-      }
+      map.get(key).map(_.toLong)
     } catch {
-      case e: Throwable => -1
+      case e: Throwable => None
     }
   }
+  private def getLong(map: Map[String, String], key: String) = {
+    getLongOpt(map, key).getOrElse(-1L)
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/cache/info/ZKInfoCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/cache/info/ZKInfoCache.scala b/measure/src/main/scala/org/apache/griffin/measure/cache/info/ZKInfoCache.scala
index 8b62fa4..ee99099 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/cache/info/ZKInfoCache.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/cache/info/ZKInfoCache.scala
@@ -117,7 +117,7 @@ case class ZKInfoCache(config: Map[String, Any], metricName: String) extends Inf
 
   def clearInfo(): Unit = {
 //    delete("/")
-    info("clear info")
+    println("clear info")
   }
 
   def listKeys(p: String): List[String] = {
@@ -138,7 +138,7 @@ case class ZKInfoCache(config: Map[String, Any], metricName: String) extends Inf
       client.getChildren().forPath(path).asScala.toList
     } catch {
       case e: Throwable => {
-        error(s"list ${path} error: ${e.getMessage}")
+        warn(s"list ${path} warn: ${e.getMessage}")
         Nil
       }
     }
@@ -182,7 +182,7 @@ case class ZKInfoCache(config: Map[String, Any], metricName: String) extends Inf
       Some(new String(client.getData().forPath(path), "utf-8"))
     } catch {
       case e: Throwable => {
-        error(s"read ${path} error: ${e.getMessage}")
+        warn(s"read ${path} warn: ${e.getMessage}")
         None
       }
     }
@@ -201,7 +201,7 @@ case class ZKInfoCache(config: Map[String, Any], metricName: String) extends Inf
       client.checkExists().forPath(path) != null
     } catch {
       case e: Throwable => {
-        error(s"check exists ${path} error: ${e.getMessage}")
+        warn(s"check exists ${path} warn: ${e.getMessage}")
         false
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataSourceParam.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataSourceParam.scala b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataSourceParam.scala
index 326d3c8..c43ea70 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataSourceParam.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataSourceParam.scala
@@ -31,4 +31,5 @@ case class DataSourceParam( @JsonProperty("name") name: String,
   def hasName: Boolean = (name != null)
   def isBaseLine: Boolean = if (baseline == null) false else baseline
   def falseBaselineClone: DataSourceParam = DataSourceParam(name, false, connectors, cache)
+  def getConnectors: List[DataConnectorParam] = if (connectors != null) connectors else Nil
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnector.scala
index 6fafebf..1cf3f32 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnector.scala
@@ -25,7 +25,7 @@ import org.apache.griffin.measure.config.params.user.DataConnectorParam
 import org.apache.griffin.measure.log.Loggable
 import org.apache.griffin.measure.process.{BatchDqProcess, BatchProcessType}
 import org.apache.griffin.measure.process.engine._
-import org.apache.griffin.measure.process.temp.{DataFrameCaches, TableRegisters}
+import org.apache.griffin.measure.process.temp.{DataFrameCaches, TableRegisters, TimeRange}
 import org.apache.griffin.measure.rule.adaptor.{InternalColumns, PreProcPhase, RuleAdaptorGroup, RunPhase}
 import org.apache.griffin.measure.rule.dsl._
 import org.apache.griffin.measure.rule.plan._
@@ -45,7 +45,7 @@ trait DataConnector extends Loggable with Serializable {
 
   def init(): Unit
 
-  def data(ms: Long): (Option[DataFrame], Set[Long])
+  def data(ms: Long): (Option[DataFrame], TimeRange)
 
   val dqEngines: DqEngines
 
@@ -72,11 +72,12 @@ trait DataConnector extends Loggable with Serializable {
         TableRegisters.registerRunTempTable(df, timeInfo.key, thisTable)
 
 //        val dsTmsts = Map[String, Set[Long]]((thisTable -> Set[Long](ms)))
-        val tmsts = Seq[Long](ms)
+//        val tmsts = Seq[Long](ms)
+        val dsTimeRanges = Map[String, TimeRange]((thisTable -> TimeRange(ms)))
 
         // generate rule steps
         val rulePlan = RuleAdaptorGroup.genRulePlan(
-          timeInfo, preProcRules, SparkSqlType, BatchProcessType)
+          timeInfo, preProcRules, SparkSqlType, BatchProcessType, dsTimeRanges)
 
         // run rules
         dqEngines.runRuleSteps(timeInfo, rulePlan.ruleSteps)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/AvroBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/AvroBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/AvroBatchDataConnector.scala
index fb042c2..5a1c22c 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/AvroBatchDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/AvroBatchDataConnector.scala
@@ -21,6 +21,7 @@ package org.apache.griffin.measure.data.connector.batch
 import org.apache.griffin.measure.config.params.user.DataConnectorParam
 import org.apache.griffin.measure.data.connector._
 import org.apache.griffin.measure.process.engine.DqEngines
+import org.apache.griffin.measure.process.temp.TimeRange
 import org.apache.griffin.measure.result._
 import org.apache.griffin.measure.utils.HdfsUtil
 import org.apache.spark.rdd.RDD
@@ -51,7 +52,7 @@ case class AvroBatchDataConnector(sqlContext: SQLContext, dqEngines: DqEngines,
     HdfsUtil.existPath(concreteFileFullPath)
   }
 
-  def data(ms: Long): (Option[DataFrame], Set[Long]) = {
+  def data(ms: Long): (Option[DataFrame], TimeRange) = {
     val dfOpt = try {
       val df = sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath)
       val dfOpt = Some(df)
@@ -63,7 +64,8 @@ case class AvroBatchDataConnector(sqlContext: SQLContext, dqEngines: DqEngines,
         None
       }
     }
-    (dfOpt, readTmst(ms))
+    val tmsts = readTmst(ms)
+    (dfOpt, TimeRange(ms, tmsts))
   }
 
 //  def available(): Boolean = {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/HiveBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/HiveBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/HiveBatchDataConnector.scala
index 812d724..2c9747e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/HiveBatchDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/HiveBatchDataConnector.scala
@@ -21,6 +21,7 @@ package org.apache.griffin.measure.data.connector.batch
 import org.apache.griffin.measure.config.params.user.DataConnectorParam
 import org.apache.griffin.measure.data.connector._
 import org.apache.griffin.measure.process.engine.DqEngines
+import org.apache.griffin.measure.process.temp.TimeRange
 import org.apache.griffin.measure.result._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.hive.HiveContext
@@ -60,7 +61,7 @@ case class HiveBatchDataConnector(sqlContext: SQLContext, dqEngines: DqEngines,
 //    if (arr.size > 0) Some(arr) else None
 //  }
 
-  def data(ms: Long): (Option[DataFrame], Set[Long]) = {
+  def data(ms: Long): (Option[DataFrame], TimeRange) = {
     val dfOpt = try {
       val dtSql = dataSql
       info(dtSql)
@@ -74,7 +75,8 @@ case class HiveBatchDataConnector(sqlContext: SQLContext, dqEngines: DqEngines,
         None
       }
     }
-    (dfOpt, readTmst(ms))
+    val tmsts = readTmst(ms)
+    (dfOpt, TimeRange(ms, tmsts))
   }
 
 //  def available(): Boolean = {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/TextDirBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/TextDirBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/TextDirBatchDataConnector.scala
index 32be963..fe8d386 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/TextDirBatchDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/TextDirBatchDataConnector.scala
@@ -20,6 +20,7 @@ package org.apache.griffin.measure.data.connector.batch
 
 import org.apache.griffin.measure.config.params.user.DataConnectorParam
 import org.apache.griffin.measure.process.engine.DqEngines
+import org.apache.griffin.measure.process.temp.TimeRange
 import org.apache.griffin.measure.utils.HdfsUtil
 import org.apache.spark.sql.{DataFrame, SQLContext}
 import org.apache.griffin.measure.utils.ParamUtil._
@@ -46,7 +47,7 @@ case class TextDirBatchDataConnector(sqlContext: SQLContext, dqEngines: DqEngine
     HdfsUtil.existPath(dirPath)
   }
 
-  def data(ms: Long): (Option[DataFrame], Set[Long]) = {
+  def data(ms: Long): (Option[DataFrame], TimeRange) = {
     val dfOpt = try {
       val dataDirs = listSubDirs(dirPath :: Nil, dataDirDepth, readable)
       // touch done file for read dirs
@@ -68,7 +69,8 @@ case class TextDirBatchDataConnector(sqlContext: SQLContext, dqEngines: DqEngine
         None
       }
     }
-    (dfOpt, readTmst(ms))
+    val tmsts = readTmst(ms)
+    (dfOpt, TimeRange(ms, tmsts))
   }
 
   private def listSubDirs(paths: Seq[String], depth: Int, filteFunc: (String) => Boolean): Seq[String] = {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala
index f8d50be..f65b0d2 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala
@@ -20,6 +20,7 @@ package org.apache.griffin.measure.data.connector.streaming
 
 import org.apache.griffin.measure.data.connector._
 import org.apache.griffin.measure.data.source.DataSourceCache
+import org.apache.griffin.measure.process.temp.TimeRange
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.DataFrame
 import org.apache.spark.streaming.dstream.InputDStream
@@ -36,7 +37,7 @@ trait StreamingDataConnector extends DataConnector {
 
   def transform(rdd: RDD[(K, V)]): Option[DataFrame]
 
-  def data(ms: Long): (Option[DataFrame], Set[Long]) = (None, Set.empty[Long])
+  def data(ms: Long): (Option[DataFrame], TimeRange) = (None, TimeRange.emptyTimeRange)
 
   var dataSourceCacheOpt: Option[DataSourceCache] = None
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala
index 1918e28..fc8c646 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala
@@ -23,7 +23,7 @@ import org.apache.griffin.measure.data.connector._
 import org.apache.griffin.measure.data.connector.batch._
 import org.apache.griffin.measure.data.connector.streaming._
 import org.apache.griffin.measure.log.Loggable
-import org.apache.griffin.measure.process.temp.{DataFrameCaches, TableRegisters}
+import org.apache.griffin.measure.process.temp.{DataFrameCaches, TableRegisters, TimeRange}
 import org.apache.griffin.measure.rule.plan.TimeInfo
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, Row, SQLContext}
@@ -49,7 +49,7 @@ case class DataSource(sqlContext: SQLContext,
     dataConnectors.map(_.tmstCache = tmstCache)
   }
 
-  def loadData(timeInfo: TimeInfo): Set[Long] = {
+  def loadData(timeInfo: TimeInfo): TimeRange = {
     val calcTime = timeInfo.calcTime
     println(s"load data [${name}]")
     val (dfOpt, tmsts) = data(calcTime)
@@ -65,11 +65,11 @@ case class DataSource(sqlContext: SQLContext,
     tmsts
   }
 
-  private def data(ms: Long): (Option[DataFrame], Set[Long]) = {
+  private def data(ms: Long): (Option[DataFrame], TimeRange) = {
     val batches = batchDataConnectors.flatMap { dc =>
-      val (dfOpt, tmsts) = dc.data(ms)
+      val (dfOpt, timeRange) = dc.data(ms)
       dfOpt match {
-        case Some(df) => Some((dfOpt, tmsts))
+        case Some(df) => Some((dfOpt, timeRange))
         case _ => None
       }
     }
@@ -81,10 +81,10 @@ case class DataSource(sqlContext: SQLContext,
 
     if (pairs.size > 0) {
       pairs.reduce { (a, b) =>
-        (unionDfOpts(a._1, b._1), a._2 ++ b._2)
+        (unionDfOpts(a._1, b._1), a._2.merge(b._2))
       }
     } else {
-      (None, Set.empty[Long])
+      (None, TimeRange.emptyTimeRange)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceCache.scala
index 9272f17..fff186f 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceCache.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceCache.scala
@@ -25,6 +25,7 @@ import org.apache.griffin.measure.cache.tmst.TmstCache
 import org.apache.griffin.measure.data.connector.streaming.StreamingDataConnector
 import org.apache.griffin.measure.data.connector._
 import org.apache.griffin.measure.log.Loggable
+import org.apache.griffin.measure.process.temp.TimeRange
 import org.apache.griffin.measure.utils.{HdfsFileDumpUtil, HdfsUtil, TimeUtil}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, SQLContext}
@@ -70,6 +71,14 @@ case class DataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
     }
   }
 
+//  val _WriteInfoPath = "write.info.path"
+//  val _ReadInfoPath = "read.info.path"
+//  val writeCacheInfoPath = param.getString(_WriteInfoPath, defInfoPath)
+//  val readCacheInfoPath = param.getString(_ReadInfoPath, defInfoPath)
+
+  val _ReadOnly = "read.only"
+  val readOnly = param.getBoolean(_ReadOnly, false)
+
   val rowSepLiteral = "\n"
   val partitionUnits: List[String] = List("hour", "min", "sec")
   val minUnitTime: Long = TimeUtil.timeFromUnit(1, partitionUnits.last)
@@ -82,47 +91,50 @@ case class DataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
   }
 
   def saveData(dfOpt: Option[DataFrame], ms: Long): Unit = {
-    dfOpt match {
-      case Some(df) => {
-        val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
-        if (newCacheLocked) {
-          try {
-            val ptns = getPartition(ms)
-            val ptnsPath = genPartitionHdfsPath(ptns)
-            val dirPath = s"${filePath}/${ptnsPath}"
-            val dataFileName = s"${ms}"
-            val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName)
-
-            // transform data
-            val dataRdd: RDD[String] = df.toJSON
-
-            // save data
-//            val dumped = if (!dataRdd.isEmpty) {
-//              HdfsFileDumpUtil.dump(dataFilePath, dataRdd, rowSepLiteral)
-//            } else false
-
-            if (!dataRdd.isEmpty) {
-              HdfsFileDumpUtil.dump(dataFilePath, dataRdd, rowSepLiteral)
+    if (!readOnly) {
+      dfOpt match {
+        case Some(df) => {
+          val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
+          if (newCacheLocked) {
+            try {
+              val ptns = getPartition(ms)
+              val ptnsPath = genPartitionHdfsPath(ptns)
+              val dirPath = s"${filePath}/${ptnsPath}"
+              val dataFileName = s"${ms}"
+              val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName)
+
+              // transform data
+              val dataRdd: RDD[String] = df.toJSON
+
+              // save data
+              //            val dumped = if (!dataRdd.isEmpty) {
+              //              HdfsFileDumpUtil.dump(dataFilePath, dataRdd, rowSepLiteral)
+              //            } else false
+
+              if (!dataRdd.isEmpty) {
+                HdfsFileDumpUtil.dump(dataFilePath, dataRdd, rowSepLiteral)
+              }
+
+            } catch {
+              case e: Throwable => error(s"save data error: ${e.getMessage}")
+            } finally {
+              newCacheLock.unlock()
             }
-
-          } catch {
-            case e: Throwable => error(s"save data error: ${e.getMessage}")
-          } finally {
-            newCacheLock.unlock()
           }
         }
+        case _ => {
+          info(s"no data frame to save")
+        }
       }
-      case _ => {
-        info(s"no data frame to save")
-      }
-    }
 
-    // submit cache time and ready time
-    submitCacheTime(ms)
-    submitReadyTime(ms)
+      // submit cache time and ready time
+      submitCacheTime(ms)
+      submitReadyTime(ms)
+    }
   }
 
-  def readData(): (Option[DataFrame], Set[Long]) = {
+  // return: (data frame option, time range)
+  def readData(): (Option[DataFrame], TimeRange) = {
     val tr = TimeInfoCache.getTimeRange
     val timeRange = (tr._1 + minUnitTime, tr._2)
     submitLastProcTime(timeRange._2)
@@ -137,6 +149,7 @@ case class DataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
 
     // list partition paths
     val partitionPaths = listPathsBetweenRanges(filePath :: Nil, partitionRanges)
+//    println(partitionPaths)
 
     val dfOpt = if (partitionPaths.isEmpty) {
       None
@@ -154,140 +167,152 @@ case class DataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
     // from until tmst range
     val (from, until) = (reviseTimeRange._1, reviseTimeRange._2 + 1)
     val tmstSet = rangeTmsts(from, until)
-    (dfOpt, tmstSet)
+
+    val retTimeRange = TimeRange(reviseTimeRange, tmstSet)
+    (dfOpt, retTimeRange)
   }
 
   def updateData(df: DataFrame, ms: Long): Unit = {
-    val ptns = getPartition(ms)
-    val ptnsPath = genPartitionHdfsPath(ptns)
-    val dirPath = s"${filePath}/${ptnsPath}"
-    val dataFileName = s"${ms}"
-    val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName)
+    if (!readOnly) {
+      val ptns = getPartition(ms)
+      val ptnsPath = genPartitionHdfsPath(ptns)
+      val dirPath = s"${filePath}/${ptnsPath}"
+      val dataFileName = s"${ms}"
+      val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName)
 
-    try {
-      val records = df.toJSON
-      val arr = records.collect
-      val needSave = !arr.isEmpty
-
-      // remove out time old data
-      HdfsFileDumpUtil.remove(dirPath, dataFileName, true)
-      println(s"remove file path: ${dirPath}/${dataFileName}")
-
-      // save updated data
-      if (needSave) {
-        HdfsFileDumpUtil.dump(dataFilePath, arr, rowSepLiteral)
-        println(s"update file path: ${dataFilePath}")
-      } else {
-        clearTmst(ms)
-        println(s"data source [${dsName}] timestamp [${ms}] cleared")
+      try {
+        val records = df.toJSON
+        val arr = records.collect
+        val needSave = !arr.isEmpty
+
+        // remove out time old data
+        HdfsFileDumpUtil.remove(dirPath, dataFileName, true)
+        println(s"remove file path: ${dirPath}/${dataFileName}")
+
+        // save updated data
+        if (needSave) {
+          HdfsFileDumpUtil.dump(dataFilePath, arr, rowSepLiteral)
+          println(s"update file path: ${dataFilePath}")
+        } else {
+          clearTmst(ms)
+          println(s"data source [${dsName}] timestamp [${ms}] cleared")
+        }
+      } catch {
+        case e: Throwable => error(s"update data error: ${e.getMessage}")
       }
-    } catch {
-      case e: Throwable => error(s"update data error: ${e.getMessage}")
     }
   }
 
   def updateData(rdd: RDD[String], ms: Long, cnt: Long): Unit = {
-    val ptns = getPartition(ms)
-    val ptnsPath = genPartitionHdfsPath(ptns)
-    val dirPath = s"${filePath}/${ptnsPath}"
-    val dataFileName = s"${ms}"
-    val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName)
+    if (!readOnly) {
+      val ptns = getPartition(ms)
+      val ptnsPath = genPartitionHdfsPath(ptns)
+      val dirPath = s"${filePath}/${ptnsPath}"
+      val dataFileName = s"${ms}"
+      val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName)
 
-    try {
-//      val needSave = !rdd.isEmpty
-
-      // remove out time old data
-      HdfsFileDumpUtil.remove(dirPath, dataFileName, true)
-      println(s"remove file path: ${dirPath}/${dataFileName}")
-
-      // save updated data
-      if (cnt > 0) {
-        HdfsFileDumpUtil.dump(dataFilePath, rdd, rowSepLiteral)
-        println(s"update file path: ${dataFilePath}")
-      } else {
-        clearTmst(ms)
-        println(s"data source [${dsName}] timestamp [${ms}] cleared")
+      try {
+        //      val needSave = !rdd.isEmpty
+
+        // remove out time old data
+        HdfsFileDumpUtil.remove(dirPath, dataFileName, true)
+        println(s"remove file path: ${dirPath}/${dataFileName}")
+
+        // save updated data
+        if (cnt > 0) {
+          HdfsFileDumpUtil.dump(dataFilePath, rdd, rowSepLiteral)
+          println(s"update file path: ${dataFilePath}")
+        } else {
+          clearTmst(ms)
+          println(s"data source [${dsName}] timestamp [${ms}] cleared")
+        }
+      } catch {
+        case e: Throwable => error(s"update data error: ${e.getMessage}")
+      } finally {
+        rdd.unpersist()
       }
-    } catch {
-      case e: Throwable => error(s"update data error: ${e.getMessage}")
-    } finally {
-      rdd.unpersist()
     }
   }
 
   def updateData(arr: Iterable[String], ms: Long): Unit = {
-    val ptns = getPartition(ms)
-    val ptnsPath = genPartitionHdfsPath(ptns)
-    val dirPath = s"${filePath}/${ptnsPath}"
-    val dataFileName = s"${ms}"
-    val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName)
+    if (!readOnly) {
+      val ptns = getPartition(ms)
+      val ptnsPath = genPartitionHdfsPath(ptns)
+      val dirPath = s"${filePath}/${ptnsPath}"
+      val dataFileName = s"${ms}"
+      val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName)
 
-    try {
-      val needSave = !arr.isEmpty
-
-      // remove out time old data
-      HdfsFileDumpUtil.remove(dirPath, dataFileName, true)
-      println(s"remove file path: ${dirPath}/${dataFileName}")
-
-      // save updated data
-      if (needSave) {
-        HdfsFileDumpUtil.dump(dataFilePath, arr, rowSepLiteral)
-        println(s"update file path: ${dataFilePath}")
-      } else {
-        clearTmst(ms)
-        println(s"data source [${dsName}] timestamp [${ms}] cleared")
+      try {
+        val needSave = !arr.isEmpty
+
+        // remove out time old data
+        HdfsFileDumpUtil.remove(dirPath, dataFileName, true)
+        println(s"remove file path: ${dirPath}/${dataFileName}")
+
+        // save updated data
+        if (needSave) {
+          HdfsFileDumpUtil.dump(dataFilePath, arr, rowSepLiteral)
+          println(s"update file path: ${dataFilePath}")
+        } else {
+          clearTmst(ms)
+          println(s"data source [${dsName}] timestamp [${ms}] cleared")
+        }
+      } catch {
+        case e: Throwable => error(s"update data error: ${e.getMessage}")
       }
-    } catch {
-      case e: Throwable => error(s"update data error: ${e.getMessage}")
     }
   }
 
   def updateDataMap(dfMap: Map[Long, DataFrame]): Unit = {
-    val dataMap = dfMap.map { pair =>
-      val (t, recs) = pair
-      val rdd = recs.toJSON
-//      rdd.cache
-      (t, rdd, rdd.count)
-    }
+    if (!readOnly) {
+      val dataMap = dfMap.map { pair =>
+        val (t, recs) = pair
+        val rdd = recs.toJSON
+        //      rdd.cache
+        (t, rdd, rdd.count)
+      }
 
-    dataMap.foreach { pair =>
-      val (t, arr, cnt) = pair
-      updateData(arr, t, cnt)
+      dataMap.foreach { pair =>
+        val (t, arr, cnt) = pair
+        updateData(arr, t, cnt)
+      }
     }
   }
 
   def cleanOldData(): Unit = {
-    val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
-    if (oldCacheLocked) {
-      try {
-        val cleanTime = readCleanTime()
-        cleanTime match {
-          case Some(ct) => {
-            println(s"data source [${dsName}] old timestamps clear until [${ct}]")
-
-            // clear out date tmsts
-            clearTmstsUntil(ct)
-
-            // drop partitions
-            val bounds = getPartition(ct)
-
-            // list partition paths
-            val earlierPaths = listPathsEarlierThanBounds(filePath :: Nil, bounds)
-
-            // delete out time data path
-            earlierPaths.foreach { path =>
-              println(s"delete hdfs path: ${path}")
-              HdfsUtil.deleteHdfsPath(path)
+    if (!readOnly) {
+      val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
+      if (oldCacheLocked) {
+        try {
+          val cleanTime = readCleanTime()
+          cleanTime match {
+            case Some(ct) => {
+              println(s"data source [${dsName}] old timestamps clear until [${ct}]")
+
+              // clear out date tmsts
+              clearTmstsUntil(ct)
+
+              // drop partitions
+              val bounds = getPartition(ct)
+
+              // list partition paths
+              val earlierPaths = listPathsEarlierThanBounds(filePath :: Nil, bounds)
+
+              // delete out time data path
+              earlierPaths.foreach { path =>
+                println(s"delete hdfs path: ${path}")
+                HdfsUtil.deleteHdfsPath(path)
+              }
+            }
+            case _ => {
+              // do nothing
             }
           }
-          case _ => {
-            // do nothing
-          }
+        } catch {
+          case e: Throwable => error(s"clean old data error: ${e.getMessage}")
+        } finally {
+          oldCacheLock.unlock()
         }
-      } catch {
-        case e: Throwable => error(s"clean old data error: ${e.getMessage}")
-      } finally {
-        oldCacheLock.unlock()
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceFactory.scala
index 47ee368..b83e2fb 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceFactory.scala
@@ -47,7 +47,7 @@ object DataSourceFactory extends Loggable {
                            ): Option[DataSource] = {
     val name = dataSourceParam.name
     val baseline = dataSourceParam.isBaseLine
-    val connectorParams = dataSourceParam.connectors
+    val connectorParams = dataSourceParam.getConnectors
     val cacheParam = dataSourceParam.cache
     val dataConnectors = connectorParams.flatMap { connectorParam =>
       DataConnectorFactory.getDataConnector(sqlContext, ssc, dqEngines, connectorParam) match {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala b/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
index 7ed4717..950cd27 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
@@ -92,9 +92,9 @@ case class BatchDqProcess(allParam: AllParam) extends DqProcess {
     dataSources.foreach(_.init)
 
     // init data sources
-    val dsTmsts = dqEngines.loadData(dataSources, calcTimeInfo)
+    val dsTimeRanges = dqEngines.loadData(dataSources, calcTimeInfo)
 
-    debug(s"data source timestamps: ${dsTmsts}")
+    println(s"data source timeRanges: ${dsTimeRanges}")
 
     // generate rule steps
 //    val ruleSteps = RuleAdaptorGroup.genConcreteRuleSteps(
@@ -103,7 +103,7 @@ case class BatchDqProcess(allParam: AllParam) extends DqProcess {
 //      CalcTimeInfo(appTime), userParam.evaluateRuleParam, dsTmsts)
 
     val rulePlan = RuleAdaptorGroup.genRulePlan(
-      calcTimeInfo, userParam.evaluateRuleParam, BatchProcessType)
+      calcTimeInfo, userParam.evaluateRuleParam, BatchProcessType, dsTimeRanges)
 
 //    rulePlan.ruleSteps.foreach(println)
 //    println("====")
@@ -116,11 +116,9 @@ case class BatchDqProcess(allParam: AllParam) extends DqProcess {
     dqEngines.runRuleSteps(calcTimeInfo, rulePlan.ruleSteps)
 
     // persist results
-    dqEngines.persistAllMetrics(calcTimeInfo, rulePlan.metricExports,
-      BatchProcessType, persistFactory)
+    dqEngines.persistAllMetrics(calcTimeInfo, rulePlan.metricExports, persistFactory)
 
-    dqEngines.persistAllRecords(calcTimeInfo, rulePlan.recordExports,
-      BatchProcessType, persistFactory, dataSources)
+    dqEngines.persistAllRecords(calcTimeInfo, rulePlan.recordExports, persistFactory, dataSources)
 //    dfs.foreach(_._2.cache())
 //
 //    dqEngines.persistAllRecords(dfs, persistFactory)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/process/ExportMode.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/ExportMode.scala b/measure/src/main/scala/org/apache/griffin/measure/process/ExportMode.scala
new file mode 100644
index 0000000..42aa92b
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/ExportMode.scala
@@ -0,0 +1,34 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.process
+
+sealed trait ExportMode {}
+
+object ExportMode {
+  def defaultMode(procType: ProcessType): ExportMode = {
+    procType match {
+      case BatchProcessType => SimpleMode
+      case StreamingProcessType => TimestampMode
+    }
+  }
+}
+
+final case object SimpleMode extends ExportMode {}
+
+final case object TimestampMode extends ExportMode {}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala
index 39444cd..fcf9528 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala
@@ -29,7 +29,7 @@ import org.apache.griffin.measure.log.Loggable
 import org.apache.griffin.measure.persist.{Persist, PersistFactory}
 import org.apache.griffin.measure.process.engine.DqEngines
 import org.apache.griffin.measure.process.temp.{DataFrameCaches, TableRegisters}
-import org.apache.griffin.measure.rule.adaptor.{RuleAdaptorGroup, RunPhase}
+import org.apache.griffin.measure.rule.adaptor.{ProcessDetailsKeys, RuleAdaptorGroup, RunPhase}
 import org.apache.griffin.measure.rule.plan._
 import org.apache.spark.sql.SQLContext
 
@@ -58,21 +58,25 @@ case class StreamingDqThread(sqlContext: SQLContext,
         TimeInfoCache.startTimeInfoCache
 
         // init data sources
-        val dsTmsts = dqEngines.loadData(dataSources, calcTimeInfo)
+        val dsTimeRanges = dqEngines.loadData(dataSources, calcTimeInfo)
 
-        println(s"data sources timestamps: ${dsTmsts}")
+        println(s"data source timeRanges: ${dsTimeRanges}")
 
         // generate rule steps
 //        val ruleSteps = RuleAdaptorGroup.genRuleSteps(
 //          CalcTimeInfo(st), evaluateRuleParam, dsTmsts)
         val rulePlan = RuleAdaptorGroup.genRulePlan(
-          calcTimeInfo, evaluateRuleParam, StreamingProcessType)
+          calcTimeInfo, evaluateRuleParam, StreamingProcessType, dsTimeRanges)
+
+        // optimize rule plan
+//        val optRulePlan = optimizeRulePlan(rulePlan, dsTmsts)
+        val optRulePlan = rulePlan
 
 //        ruleSteps.foreach(println)
 
         // run rules
 //        dqEngines.runRuleSteps(ruleSteps)
-        dqEngines.runRuleSteps(calcTimeInfo, rulePlan.ruleSteps)
+        dqEngines.runRuleSteps(calcTimeInfo, optRulePlan.ruleSteps)
 
         val ct = new Date().getTime
         val calculationTimeStr = s"calculation using time: ${ct - st} ms"
@@ -81,8 +85,7 @@ case class StreamingDqThread(sqlContext: SQLContext,
 
         // persist results
 //        val timeGroups = dqEngines.persistAllMetrics(ruleSteps, persistFactory)
-        dqEngines.persistAllMetrics(calcTimeInfo, rulePlan.metricExports,
-          StreamingProcessType, persistFactory)
+        dqEngines.persistAllMetrics(calcTimeInfo, optRulePlan.metricExports, persistFactory)
 //        println(s"--- timeGroups: ${timeGroups}")
 
         val rt = new Date().getTime
@@ -90,8 +93,7 @@ case class StreamingDqThread(sqlContext: SQLContext,
         appPersist.log(rt, persistResultTimeStr)
 
         // persist records
-        dqEngines.persistAllRecords(calcTimeInfo, rulePlan.recordExports,
-          StreamingProcessType, persistFactory, dataSources)
+        dqEngines.persistAllRecords(calcTimeInfo, optRulePlan.recordExports, persistFactory, dataSources)
 
         val et = new Date().getTime
         val persistTimeStr = s"persist records using time: ${et - rt} ms"
@@ -167,54 +169,29 @@ case class StreamingDqThread(sqlContext: SQLContext,
     }
   }
 
-//  // calculate accuracy between source data and target data
-//  private def accuracy(sourceData: RDD[(Product, (Map[String, Any], Map[String, Any]))],
-//               targetData: RDD[(Product, (Map[String, Any], Map[String, Any]))],
-//               ruleAnalyzer: RuleAnalyzer) = {
-//    // 1. cogroup
-//    val allKvs = sourceData.cogroup(targetData)
-//
-//    // 2. accuracy calculation
-//    val (accuResult, missingRdd, matchedRdd) = AccuracyCore.accuracy(allKvs, ruleAnalyzer)
-//
-//    (accuResult, missingRdd, matchedRdd)
-//  }
-//
-//  private def reorgByTimeGroup(rdd: RDD[(Product, (Map[String, Any], Map[String, Any]))]
-//                      ): RDD[(Long, (Product, (Map[String, Any], Map[String, Any])))] = {
-//    rdd.flatMap { row =>
-//      val (key, (value, info)) = row
-//      val b: Option[(Long, (Product, (Map[String, Any], Map[String, Any])))] = info.get(TimeStampInfo.key) match {
-//        case Some(t: Long) => Some((t, row))
-//        case _ => None
-//      }
-//      b
-//    }
-//  }
-//
-//  // convert data into a string
-//  def record2String(rec: (Product, (Map[String, Any], Map[String, Any])), dataPersist: Iterable[Expr], infoPersist: Iterable[Expr]): String = {
-//    val (key, (data, info)) = rec
-//    val persistData = getPersistMap(data, dataPersist)
-//    val persistInfo = info.mapValues { value =>
-//      value match {
-//        case vd: Map[String, Any] => getPersistMap(vd, infoPersist)
-//        case v => v
-//      }
-//    }.map(identity)
-//    s"${persistData} [${persistInfo}]"
-//  }
-//
-//  // get the expr value map of the persist expressions
-//  private def getPersistMap(data: Map[String, Any], persist: Iterable[Expr]): Map[String, Any] = {
-//    val persistMap = persist.map(e => (e._id, e.desc)).toMap
-//    data.flatMap { pair =>
-//      val (k, v) = pair
-//      persistMap.get(k) match {
-//        case Some(d) => Some((d -> v))
-//        case _ => None
-//      }
-//    }
-//  }
+  private def optimizeRulePlan(rulePlan: RulePlan, dsTmsts: Map[String, Set[Long]]): RulePlan = {
+    val steps = rulePlan.ruleSteps
+    val optExports = rulePlan.ruleExports.flatMap { export =>
+      findRuleStepByName(steps, export.stepName).map { rs =>
+        rs.details.get(ProcessDetailsKeys._baselineDataSource) match {
+          case Some(dsname: String) => {
+            val defTmstOpt = (dsTmsts.get(dsname)).flatMap { set =>
+              try { Some(set.max) } catch { case _: Throwable => None }
+            }
+            defTmstOpt match {
+              case Some(t) => export.setDefTimestamp(t)
+              case _ => export
+            }
+          }
+          case _ => export
+        }
+      }
+    }
+    RulePlan(steps, optExports)
+  }
+
+  private def findRuleStepByName(steps: Seq[RuleStep], name: String): Option[RuleStep] = {
+    steps.filter(_.name == name).headOption
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
index a48c4d1..00c6ef4 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
@@ -34,16 +34,14 @@ trait DqEngine extends Loggable with Serializable {
 
   protected def collectable(): Boolean = false
 
-  def collectMetrics(timeInfo: TimeInfo, metricExport: MetricExport, procType: ProcessType
-                    ): Map[Long, Map[String, Any]]
+  def collectMetrics(timeInfo: TimeInfo, metricExport: MetricExport): Map[Long, Map[String, Any]]
 
   //  def collectRecords(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long]): Option[RDD[(Long, Iterable[String])]]
   //
   //  def collectUpdateCacheDatas(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long]): Option[RDD[(Long, Iterable[String])]]
 
 //  def collectUpdateRDD(ruleStep: RuleStep): Option[DataFrame]
-  def collectRecords(timeInfo: TimeInfo, recordExport: RecordExport, procType: ProcessType
-                    ): Map[Long, DataFrame]
+//  def collectRecords(timeInfo: TimeInfo, recordExport: RecordExport): Map[Long, DataFrame]
 
 
   def collectBatchRecords(recordExport: RecordExport): Option[RDD[String]]

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
index 03ee208..2163925 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
@@ -24,7 +24,8 @@ import org.apache.griffin.measure.config.params.user.DataSourceParam
 import org.apache.griffin.measure.data.source._
 import org.apache.griffin.measure.log.Loggable
 import org.apache.griffin.measure.persist.{Persist, PersistFactory}
-import org.apache.griffin.measure.process.{BatchProcessType, ProcessType, StreamingProcessType}
+import org.apache.griffin.measure.process.temp.TimeRange
+import org.apache.griffin.measure.process._
 import org.apache.griffin.measure.rule.adaptor.InternalColumns
 import org.apache.griffin.measure.rule.dsl._
 import org.apache.griffin.measure.rule.plan._
@@ -34,14 +35,14 @@ import org.apache.spark.sql.{DataFrame, Row}
 
 import scala.concurrent._
 import scala.concurrent.duration.Duration
-import scala.util.{Failure, Success}
+import scala.util.{Failure, Success, Try}
 import ExecutionContext.Implicits.global
 
 case class DqEngines(engines: Seq[DqEngine]) extends DqEngine {
 
   val persistOrder: List[PersistType] = List(MetricPersistType, RecordPersistType)
 
-  def loadData(dataSources: Seq[DataSource], timeInfo: TimeInfo): Map[String, Set[Long]] = {
+  def loadData(dataSources: Seq[DataSource], timeInfo: TimeInfo): Map[String, TimeRange] = {
     dataSources.map { ds =>
       (ds.name, ds.loadData(timeInfo))
     }.toMap
@@ -53,12 +54,11 @@ case class DqEngines(engines: Seq[DqEngine]) extends DqEngine {
     }
   }
 
-  def persistAllMetrics(timeInfo: TimeInfo, metricExports: Seq[MetricExport],
-                        procType: ProcessType, persistFactory: PersistFactory
+  def persistAllMetrics(timeInfo: TimeInfo, metricExports: Seq[MetricExport], persistFactory: PersistFactory
                        ): Unit = {
     val allMetrics: Map[Long, Map[String, Any]] = {
-      metricExports.foldLeft(Map[Long, Map[String, Any]]()) { (ret, step) =>
-        val metrics = collectMetrics(timeInfo, step, procType)
+      metricExports.foldLeft(Map[Long, Map[String, Any]]()) { (ret, metricExport) =>
+        val metrics = collectMetrics(timeInfo, metricExport)
         metrics.foldLeft(ret) { (total, pair) =>
           val (k, v) = pair
           total.get(k) match {
@@ -112,7 +112,7 @@ case class DqEngines(engines: Seq[DqEngine]) extends DqEngine {
     Await.result(pro.future, Duration.Inf)
   }
 
-  def persistAllRecords(timeInfo: TimeInfo, recordExports: Seq[RecordExport], procType: ProcessType,
+  def persistAllRecords(timeInfo: TimeInfo, recordExports: Seq[RecordExport],
                         persistFactory: PersistFactory, dataSources: Seq[DataSource]
                        ): Unit = {
     // method 1: multi thread persist multi data frame
@@ -124,13 +124,13 @@ case class DqEngines(engines: Seq[DqEngine]) extends DqEngine {
     // method 2: multi thread persist multi iterable
     recordExports.foreach { recordExport =>
 //      val records = collectRecords(timeInfo, recordExport, procType)
-      procType match {
-        case BatchProcessType => {
+      recordExport.mode match {
+        case SimpleMode => {
           collectBatchRecords(recordExport).foreach { rdd =>
             persistCollectedBatchRecords(timeInfo, recordExport, rdd, persistFactory)
           }
         }
-        case StreamingProcessType => {
+        case TimestampMode => {
           val (rddOpt, emptySet) = collectStreamingRecords(recordExport)
           persistCollectedStreamingRecords(recordExport, rddOpt, emptySet, persistFactory, dataSources)
 //          collectStreamingRecords(recordExport).foreach { rddPair =>
@@ -282,21 +282,20 @@ case class DqEngines(engines: Seq[DqEngine]) extends DqEngine {
 //      engine.collectUpdateCacheDatas(ruleStep, timeGroups)
 //    }.headOption
 //  }
-  def collectMetrics(timeInfo: TimeInfo, metricExport: MetricExport, procType: ProcessType
+  def collectMetrics(timeInfo: TimeInfo, metricExport: MetricExport
                     ): Map[Long, Map[String, Any]] = {
     val ret = engines.foldLeft(Map[Long, Map[String, Any]]()) { (ret, engine) =>
-      if (ret.nonEmpty) ret else engine.collectMetrics(timeInfo, metricExport, procType)
+      if (ret.nonEmpty) ret else engine.collectMetrics(timeInfo, metricExport)
     }
     ret
   }
 
-  def collectRecords(timeInfo: TimeInfo, recordExport: RecordExport, procType: ProcessType
-                    ): Map[Long, DataFrame] = {
-    val ret = engines.foldLeft(Map[Long, DataFrame]()) { (ret, engine) =>
-      if (ret.nonEmpty) ret else engine.collectRecords(timeInfo, recordExport, procType)
-    }
-    ret
-  }
+//  def collectRecords(timeInfo: TimeInfo, recordExport: RecordExport): Map[Long, DataFrame] = {
+//    val ret = engines.foldLeft(Map[Long, DataFrame]()) { (ret, engine) =>
+//      if (ret.nonEmpty) ret else engine.collectRecords(timeInfo, recordExport)
+//    }
+//    ret
+//  }
 
   def collectUpdateRDD(ruleStep: RuleStep): Option[DataFrame] = {
 //    engines.flatMap { engine =>

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
index f1e12d2..3bcecdb 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
@@ -20,7 +20,7 @@ package org.apache.griffin.measure.process.engine
 
 import org.apache.griffin.measure.cache.tmst.{TempName, TmstCache}
 import org.apache.griffin.measure.log.Loggable
-import org.apache.griffin.measure.process.{BatchProcessType, ProcessType, StreamingProcessType}
+import org.apache.griffin.measure.process._
 import org.apache.griffin.measure.rule.adaptor.InternalColumns
 import org.apache.griffin.measure.rule.dsl._
 import org.apache.griffin.measure.rule.plan._
@@ -68,20 +68,20 @@ trait SparkDqEngine extends DqEngine {
     }
   }
 
-  def collectMetrics(timeInfo: TimeInfo, metricExport: MetricExport, procType: ProcessType
+  def collectMetrics(timeInfo: TimeInfo, metricExport: MetricExport
                     ): Map[Long, Map[String, Any]] = {
     if (collectable) {
-      val MetricExport(name, stepName, collectType) = metricExport
+      val MetricExport(name, stepName, collectType, defTmst, mode) = metricExport
       try {
-        val metricMaps = getMetricMaps(stepName)
-        procType match {
-          case BatchProcessType => {
+        val metricMaps: Seq[Map[String, Any]] = getMetricMaps(stepName)
+        mode match {
+          case SimpleMode => {
             val metrics: Map[String, Any] = normalizeMetric(metricMaps, name, collectType)
-            emptyMetricMap + (timeInfo.calcTime -> metrics)
+            emptyMetricMap + (defTmst -> metrics)
           }
-          case StreamingProcessType => {
+          case TimestampMode => {
             val tmstMetrics = metricMaps.map { metric =>
-              val tmst = metric.getLong(InternalColumns.tmst, timeInfo.calcTime)
+              val tmst = metric.getLong(InternalColumns.tmst, defTmst)
               val pureMetric = metric.removeKeys(InternalColumns.columns)
               (tmst, pureMetric)
             }
@@ -103,44 +103,53 @@ trait SparkDqEngine extends DqEngine {
   }
 
 
-  def collectRecords(timeInfo: TimeInfo, recordExport: RecordExport, procType: ProcessType
-                    ): Map[Long, DataFrame] = {
-    if (collectable) {
-      val RecordExport(_, stepName, _, originDFOpt) = recordExport
-      val stepDf = sqlContext.table(s"`${stepName}`")
-      val recordsDf = originDFOpt match {
-        case Some(originName) => sqlContext.table(s"`${originName}`")
-        case _ => stepDf
-      }
-
-      procType match {
-        case BatchProcessType => {
-          val recordsDf = sqlContext.table(s"`${stepName}`")
-          emptyRecordMap + (timeInfo.calcTime -> recordsDf)
-        }
-        case StreamingProcessType => {
-          originDFOpt match {
-            case Some(originName) => {
-              val recordsDf = sqlContext.table(s"`${originName}`")
-              stepDf.collect.map { row =>
-                val tmst = row.getAs[Long](InternalColumns.tmst)
-                val trdf = recordsDf.filter(s"`${InternalColumns.tmst}` = ${tmst}")
-                (tmst, trdf)
-              }.toMap
-            }
-            case _ => {
-              val recordsDf = sqlContext.table(s"`${stepName}`")
-              emptyRecordMap + (timeInfo.calcTime -> recordsDf)
-            }
-          }
-        }
-      }
-    } else emptyRecordMap
+  private def getTmst(row: Row, defTmst: Long): Long = {
+    try {
+      row.getAs[Long](InternalColumns.tmst)
+    } catch {
+      case _: Throwable => defTmst
+    }
   }
 
+//  def collectRecords(timeInfo: TimeInfo, recordExport: RecordExport): Map[Long, DataFrame] = {
+//    if (collectable) {
+//      val RecordExport(_, stepName, _, originDFOpt, defTmst, procType) = recordExport
+//      val stepDf = sqlContext.table(s"`${stepName}`")
+//      val recordsDf = originDFOpt match {
+//        case Some(originName) => sqlContext.table(s"`${originName}`")
+//        case _ => stepDf
+//      }
+//
+//      procType match {
+//        case BatchProcessType => {
+//          val recordsDf = sqlContext.table(s"`${stepName}`")
+//          emptyRecordMap + (defTmst -> recordsDf)
+//        }
+//        case StreamingProcessType => {
+//          originDFOpt match {
+//            case Some(originName) => {
+//              val recordsDf = sqlContext.table(s"`${originName}`")
+//              stepDf.map { row =>
+//                val tmst = getTmst(row, defTmst)
+//                val trdf = if (recordsDf.columns.contains(InternalColumns.tmst)) {
+//                  recordsDf.filter(s"`${InternalColumns.tmst}` = ${tmst}")
+//                } else recordsDf
+//                (tmst, trdf)
+//              }.collect.toMap
+//            }
+//            case _ => {
+//              val recordsDf = stepDf
+//              emptyRecordMap + (defTmst -> recordsDf)
+//            }
+//          }
+//        }
+//      }
+//    } else emptyRecordMap
+//  }
+
   private def getRecordDataFrame(recordExport: RecordExport): Option[DataFrame] = {
     if (collectable) {
-      val RecordExport(_, stepName, _, _) = recordExport
+      val RecordExport(_, stepName, _, _, defTmst, procType) = recordExport
       val stepDf = sqlContext.table(s"`${stepName}`")
       Some(stepDf)
     } else None
@@ -151,14 +160,14 @@ trait SparkDqEngine extends DqEngine {
   }
 
   def collectStreamingRecords(recordExport: RecordExport): (Option[RDD[(Long, Iterable[String])]], Set[Long]) = {
-    val RecordExport(_, _, _, originDFOpt) = recordExport
+    val RecordExport(_, _, _, originDFOpt, defTmst, procType) = recordExport
     getRecordDataFrame(recordExport) match {
       case Some(stepDf) => {
         originDFOpt match {
           case Some(originName) => {
             val tmsts = (stepDf.collect.flatMap { row =>
               try {
-                val tmst = row.getAs[Long](InternalColumns.tmst)
+                val tmst = getTmst(row, defTmst)
                 val empty = row.getAs[Boolean](InternalColumns.empty)
                 Some((tmst, empty))
               } catch {
@@ -170,7 +179,7 @@ trait SparkDqEngine extends DqEngine {
             if (recordTmsts.size > 0) {
               val recordsDf = sqlContext.table(s"`${originName}`")
               val records = recordsDf.flatMap { row =>
-                val tmst = row.getAs[Long](InternalColumns.tmst)
+                val tmst = getTmst(row, defTmst)
                 if (recordTmsts.contains(tmst)) {
                   try {
                     val map = SparkRowFormatter.formatRow(row)
@@ -186,7 +195,7 @@ trait SparkDqEngine extends DqEngine {
           }
           case _ => {
             val records = stepDf.flatMap { row =>
-              val tmst = row.getAs[Long](InternalColumns.tmst)
+              val tmst = getTmst(row, defTmst)
               try {
                 val map = SparkRowFormatter.formatRow(row)
                 val str = JsonUtil.toJson(map)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
index 9de7955..dcb02f6 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
@@ -47,6 +47,9 @@ case class SparkSqlEngine(sqlContext: SQLContext) extends SparkDqEngine {
             }
           } else sqlContext.sql(rule)
 
+//          println(name)
+//          rdf.show(10)
+
           if (rs.isGlobal) {
             if (rs.needCache) DataFrameCaches.cacheGlobalDataFrame(name, rdf)
             TableRegisters.registerRunGlobalTable(rdf, name)