You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2018/01/08 09:44:43 UTC

[6/6] incubator-griffin git commit: Measure module enhancement

Measure module enhancement

1. fully support user input spark sql as rules.
2. add uniqueness and timeliness measurements.
3. enhance calculation and persist performance in streaming mode.
4. add mongo db metric persist.
5. use spark sql engine as calculation engine.
6. refactor measure module code.

Author: Lionel Liu <bh...@163.com>

Closes #184 from bhlx3lyx7/tmst-merge.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/9c586ee6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/9c586ee6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/9c586ee6

Branch: refs/heads/master
Commit: 9c586ee6800fe7d83e50125881a55f7f11c3265b
Parents: a714b60
Author: Lionel Liu <bh...@163.com>
Authored: Mon Jan 8 17:44:24 2018 +0800
Committer: Lionel Liu <bh...@163.com>
Committed: Mon Jan 8 17:44:24 2018 +0800

----------------------------------------------------------------------
 .gitignore                                      |    2 +-
 .../docker/svc_msr/docker-compose-streaming.yml |    1 +
 measure/pom.xml                                 |    8 +
 .../apache/griffin/measure/Application.scala    |    9 +-
 .../measure/cache/info/TimeInfoCache.scala      |    2 +-
 .../cache/result/CacheResultProcesser.scala     |    2 +-
 .../griffin/measure/cache/tmst/TempName.scala   |   47 +
 .../griffin/measure/cache/tmst/TmstCache.scala  |   46 +
 .../config/params/user/DataSourceParam.scala    |    5 +-
 .../measure/config/params/user/UserParam.scala  |   35 +-
 .../config/validator/AllParamValidator.scala    |   34 -
 .../config/validator/ParamValidator.scala       |    8 +-
 .../measure/data/connector/DataConnector.scala  |   77 +-
 .../batch/AvroBatchDataConnector.scala          |    5 +-
 .../batch/HiveBatchDataConnector.scala          |    5 +-
 .../batch/TextDirBatchDataConnector.scala       |    5 +-
 .../streaming/StreamingDataConnector.scala      |    2 +-
 .../measure/data/source/DataSource.scala        |   66 +-
 .../measure/data/source/DataSourceCache.scala   |   69 +-
 .../measure/data/source/DataSourceFactory.scala |   41 +-
 .../griffin/measure/persist/HdfsPersist.scala   |  128 +-
 .../griffin/measure/persist/HttpPersist.scala   |    3 +-
 .../griffin/measure/persist/LoggerPersist.scala |   53 +-
 .../griffin/measure/persist/MongoPersist.scala  |  119 ++
 .../measure/persist/MongoThreadPool.scala       |   73 +
 .../griffin/measure/persist/MultiPersists.scala |   19 +-
 .../griffin/measure/persist/Persist.scala       |    3 +-
 .../measure/persist/PersistFactory.scala        |    2 +
 .../measure/persist/PersistThreadPool.scala     |    2 +-
 .../measure/process/BatchDqProcess.scala        |  110 +-
 .../griffin/measure/process/DqProcess.scala     |    2 +-
 .../measure/process/StreamingDqProcess.scala    |   32 +-
 .../measure/process/StreamingDqThread.scala     |  101 +-
 .../measure/process/check/DataChecker.scala     |   29 -
 .../process/engine/DataFrameOprEngine.scala     |  105 +-
 .../measure/process/engine/DqEngine.scala       |   26 +-
 .../measure/process/engine/DqEngines.scala      |  349 ++++-
 .../measure/process/engine/SparkDqEngine.scala  |  350 ++++-
 .../measure/process/engine/SparkSqlEngine.scala |   25 +-
 .../measure/process/temp/DataFrameCaches.scala  |  115 ++
 .../measure/process/temp/TableRegisters.scala   |  153 ++
 .../measure/process/temp/TableRegs.scala        |   81 +
 .../griffin/measure/result/AccuracyResult.scala |    6 +
 .../griffin/measure/result/ProfileResult.scala  |    4 +
 .../apache/griffin/measure/result/Result.scala  |    4 +
 .../rule/adaptor/DataFrameOprAdaptor.scala      |   45 +-
 .../rule/adaptor/GriffinDslAdaptor.scala        | 1415 ++++++++++++++----
 .../measure/rule/adaptor/InternalColumns.scala  |   31 +
 .../measure/rule/adaptor/RuleAdaptor.scala      |  157 +-
 .../measure/rule/adaptor/RuleAdaptorGroup.scala |  259 +++-
 .../measure/rule/adaptor/SparkSqlAdaptor.scala  |   50 +-
 .../griffin/measure/rule/dsl/CollectType.scala  |   57 +
 .../griffin/measure/rule/dsl/DqType.scala       |   11 +-
 .../griffin/measure/rule/dsl/DslType.scala      |    9 +-
 .../griffin/measure/rule/dsl/PersistType.scala  |    2 +
 .../rule/dsl/analyzer/BasicAnalyzer.scala       |    4 +-
 .../rule/dsl/analyzer/DuplicateAnalyzer.scala   |   46 +
 .../rule/dsl/analyzer/ProfilingAnalyzer.scala   |   22 +-
 .../rule/dsl/analyzer/TimelinessAnalyzer.scala  |   65 +
 .../rule/dsl/expr/ClauseExpression.scala        |   66 +-
 .../griffin/measure/rule/dsl/expr/Expr.scala    |    3 +
 .../rule/dsl/expr/ExtraConditionExpr.scala      |   27 +
 .../measure/rule/dsl/expr/FunctionExpr.scala    |   16 +-
 .../measure/rule/dsl/expr/LogicalExpr.scala     |   34 +
 .../measure/rule/dsl/expr/MathExpr.scala        |   14 +
 .../measure/rule/dsl/expr/SelectExpr.scala      |   39 +-
 .../measure/rule/dsl/parser/BasicParser.scala   |   25 +-
 .../rule/dsl/parser/GriffinDslParser.scala      |   18 +
 .../griffin/measure/rule/plan/DfOprStep.scala   |   32 +
 .../measure/rule/plan/MetricExport.scala        |   28 +
 .../measure/rule/plan/RecordExport.scala        |   27 +
 .../griffin/measure/rule/plan/RuleExport.scala  |   27 +
 .../griffin/measure/rule/plan/RulePlan.scala    |   54 +
 .../griffin/measure/rule/plan/RuleStep.scala    |   40 +
 .../measure/rule/plan/SparkSqlStep.scala        |   32 +
 .../griffin/measure/rule/plan/TimeInfo.scala    |   37 +
 .../measure/rule/step/ConcreteRuleStep.scala    |   37 -
 .../griffin/measure/rule/step/DfOprStep.scala   |   29 -
 .../measure/rule/step/GriffinDslStep.scala      |   28 -
 .../griffin/measure/rule/step/RuleStep.scala    |   31 -
 .../measure/rule/step/SparkSqlStep.scala        |   30 -
 .../griffin/measure/rule/udf/GriffinUdfs.scala  |    5 +
 .../measure/utils/HdfsFileDumpUtil.scala        |   40 +-
 .../apache/griffin/measure/utils/HdfsUtil.scala |    6 +-
 .../griffin/measure/utils/ParamUtil.scala       |   36 +
 .../apache/griffin/measure/utils/TimeUtil.scala |    6 +-
 .../resources/_accuracy-batch-griffindsl.json   |   63 +
 .../resources/_accuracy-batch-sparksql.json     |   63 +
 .../_accuracy-streaming-griffindsl.json         |  117 ++
 .../resources/_accuracy-streaming-sparksql.json |  142 ++
 .../resources/_duplicate-batch-griffindsl.json  |   56 +
 .../_duplicate-streaming-griffindsl.json        |  116 ++
 .../_duplicate-streaming-sparksql.json          |  130 ++
 .../resources/_profiling-batch-griffindsl.json  |   46 +
 .../resources/_profiling-batch-sparksql.json    |   44 +
 .../_profiling-streaming-griffindsl.json        |   74 +
 .../_profiling-streaming-sparksql.json          |   80 +
 .../resources/_timeliness-batch-griffindsl.json |   42 +
 .../resources/_timeliness-batch-sparksql.json   |   52 +
 .../_timeliness-streaming-griffindsl.json       |   72 +
 .../_timeliness-streaming-sparksql.json         |   82 +
 .../resources/config-test-accuracy-new.json     |   56 +
 .../resources/config-test-accuracy-new2.json    |   72 +
 .../config-test-accuracy-streaming-new.json     |  117 ++
 .../config-test-accuracy-streaming-new2.json    |  133 ++
 .../test/resources/config-test-accuracy2.json   |   64 +
 .../resources/config-test-profiling-new.json    |   80 +
 .../resources/config-test-profiling-new2.json   |   36 +
 .../config-test-profiling-streaming-new.json    |   85 ++
 .../config-test-profiling-streaming-new2.json   |   72 +
 .../config-test-profiling-streaming.json        |   25 +-
 .../test/resources/config-test-profiling1.json  |   60 +
 .../test/resources/config-test-profiling2.json  |   35 +
 measure/src/test/resources/env-hdfs-test.json   |   45 +
 .../src/test/resources/env-streaming-mongo.json |   54 +
 measure/src/test/resources/env-test.json        |    2 +-
 .../resources/performance-test-accuracy.json    |   56 +
 .../resources/performance-test-profiling.json   |   34 +
 measure/src/test/resources/timeliness_data.avro |  Bin 0 -> 409 bytes
 .../validator/AllParamValidatorTest.scala       |   14 +-
 .../measure/persist/MongoPersistTest.scala      |   47 +
 .../rule/adaptor/GriffinDslAdaptorTest.scala    |  171 ++-
 .../rule/adaptor/RuleAdaptorGroupTest.scala     |   70 +
 .../rule/adaptor/SparkSqlAdaptorTest.scala      |   59 +
 .../rule/dsl/parser/BasicParserTest.scala       |   44 +-
 125 files changed, 6773 insertions(+), 1140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 405d693..58525d9 100644
--- a/.gitignore
+++ b/.gitignore
@@ -37,4 +37,4 @@ ui/tmp
 derby.log
 metastore_db
 
-measure/src/test/scala/org/apache/griffin/measure/process/*
+measure/src/test/test_scala/*

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/griffin-doc/docker/svc_msr/docker-compose-streaming.yml
----------------------------------------------------------------------
diff --git a/griffin-doc/docker/svc_msr/docker-compose-streaming.yml b/griffin-doc/docker/svc_msr/docker-compose-streaming.yml
index 9fde137..8c22b64 100644
--- a/griffin-doc/docker/svc_msr/docker-compose-streaming.yml
+++ b/griffin-doc/docker/svc_msr/docker-compose-streaming.yml
@@ -42,6 +42,7 @@ es:
   hostname: es
   ports:
     - 39200:9200
+    - 39300:9300
   container_name: es
 
 zk:

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/pom.xml
----------------------------------------------------------------------
diff --git a/measure/pom.xml b/measure/pom.xml
index 69899dd..a0ff838 100644
--- a/measure/pom.xml
+++ b/measure/pom.xml
@@ -45,6 +45,7 @@ under the License.
     <avro.version>1.7.7</avro.version>
     <jackson.version>2.8.7</jackson.version>
     <scalaj.version>2.3.0</scalaj.version>
+    <mongo.version>2.1.0</mongo.version>
     <junit.version>4.11</junit.version>
     <scalatest.version>3.0.0</scalatest.version>
     <slf4j.version>1.7.21</slf4j.version>
@@ -105,6 +106,13 @@ under the License.
       <version>${scalaj.version}</version>
     </dependency>
 
+    <!--mongo request-->
+    <dependency>
+      <groupId>org.mongodb.scala</groupId>
+      <artifactId>mongo-scala-driver_2.11</artifactId>
+      <version>${mongo.version}</version>
+    </dependency>
+
     <!--avro-->
     <dependency>
       <groupId>com.databricks</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/Application.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/Application.scala b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
index edbb552..43781f2 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/Application.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
@@ -22,7 +22,7 @@ import org.apache.griffin.measure.config.params._
 import org.apache.griffin.measure.config.params.env._
 import org.apache.griffin.measure.config.params.user._
 import org.apache.griffin.measure.config.reader._
-import org.apache.griffin.measure.config.validator.AllParamValidator
+import org.apache.griffin.measure.config.validator._
 import org.apache.griffin.measure.log.Loggable
 import org.apache.griffin.measure.persist.PersistThreadPool
 import org.apache.griffin.measure.process._
@@ -68,7 +68,7 @@ object Application extends Loggable {
     val allParam: AllParam = AllParam(envParam, userParam)
 
     // validate param files
-    validateParams(allParam) match {
+    ParamValidator.validate(allParam) match {
       case Failure(ex) => {
         error(ex.getMessage)
         sys.exit(-3)
@@ -171,11 +171,6 @@ object Application extends Loggable {
     paramReader.readConfig[T]
   }
 
-  private def validateParams(allParam: AllParam): Try[Boolean] = {
-    val allParamValidator = AllParamValidator()
-    allParamValidator.validate(allParam)
-  }
-
   private def shutdown(): Unit = {
     PersistThreadPool.shutdown
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala b/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala
index b581a58..85dfe62 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala
@@ -93,7 +93,7 @@ object TimeInfoCache extends Loggable with Serializable {
     val map = InfoCacheInstance.readInfo(List(finalLastProcTime, finalReadyTime))
     val lastProcTime = getLong(map, finalLastProcTime)
     val curReadyTime = getLong(map, finalReadyTime)
-    (lastProcTime + 1, curReadyTime)
+    (lastProcTime, curReadyTime)
   }
 
   private def readCleanTime(): Long = {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResultProcesser.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResultProcesser.scala b/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResultProcesser.scala
index 9916e92..0511c04 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResultProcesser.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResultProcesser.scala
@@ -25,7 +25,7 @@ import scala.collection.mutable.{Map => MutableMap}
 
 object CacheResultProcesser extends Loggable {
 
-  val cacheGroup: MutableMap[Long, CacheResult] = MutableMap()
+  private val cacheGroup: MutableMap[Long, CacheResult] = MutableMap()
 
   def genUpdateCacheResult(timeGroup: Long, updateTime: Long, result: Result): Option[CacheResult] = {
     cacheGroup.get(timeGroup) match {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/cache/tmst/TempName.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/cache/tmst/TempName.scala b/measure/src/main/scala/org/apache/griffin/measure/cache/tmst/TempName.scala
new file mode 100644
index 0000000..7a570ec
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/cache/tmst/TempName.scala
@@ -0,0 +1,47 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.cache.tmst
+
+import org.apache.griffin.measure.log.Loggable
+import org.apache.griffin.measure.rule.plan.TimeInfo
+
+object TempName extends Loggable {
+
+  def tmstName(name: String, ms: Long) = {
+    s"${name}_${ms}"
+  }
+
+  //-- temp df name --
+//  private val tmstNameRegex = """^(.*)\((\d*)\)\[(\d*)\]$""".r
+//  private val tmstNameRegex = """^(.*)_(\d*)_(\d*)$""".r
+//  def tmstName(name: String, timeInfo: TimeInfo) = {
+//    val calcTime = timeInfo.calcTime
+//    val tmst = timeInfo.tmst
+//    s"${name}_${calcTime}_${tmst}"
+//  }
+//  def extractTmstName(tmstName: String): (String, Option[Long], Option[Long]) = {
+//    tmstName match {
+//      case tmstNameRegex(name, calcTime, tmst) => {
+//        try { (name, Some(calcTime.toLong), Some(tmst.toLong)) } catch { case e: Throwable => (tmstName, None, None) }
+//      }
+//      case _ => (tmstName, None, None)
+//    }
+//  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/cache/tmst/TmstCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/cache/tmst/TmstCache.scala b/measure/src/main/scala/org/apache/griffin/measure/cache/tmst/TmstCache.scala
new file mode 100644
index 0000000..f031449
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/cache/tmst/TmstCache.scala
@@ -0,0 +1,46 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.cache.tmst
+
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.griffin.measure.log.Loggable
+
+import scala.collection.mutable.{SortedSet => MutableSortedSet}
+
+
+case class TmstCache() extends Loggable {
+
+  private val tmstGroup: MutableSortedSet[Long] = MutableSortedSet.empty[Long]
+
+  //-- insert tmst into tmst group --
+  def insert(tmst: Long) = tmstGroup += tmst
+  def insert(tmsts: Iterable[Long]) = tmstGroup ++= tmsts
+
+  //-- remove tmst from tmst group --
+  def remove(tmst: Long) = tmstGroup -= tmst
+  def remove(tmsts: Iterable[Long]) = tmstGroup --= tmsts
+
+  //-- get subset of tmst group --
+  def range(from: Long, until: Long) = tmstGroup.range(from, until).toSet
+  def until(until: Long) = tmstGroup.until(until).toSet
+  def from(from: Long) = tmstGroup.from(from).toSet
+  def all = tmstGroup.toSet
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataSourceParam.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataSourceParam.scala b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataSourceParam.scala
index b638234..326d3c8 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataSourceParam.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataSourceParam.scala
@@ -24,8 +24,11 @@ import org.apache.griffin.measure.config.params.Param
 
 @JsonInclude(Include.NON_NULL)
 case class DataSourceParam( @JsonProperty("name") name: String,
+                            @JsonProperty("baseline") baseline: Boolean,
                             @JsonProperty("connectors") connectors: List[DataConnectorParam],
                             @JsonProperty("cache") cache: Map[String, Any]
                           ) extends Param {
-
+  def hasName: Boolean = (name != null)
+  def isBaseLine: Boolean = if (baseline == null) false else baseline
+  def falseBaselineClone: DataSourceParam = DataSourceParam(name, false, connectors, cache)
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/config/params/user/UserParam.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/UserParam.scala b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/UserParam.scala
index 173f8f4..7d8b0af 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/UserParam.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/UserParam.scala
@@ -26,8 +26,39 @@ import org.apache.griffin.measure.config.params.Param
 case class UserParam( @JsonProperty("name") name: String,
                       @JsonProperty("timestamp") timestamp: Long,
                       @JsonProperty("process.type") procType: String,
-                      @JsonProperty("data.sources") dataSources: List[DataSourceParam],
-                      @JsonProperty("evaluateRule") evaluateRuleParam: EvaluateRuleParam
+                      @JsonProperty("data.sources") dataSourceParams: List[DataSourceParam],
+                      @JsonProperty("evaluate.rule") evaluateRuleParam: EvaluateRuleParam
                     ) extends Param {
 
+  private val validDs = {
+    val (validDsParams, _) = dataSourceParams.foldLeft((Nil: Seq[DataSourceParam], Set[String]())) { (ret, dsParam) =>
+      val (seq, names) = ret
+      if (dsParam.hasName && !names.contains(dsParam.name)) {
+        (seq :+ dsParam, names + dsParam.name)
+      } else ret
+    }
+    validDsParams
+  }
+  private val baselineDsOpt = {
+    val baselines = validDs.filter(_.isBaseLine)
+    if (baselines.size > 0) baselines.headOption
+    else validDs.headOption
+  }
+
+  val baselineDsName = baselineDsOpt match {
+    case Some(ds) => ds.name
+    case _ => ""
+  }
+  val dataSources = {
+    validDs.map { ds =>
+      if (ds.name != baselineDsName && ds.isBaseLine) {
+        ds.falseBaselineClone
+      } else ds
+    }
+  }
+
+  override def validate(): Boolean = {
+    dataSources.size > 0
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/config/validator/AllParamValidator.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/validator/AllParamValidator.scala b/measure/src/main/scala/org/apache/griffin/measure/config/validator/AllParamValidator.scala
deleted file mode 100644
index 66e140b..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/config/validator/AllParamValidator.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.config.validator
-
-import org.apache.griffin.measure.config.params.Param
-
-import scala.util.Try
-
-// need to validate params
-case class AllParamValidator() extends ParamValidator {
-
-  def validate[T <: Param](param: Param): Try[Boolean] = {
-    Try {
-      param.validate
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/config/validator/ParamValidator.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/validator/ParamValidator.scala b/measure/src/main/scala/org/apache/griffin/measure/config/validator/ParamValidator.scala
index 1a3e050..fd486e9 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/config/validator/ParamValidator.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/config/validator/ParamValidator.scala
@@ -19,12 +19,14 @@ under the License.
 package org.apache.griffin.measure.config.validator
 
 import org.apache.griffin.measure.log.Loggable
-import org.apache.griffin.measure.config.params.Param
+import org.apache.griffin.measure.config.params._
 
 import scala.util.Try
 
-trait ParamValidator extends Loggable with Serializable {
+object ParamValidator extends Loggable with Serializable {
 
-  def validate[T <: Param](param: Param): Try[Boolean]
+  def validate[T <: Param](param: Param): Try[Boolean] = Try {
+    param.validate
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnector.scala
index 534fb1b..6fafebf 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnector.scala
@@ -20,11 +20,15 @@ package org.apache.griffin.measure.data.connector
 
 import java.util.concurrent.atomic.AtomicLong
 
+import org.apache.griffin.measure.cache.tmst.TmstCache
 import org.apache.griffin.measure.config.params.user.DataConnectorParam
 import org.apache.griffin.measure.log.Loggable
+import org.apache.griffin.measure.process.{BatchDqProcess, BatchProcessType}
 import org.apache.griffin.measure.process.engine._
-import org.apache.griffin.measure.rule.adaptor.{PreProcPhase, RuleAdaptorGroup, RunPhase}
+import org.apache.griffin.measure.process.temp.{DataFrameCaches, TableRegisters}
+import org.apache.griffin.measure.rule.adaptor.{InternalColumns, PreProcPhase, RuleAdaptorGroup, RunPhase}
 import org.apache.griffin.measure.rule.dsl._
+import org.apache.griffin.measure.rule.plan._
 import org.apache.griffin.measure.rule.preproc.PreProcRuleGenerator
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.functions._
@@ -35,9 +39,13 @@ trait DataConnector extends Loggable with Serializable {
 
 //  def available(): Boolean
 
+  var tmstCache: TmstCache = _
+  protected def saveTmst(t: Long) = tmstCache.insert(t)
+  protected def readTmst(t: Long) = tmstCache.range(t, t + 1)
+
   def init(): Unit
 
-  def data(ms: Long): Option[DataFrame]
+  def data(ms: Long): (Option[DataFrame], Set[Long])
 
   val dqEngines: DqEngines
 
@@ -50,39 +58,56 @@ trait DataConnector extends Loggable with Serializable {
   protected def suffix(ms: Long): String = s"${id}_${ms}"
   protected def thisName(ms: Long): String = s"this_${suffix(ms)}"
 
-  final val tmstColName = GroupByColumn.tmst
+  final val tmstColName = InternalColumns.tmst
 
   def preProcess(dfOpt: Option[DataFrame], ms: Long): Option[DataFrame] = {
+    val timeInfo = CalcTimeInfo(ms, id)
     val thisTable = thisName(ms)
-    val preProcRules = PreProcRuleGenerator.genPreProcRules(dcParam.preProc, suffix(ms))
-    val names = PreProcRuleGenerator.getRuleNames(preProcRules).toSet + thisTable
 
     try {
       dfOpt.flatMap { df =>
-        // in data
-        df.registerTempTable(thisTable)
+        val preProcRules = PreProcRuleGenerator.genPreProcRules(dcParam.preProc, suffix(ms))
+
+        // init data
+        TableRegisters.registerRunTempTable(df, timeInfo.key, thisTable)
+
+//        val dsTmsts = Map[String, Set[Long]]((thisTable -> Set[Long](ms)))
+        val tmsts = Seq[Long](ms)
 
         // generate rule steps
-        val ruleSteps = RuleAdaptorGroup.genConcreteRuleSteps(preProcRules, DslType("spark-sql"), PreProcPhase)
+        val rulePlan = RuleAdaptorGroup.genRulePlan(
+          timeInfo, preProcRules, SparkSqlType, BatchProcessType)
 
         // run rules
-        dqEngines.runRuleSteps(ruleSteps)
+        dqEngines.runRuleSteps(timeInfo, rulePlan.ruleSteps)
 
         // out data
-        val outDf = sqlContext.table(thisTable)
-
-        // drop temp table
-        names.foreach { name =>
-          try {
-            sqlContext.dropTempTable(name)
-          } catch {
-            case e: Throwable => warn(s"drop temp table ${name} fails")
-          }
-        }
-
-        // add tmst
+        val outDf = sqlContext.table(s"`${thisTable}`")
+
+//        names.foreach { name =>
+//          try {
+//            TempTables.unregisterTempTable(sqlContext, ms, name)
+//          } catch {
+//            case e: Throwable => warn(s"drop temp table ${name} fails")
+//          }
+//        }
+
+//        val range = if (id == "dc1") (0 until 20).toList else (0 until 1).toList
+//        val withTmstDfs = range.map { i =>
+//          saveTmst(ms + i)
+//          outDf.withColumn(tmstColName, lit(ms + i)).limit(49 - i)
+//        }
+//        Some(withTmstDfs.reduce(_ unionAll _))
+
+        // add tmst column
         val withTmstDf = outDf.withColumn(tmstColName, lit(ms))
 
+        // tmst cache
+        saveTmst(ms)
+
+        // drop temp tables
+        cleanData(timeInfo)
+
         Some(withTmstDf)
       }
     } catch {
@@ -94,6 +119,13 @@ trait DataConnector extends Loggable with Serializable {
 
   }
 
+  private def cleanData(timeInfo: TimeInfo): Unit = {
+    TableRegisters.unregisterRunTempTables(sqlContext, timeInfo.key)
+
+    DataFrameCaches.uncacheDataFrames(timeInfo.key)
+    DataFrameCaches.clearTrashDataFrames(timeInfo.key)
+  }
+
 }
 
 object DataConnectorIdGenerator {
@@ -109,6 +141,3 @@ object DataConnectorIdGenerator {
   }
 }
 
-object GroupByColumn {
-  val tmst = "__tmst"
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/AvroBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/AvroBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/AvroBatchDataConnector.scala
index ccd6441..fb042c2 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/AvroBatchDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/AvroBatchDataConnector.scala
@@ -51,8 +51,8 @@ case class AvroBatchDataConnector(sqlContext: SQLContext, dqEngines: DqEngines,
     HdfsUtil.existPath(concreteFileFullPath)
   }
 
-  def data(ms: Long): Option[DataFrame] = {
-    try {
+  def data(ms: Long): (Option[DataFrame], Set[Long]) = {
+    val dfOpt = try {
       val df = sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath)
       val dfOpt = Some(df)
       val preDfOpt = preProcess(dfOpt, ms)
@@ -63,6 +63,7 @@ case class AvroBatchDataConnector(sqlContext: SQLContext, dqEngines: DqEngines,
         None
       }
     }
+    (dfOpt, readTmst(ms))
   }
 
 //  def available(): Boolean = {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/HiveBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/HiveBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/HiveBatchDataConnector.scala
index cf51d6c..812d724 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/HiveBatchDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/HiveBatchDataConnector.scala
@@ -60,8 +60,8 @@ case class HiveBatchDataConnector(sqlContext: SQLContext, dqEngines: DqEngines,
 //    if (arr.size > 0) Some(arr) else None
 //  }
 
-  def data(ms: Long): Option[DataFrame] = {
-    try {
+  def data(ms: Long): (Option[DataFrame], Set[Long]) = {
+    val dfOpt = try {
       val dtSql = dataSql
       info(dtSql)
       val df = sqlContext.sql(dtSql)
@@ -74,6 +74,7 @@ case class HiveBatchDataConnector(sqlContext: SQLContext, dqEngines: DqEngines,
         None
       }
     }
+    (dfOpt, readTmst(ms))
   }
 
 //  def available(): Boolean = {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/TextDirBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/TextDirBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/TextDirBatchDataConnector.scala
index 13ffe89..32be963 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/TextDirBatchDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/TextDirBatchDataConnector.scala
@@ -46,8 +46,8 @@ case class TextDirBatchDataConnector(sqlContext: SQLContext, dqEngines: DqEngine
     HdfsUtil.existPath(dirPath)
   }
 
-  def data(ms: Long): Option[DataFrame] = {
-    try {
+  def data(ms: Long): (Option[DataFrame], Set[Long]) = {
+    val dfOpt = try {
       val dataDirs = listSubDirs(dirPath :: Nil, dataDirDepth, readable)
       // touch done file for read dirs
       dataDirs.foreach(dir => touchDone(dir))
@@ -68,6 +68,7 @@ case class TextDirBatchDataConnector(sqlContext: SQLContext, dqEngines: DqEngine
         None
       }
     }
+    (dfOpt, readTmst(ms))
   }
 
   private def listSubDirs(paths: Seq[String], depth: Int, filteFunc: (String) => Boolean): Seq[String] = {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala
index cc21761..f8d50be 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala
@@ -36,7 +36,7 @@ trait StreamingDataConnector extends DataConnector {
 
   def transform(rdd: RDD[(K, V)]): Option[DataFrame]
 
-  def data(ms: Long): Option[DataFrame] = None
+  def data(ms: Long): (Option[DataFrame], Set[Long]) = (None, Set.empty[Long])
 
   var dataSourceCacheOpt: Option[DataSourceCache] = None
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala
index 0927754..1918e28 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala
@@ -18,15 +18,19 @@ under the License.
 */
 package org.apache.griffin.measure.data.source
 
+import org.apache.griffin.measure.cache.tmst._
 import org.apache.griffin.measure.data.connector._
 import org.apache.griffin.measure.data.connector.batch._
 import org.apache.griffin.measure.data.connector.streaming._
 import org.apache.griffin.measure.log.Loggable
+import org.apache.griffin.measure.process.temp.{DataFrameCaches, TableRegisters}
+import org.apache.griffin.measure.rule.plan.TimeInfo
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, Row, SQLContext}
 
 case class DataSource(sqlContext: SQLContext,
                       name: String,
+                      baseline: Boolean,
                       dataConnectors: Seq[DataConnector],
                       dataSourceCacheOpt: Option[DataSourceCache]
                      ) extends Loggable with Serializable {
@@ -35,44 +39,61 @@ case class DataSource(sqlContext: SQLContext,
   val streamingDataConnectors = DataConnectorFactory.filterStreamingDataConnectors(dataConnectors)
   streamingDataConnectors.foreach(_.dataSourceCacheOpt = dataSourceCacheOpt)
 
+  val tmstCache: TmstCache = TmstCache()
+
   def init(): Unit = {
     dataSourceCacheOpt.foreach(_.init)
     dataConnectors.foreach(_.init)
+
+    dataSourceCacheOpt.map(_.tmstCache = tmstCache)
+    dataConnectors.map(_.tmstCache = tmstCache)
   }
 
-  def loadData(ms: Long): Unit = {
-    data(ms) match {
+  def loadData(timeInfo: TimeInfo): Set[Long] = {
+    val calcTime = timeInfo.calcTime
+    println(s"load data [${name}]")
+    val (dfOpt, tmsts) = data(calcTime)
+    dfOpt match {
       case Some(df) => {
-        df.registerTempTable(name)
+//        DataFrameCaches.cacheDataFrame(timeInfo.key, name, df)
+        TableRegisters.registerRunTempTable(df, timeInfo.key, name)
       }
       case None => {
-//        val df = sqlContext.emptyDataFrame
-//        df.registerTempTable(name)
         warn(s"load data source [${name}] fails")
-//        throw new Exception(s"load data source [${name}] fails")
       }
     }
+    tmsts
   }
 
-  def dropTable(): Unit = {
-    try {
-      sqlContext.dropTempTable(name)
-    } catch {
-      case e: Throwable => warn(s"drop table [${name}] fails")
+  private def data(ms: Long): (Option[DataFrame], Set[Long]) = {
+    val batches = batchDataConnectors.flatMap { dc =>
+      val (dfOpt, tmsts) = dc.data(ms)
+      dfOpt match {
+        case Some(df) => Some((dfOpt, tmsts))
+        case _ => None
+      }
     }
-  }
-
-  private def data(ms: Long): Option[DataFrame] = {
-    val batchDataFrameOpt = batchDataConnectors.flatMap { dc =>
-      dc.data(ms)
-    }.reduceOption((a, b) => unionDataFrames(a, b))
+    val caches = dataSourceCacheOpt match {
+      case Some(dsc) => dsc.readData() :: Nil
+      case _ => Nil
+    }
+    val pairs = batches ++ caches
 
-    val cacheDataFrameOpt = dataSourceCacheOpt.flatMap(_.readData())
+    if (pairs.size > 0) {
+      pairs.reduce { (a, b) =>
+        (unionDfOpts(a._1, b._1), a._2 ++ b._2)
+      }
+    } else {
+      (None, Set.empty[Long])
+    }
+  }
 
-    (batchDataFrameOpt, cacheDataFrameOpt) match {
-      case (Some(bdf), Some(cdf)) => Some(unionDataFrames(bdf, cdf))
-      case (Some(bdf), _) => Some(bdf)
-      case (_, Some(cdf)) => Some(cdf)
+  private def unionDfOpts(dfOpt1: Option[DataFrame], dfOpt2: Option[DataFrame]
+                         ): Option[DataFrame] = {
+    (dfOpt1, dfOpt2) match {
+      case (Some(df1), Some(df2)) => Some(unionDataFrames(df1, df2))
+      case (Some(df1), _) => dfOpt1
+      case (_, Some(df2)) => dfOpt2
       case _ => None
     }
   }
@@ -88,7 +109,6 @@ case class DataSource(sqlContext: SQLContext,
       }
       val ndf2 = sqlContext.createDataFrame(rdd2, df1.schema)
       df1 unionAll ndf2
-//      df1 unionAll df2
     } catch {
       case e: Throwable => df1
     }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceCache.scala
index 769550f..9272f17 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceCache.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceCache.scala
@@ -21,6 +21,7 @@ package org.apache.griffin.measure.data.source
 import java.util.concurrent.TimeUnit
 
 import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache}
+import org.apache.griffin.measure.cache.tmst.TmstCache
 import org.apache.griffin.measure.data.connector.streaming.StreamingDataConnector
 import org.apache.griffin.measure.data.connector._
 import org.apache.griffin.measure.log.Loggable
@@ -32,10 +33,16 @@ import scala.util.{Failure, Success}
 import org.apache.griffin.measure.utils.ParamUtil._
 
 case class DataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
-                           metricName: String, index: Int
+                           dsName: String, index: Int
                           ) extends DataCacheable with Loggable with Serializable {
 
-  val name = ""
+  var tmstCache: TmstCache = _
+  protected def rangeTmsts(from: Long, until: Long) = tmstCache.range(from, until)
+  protected def clearTmst(t: Long) = tmstCache.remove(t)
+  protected def clearTmstsUntil(until: Long) = {
+    val outDateTmsts = tmstCache.until(until)
+    tmstCache.remove(outDateTmsts)
+  }
 
   val _FilePath = "file.path"
   val _InfoPath = "info.path"
@@ -43,7 +50,7 @@ case class DataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
   val _ReadyTimeDelay = "ready.time.delay"
   val _TimeRange = "time.range"
 
-  val defFilePath = s"hdfs:///griffin/cache/${metricName}/${index}"
+  val defFilePath = s"hdfs:///griffin/cache/${dsName}/${index}"
   val defInfoPath = s"${index}"
 
   val filePath: String = param.getString(_FilePath, defFilePath)
@@ -65,6 +72,7 @@ case class DataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
 
   val rowSepLiteral = "\n"
   val partitionUnits: List[String] = List("hour", "min", "sec")
+  val minUnitTime: Long = TimeUtil.timeFromUnit(1, partitionUnits.last)
 
   val newCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.new")
   val oldCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.old")
@@ -89,9 +97,13 @@ case class DataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
             val dataRdd: RDD[String] = df.toJSON
 
             // save data
-            val dumped = if (!dataRdd.isEmpty) {
+//            val dumped = if (!dataRdd.isEmpty) {
+//              HdfsFileDumpUtil.dump(dataFilePath, dataRdd, rowSepLiteral)
+//            } else false
+
+            if (!dataRdd.isEmpty) {
               HdfsFileDumpUtil.dump(dataFilePath, dataRdd, rowSepLiteral)
-            } else false
+            }
 
           } catch {
             case e: Throwable => error(s"save data error: ${e.getMessage}")
@@ -110,8 +122,9 @@ case class DataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
     submitReadyTime(ms)
   }
 
-  def readData(): Option[DataFrame] = {
-    val timeRange = TimeInfoCache.getTimeRange
+  def readData(): (Option[DataFrame], Set[Long]) = {
+    val tr = TimeInfoCache.getTimeRange
+    val timeRange = (tr._1 + minUnitTime, tr._2)
     submitLastProcTime(timeRange._2)
 
     val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + deltaTimeRange._2)
@@ -125,7 +138,7 @@ case class DataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
     // list partition paths
     val partitionPaths = listPathsBetweenRanges(filePath :: Nil, partitionRanges)
 
-    if (partitionPaths.isEmpty) {
+    val dfOpt = if (partitionPaths.isEmpty) {
       None
     } else {
       try {
@@ -137,9 +150,13 @@ case class DataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
         }
       }
     }
+
+    // from until tmst range
+    val (from, until) = (reviseTimeRange._1, reviseTimeRange._2 + 1)
+    val tmstSet = rangeTmsts(from, until)
+    (dfOpt, tmstSet)
   }
 
-  // -- deprecated --
   def updateData(df: DataFrame, ms: Long): Unit = {
     val ptns = getPartition(ms)
     val ptnsPath = genPartitionHdfsPath(ptns)
@@ -157,10 +174,13 @@ case class DataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
       println(s"remove file path: ${dirPath}/${dataFileName}")
 
       // save updated data
-      val dumped = if (needSave) {
+      if (needSave) {
         HdfsFileDumpUtil.dump(dataFilePath, arr, rowSepLiteral)
         println(s"update file path: ${dataFilePath}")
-      } else false
+      } else {
+        clearTmst(ms)
+        println(s"data source [${dsName}] timestamp [${ms}] cleared")
+      }
     } catch {
       case e: Throwable => error(s"update data error: ${e.getMessage}")
     }
@@ -181,10 +201,13 @@ case class DataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
       println(s"remove file path: ${dirPath}/${dataFileName}")
 
       // save updated data
-      val dumped = if (cnt > 0) {
+      if (cnt > 0) {
         HdfsFileDumpUtil.dump(dataFilePath, rdd, rowSepLiteral)
         println(s"update file path: ${dataFilePath}")
-      } else false
+      } else {
+        clearTmst(ms)
+        println(s"data source [${dsName}] timestamp [${ms}] cleared")
+      }
     } catch {
       case e: Throwable => error(s"update data error: ${e.getMessage}")
     } finally {
@@ -192,7 +215,7 @@ case class DataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
     }
   }
 
-  def updateData(rdd: Iterable[String], ms: Long): Unit = {
+  def updateData(arr: Iterable[String], ms: Long): Unit = {
     val ptns = getPartition(ms)
     val ptnsPath = genPartitionHdfsPath(ptns)
     val dirPath = s"${filePath}/${ptnsPath}"
@@ -200,17 +223,20 @@ case class DataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
     val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName)
 
     try {
-      val needSave = !rdd.isEmpty
+      val needSave = !arr.isEmpty
 
       // remove out time old data
       HdfsFileDumpUtil.remove(dirPath, dataFileName, true)
       println(s"remove file path: ${dirPath}/${dataFileName}")
 
       // save updated data
-      val dumped = if (needSave) {
-        HdfsFileDumpUtil.dump(dataFilePath, rdd, rowSepLiteral)
+      if (needSave) {
+        HdfsFileDumpUtil.dump(dataFilePath, arr, rowSepLiteral)
         println(s"update file path: ${dataFilePath}")
-      } else false
+      } else {
+        clearTmst(ms)
+        println(s"data source [${dsName}] timestamp [${ms}] cleared")
+      }
     } catch {
       case e: Throwable => error(s"update data error: ${e.getMessage}")
     }
@@ -237,6 +263,11 @@ case class DataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
         val cleanTime = readCleanTime()
         cleanTime match {
           case Some(ct) => {
+            println(s"data source [${dsName}] old timestamps clear until [${ct}]")
+
+            // clear out date tmsts
+            clearTmstsUntil(ct)
+
             // drop partitions
             val bounds = getPartition(ct)
 
@@ -292,7 +323,7 @@ case class DataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
   }
 
 
-  // here the range means [min, max], but the best range should be (min, max]
+  // here the range means [min, max]
   private def listPathsBetweenRanges(paths: List[String],
                                      partitionRanges: List[(Long, Long)]
                                     ): List[String] = {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceFactory.scala
index 6c1b76e..47ee368 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceFactory.scala
@@ -31,24 +31,22 @@ import scala.util.{Success, Try}
 
 object DataSourceFactory extends Loggable {
 
-  val HiveRegex = """^(?i)hive$""".r
-  val TextRegex = """^(?i)text$""".r
-  val AvroRegex = """^(?i)avro$""".r
-
   def genDataSources(sqlContext: SQLContext, ssc: StreamingContext, dqEngines: DqEngines,
-                     dataSourceParams: Seq[DataSourceParam], metricName: String): Seq[DataSource] = {
-    dataSourceParams.zipWithIndex.flatMap { pair =>
+                     dataSourceParams: Seq[DataSourceParam]) = {
+    val filteredDsParams = trimDataSourceParams(dataSourceParams)
+    filteredDsParams.zipWithIndex.flatMap { pair =>
       val (param, index) = pair
-      genDataSource(sqlContext, ssc, dqEngines, param, metricName, index)
+      genDataSource(sqlContext, ssc, dqEngines, param, index)
     }
   }
 
   private def genDataSource(sqlContext: SQLContext, ssc: StreamingContext,
                             dqEngines: DqEngines,
                             dataSourceParam: DataSourceParam,
-                            metricName: String, index: Int
+                            index: Int
                            ): Option[DataSource] = {
     val name = dataSourceParam.name
+    val baseline = dataSourceParam.isBaseLine
     val connectorParams = dataSourceParam.connectors
     val cacheParam = dataSourceParam.cache
     val dataConnectors = connectorParams.flatMap { connectorParam =>
@@ -57,17 +55,17 @@ object DataSourceFactory extends Loggable {
         case _ => None
       }
     }
-    val dataSourceCacheOpt = genDataSourceCache(sqlContext, cacheParam, metricName, index)
+    val dataSourceCacheOpt = genDataSourceCache(sqlContext, cacheParam, name, index)
 
-    Some(DataSource(sqlContext, name, dataConnectors, dataSourceCacheOpt))
+    Some(DataSource(sqlContext, name, baseline, dataConnectors, dataSourceCacheOpt))
   }
 
   private def genDataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
-                                 metricName: String, index: Int
+                                 name: String, index: Int
                                 ) = {
     if (param != null) {
       try {
-        Some(DataSourceCache(sqlContext, param, metricName, index))
+        Some(DataSourceCache(sqlContext, param, name, index))
       } catch {
         case e: Throwable => {
           error(s"generate data source cache fails")
@@ -77,4 +75,23 @@ object DataSourceFactory extends Loggable {
     } else None
   }
 
+
+  private def trimDataSourceParams(dataSourceParams: Seq[DataSourceParam]): Seq[DataSourceParam] = {
+    val (validDsParams, _) =
+      dataSourceParams.foldLeft((Nil: Seq[DataSourceParam], Set[String]())) { (ret, dsParam) =>
+        val (seq, names) = ret
+        if (dsParam.hasName && !names.contains(dsParam.name)) {
+          (seq :+ dsParam, names + dsParam.name)
+        } else ret
+      }
+    if (validDsParams.size > 0) {
+      val baselineDsParam = validDsParams.filter(_.isBaseLine).headOption.getOrElse(validDsParams.head)
+      validDsParams.map { dsParam =>
+        if (dsParam.name != baselineDsParam.name && dsParam.isBaseLine) {
+          dsParam.falseBaselineClone
+        } else dsParam
+      }
+    } else validDsParams
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/persist/HdfsPersist.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/HdfsPersist.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/HdfsPersist.scala
index 61d0cde..11c44d8 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/persist/HdfsPersist.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/persist/HdfsPersist.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.DataFrame
 
 import scala.util.Try
 import org.apache.griffin.measure.utils.ParamUtil._
+import org.apache.spark.TaskContext
 
 // persist result and data to hdfs
 case class HdfsPersist(config: Map[String, Any], metricName: String, timeStamp: Long) extends Persist {
@@ -37,7 +38,7 @@ case class HdfsPersist(config: Map[String, Any], metricName: String, timeStamp:
 
   val path = config.getOrElse(Path, "").toString
   val maxPersistLines = config.getInt(MaxPersistLines, -1)
-  val maxLinesPerFile = config.getLong(MaxLinesPerFile, 10000)
+  val maxLinesPerFile = math.min(config.getInt(MaxLinesPerFile, 10000), 1000000)
 
   val separator = "/"
 
@@ -50,6 +51,10 @@ case class HdfsPersist(config: Map[String, Any], metricName: String, timeStamp:
 
   val LogFile = filePath("_LOG")
 
+  val _MetricName = "metricName"
+  val _Timestamp = "timestamp"
+  val _Value = "value"
+
   var _init = true
   private def isInit = {
     val i = _init
@@ -183,7 +188,16 @@ case class HdfsPersist(config: Map[String, Any], metricName: String, timeStamp:
 //    }
 //  }
 
-  private def persistRecords(hdfsPath: String, records: Iterable[String]): Unit = {
+//  private def persistRecords2Hdfs(hdfsPath: String, rdd: RDD[String]): Unit = {
+//    try {
+////      rdd.saveAsTextFile(hdfsPath)
+//      val recStr = rdd.collect().mkString("\n")
+//      HdfsUtil.writeContent(hdfsPath, recStr)
+//    } catch {
+//      case e: Throwable => error(e.getMessage)
+//    }
+//  }
+  private def persistRecords2Hdfs(hdfsPath: String, records: Iterable[String]): Unit = {
     try {
       val recStr = records.mkString("\n")
       HdfsUtil.writeContent(hdfsPath, recStr)
@@ -201,52 +215,89 @@ case class HdfsPersist(config: Map[String, Any], metricName: String, timeStamp:
     }
   }
 
+  private def getHdfsPath(path: String, groupId: Int): String = {
+    HdfsUtil.getHdfsFilePath(path, s"${groupId}")
+//    if (groupId == 0) path else withSuffix(path, s"${groupId}")
+  }
+  private def getHdfsPath(path: String, ptnId: Int, groupId: Int): String = {
+    HdfsUtil.getHdfsFilePath(path, s"${ptnId}.${groupId}")
+//    if (ptnId == 0 && groupId == 0) path else withSuffix(path, s"${ptnId}.${groupId}")
+  }
 
-//  def persistRecords(df: DataFrame, name: String): Unit = {
-//    val records = df.toJSON
-//    val path = filePath(name)
-//    try {
-//      val recordCount = records.count
-//      val count = if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount)
-//      if (count > 0) {
-//        val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt
-//        if (groupCount <= 1) {
-//          val recs = records.take(count.toInt)
-//          persistRecords(path, recs)
-//        } else {
-//          val groupedRecords: RDD[(Long, Iterable[String])] =
-//            records.zipWithIndex.flatMap { r =>
-//              val gid = r._2 / maxLinesPerFile
-//              if (gid < groupCount) Some((gid, r._1)) else None
-//            }.groupByKey()
-//          groupedRecords.foreach { group =>
-//            val (gid, recs) = group
-//            val hdfsPath = if (gid == 0) path else withSuffix(path, gid.toString)
-//            persistRecords(hdfsPath, recs)
-//          }
-//        }
-//      }
-//    } catch {
-//      case e: Throwable => error(e.getMessage)
-//    }
-//  }
+  private def clearOldRecords(path: String): Unit = {
+    HdfsUtil.deleteHdfsPath(path)
+  }
+
+  def persistRecords(df: DataFrame, name: String): Unit = {
+    val path = filePath(name)
+    clearOldRecords(path)
+    try {
+      val recordCount = df.count
+      val count = if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount)
+      val maxCount = count.toInt
+      if (maxCount > 0) {
+        val recDf = df.limit(maxCount)
+        recDf.toJSON.foreachPartition { ptn =>
+          val ptnid = TaskContext.getPartitionId()
+          val groupedRecords = ptn.grouped(maxLinesPerFile).zipWithIndex
+          groupedRecords.foreach { group =>
+            val (recs, gid) = group
+            val hdfsPath = getHdfsPath(path, ptnid, gid)
+            persistRecords2Hdfs(hdfsPath, recs)
+          }
+        }
+      }
+    } catch {
+      case e: Throwable => error(e.getMessage)
+    }
+  }
+
+  def persistRecords(records: RDD[String], name: String): Unit = {
+    val path = filePath(name)
+    clearOldRecords(path)
+    try {
+      val recordCount = records.count
+      val count = if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount)
+      if (count > 0) {
+        val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt
+        if (groupCount <= 1) {
+          val recs = records.take(count.toInt)
+          persistRecords2Hdfs(path, recs)
+        } else {
+          val groupedRecords: RDD[(Long, Iterable[String])] =
+            records.zipWithIndex.flatMap { r =>
+              val gid = r._2 / maxLinesPerFile
+              if (gid < groupCount) Some((gid, r._1)) else None
+            }.groupByKey()
+          groupedRecords.foreach { group =>
+            val (gid, recs) = group
+            val hdfsPath = if (gid == 0) path else withSuffix(path, gid.toString)
+            persistRecords2Hdfs(hdfsPath, recs)
+          }
+        }
+      }
+    } catch {
+      case e: Throwable => error(e.getMessage)
+    }
+  }
 
   def persistRecords(records: Iterable[String], name: String): Unit = {
     val path = filePath(name)
+    clearOldRecords(path)
     try {
       val recordCount = records.size
       val count = if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount)
       if (count > 0) {
-        val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt
+        val groupCount = (count - 1) / maxLinesPerFile + 1
         if (groupCount <= 1) {
           val recs = records.take(count.toInt)
-          persistRecords(path, recs)
+          persistRecords2Hdfs(path, recs)
         } else {
-          val groupedRecords = records.grouped(groupCount).zipWithIndex
+          val groupedRecords = records.grouped(maxLinesPerFile).zipWithIndex
           groupedRecords.take(groupCount).foreach { group =>
             val (recs, gid) = group
-            val hdfsPath = if (gid == 0) path else withSuffix(path, gid.toString)
-            persistRecords(hdfsPath, recs)
+            val hdfsPath = getHdfsPath(path, gid)
+            persistRecords2Hdfs(hdfsPath, recs)
           }
         }
       }
@@ -280,10 +331,11 @@ case class HdfsPersist(config: Map[String, Any], metricName: String, timeStamp:
 //  }
 
   def persistMetrics(metrics: Map[String, Any]): Unit = {
+    val head = Map[String, Any]((_MetricName -> metricName), (_Timestamp -> timeStamp))
+    val result = head + (_Value -> metrics)
     try {
-      val json = JsonUtil.toJson(metrics)
-      println(s"hdfs persist metrics: ${json}")
-      persistRecords(MetricsFile, json :: Nil)
+      val json = JsonUtil.toJson(result)
+      persistRecords2Hdfs(MetricsFile, json :: Nil)
     } catch {
       case e: Throwable => error(e.getMessage)
     }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/persist/HttpPersist.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/HttpPersist.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/HttpPersist.scala
index 225ee41..c4abc22 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/persist/HttpPersist.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/persist/HttpPersist.scala
@@ -89,7 +89,8 @@ case class HttpPersist(config: Map[String, Any], metricName: String, timeStamp:
 
   def log(rt: Long, msg: String): Unit = {}
 
-//  def persistRecords(df: DataFrame, name: String): Unit = {}
+  def persistRecords(df: DataFrame, name: String): Unit = {}
+  def persistRecords(records: RDD[String], name: String): Unit = {}
   def persistRecords(records: Iterable[String], name: String): Unit = {}
 
 //  def persistMetrics(metrics: Seq[String], name: String): Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/persist/LoggerPersist.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/LoggerPersist.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/LoggerPersist.scala
index 0cd6f6b..d9a601a 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/persist/LoggerPersist.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/persist/LoggerPersist.scala
@@ -117,31 +117,48 @@ case class LoggerPersist(config: Map[String, Any], metricName: String, timeStamp
     println(s"[${timeStamp}] ${rt}: ${msg}")
   }
 
-//  def persistRecords(df: DataFrame, name: String): Unit = {
-//    val records = df.toJSON
-//    println(s"${name} [${timeStamp}] records: ")
+  def persistRecords(df: DataFrame, name: String): Unit = {
+//    println(s"${metricName} [${timeStamp}] records: ")
 //    try {
-//      val recordCount = records.count.toInt
+//      val recordCount = df.count
 //      val count = if (maxLogLines < 0) recordCount else scala.math.min(maxLogLines, recordCount)
-//      if (count > 0) {
-//        val recordsArray = records.take(count)
+//      val maxCount = count.toInt
+//      if (maxCount > 0) {
+//        val recDf = df.limit(maxCount)
+//        val recordsArray = recDf.toJSON.collect()
 //        recordsArray.foreach(println)
 //      }
 //    } catch {
 //      case e: Throwable => error(e.getMessage)
 //    }
-//  }
+  }
+
+  def persistRecords(records: RDD[String], name: String): Unit = {
+//    println(s"${metricName} [${timeStamp}] records: ")
+//    try {
+//      val recordCount = records.count
+//      val count = if (maxLogLines < 0) recordCount else scala.math.min(maxLogLines, recordCount)
+//      val maxCount = count.toInt
+//      if (maxCount > 0) {
+//        val recordsArray = records.take(maxCount)
+//        recordsArray.foreach(println)
+//      }
+//    } catch {
+//      case e: Throwable => error(e.getMessage)
+//    }
+  }
 
   def persistRecords(records: Iterable[String], name: String): Unit = {
-    try {
-      val recordCount = records.size
-      val count = if (maxLogLines < 0) recordCount else scala.math.min(maxLogLines, recordCount)
-      if (count > 0) {
-        records.foreach(println)
-      }
-    } catch {
-      case e: Throwable => error(e.getMessage)
-    }
+//    println(s"${metricName} [${timeStamp}] records: ")
+//    try {
+//      val recordCount = records.size
+//      val count = if (maxLogLines < 0) recordCount else scala.math.min(maxLogLines, recordCount)
+//      if (count > 0) {
+//        records.foreach(println)
+//      }
+//    } catch {
+//      case e: Throwable => error(e.getMessage)
+//    }
   }
 
 //  def persistMetrics(metrics: Seq[String], name: String): Unit = {
@@ -161,10 +178,6 @@ case class LoggerPersist(config: Map[String, Any], metricName: String, timeStamp
     println(s"${metricName} [${timeStamp}] metrics: ")
     val json = JsonUtil.toJson(metrics)
     println(json)
-//    metrics.foreach { metric =>
-//      val (key, value) = metric
-//      println(s"${key}: ${value}")
-//    }
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/persist/MongoPersist.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/MongoPersist.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/MongoPersist.scala
new file mode 100644
index 0000000..b5923ce
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/persist/MongoPersist.scala
@@ -0,0 +1,119 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.persist
+
+import org.mongodb.scala._
+import org.apache.griffin.measure.utils.ParamUtil._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.DataFrame
+import org.mongodb.scala.model.{Filters, UpdateOptions, Updates}
+import org.mongodb.scala.result.UpdateResult
+
+import scala.concurrent.Future
+import scala.util.{Failure, Success}
+
+
+case class MongoPersist(config: Map[String, Any], metricName: String, timeStamp: Long) extends Persist {
+
+  MongoConnection.init(config)
+
+  val _MetricName = "metricName"
+  val _Timestamp = "timestamp"
+  val _Value = "value"
+
+  def available(): Boolean = MongoConnection.dataConf.available
+
+  def start(msg: String): Unit = {}
+  def finish(): Unit = {}
+
+  def log(rt: Long, msg: String): Unit = {}
+
+  def persistRecords(df: DataFrame, name: String): Unit = {}
+  def persistRecords(records: RDD[String], name: String): Unit = {}
+  def persistRecords(records: Iterable[String], name: String): Unit = {}
+
+  def persistMetrics(metrics: Map[String, Any]): Unit = {
+    mongoInsert(metrics)
+  }
+
+  private val filter = Filters.and(
+    Filters.eq(_MetricName, metricName),
+    Filters.eq(_Timestamp, timeStamp)
+  )
+
+  private def mongoInsert(dataMap: Map[String, Any]): Unit = {
+    try {
+      val update = Updates.set(_Value, dataMap)
+      def func(): (Long, Future[UpdateResult]) = {
+        (timeStamp, MongoConnection.getDataCollection.updateOne(
+          filter, update, UpdateOptions().upsert(true)).toFuture)
+      }
+      MongoThreadPool.addTask(func _, 10)
+    } catch {
+      case e: Throwable => error(e.getMessage)
+    }
+  }
+
+}
+
+case class MongoConf(url: String, database: String, collection: String) {
+  def available: Boolean = url.nonEmpty && database.nonEmpty && collection.nonEmpty
+}
+
+object MongoConnection {
+
+  val _MongoHead = "mongodb://"
+
+  val Url = "url"
+  val Database = "database"
+  val Collection = "collection"
+
+  private var initialed = false
+
+  var dataConf: MongoConf = _
+  private var dataCollection: MongoCollection[Document] = _
+
+  def getDataCollection = dataCollection
+
+  def init(config: Map[String, Any]): Unit = {
+    if (!initialed) {
+      dataConf = mongoConf(config)
+      dataCollection = mongoCollection(dataConf)
+      initialed = true
+    }
+  }
+
+  private def mongoConf(cfg: Map[String, Any]): MongoConf = {
+    val url = cfg.getString(Url, "").trim
+    val mongoUrl = if (url.startsWith(_MongoHead)) url else {
+      _MongoHead + url
+    }
+    MongoConf(
+      mongoUrl,
+      cfg.getString(Database, ""),
+      cfg.getString(Collection, "")
+    )
+  }
+  private def mongoCollection(mongoConf: MongoConf): MongoCollection[Document] = {
+    val mongoClient: MongoClient = MongoClient(mongoConf.url)
+    val database: MongoDatabase = mongoClient.getDatabase(mongoConf.database)
+    database.getCollection(mongoConf.collection)
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/persist/MongoThreadPool.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/MongoThreadPool.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/MongoThreadPool.scala
new file mode 100644
index 0000000..2f43edb
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/persist/MongoThreadPool.scala
@@ -0,0 +1,73 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.persist
+
+import java.util.Date
+import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit}
+
+import scala.concurrent.Future
+import scala.util.{Failure, Success}
+
+object MongoThreadPool {
+
+  import scala.concurrent.ExecutionContext.Implicits.global
+
+  private val pool: ThreadPoolExecutor = Executors.newFixedThreadPool(5).asInstanceOf[ThreadPoolExecutor]
+  val MAX_RETRY = 100
+
+  def shutdown(): Unit = {
+    pool.shutdown()
+    pool.awaitTermination(10, TimeUnit.SECONDS)
+  }
+
+  def addTask(func: () => (Long, Future[_]), retry: Int): Unit = {
+    val r = if (retry < 0) MAX_RETRY else retry
+    println(s"add task, current task num: ${pool.getQueue.size}")
+    pool.submit(Task(func, r))
+  }
+
+  case class Task(func: () => (Long, Future[_]), retry: Int) extends Runnable {
+
+    override def run(): Unit = {
+      val st = new Date().getTime
+      val (t, res) = func()
+      res.onComplete {
+        case Success(value) => {
+          val et = new Date().getTime
+          println(s"task ${t} success [ using time ${et - st} ms ]")
+        }
+        case Failure(e) => {
+          val et = new Date().getTime
+          println(s"task ${t} fails [ using time ${et - st} ms ] : ${e.getMessage}")
+          if (retry > 0) {
+            println(s"task ${t} retry [ rest retry count: ${retry - 1} ]")
+            pool.submit(Task(func, retry - 1))
+          } else {
+            println(s"task ${t} retry ends but fails")
+          }
+        }
+      }
+    }
+
+    def fail(): Unit = {
+      println("task fails")
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala
index d698bb0..aa97afa 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala
@@ -58,7 +58,24 @@ case class MultiPersists(persists: Iterable[Persist]) extends Persist {
     }
   }
 
-//  def persistRecords(df: DataFrame, name: String): Unit = { persists.foreach(_.persistRecords(df, name)) }
+  def persistRecords(df: DataFrame, name: String): Unit = {
+    persists.foreach { persist =>
+      try {
+        persist.persistRecords(df, name)
+      } catch {
+        case e: Throwable => error(s"persist df error: ${e.getMessage}")
+      }
+    }
+  }
+  def persistRecords(records: RDD[String], name: String): Unit = {
+    persists.foreach { persist =>
+      try {
+        persist.persistRecords(records, name)
+      } catch {
+        case e: Throwable => error(s"persist records error: ${e.getMessage}")
+      }
+    }
+  }
   def persistRecords(records: Iterable[String], name: String): Unit = {
     persists.foreach { persist =>
       try {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala
index 2884fa6..361fad7 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala
@@ -43,7 +43,8 @@ trait Persist extends Loggable with Serializable {
 //  def records(recs: RDD[String], tp: String): Unit
 //  def records(recs: Iterable[String], tp: String): Unit
 
-//  def persistRecords(df: DataFrame, name: String): Unit
+  def persistRecords(df: DataFrame, name: String): Unit
+  def persistRecords(records: RDD[String], name: String): Unit
   def persistRecords(records: Iterable[String], name: String): Unit
 //  def persistMetrics(metrics: Seq[String], name: String): Unit
   def persistMetrics(metrics: Map[String, Any]): Unit

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/persist/PersistFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/PersistFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/PersistFactory.scala
index 3a74343..b2e34a9 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/persist/PersistFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/persist/PersistFactory.scala
@@ -29,6 +29,7 @@ case class PersistFactory(persistParams: Iterable[PersistParam], metricName: Str
   val HTTP_REGEX = """^(?i)http$""".r
 //  val OLDHTTP_REGEX = """^(?i)oldhttp$""".r
   val LOG_REGEX = """^(?i)log$""".r
+  val MONGO_REGEX = """^(?i)mongo$""".r
 
   def getPersists(timeStamp: Long): MultiPersists = {
     MultiPersists(persistParams.flatMap(param => getPersist(timeStamp, param)))
@@ -42,6 +43,7 @@ case class PersistFactory(persistParams: Iterable[PersistParam], metricName: Str
       case HTTP_REGEX() => Try(HttpPersist(config, metricName, timeStamp))
 //      case OLDHTTP_REGEX() => Try(OldHttpPersist(config, metricName, timeStamp))
       case LOG_REGEX() => Try(LoggerPersist(config, metricName, timeStamp))
+      case MONGO_REGEX() => Try(MongoPersist(config, metricName, timeStamp))
       case _ => throw new Exception("not supported persist type")
     }
     persistTry match {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/persist/PersistThreadPool.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/PersistThreadPool.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/PersistThreadPool.scala
index 7993aab..0a647b4 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/persist/PersistThreadPool.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/persist/PersistThreadPool.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit}
 
 object PersistThreadPool {
 
-  private val pool: ThreadPoolExecutor = Executors.newFixedThreadPool(10).asInstanceOf[ThreadPoolExecutor]
+  private val pool: ThreadPoolExecutor = Executors.newFixedThreadPool(5).asInstanceOf[ThreadPoolExecutor]
   val MAX_RETRY = 100
 
   def shutdown(): Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala b/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
index dc8b79a..7ed4717 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
@@ -20,19 +20,24 @@ package org.apache.griffin.measure.process
 
 import java.util.Date
 
+import org.apache.griffin.measure.cache.info.TimeInfoCache
+import org.apache.griffin.measure.cache.result.CacheResultProcesser
 import org.apache.griffin.measure.config.params._
 import org.apache.griffin.measure.config.params.env._
 import org.apache.griffin.measure.config.params.user._
 import org.apache.griffin.measure.data.source.DataSourceFactory
 import org.apache.griffin.measure.persist.{Persist, PersistFactory}
 import org.apache.griffin.measure.process.engine.{DqEngineFactory, SparkSqlEngine}
+import org.apache.griffin.measure.process.temp.{DataFrameCaches, TableRegisters}
 import org.apache.griffin.measure.rule.adaptor.{RuleAdaptorGroup, RunPhase}
+import org.apache.griffin.measure.rule.plan._
 import org.apache.griffin.measure.rule.udf.GriffinUdfs
 import org.apache.griffin.measure.utils.JsonUtil
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.{SparkConf, SparkContext}
 
+import scala.concurrent.Await
 import scala.util.Try
 
 case class BatchDqProcess(allParam: AllParam) extends DqProcess {
@@ -40,8 +45,10 @@ case class BatchDqProcess(allParam: AllParam) extends DqProcess {
   val envParam: EnvParam = allParam.envParam
   val userParam: UserParam = allParam.userParam
 
-  val metricName = userParam.name
   val sparkParam = envParam.sparkParam
+  val metricName = userParam.name
+  val dataSourceNames = userParam.dataSources.map(_.name)
+  val baselineDsName = userParam.baselineDsName
 
   var sparkContext: SparkContext = _
   var sqlContext: SQLContext = _
@@ -59,17 +66,19 @@ case class BatchDqProcess(allParam: AllParam) extends DqProcess {
     GriffinUdfs.register(sqlContext)
 
     // init adaptors
-    val dataSourceNames = userParam.dataSources.map(_.name)
-    RuleAdaptorGroup.init(sqlContext, dataSourceNames)
+    RuleAdaptorGroup.init(sqlContext, dataSourceNames, baselineDsName)
   }
 
   def run: Try[_] = Try {
     // start time
-    val startTime = getStartTime
+    val startTime = new Date().getTime
+
+    val appTime = getAppTime
+    val calcTimeInfo = CalcTimeInfo(appTime)
 
     // get persists to persist measure result
     val persistFactory = PersistFactory(envParam.persistParams, metricName)
-    val persist: Persist = persistFactory.getPersists(startTime)
+    val persist: Persist = persistFactory.getPersists(appTime)
 
     // persist start id
     val applicationId = sparkContext.applicationId
@@ -79,28 +88,44 @@ case class BatchDqProcess(allParam: AllParam) extends DqProcess {
     val dqEngines = DqEngineFactory.genDqEngines(sqlContext)
 
     // generate data sources
-    val dataSources = DataSourceFactory.genDataSources(sqlContext, null, dqEngines, userParam.dataSources, metricName)
+    val dataSources = DataSourceFactory.genDataSources(sqlContext, null, dqEngines, userParam.dataSources)
     dataSources.foreach(_.init)
 
     // init data sources
-    dqEngines.loadData(dataSources, startTime)
+    val dsTmsts = dqEngines.loadData(dataSources, calcTimeInfo)
+
+    debug(s"data source timestamps: ${dsTmsts}")
 
     // generate rule steps
-    val ruleSteps = RuleAdaptorGroup.genConcreteRuleSteps(userParam.evaluateRuleParam, RunPhase)
+//    val ruleSteps = RuleAdaptorGroup.genConcreteRuleSteps(
+//      TimeInfo(appTime, appTime), userParam.evaluateRuleParam, dsTmsts, BatchProcessType, RunPhase)
+//    val ruleSteps = RuleAdaptorGroup.genRuleSteps(
+//      CalcTimeInfo(appTime), userParam.evaluateRuleParam, dsTmsts)
+
+    val rulePlan = RuleAdaptorGroup.genRulePlan(
+      calcTimeInfo, userParam.evaluateRuleParam, BatchProcessType)
+
+//    rulePlan.ruleSteps.foreach(println)
+//    println("====")
+//    rulePlan.metricExports.foreach(println)
+//    println("====")
+//    rulePlan.recordExports.foreach(println)
+//    println("====")
 
     // run rules
-    dqEngines.runRuleSteps(ruleSteps)
+    dqEngines.runRuleSteps(calcTimeInfo, rulePlan.ruleSteps)
 
     // persist results
-    val timeGroups = dqEngines.persistAllMetrics(ruleSteps, persistFactory)
-
-    val rdds = dqEngines.collectUpdateRDDs(ruleSteps, timeGroups)
-    rdds.foreach(_._2.cache())
+    dqEngines.persistAllMetrics(calcTimeInfo, rulePlan.metricExports,
+      BatchProcessType, persistFactory)
 
-    dqEngines.persistAllRecords(rdds, persistFactory)
-//    dqEngines.persistAllRecords(ruleSteps, persistFactory, timeGroups)
+    dqEngines.persistAllRecords(calcTimeInfo, rulePlan.recordExports,
+      BatchProcessType, persistFactory, dataSources)
+//    dfs.foreach(_._2.cache())
+//
+//    dqEngines.persistAllRecords(dfs, persistFactory)
 
-    rdds.foreach(_._2.unpersist())
+//    dfs.foreach(_._2.unpersist())
 
     // end time
     val endTime = new Date().getTime
@@ -108,10 +133,63 @@ case class BatchDqProcess(allParam: AllParam) extends DqProcess {
 
     // finish
     persist.finish()
+
+//    sqlContext.tables().show(50)
+//    println(sqlContext.tableNames().size)
+
+//    sqlContext.tables().show(50)
+
+    // clean data
+    cleanData(calcTimeInfo)
+
+//    sqlContext.tables().show(50)
+//    println(sqlContext.tableNames().size)
+
+    // clear temp table
+//    ruleSteps.foreach { rs =>
+//      println(rs)
+//      //      sqlContext.dropTempTable(rs.ruleInfo.name)
+//      rs.ruleInfo.tmstNameOpt match {
+//        case Some(n) => sqlContext.dropTempTable(s"`${n}`")
+//        case _ => {}
+//      }
+//    }
+//
+//    // -- test --
+//    sqlContext.tables().show(50)
+  }
+
+  private def cleanData(timeInfo: TimeInfo): Unit = {
+    TableRegisters.unregisterRunTempTables(sqlContext, timeInfo.key)
+    TableRegisters.unregisterCompileTempTables(timeInfo.key)
+
+    DataFrameCaches.uncacheDataFrames(timeInfo.key)
+    DataFrameCaches.clearTrashDataFrames(timeInfo.key)
+    DataFrameCaches.clearGlobalTrashDataFrames()
   }
 
   def end: Try[_] = Try {
+    TableRegisters.unregisterRunGlobalTables(sqlContext)
+    TableRegisters.unregisterCompileGlobalTables
+
+    DataFrameCaches.uncacheGlobalDataFrames()
+    DataFrameCaches.clearGlobalTrashDataFrames()
+
     sparkContext.stop
   }
 
+//  private def cleanData(t: Long): Unit = {
+//    try {
+////      dataSources.foreach(_.cleanOldData)
+////      dataSources.foreach(_.dropTable(t))
+//
+////      val cleanTime = TimeInfoCache.getCleanTime
+////      CacheResultProcesser.refresh(cleanTime)
+//
+//      sqlContext.dropTempTable()
+//    } catch {
+//      case e: Throwable => error(s"clean data error: ${e.getMessage}")
+//    }
+//  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/process/DqProcess.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/DqProcess.scala b/measure/src/main/scala/org/apache/griffin/measure/process/DqProcess.scala
index 7ff29d6..ac8f3d6 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/DqProcess.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/DqProcess.scala
@@ -37,7 +37,7 @@ trait DqProcess extends Loggable with Serializable {
 
   def retriable: Boolean
 
-  protected def getStartTime: Long = {
+  protected def getAppTime: Long = {
     if (userParam.timestamp != null && userParam.timestamp > 0) { userParam.timestamp }
     else { System.currentTimeMillis }
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala
index 3fe8b3f..1cc2ab7 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala
@@ -27,7 +27,9 @@ import org.apache.griffin.measure.config.params.user._
 import org.apache.griffin.measure.data.source.DataSourceFactory
 import org.apache.griffin.measure.persist.{Persist, PersistFactory}
 import org.apache.griffin.measure.process.engine.DqEngineFactory
+import org.apache.griffin.measure.process.temp.{DataFrameCaches, TableRegisters}
 import org.apache.griffin.measure.rule.adaptor.RuleAdaptorGroup
+import org.apache.griffin.measure.rule.plan.TimeInfo
 import org.apache.griffin.measure.rule.udf.GriffinUdfs
 import org.apache.griffin.measure.utils.TimeUtil
 import org.apache.spark.sql.SQLContext
@@ -42,8 +44,10 @@ case class StreamingDqProcess(allParam: AllParam) extends DqProcess {
   val envParam: EnvParam = allParam.envParam
   val userParam: UserParam = allParam.userParam
 
-  val metricName = userParam.name
   val sparkParam = envParam.sparkParam
+  val metricName = userParam.name
+  val dataSourceNames = userParam.dataSources.map(_.name)
+  val baselineDsName = userParam.baselineDsName
 
   var sparkContext: SparkContext = _
   var sqlContext: SQLContext = _
@@ -66,7 +70,7 @@ case class StreamingDqProcess(allParam: AllParam) extends DqProcess {
 
     // init adaptors
     val dataSourceNames = userParam.dataSources.map(_.name)
-    RuleAdaptorGroup.init(sqlContext, dataSourceNames)
+    RuleAdaptorGroup.init(sqlContext, dataSourceNames, baselineDsName)
   }
 
   def run: Try[_] = Try {
@@ -82,11 +86,11 @@ case class StreamingDqProcess(allParam: AllParam) extends DqProcess {
     })
 
     // start time
-    val startTime = getStartTime
+    val appTime = getAppTime
 
     // get persists to persist measure result
     val persistFactory = PersistFactory(envParam.persistParams, metricName)
-    val persist: Persist = persistFactory.getPersists(startTime)
+    val persist: Persist = persistFactory.getPersists(appTime)
 
     // persist start id
     val applicationId = sparkContext.applicationId
@@ -96,17 +100,19 @@ case class StreamingDqProcess(allParam: AllParam) extends DqProcess {
     val dqEngines = DqEngineFactory.genDqEngines(sqlContext)
 
     // generate data sources
-    val dataSources = DataSourceFactory.genDataSources(sqlContext, ssc, dqEngines, userParam.dataSources, metricName)
+    val dataSources = DataSourceFactory.genDataSources(sqlContext, ssc, dqEngines, userParam.dataSources)
     dataSources.foreach(_.init)
 
     // process thread
-    val dqThread = StreamingDqThread(dqEngines, dataSources, userParam.evaluateRuleParam, persistFactory, persist)
+    val dqThread = StreamingDqThread(sqlContext, dqEngines, dataSources,
+      userParam.evaluateRuleParam, persistFactory, persist)
 
     // init data sources
-//    dqEngines.loadData(dataSources)
-//
-//    // generate rule steps
-//    val ruleSteps = RuleAdaptorGroup.genConcreteRuleSteps(userParam.evaluateRuleParam)
+//    val dsTmsts = dqEngines.loadData(dataSources, appTime)
+
+    // generate rule steps
+//    val ruleSteps = RuleAdaptorGroup.genRuleSteps(
+//      TimeInfo(appTime, appTime), userParam.evaluateRuleParam, dsTmsts)
 //
 //    // run rules
 //    dqEngines.runRuleSteps(ruleSteps)
@@ -136,6 +142,12 @@ case class StreamingDqProcess(allParam: AllParam) extends DqProcess {
   }
 
   def end: Try[_] = Try {
+    TableRegisters.unregisterCompileGlobalTables()
+    TableRegisters.unregisterRunGlobalTables(sqlContext)
+
+    DataFrameCaches.uncacheGlobalDataFrames()
+    DataFrameCaches.clearGlobalTrashDataFrames()
+
     sparkContext.stop
 
     InfoCacheInstance.close