You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2018/01/16 08:32:40 UTC

[2/2] incubator-griffin git commit: Add distinctness measurement

Add distinctness measurement

add distinct measurement
add data source type which only read data from cache in streaming mode

Author: Lionel Liu <bh...@163.com>

Closes #191 from bhlx3lyx7/tmst.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/cbff5b45
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/cbff5b45
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/cbff5b45

Branch: refs/heads/master
Commit: cbff5b45c19da1ff4354aba5a1ced35c3a437a9c
Parents: e704da6
Author: Lionel Liu <bh...@163.com>
Authored: Tue Jan 16 16:32:31 2018 +0800
Committer: Lionel Liu <bh...@163.com>
Committed: Tue Jan 16 16:32:31 2018 +0800

----------------------------------------------------------------------
 griffin-doc/measure/measure-batch-sample.md     |  64 ++--
 .../measure/measure-configuration-guide.md      |  66 ++--
 .../measure/measure-streaming-sample-old.md     | 204 -----------
 griffin-doc/measure/measure-streaming-sample.md | 256 ++++++++++++++
 .../measure/cache/info/TimeInfoCache.scala      |  24 +-
 .../measure/cache/info/ZKInfoCache.scala        |   8 +-
 .../config/params/user/DataSourceParam.scala    |   1 +
 .../measure/data/connector/DataConnector.scala  |   9 +-
 .../batch/AvroBatchDataConnector.scala          |   6 +-
 .../batch/HiveBatchDataConnector.scala          |   6 +-
 .../batch/TextDirBatchDataConnector.scala       |   6 +-
 .../streaming/StreamingDataConnector.scala      |   3 +-
 .../measure/data/source/DataSource.scala        |  14 +-
 .../measure/data/source/DataSourceCache.scala   | 303 +++++++++--------
 .../measure/data/source/DataSourceFactory.scala |   2 +-
 .../measure/process/BatchDqProcess.scala        |  12 +-
 .../griffin/measure/process/ExportMode.scala    |  34 ++
 .../measure/process/StreamingDqThread.scala     |  93 ++---
 .../measure/process/engine/DqEngine.scala       |   6 +-
 .../measure/process/engine/DqEngines.scala      |  39 ++-
 .../measure/process/engine/SparkDqEngine.scala  | 103 +++---
 .../measure/process/engine/SparkSqlEngine.scala |   3 +
 .../measure/process/temp/TimeRange.scala        |  41 +++
 .../rule/adaptor/DataFrameOprAdaptor.scala      |   9 +-
 .../measure/rule/adaptor/GlobalKeys.scala       |  70 ++++
 .../rule/adaptor/GriffinDslAdaptor.scala        | 335 +++++++++++++------
 .../measure/rule/adaptor/InternalColumns.scala  |   4 +-
 .../measure/rule/adaptor/RuleAdaptor.scala      |  29 +-
 .../measure/rule/adaptor/RuleAdaptorGroup.scala |  12 +-
 .../measure/rule/adaptor/SparkSqlAdaptor.scala  |   9 +-
 .../griffin/measure/rule/dsl/DqType.scala       |   7 +-
 .../dsl/analyzer/DistinctnessAnalyzer.scala     |  47 +++
 .../rule/dsl/expr/ClauseExpression.scala        |   8 +
 .../rule/dsl/parser/GriffinDslParser.scala      |   9 +
 .../measure/rule/plan/MetricExport.scala        |   8 +-
 .../measure/rule/plan/RecordExport.scala        |   9 +-
 .../griffin/measure/rule/plan/RuleExport.scala  |   8 +
 .../_distinctness-batch-griffindsl.json         |  57 ++++
 .../_distinctness-batch-griffindsl1.json        |  73 ++++
 .../_distinctness-streaming-griffindsl.json     |  85 +++++
 .../resources/_profiling-batch-griffindsl.json  |   4 +-
 measure/src/test/resources/dupdata.avro         | Bin 0 -> 304 bytes
 measure/src/test/resources/empty.avro           | Bin 0 -> 215 bytes
 43 files changed, 1396 insertions(+), 690 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/griffin-doc/measure/measure-batch-sample.md
----------------------------------------------------------------------
diff --git a/griffin-doc/measure/measure-batch-sample.md b/griffin-doc/measure/measure-batch-sample.md
index 3783f94..544adc7 100644
--- a/griffin-doc/measure/measure-batch-sample.md
+++ b/griffin-doc/measure/measure-batch-sample.md
@@ -29,50 +29,50 @@ Measures consists of batch measure and streaming measure. This document is for t
 
   "data.sources": [
     {
-      "name": "src",
+      "name": "source",
+      "baseline": true,
       "connectors": [
         {
           "type": "avro",
           "version": "1.7",
           "config": {
-            "file.name": "users_info_src.avro"
+            "file.name": "src/test/resources/users_info_src.avro"
           }
         }
       ]
     }, {
-      "name": "tgt",
+      "name": "target",
       "connectors": [
         {
           "type": "avro",
           "version": "1.7",
           "config": {
-            "file.name": "users_info_target.avro"
+            "file.name": "src/test/resources/users_info_target.avro"
           }
         }
       ]
     }
   ],
 
-  "evaluateRule": {
+  "evaluate.rule": {
     "rules": [
       {
         "dsl.type": "griffin-dsl",
         "dq.type": "accuracy",
-        "rule": "src.user_id = tgt.user_id AND upper(src.first_name) = upper(tgt.first_name) AND src.last_name = tgt.last_name",
+        "name": "accu",
+        "rule": "source.user_id = target.user_id AND upper(source.first_name) = upper(target.first_name) AND source.last_name = target.last_name AND source.address = target.address AND source.email = target.email AND source.phone = target.phone AND source.post_code = target.post_code",
         "details": {
-          "source": "src",
-          "target": "tgt",
-          "miss.records": {
-            "name": "miss.records",
-            "persist.type": "record"
-          },
-          "accuracy": {
-            "name": "accu",
-            "persist.type": "metric"
-          },
+          "source": "source",
+          "target": "target",
           "miss": "miss_count",
           "total": "total_count",
           "matched": "matched_count"
+        },
+        "metric": {
+          "name": "accu"
+        },
+        "record": {
+          "name": "missRecords"
         }
       }
     ]
@@ -92,7 +92,7 @@ The miss records of source will be persisted as record.
 ## Batch Profiling Sample
 ```
 {
-  "name": "prof_batch_test",
+  "name": "prof_batch",
 
   "process.type": "batch",
 
@@ -101,29 +101,35 @@ The miss records of source will be persisted as record.
       "name": "source",
       "connectors": [
         {
-          "type": "hive",
-          "version": "1.2",
+          "type": "avro",
+          "version": "1.7",
           "config": {
-          	"database": "griffin",
-          	"table.name": "demo_src"
+            "file.name": "src/test/resources/users_info_src.avro"
           }
         }
       ]
     }
   ],
 
-  "evaluateRule": {
+  "evaluate.rule": {
     "rules": [
       {
         "dsl.type": "griffin-dsl",
         "dq.type": "profiling",
-        "rule": "country, country.count() as cnt group by country order by cnt desc limit 3",
-        "details": {
-          "source": "source",
-          "profiling": {
-            "name": "cntry-group",
-            "persist.type": "metric"
-          }
+        "name": "prof",
+        "rule": "select count(*) as `cnt`, count(distinct `post_code`) as `dis-cnt`, max(user_id) as `max` from source",
+        "metric": {
+          "name": "prof"
+        }
+      },
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "profiling",
+        "name": "grp",
+        "rule": "select post_code as `pc`, count(*) as `cnt` from source group by post_code",
+        "metric": {
+          "name": "post_group",
+          "collect.type": "array"
         }
       }
     ]

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/griffin-doc/measure/measure-configuration-guide.md
----------------------------------------------------------------------
diff --git a/griffin-doc/measure/measure-configuration-guide.md b/griffin-doc/measure/measure-configuration-guide.md
index 0632927..5ac7e5f 100644
--- a/griffin-doc/measure/measure-configuration-guide.md
+++ b/griffin-doc/measure/measure-configuration-guide.md
@@ -136,26 +136,25 @@ Above lists environment parameters.
     }
   ],
 
-  "evaluateRule": {
+  "evaluate.rule": {
     "rules": [
       {
         "dsl.type": "griffin-dsl",
         "dq.type": "accuracy",
-        "rule": "src.user_id = tgt.user_id AND upper(src.first_name) = upper(tgt.first_name) AND src.last_name = tgt.last_name",
+        "name": "accu",
+        "rule": "source.user_id = target.user_id AND upper(source.first_name) = upper(target.first_name) AND source.last_name = target.last_name AND source.address = target.address AND source.email = target.email AND source.phone = target.phone AND source.post_code = target.post_code",
         "details": {
-          "source": "src",
-          "target": "tgt",
-          "miss.records": {
-            "name": "miss.records",
-            "persist.type": "record"
-          },
-          "accuracy": {
-            "name": "accu",
-            "persist.type": "metric"
-          },
+          "source": "source",
+          "target": "target",
           "miss": "miss_count",
           "total": "total_count",
           "matched": "matched_count"
+        },
+        "metric": {
+          "name": "accu"
+        },
+        "record": {
+          "name": "missRecords"
         }
       }
     ]
@@ -193,19 +192,34 @@ Above lists DQ job configure parameters.
 
 ### <a name="rule"></a>Rule
 - **dsl.type**: Rule dsl type, "spark-sql", "df-opr" and "griffin-dsl".
-- **name** (step information): Result table name of this rule, optional for "griffin-dsl" type.
-- **persist.type** (step information): Persist type of result table, optional for "griffin-dsl" type. Supporting "metric", "record" and "none" type, "metric" type indicates the result will be persisted as metrics, "record" type indicates the result will be persisted as record only, "none" type indicates the result will not be persisted. Default is "none" type.
-- **update.data.source** (step information): If the result table needs to update the data source, this parameter is the data source name, for streaming accuracy case, optional.
 - **dq.type**: DQ type of this rule, only for "griffin-dsl" type, supporting "accuracy" and "profiling".
+- **name** (step information): Result table name of this rule, optional for "griffin-dsl" type.
+- **rule**: The rule string.
 - **details**: Details of this rule, optional.
-	+ accuracy dq type detail configuration
-		* source: the data source name which as source in accuracy, default is the name of first data source in "data.sources" if not configured.
-		* target: the data source name which as target in accuracy, default is the name of second data source in "data.sources" if not configured.
-		* miss.records: step information of miss records result table step in accuracy.
-		* accuracy: step information of accuracy result table step in accuracy.
-		* miss: alias of miss column in result table.
-		* total: alias of total column in result table.
-		* matched: alias of matched column in result table.
-	+ profiling dq type detail configuration
-		* source: the data source name which as source in profiling, default is the name of first data source in "data.sources" if not configured. If the griffin-dsl rule contains from clause, this parameter is ignored.
-		* profiling: step information of profiling result table step in profiling.
\ No newline at end of file
+  + accuracy dq type detail configuration
+    * source: the data source name which as source in accuracy, default is the name of first data source in "data.sources" if not configured.
+    * target: the data source name which as target in accuracy, default is the name of second data source in "data.sources" if not configured.
+    * miss: the miss count name in metric, optional.
+    * total: the total count name in metric, optional.
+    * matched: the matched count name in metric, optional.
+  + profiling dq type detail configuration
+    * source: the data source name which as source in profiling, default is the name of first data source in "data.sources" if not configured. If the griffin-dsl rule contains from clause, this parameter is ignored.
+  + uniqueness dq type detail configuration
+    * source: name of data source to measure uniqueness.
+    * target: name of data source to compare with. It is always the same as source, or more than source.
+    * unique: the unique count name in metric, optional.
+    * total: the total count name in metric, optional.
+    * dup: the duplicate count name in metric, optional.
+    * num: the duplicate number name in metric, optional.
+    * duplication.array: optional, if set as a non-empty string, the duplication metric will be computed, and the group metric name is this string.
+  + timeliness dq type detail configuration
+    * source: name of data source to measure timeliness.
+    * latency: the latency column name in metric, optional.
+    * threshold: optional, if set as a time string like "1h", the items with latency more than 1 hour will be record.
+- **metric**: Configuration of metric export.
+  + name: name of metric.
+  + collect.type: collect metric as the type set, including "default", "entries", "array", "map", optional.
+- **record**: Configuration of record export.
+  + name: name of record.
+  + data.source.cache: optional, if set as data source name, the cache of this data source will be updated by the records, always used in streaming accuracy case.
+  + origin.DF: avaiable only if "data.source.cache" is set, the origin data frame name of records.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/griffin-doc/measure/measure-streaming-sample-old.md
----------------------------------------------------------------------
diff --git a/griffin-doc/measure/measure-streaming-sample-old.md b/griffin-doc/measure/measure-streaming-sample-old.md
deleted file mode 100644
index 004ed3b..0000000
--- a/griffin-doc/measure/measure-streaming-sample-old.md
+++ /dev/null
@@ -1,204 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-# Measure streaming sample
-Measures consists of batch measure and streaming measure. This document is for the streaming measure sample.
-
-### Data source
-At current, we support kafka as streaming data source.  
-In this sample, we also need a kafka as data source.
-
-### Measure type
-At current, we support accuracy measure in streaming mode.
-
-### Kafka decoder
-In kafka, data always needs encode and decode, we support String type kafka data currently, you can also implement and use your decoder for kafka case.
-
-### Environment
-For current griffin streaming case, we need some necessary environment dependencies, zookeeper and hdfs.  
-We use zookeeper to cache some checkpoint information, it's optional, but we recommend it.  
-We use hdfs to save the temporary data, it's also a recommend selection.
-
-### Streaming accuracy result
-The streaming data will be separated into mini-batches of data, for each mini-batch data, there should be an accuracy result. Therefore, the streaming accuracy result should be a bunch of batch accuracy results with timestamp.  
-Considering the latency of streaming data, which means the source data and the matching target data will not exactly reach exactly at the same time, we have to accept some delay of data in streaming mode, by holding unmatched data in memory or disk, and try to match them later until the data is out-time.
-
-## How to run streaming sample
-### Environment Preparation
-At first, we need some environment preparation.  
-- Zookeeper: Zookeeper 3.4.10
-- Hadoop: Hadoop 2.6
-- Spark: Spark 1.6
-- Kafka: Kafka 0.8
-
-### Data Preparation
-Create two topics in kafka, for source and target data. For example, topic "source" for source data, and topic "target" for target data.  
-Streaming data should also be prepared, the format could be json string, for example:  
-Source data could be:
-```
-{"name": "kevin", "age": 24}
-{"name": "jason", "age": 25}
-{"name": "jhon", "age": 28}
-{"name": "steve", "age": 31}
-```
-Target data could be:
-```
-{"name": "kevin", "age": 24}
-{"name": "jason", "age": 25}
-{"name": "steve", "age": 20}
-```
-You need to input the source data and target data into these two topics, through console producer might be a good choice for experimental purpose.
-
-### Configuration Preparation
-Two configuration files are required.
-Environment configuration file: env.json
-```
-{
-  "spark": {
-    "log.level": "WARN",
-    "checkpoint.dir": "hdfs:///griffin/streaming/cp",
-    "batch.interval": "5s",
-    "process.interval": "30s",
-    "config": {
-      "spark.task.maxFailures": 5,
-      "spark.streaming.kafkaMaxRatePerPartition": 1000,
-      "spark.streaming.concurrentJobs": 4
-    }
-  },
-
-  "persist": [
-    {
-      "type": "log",
-      "config": {
-        "max.log.lines": 100
-      }
-    }, {
-      "type": "hdfs",
-      "config": {
-        "path": "hdfs:///griffin/streaming/persist",
-        "max.persist.lines": 10000,
-        "max.lines.per.file": 10000
-      }
-    }
-  ],
-
-  "info.cache": [
-    {
-      "type": "zk",
-      "config": {
-        "hosts": "<zookeeper host ip>:2181",
-        "namespace": "griffin/infocache",
-        "lock.path": "lock",
-        "mode": "persist",
-        "init.clear": true,
-        "close.clear": false
-      }
-    }
-  ]
-}
-```
-In env.json, "spark" field configures the spark and spark streaming parameters, "persist" field configures the persist ways, we support "log", "hdfs" and "http" ways at current, "info.cache" field configures the information cache parameters, we support zookeeper only at current.  
-
-Process configuration file: config.json
-```
-{
-  "name": "streaming-accu-sample",
-  "type": "accuracy",
-  "process.type": "streaming",
-
-  "source": {
-    "type": "kafka",
-    "version": "0.8",
-    "config": {
-      "kafka.config": {
-        "bootstrap.servers": "<kafka host ip>:9092",
-        "group.id": "group1",
-        "auto.offset.reset": "smallest",
-        "auto.commit.enable": "false"
-      },
-      "topics": "source",
-      "key.type": "java.lang.String",
-      "value.type": "java.lang.String"
-    },
-    "cache": {
-      "type": "text",
-      "config": {
-        "file.path": "hdfs:///griffin/streaming/dump/source",
-        "info.path": "source",
-        "ready.time.interval": "10s",
-        "ready.time.delay": "0"
-      },
-      "time.range": ["-5m", "0"]
-    },
-    "match.once": true
-  },
-
-  "target": {
-    "type": "kafka",
-    "version": "0.8",
-    "config": {
-      "kafka.config": {
-        "bootstrap.servers": "<kafka host ip>:9092",
-        "group.id": "group1",
-        "auto.offset.reset": "smallest",
-        "auto.commit.enable": "false"
-      },
-      "topics": "target",
-      "key.type": "java.lang.String",
-      "value.type": "java.lang.String"
-    },
-    "cache": {
-      "type": "text",
-      "config": {
-        "file.path": "hdfs:///griffin/streaming/dump/target",
-        "info.path": "target",
-        "ready.time.interval": "10s",
-        "ready.time.delay": "0"
-      },
-      "time.range": ["-5m", "0"]
-    },
-    "match.once": false
-  },
-
-  "evaluateRule": {
-    "rules": "$source.json().name = $target.json().name AND $source.json().age = $target.json().age"
-  }
-}
-```
-In config.json, "source" and "target" fields configure the data source parameters.  
-The "cache" field in data source configuration represents the temporary data cache way, at current we support "text" and "hive" ways. We recommend "text" way, it only depends on hdfs. "time.range" means that the data older than the lower bound should be considered as out-time, and the out-time data will not be calculated any more.   
-"match.once" represents the data from this data source could be matched only once or more times.  
-"evaluateRule.rule" configures the match rule between each source and target data.
-
-### Run
-Build the measure package.
-```
-mvn clean install
-```
-Get the measure package ```measure-<version>-incubating-SNAPSHOT.jar```, rename it to ```griffin-measure.jar```.  
-Put measure package together with env.json and config.json.
-Run the following command:
-```
-spark-submit --class org.apache.griffin.measure.Application \
---master yarn-client --queue default \
-griffin-measure.jar \
-env.json config.json local,local
-```
-The first two parameters are the paths of env.json and config.json, the third parameter represents the file system type of the two configuration files, "local" or "hdfs" are both supported.  
-
-The spark streaming application will be long-time running, you can get the results of each mini-batch of data, during the run-time, you can also input more data into source and target topics, to check the results of the later mini-batches.

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/griffin-doc/measure/measure-streaming-sample.md
----------------------------------------------------------------------
diff --git a/griffin-doc/measure/measure-streaming-sample.md b/griffin-doc/measure/measure-streaming-sample.md
new file mode 100644
index 0000000..5c80576
--- /dev/null
+++ b/griffin-doc/measure/measure-streaming-sample.md
@@ -0,0 +1,256 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Measure Streaming Sample
+Measures consists of batch measure and streaming measure. This document is for the streaming measure sample.
+
+## Streaming Accuracy Sample
+```
+{
+  "name": "accu_streaming",
+
+  "process.type": "streaming",
+
+  "data.sources": [
+    {
+      "name": "source",
+      "baseline": true,
+      "connectors": [
+        {
+          "type": "kafka",
+          "version": "0.8",
+          "config": {
+            "kafka.config": {
+              "bootstrap.servers": "10.149.247.156:9092",
+              "group.id": "src_group",
+              "auto.offset.reset": "largest",
+              "auto.commit.enable": "false"
+            },
+            "topics": "sss",
+            "key.type": "java.lang.String",
+            "value.type": "java.lang.String"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "df-opr",
+              "name": "${s1}",
+              "rule": "from_json",
+              "details": {
+                "df.name": "${this}"
+              }
+            },
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select name, age from ${s1}"
+            }
+          ]
+        }
+      ],
+      "cache": {
+        "file.path": "hdfs://localhost/griffin/streaming/dump/source",
+        "info.path": "source",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0",
+        "time.range": ["-2m", "0"]
+      }
+    }, {
+      "name": "target",
+      "connectors": [
+        {
+          "type": "kafka",
+          "version": "0.8",
+          "config": {
+            "kafka.config": {
+              "bootstrap.servers": "10.149.247.156:9092",
+              "group.id": "tgt_group",
+              "auto.offset.reset": "largest",
+              "auto.commit.enable": "false"
+            },
+            "topics": "ttt",
+            "key.type": "java.lang.String",
+            "value.type": "java.lang.String"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "df-opr",
+              "name": "${t1}",
+              "rule": "from_json",
+              "details": {
+                "df.name": "${this}"
+              }
+            },
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select name, age from ${t1}"
+            }
+          ]
+        }
+      ],
+      "cache": {
+        "file.path": "hdfs://localhost/griffin/streaming/dump/target",
+        "info.path": "target",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0",
+        "time.range": ["-2m", "0"]
+      }
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "accuracy",
+        "name": "accu",
+        "rule": "source.name = target.name and source.age = target.age",
+        "details": {
+          "source": "source",
+          "target": "target",
+          "miss": "miss_count",
+          "total": "total_count",
+          "matched": "matched_count"
+        },
+        "metric": {
+          "name": "accu"
+        },
+        "record": {
+          "name": "missRecords",
+          "data.source.cache": "source"
+        }
+      }
+    ]
+  }
+}
+```
+Above is the configure file of streaming accuracy job.  
+
+### Data source
+In this sample, we use kafka topics as source and target.  
+At current, griffin supports kafka 0.8, for 1.0 or later version is during implementation.  
+In griffin implementation, we can only support json string as kafka data, which could describe itself in data. In some other solution, there might be a schema proxy for kafka binary data, you can implement such data source connector if you need, it's also during implementation by us.
+In streaming cases, the data from topics always needs some pre-process first, which is configured in `pre.proc`, just like the `rules`, griffin will not parse sql content, so we use some pattern to mark your temporory tables. `${this}` means the origin data set, and the output table name should also be `${this}`.
+
+For example, you can create two topics in kafka, for source and target data, the format could be json string.
+Source data could be:
+```
+{"name": "kevin", "age": 24}
+{"name": "jason", "age": 25}
+{"name": "jhon", "age": 28}
+{"name": "steve", "age": 31}
+```
+Target data could be:
+```
+{"name": "kevin", "age": 24}
+{"name": "jason", "age": 25}
+{"name": "steve", "age": 20}
+```
+You need to input the source data and target data into these two topics, through console producer might be a good choice for experimental purpose.
+
+### Evaluate rule
+In this accuracy sample, the rule describes the match condition: `source.name = target.name and source.age = target.age`.  
+The accuracy metrics will be persisted as metric, with miss column named "miss_count", total column named "total_count", matched column named "matched_count".  
+The miss records of source will be persisted as record.  
+
+## Streaming Profiling Sample
+```
+{
+  "name": "prof_streaming",
+
+  "process.type": "streaming",
+
+  "data.sources": [
+    {
+      "name": "source",
+      "connectors": [
+        {
+          "type": "kafka",
+          "version": "0.8",
+          "config": {
+            "kafka.config": {
+              "bootstrap.servers": "10.149.247.156:9092",
+              "group.id": "group1",
+              "auto.offset.reset": "smallest",
+              "auto.commit.enable": "false"
+            },
+            "topics": "sss",
+            "key.type": "java.lang.String",
+            "value.type": "java.lang.String"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "df-opr",
+              "name": "${s1}",
+              "rule": "from_json",
+              "details": {
+                "df.name": "${this}"
+              }
+            },
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select name, age from ${s1}"
+            }
+          ]
+        }
+      ],
+      "cache": {
+        "file.path": "hdfs://localhost/griffin/streaming/dump/source",
+        "info.path": "source",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0",
+        "time.range": ["0", "0"]
+      }
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "profiling",
+        "name": "prof",
+        "rule": "select count(name) as `cnt`, max(age) as `max`, min(age) as `min` from source",
+        "metric": {
+          "name": "prof"
+        }
+      },
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "profiling",
+        "name": "grp",
+        "rule": "select name, count(*) as `cnt` from source group by name",
+        "metric": {
+          "name": "name_group",
+          "collect.type": "array"
+        }
+      }
+    ]
+  }
+}
+```
+Above is the configure file of streaming profiling job.  
+
+### Data source
+In this sample, we use kafka topics as source.  
+
+### Evaluate rule
+In this profiling sample, the rule describes the profiling request: `select count(name) as `cnt`, max(age) as `max`, min(age) as `min` from source` and `select name, count(*) as `cnt` from source group by name`.  
+The profiling metrics will be persisted as metric, with these two results in one json.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala b/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala
index 85dfe62..aefd390 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala
@@ -60,8 +60,8 @@ object TimeInfoCache extends Loggable with Serializable {
     val subPath = InfoCacheInstance.listKeys(infoPath)
     val keys = subPath.map { p => s"${infoPath}/${p}/${ReadyTime}" }
     val result = InfoCacheInstance.readInfo(keys)
-    val time = keys.map { k =>
-      getLong(result, k)
+    val time = keys.flatMap { k =>
+      getLongOpt(result, k)
     }.min
     val map = Map[String, String]((finalReadyTime -> time.toString))
     InfoCacheInstance.cacheInfo(map)
@@ -71,8 +71,8 @@ object TimeInfoCache extends Loggable with Serializable {
     val subPath = InfoCacheInstance.listKeys(infoPath)
     val keys = subPath.map { p => s"${infoPath}/${p}/${LastProcTime}" }
     val result = InfoCacheInstance.readInfo(keys)
-    val time = keys.map { k =>
-      getLong(result, k)
+    val time = keys.flatMap { k =>
+      getLongOpt(result, k)
     }.min
     val map = Map[String, String]((finalLastProcTime -> time.toString))
     InfoCacheInstance.cacheInfo(map)
@@ -82,8 +82,8 @@ object TimeInfoCache extends Loggable with Serializable {
     val subPath = InfoCacheInstance.listKeys(infoPath)
     val keys = subPath.map { p => s"${infoPath}/${p}/${CleanTime}" }
     val result = InfoCacheInstance.readInfo(keys)
-    val time = keys.map { k =>
-      getLong(result, k)
+    val time = keys.flatMap { k =>
+      getLongOpt(result, k)
     }.min
     val map = Map[String, String]((finalCleanTime -> time.toString))
     InfoCacheInstance.cacheInfo(map)
@@ -102,15 +102,15 @@ object TimeInfoCache extends Loggable with Serializable {
     cleanTime
   }
 
-  private def getLong(map: Map[String, String], key: String): Long = {
+  private def getLongOpt(map: Map[String, String], key: String): Option[Long] = {
     try {
-      map.get(key) match {
-        case Some(v) => v.toLong
-        case _ => -1
-      }
+      map.get(key).map(_.toLong)
     } catch {
-      case e: Throwable => -1
+      case e: Throwable => None
     }
   }
+  private def getLong(map: Map[String, String], key: String) = {
+    getLongOpt(map, key).getOrElse(-1L)
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/cache/info/ZKInfoCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/cache/info/ZKInfoCache.scala b/measure/src/main/scala/org/apache/griffin/measure/cache/info/ZKInfoCache.scala
index 8b62fa4..ee99099 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/cache/info/ZKInfoCache.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/cache/info/ZKInfoCache.scala
@@ -117,7 +117,7 @@ case class ZKInfoCache(config: Map[String, Any], metricName: String) extends Inf
 
   def clearInfo(): Unit = {
 //    delete("/")
-    info("clear info")
+    println("clear info")
   }
 
   def listKeys(p: String): List[String] = {
@@ -138,7 +138,7 @@ case class ZKInfoCache(config: Map[String, Any], metricName: String) extends Inf
       client.getChildren().forPath(path).asScala.toList
     } catch {
       case e: Throwable => {
-        error(s"list ${path} error: ${e.getMessage}")
+        warn(s"list ${path} warn: ${e.getMessage}")
         Nil
       }
     }
@@ -182,7 +182,7 @@ case class ZKInfoCache(config: Map[String, Any], metricName: String) extends Inf
       Some(new String(client.getData().forPath(path), "utf-8"))
     } catch {
       case e: Throwable => {
-        error(s"read ${path} error: ${e.getMessage}")
+        warn(s"read ${path} warn: ${e.getMessage}")
         None
       }
     }
@@ -201,7 +201,7 @@ case class ZKInfoCache(config: Map[String, Any], metricName: String) extends Inf
       client.checkExists().forPath(path) != null
     } catch {
       case e: Throwable => {
-        error(s"check exists ${path} error: ${e.getMessage}")
+        warn(s"check exists ${path} warn: ${e.getMessage}")
         false
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataSourceParam.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataSourceParam.scala b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataSourceParam.scala
index 326d3c8..c43ea70 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataSourceParam.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataSourceParam.scala
@@ -31,4 +31,5 @@ case class DataSourceParam( @JsonProperty("name") name: String,
   def hasName: Boolean = (name != null)
   def isBaseLine: Boolean = if (baseline == null) false else baseline
   def falseBaselineClone: DataSourceParam = DataSourceParam(name, false, connectors, cache)
+  def getConnectors: List[DataConnectorParam] = if (connectors != null) connectors else Nil
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnector.scala
index 6fafebf..1cf3f32 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnector.scala
@@ -25,7 +25,7 @@ import org.apache.griffin.measure.config.params.user.DataConnectorParam
 import org.apache.griffin.measure.log.Loggable
 import org.apache.griffin.measure.process.{BatchDqProcess, BatchProcessType}
 import org.apache.griffin.measure.process.engine._
-import org.apache.griffin.measure.process.temp.{DataFrameCaches, TableRegisters}
+import org.apache.griffin.measure.process.temp.{DataFrameCaches, TableRegisters, TimeRange}
 import org.apache.griffin.measure.rule.adaptor.{InternalColumns, PreProcPhase, RuleAdaptorGroup, RunPhase}
 import org.apache.griffin.measure.rule.dsl._
 import org.apache.griffin.measure.rule.plan._
@@ -45,7 +45,7 @@ trait DataConnector extends Loggable with Serializable {
 
   def init(): Unit
 
-  def data(ms: Long): (Option[DataFrame], Set[Long])
+  def data(ms: Long): (Option[DataFrame], TimeRange)
 
   val dqEngines: DqEngines
 
@@ -72,11 +72,12 @@ trait DataConnector extends Loggable with Serializable {
         TableRegisters.registerRunTempTable(df, timeInfo.key, thisTable)
 
 //        val dsTmsts = Map[String, Set[Long]]((thisTable -> Set[Long](ms)))
-        val tmsts = Seq[Long](ms)
+//        val tmsts = Seq[Long](ms)
+        val dsTimeRanges = Map[String, TimeRange]((thisTable -> TimeRange(ms)))
 
         // generate rule steps
         val rulePlan = RuleAdaptorGroup.genRulePlan(
-          timeInfo, preProcRules, SparkSqlType, BatchProcessType)
+          timeInfo, preProcRules, SparkSqlType, BatchProcessType, dsTimeRanges)
 
         // run rules
         dqEngines.runRuleSteps(timeInfo, rulePlan.ruleSteps)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/AvroBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/AvroBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/AvroBatchDataConnector.scala
index fb042c2..5a1c22c 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/AvroBatchDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/AvroBatchDataConnector.scala
@@ -21,6 +21,7 @@ package org.apache.griffin.measure.data.connector.batch
 import org.apache.griffin.measure.config.params.user.DataConnectorParam
 import org.apache.griffin.measure.data.connector._
 import org.apache.griffin.measure.process.engine.DqEngines
+import org.apache.griffin.measure.process.temp.TimeRange
 import org.apache.griffin.measure.result._
 import org.apache.griffin.measure.utils.HdfsUtil
 import org.apache.spark.rdd.RDD
@@ -51,7 +52,7 @@ case class AvroBatchDataConnector(sqlContext: SQLContext, dqEngines: DqEngines,
     HdfsUtil.existPath(concreteFileFullPath)
   }
 
-  def data(ms: Long): (Option[DataFrame], Set[Long]) = {
+  def data(ms: Long): (Option[DataFrame], TimeRange) = {
     val dfOpt = try {
       val df = sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath)
       val dfOpt = Some(df)
@@ -63,7 +64,8 @@ case class AvroBatchDataConnector(sqlContext: SQLContext, dqEngines: DqEngines,
         None
       }
     }
-    (dfOpt, readTmst(ms))
+    val tmsts = readTmst(ms)
+    (dfOpt, TimeRange(ms, tmsts))
   }
 
 //  def available(): Boolean = {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/HiveBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/HiveBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/HiveBatchDataConnector.scala
index 812d724..2c9747e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/HiveBatchDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/HiveBatchDataConnector.scala
@@ -21,6 +21,7 @@ package org.apache.griffin.measure.data.connector.batch
 import org.apache.griffin.measure.config.params.user.DataConnectorParam
 import org.apache.griffin.measure.data.connector._
 import org.apache.griffin.measure.process.engine.DqEngines
+import org.apache.griffin.measure.process.temp.TimeRange
 import org.apache.griffin.measure.result._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.hive.HiveContext
@@ -60,7 +61,7 @@ case class HiveBatchDataConnector(sqlContext: SQLContext, dqEngines: DqEngines,
 //    if (arr.size > 0) Some(arr) else None
 //  }
 
-  def data(ms: Long): (Option[DataFrame], Set[Long]) = {
+  def data(ms: Long): (Option[DataFrame], TimeRange) = {
     val dfOpt = try {
       val dtSql = dataSql
       info(dtSql)
@@ -74,7 +75,8 @@ case class HiveBatchDataConnector(sqlContext: SQLContext, dqEngines: DqEngines,
         None
       }
     }
-    (dfOpt, readTmst(ms))
+    val tmsts = readTmst(ms)
+    (dfOpt, TimeRange(ms, tmsts))
   }
 
 //  def available(): Boolean = {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/TextDirBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/TextDirBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/TextDirBatchDataConnector.scala
index 32be963..fe8d386 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/TextDirBatchDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/TextDirBatchDataConnector.scala
@@ -20,6 +20,7 @@ package org.apache.griffin.measure.data.connector.batch
 
 import org.apache.griffin.measure.config.params.user.DataConnectorParam
 import org.apache.griffin.measure.process.engine.DqEngines
+import org.apache.griffin.measure.process.temp.TimeRange
 import org.apache.griffin.measure.utils.HdfsUtil
 import org.apache.spark.sql.{DataFrame, SQLContext}
 import org.apache.griffin.measure.utils.ParamUtil._
@@ -46,7 +47,7 @@ case class TextDirBatchDataConnector(sqlContext: SQLContext, dqEngines: DqEngine
     HdfsUtil.existPath(dirPath)
   }
 
-  def data(ms: Long): (Option[DataFrame], Set[Long]) = {
+  def data(ms: Long): (Option[DataFrame], TimeRange) = {
     val dfOpt = try {
       val dataDirs = listSubDirs(dirPath :: Nil, dataDirDepth, readable)
       // touch done file for read dirs
@@ -68,7 +69,8 @@ case class TextDirBatchDataConnector(sqlContext: SQLContext, dqEngines: DqEngine
         None
       }
     }
-    (dfOpt, readTmst(ms))
+    val tmsts = readTmst(ms)
+    (dfOpt, TimeRange(ms, tmsts))
   }
 
   private def listSubDirs(paths: Seq[String], depth: Int, filteFunc: (String) => Boolean): Seq[String] = {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala
index f8d50be..f65b0d2 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala
@@ -20,6 +20,7 @@ package org.apache.griffin.measure.data.connector.streaming
 
 import org.apache.griffin.measure.data.connector._
 import org.apache.griffin.measure.data.source.DataSourceCache
+import org.apache.griffin.measure.process.temp.TimeRange
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.DataFrame
 import org.apache.spark.streaming.dstream.InputDStream
@@ -36,7 +37,7 @@ trait StreamingDataConnector extends DataConnector {
 
   def transform(rdd: RDD[(K, V)]): Option[DataFrame]
 
-  def data(ms: Long): (Option[DataFrame], Set[Long]) = (None, Set.empty[Long])
+  def data(ms: Long): (Option[DataFrame], TimeRange) = (None, TimeRange.emptyTimeRange)
 
   var dataSourceCacheOpt: Option[DataSourceCache] = None
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala
index 1918e28..fc8c646 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala
@@ -23,7 +23,7 @@ import org.apache.griffin.measure.data.connector._
 import org.apache.griffin.measure.data.connector.batch._
 import org.apache.griffin.measure.data.connector.streaming._
 import org.apache.griffin.measure.log.Loggable
-import org.apache.griffin.measure.process.temp.{DataFrameCaches, TableRegisters}
+import org.apache.griffin.measure.process.temp.{DataFrameCaches, TableRegisters, TimeRange}
 import org.apache.griffin.measure.rule.plan.TimeInfo
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, Row, SQLContext}
@@ -49,7 +49,7 @@ case class DataSource(sqlContext: SQLContext,
     dataConnectors.map(_.tmstCache = tmstCache)
   }
 
-  def loadData(timeInfo: TimeInfo): Set[Long] = {
+  def loadData(timeInfo: TimeInfo): TimeRange = {
     val calcTime = timeInfo.calcTime
     println(s"load data [${name}]")
     val (dfOpt, tmsts) = data(calcTime)
@@ -65,11 +65,11 @@ case class DataSource(sqlContext: SQLContext,
     tmsts
   }
 
-  private def data(ms: Long): (Option[DataFrame], Set[Long]) = {
+  private def data(ms: Long): (Option[DataFrame], TimeRange) = {
     val batches = batchDataConnectors.flatMap { dc =>
-      val (dfOpt, tmsts) = dc.data(ms)
+      val (dfOpt, timeRange) = dc.data(ms)
       dfOpt match {
-        case Some(df) => Some((dfOpt, tmsts))
+        case Some(df) => Some((dfOpt, timeRange))
         case _ => None
       }
     }
@@ -81,10 +81,10 @@ case class DataSource(sqlContext: SQLContext,
 
     if (pairs.size > 0) {
       pairs.reduce { (a, b) =>
-        (unionDfOpts(a._1, b._1), a._2 ++ b._2)
+        (unionDfOpts(a._1, b._1), a._2.merge(b._2))
       }
     } else {
-      (None, Set.empty[Long])
+      (None, TimeRange.emptyTimeRange)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceCache.scala
index 9272f17..fff186f 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceCache.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceCache.scala
@@ -25,6 +25,7 @@ import org.apache.griffin.measure.cache.tmst.TmstCache
 import org.apache.griffin.measure.data.connector.streaming.StreamingDataConnector
 import org.apache.griffin.measure.data.connector._
 import org.apache.griffin.measure.log.Loggable
+import org.apache.griffin.measure.process.temp.TimeRange
 import org.apache.griffin.measure.utils.{HdfsFileDumpUtil, HdfsUtil, TimeUtil}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, SQLContext}
@@ -70,6 +71,14 @@ case class DataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
     }
   }
 
+//  val _WriteInfoPath = "write.info.path"
+//  val _ReadInfoPath = "read.info.path"
+//  val writeCacheInfoPath = param.getString(_WriteInfoPath, defInfoPath)
+//  val readCacheInfoPath = param.getString(_ReadInfoPath, defInfoPath)
+
+  val _ReadOnly = "read.only"
+  val readOnly = param.getBoolean(_ReadOnly, false)
+
   val rowSepLiteral = "\n"
   val partitionUnits: List[String] = List("hour", "min", "sec")
   val minUnitTime: Long = TimeUtil.timeFromUnit(1, partitionUnits.last)
@@ -82,47 +91,50 @@ case class DataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
   }
 
   def saveData(dfOpt: Option[DataFrame], ms: Long): Unit = {
-    dfOpt match {
-      case Some(df) => {
-        val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
-        if (newCacheLocked) {
-          try {
-            val ptns = getPartition(ms)
-            val ptnsPath = genPartitionHdfsPath(ptns)
-            val dirPath = s"${filePath}/${ptnsPath}"
-            val dataFileName = s"${ms}"
-            val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName)
-
-            // transform data
-            val dataRdd: RDD[String] = df.toJSON
-
-            // save data
-//            val dumped = if (!dataRdd.isEmpty) {
-//              HdfsFileDumpUtil.dump(dataFilePath, dataRdd, rowSepLiteral)
-//            } else false
-
-            if (!dataRdd.isEmpty) {
-              HdfsFileDumpUtil.dump(dataFilePath, dataRdd, rowSepLiteral)
+    if (!readOnly) {
+      dfOpt match {
+        case Some(df) => {
+          val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
+          if (newCacheLocked) {
+            try {
+              val ptns = getPartition(ms)
+              val ptnsPath = genPartitionHdfsPath(ptns)
+              val dirPath = s"${filePath}/${ptnsPath}"
+              val dataFileName = s"${ms}"
+              val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName)
+
+              // transform data
+              val dataRdd: RDD[String] = df.toJSON
+
+              // save data
+              //            val dumped = if (!dataRdd.isEmpty) {
+              //              HdfsFileDumpUtil.dump(dataFilePath, dataRdd, rowSepLiteral)
+              //            } else false
+
+              if (!dataRdd.isEmpty) {
+                HdfsFileDumpUtil.dump(dataFilePath, dataRdd, rowSepLiteral)
+              }
+
+            } catch {
+              case e: Throwable => error(s"save data error: ${e.getMessage}")
+            } finally {
+              newCacheLock.unlock()
             }
-
-          } catch {
-            case e: Throwable => error(s"save data error: ${e.getMessage}")
-          } finally {
-            newCacheLock.unlock()
           }
         }
+        case _ => {
+          info(s"no data frame to save")
+        }
       }
-      case _ => {
-        info(s"no data frame to save")
-      }
-    }
 
-    // submit cache time and ready time
-    submitCacheTime(ms)
-    submitReadyTime(ms)
+      // submit cache time and ready time
+      submitCacheTime(ms)
+      submitReadyTime(ms)
+    }
   }
 
-  def readData(): (Option[DataFrame], Set[Long]) = {
+  // return: (data frame option, time range)
+  def readData(): (Option[DataFrame], TimeRange) = {
     val tr = TimeInfoCache.getTimeRange
     val timeRange = (tr._1 + minUnitTime, tr._2)
     submitLastProcTime(timeRange._2)
@@ -137,6 +149,7 @@ case class DataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
 
     // list partition paths
     val partitionPaths = listPathsBetweenRanges(filePath :: Nil, partitionRanges)
+//    println(partitionPaths)
 
     val dfOpt = if (partitionPaths.isEmpty) {
       None
@@ -154,140 +167,152 @@ case class DataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
     // from until tmst range
     val (from, until) = (reviseTimeRange._1, reviseTimeRange._2 + 1)
     val tmstSet = rangeTmsts(from, until)
-    (dfOpt, tmstSet)
+
+    val retTimeRange = TimeRange(reviseTimeRange, tmstSet)
+    (dfOpt, retTimeRange)
   }
 
   def updateData(df: DataFrame, ms: Long): Unit = {
-    val ptns = getPartition(ms)
-    val ptnsPath = genPartitionHdfsPath(ptns)
-    val dirPath = s"${filePath}/${ptnsPath}"
-    val dataFileName = s"${ms}"
-    val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName)
+    if (!readOnly) {
+      val ptns = getPartition(ms)
+      val ptnsPath = genPartitionHdfsPath(ptns)
+      val dirPath = s"${filePath}/${ptnsPath}"
+      val dataFileName = s"${ms}"
+      val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName)
 
-    try {
-      val records = df.toJSON
-      val arr = records.collect
-      val needSave = !arr.isEmpty
-
-      // remove out time old data
-      HdfsFileDumpUtil.remove(dirPath, dataFileName, true)
-      println(s"remove file path: ${dirPath}/${dataFileName}")
-
-      // save updated data
-      if (needSave) {
-        HdfsFileDumpUtil.dump(dataFilePath, arr, rowSepLiteral)
-        println(s"update file path: ${dataFilePath}")
-      } else {
-        clearTmst(ms)
-        println(s"data source [${dsName}] timestamp [${ms}] cleared")
+      try {
+        val records = df.toJSON
+        val arr = records.collect
+        val needSave = !arr.isEmpty
+
+        // remove out time old data
+        HdfsFileDumpUtil.remove(dirPath, dataFileName, true)
+        println(s"remove file path: ${dirPath}/${dataFileName}")
+
+        // save updated data
+        if (needSave) {
+          HdfsFileDumpUtil.dump(dataFilePath, arr, rowSepLiteral)
+          println(s"update file path: ${dataFilePath}")
+        } else {
+          clearTmst(ms)
+          println(s"data source [${dsName}] timestamp [${ms}] cleared")
+        }
+      } catch {
+        case e: Throwable => error(s"update data error: ${e.getMessage}")
       }
-    } catch {
-      case e: Throwable => error(s"update data error: ${e.getMessage}")
     }
   }
 
   def updateData(rdd: RDD[String], ms: Long, cnt: Long): Unit = {
-    val ptns = getPartition(ms)
-    val ptnsPath = genPartitionHdfsPath(ptns)
-    val dirPath = s"${filePath}/${ptnsPath}"
-    val dataFileName = s"${ms}"
-    val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName)
+    if (!readOnly) {
+      val ptns = getPartition(ms)
+      val ptnsPath = genPartitionHdfsPath(ptns)
+      val dirPath = s"${filePath}/${ptnsPath}"
+      val dataFileName = s"${ms}"
+      val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName)
 
-    try {
-//      val needSave = !rdd.isEmpty
-
-      // remove out time old data
-      HdfsFileDumpUtil.remove(dirPath, dataFileName, true)
-      println(s"remove file path: ${dirPath}/${dataFileName}")
-
-      // save updated data
-      if (cnt > 0) {
-        HdfsFileDumpUtil.dump(dataFilePath, rdd, rowSepLiteral)
-        println(s"update file path: ${dataFilePath}")
-      } else {
-        clearTmst(ms)
-        println(s"data source [${dsName}] timestamp [${ms}] cleared")
+      try {
+        //      val needSave = !rdd.isEmpty
+
+        // remove out time old data
+        HdfsFileDumpUtil.remove(dirPath, dataFileName, true)
+        println(s"remove file path: ${dirPath}/${dataFileName}")
+
+        // save updated data
+        if (cnt > 0) {
+          HdfsFileDumpUtil.dump(dataFilePath, rdd, rowSepLiteral)
+          println(s"update file path: ${dataFilePath}")
+        } else {
+          clearTmst(ms)
+          println(s"data source [${dsName}] timestamp [${ms}] cleared")
+        }
+      } catch {
+        case e: Throwable => error(s"update data error: ${e.getMessage}")
+      } finally {
+        rdd.unpersist()
       }
-    } catch {
-      case e: Throwable => error(s"update data error: ${e.getMessage}")
-    } finally {
-      rdd.unpersist()
     }
   }
 
   def updateData(arr: Iterable[String], ms: Long): Unit = {
-    val ptns = getPartition(ms)
-    val ptnsPath = genPartitionHdfsPath(ptns)
-    val dirPath = s"${filePath}/${ptnsPath}"
-    val dataFileName = s"${ms}"
-    val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName)
+    if (!readOnly) {
+      val ptns = getPartition(ms)
+      val ptnsPath = genPartitionHdfsPath(ptns)
+      val dirPath = s"${filePath}/${ptnsPath}"
+      val dataFileName = s"${ms}"
+      val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName)
 
-    try {
-      val needSave = !arr.isEmpty
-
-      // remove out time old data
-      HdfsFileDumpUtil.remove(dirPath, dataFileName, true)
-      println(s"remove file path: ${dirPath}/${dataFileName}")
-
-      // save updated data
-      if (needSave) {
-        HdfsFileDumpUtil.dump(dataFilePath, arr, rowSepLiteral)
-        println(s"update file path: ${dataFilePath}")
-      } else {
-        clearTmst(ms)
-        println(s"data source [${dsName}] timestamp [${ms}] cleared")
+      try {
+        val needSave = !arr.isEmpty
+
+        // remove out time old data
+        HdfsFileDumpUtil.remove(dirPath, dataFileName, true)
+        println(s"remove file path: ${dirPath}/${dataFileName}")
+
+        // save updated data
+        if (needSave) {
+          HdfsFileDumpUtil.dump(dataFilePath, arr, rowSepLiteral)
+          println(s"update file path: ${dataFilePath}")
+        } else {
+          clearTmst(ms)
+          println(s"data source [${dsName}] timestamp [${ms}] cleared")
+        }
+      } catch {
+        case e: Throwable => error(s"update data error: ${e.getMessage}")
       }
-    } catch {
-      case e: Throwable => error(s"update data error: ${e.getMessage}")
     }
   }
 
   def updateDataMap(dfMap: Map[Long, DataFrame]): Unit = {
-    val dataMap = dfMap.map { pair =>
-      val (t, recs) = pair
-      val rdd = recs.toJSON
-//      rdd.cache
-      (t, rdd, rdd.count)
-    }
+    if (!readOnly) {
+      val dataMap = dfMap.map { pair =>
+        val (t, recs) = pair
+        val rdd = recs.toJSON
+        //      rdd.cache
+        (t, rdd, rdd.count)
+      }
 
-    dataMap.foreach { pair =>
-      val (t, arr, cnt) = pair
-      updateData(arr, t, cnt)
+      dataMap.foreach { pair =>
+        val (t, arr, cnt) = pair
+        updateData(arr, t, cnt)
+      }
     }
   }
 
   def cleanOldData(): Unit = {
-    val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
-    if (oldCacheLocked) {
-      try {
-        val cleanTime = readCleanTime()
-        cleanTime match {
-          case Some(ct) => {
-            println(s"data source [${dsName}] old timestamps clear until [${ct}]")
-
-            // clear out date tmsts
-            clearTmstsUntil(ct)
-
-            // drop partitions
-            val bounds = getPartition(ct)
-
-            // list partition paths
-            val earlierPaths = listPathsEarlierThanBounds(filePath :: Nil, bounds)
-
-            // delete out time data path
-            earlierPaths.foreach { path =>
-              println(s"delete hdfs path: ${path}")
-              HdfsUtil.deleteHdfsPath(path)
+    if (!readOnly) {
+      val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
+      if (oldCacheLocked) {
+        try {
+          val cleanTime = readCleanTime()
+          cleanTime match {
+            case Some(ct) => {
+              println(s"data source [${dsName}] old timestamps clear until [${ct}]")
+
+              // clear out date tmsts
+              clearTmstsUntil(ct)
+
+              // drop partitions
+              val bounds = getPartition(ct)
+
+              // list partition paths
+              val earlierPaths = listPathsEarlierThanBounds(filePath :: Nil, bounds)
+
+              // delete out time data path
+              earlierPaths.foreach { path =>
+                println(s"delete hdfs path: ${path}")
+                HdfsUtil.deleteHdfsPath(path)
+              }
+            }
+            case _ => {
+              // do nothing
             }
           }
-          case _ => {
-            // do nothing
-          }
+        } catch {
+          case e: Throwable => error(s"clean old data error: ${e.getMessage}")
+        } finally {
+          oldCacheLock.unlock()
         }
-      } catch {
-        case e: Throwable => error(s"clean old data error: ${e.getMessage}")
-      } finally {
-        oldCacheLock.unlock()
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceFactory.scala
index 47ee368..b83e2fb 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceFactory.scala
@@ -47,7 +47,7 @@ object DataSourceFactory extends Loggable {
                            ): Option[DataSource] = {
     val name = dataSourceParam.name
     val baseline = dataSourceParam.isBaseLine
-    val connectorParams = dataSourceParam.connectors
+    val connectorParams = dataSourceParam.getConnectors
     val cacheParam = dataSourceParam.cache
     val dataConnectors = connectorParams.flatMap { connectorParam =>
       DataConnectorFactory.getDataConnector(sqlContext, ssc, dqEngines, connectorParam) match {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala b/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
index 7ed4717..950cd27 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
@@ -92,9 +92,9 @@ case class BatchDqProcess(allParam: AllParam) extends DqProcess {
     dataSources.foreach(_.init)
 
     // init data sources
-    val dsTmsts = dqEngines.loadData(dataSources, calcTimeInfo)
+    val dsTimeRanges = dqEngines.loadData(dataSources, calcTimeInfo)
 
-    debug(s"data source timestamps: ${dsTmsts}")
+    println(s"data source timeRanges: ${dsTimeRanges}")
 
     // generate rule steps
 //    val ruleSteps = RuleAdaptorGroup.genConcreteRuleSteps(
@@ -103,7 +103,7 @@ case class BatchDqProcess(allParam: AllParam) extends DqProcess {
 //      CalcTimeInfo(appTime), userParam.evaluateRuleParam, dsTmsts)
 
     val rulePlan = RuleAdaptorGroup.genRulePlan(
-      calcTimeInfo, userParam.evaluateRuleParam, BatchProcessType)
+      calcTimeInfo, userParam.evaluateRuleParam, BatchProcessType, dsTimeRanges)
 
 //    rulePlan.ruleSteps.foreach(println)
 //    println("====")
@@ -116,11 +116,9 @@ case class BatchDqProcess(allParam: AllParam) extends DqProcess {
     dqEngines.runRuleSteps(calcTimeInfo, rulePlan.ruleSteps)
 
     // persist results
-    dqEngines.persistAllMetrics(calcTimeInfo, rulePlan.metricExports,
-      BatchProcessType, persistFactory)
+    dqEngines.persistAllMetrics(calcTimeInfo, rulePlan.metricExports, persistFactory)
 
-    dqEngines.persistAllRecords(calcTimeInfo, rulePlan.recordExports,
-      BatchProcessType, persistFactory, dataSources)
+    dqEngines.persistAllRecords(calcTimeInfo, rulePlan.recordExports, persistFactory, dataSources)
 //    dfs.foreach(_._2.cache())
 //
 //    dqEngines.persistAllRecords(dfs, persistFactory)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/process/ExportMode.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/ExportMode.scala b/measure/src/main/scala/org/apache/griffin/measure/process/ExportMode.scala
new file mode 100644
index 0000000..42aa92b
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/ExportMode.scala
@@ -0,0 +1,34 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.process
+
+sealed trait ExportMode {}
+
+object ExportMode {
+  def defaultMode(procType: ProcessType): ExportMode = {
+    procType match {
+      case BatchProcessType => SimpleMode
+      case StreamingProcessType => TimestampMode
+    }
+  }
+}
+
+final case object SimpleMode extends ExportMode {}
+
+final case object TimestampMode extends ExportMode {}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala
index 39444cd..fcf9528 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala
@@ -29,7 +29,7 @@ import org.apache.griffin.measure.log.Loggable
 import org.apache.griffin.measure.persist.{Persist, PersistFactory}
 import org.apache.griffin.measure.process.engine.DqEngines
 import org.apache.griffin.measure.process.temp.{DataFrameCaches, TableRegisters}
-import org.apache.griffin.measure.rule.adaptor.{RuleAdaptorGroup, RunPhase}
+import org.apache.griffin.measure.rule.adaptor.{ProcessDetailsKeys, RuleAdaptorGroup, RunPhase}
 import org.apache.griffin.measure.rule.plan._
 import org.apache.spark.sql.SQLContext
 
@@ -58,21 +58,25 @@ case class StreamingDqThread(sqlContext: SQLContext,
         TimeInfoCache.startTimeInfoCache
 
         // init data sources
-        val dsTmsts = dqEngines.loadData(dataSources, calcTimeInfo)
+        val dsTimeRanges = dqEngines.loadData(dataSources, calcTimeInfo)
 
-        println(s"data sources timestamps: ${dsTmsts}")
+        println(s"data source timeRanges: ${dsTimeRanges}")
 
         // generate rule steps
 //        val ruleSteps = RuleAdaptorGroup.genRuleSteps(
 //          CalcTimeInfo(st), evaluateRuleParam, dsTmsts)
         val rulePlan = RuleAdaptorGroup.genRulePlan(
-          calcTimeInfo, evaluateRuleParam, StreamingProcessType)
+          calcTimeInfo, evaluateRuleParam, StreamingProcessType, dsTimeRanges)
+
+        // optimize rule plan
+//        val optRulePlan = optimizeRulePlan(rulePlan, dsTmsts)
+        val optRulePlan = rulePlan
 
 //        ruleSteps.foreach(println)
 
         // run rules
 //        dqEngines.runRuleSteps(ruleSteps)
-        dqEngines.runRuleSteps(calcTimeInfo, rulePlan.ruleSteps)
+        dqEngines.runRuleSteps(calcTimeInfo, optRulePlan.ruleSteps)
 
         val ct = new Date().getTime
         val calculationTimeStr = s"calculation using time: ${ct - st} ms"
@@ -81,8 +85,7 @@ case class StreamingDqThread(sqlContext: SQLContext,
 
         // persist results
 //        val timeGroups = dqEngines.persistAllMetrics(ruleSteps, persistFactory)
-        dqEngines.persistAllMetrics(calcTimeInfo, rulePlan.metricExports,
-          StreamingProcessType, persistFactory)
+        dqEngines.persistAllMetrics(calcTimeInfo, optRulePlan.metricExports, persistFactory)
 //        println(s"--- timeGroups: ${timeGroups}")
 
         val rt = new Date().getTime
@@ -90,8 +93,7 @@ case class StreamingDqThread(sqlContext: SQLContext,
         appPersist.log(rt, persistResultTimeStr)
 
         // persist records
-        dqEngines.persistAllRecords(calcTimeInfo, rulePlan.recordExports,
-          StreamingProcessType, persistFactory, dataSources)
+        dqEngines.persistAllRecords(calcTimeInfo, optRulePlan.recordExports, persistFactory, dataSources)
 
         val et = new Date().getTime
         val persistTimeStr = s"persist records using time: ${et - rt} ms"
@@ -167,54 +169,29 @@ case class StreamingDqThread(sqlContext: SQLContext,
     }
   }
 
-//  // calculate accuracy between source data and target data
-//  private def accuracy(sourceData: RDD[(Product, (Map[String, Any], Map[String, Any]))],
-//               targetData: RDD[(Product, (Map[String, Any], Map[String, Any]))],
-//               ruleAnalyzer: RuleAnalyzer) = {
-//    // 1. cogroup
-//    val allKvs = sourceData.cogroup(targetData)
-//
-//    // 2. accuracy calculation
-//    val (accuResult, missingRdd, matchedRdd) = AccuracyCore.accuracy(allKvs, ruleAnalyzer)
-//
-//    (accuResult, missingRdd, matchedRdd)
-//  }
-//
-//  private def reorgByTimeGroup(rdd: RDD[(Product, (Map[String, Any], Map[String, Any]))]
-//                      ): RDD[(Long, (Product, (Map[String, Any], Map[String, Any])))] = {
-//    rdd.flatMap { row =>
-//      val (key, (value, info)) = row
-//      val b: Option[(Long, (Product, (Map[String, Any], Map[String, Any])))] = info.get(TimeStampInfo.key) match {
-//        case Some(t: Long) => Some((t, row))
-//        case _ => None
-//      }
-//      b
-//    }
-//  }
-//
-//  // convert data into a string
-//  def record2String(rec: (Product, (Map[String, Any], Map[String, Any])), dataPersist: Iterable[Expr], infoPersist: Iterable[Expr]): String = {
-//    val (key, (data, info)) = rec
-//    val persistData = getPersistMap(data, dataPersist)
-//    val persistInfo = info.mapValues { value =>
-//      value match {
-//        case vd: Map[String, Any] => getPersistMap(vd, infoPersist)
-//        case v => v
-//      }
-//    }.map(identity)
-//    s"${persistData} [${persistInfo}]"
-//  }
-//
-//  // get the expr value map of the persist expressions
-//  private def getPersistMap(data: Map[String, Any], persist: Iterable[Expr]): Map[String, Any] = {
-//    val persistMap = persist.map(e => (e._id, e.desc)).toMap
-//    data.flatMap { pair =>
-//      val (k, v) = pair
-//      persistMap.get(k) match {
-//        case Some(d) => Some((d -> v))
-//        case _ => None
-//      }
-//    }
-//  }
+  private def optimizeRulePlan(rulePlan: RulePlan, dsTmsts: Map[String, Set[Long]]): RulePlan = {
+    val steps = rulePlan.ruleSteps
+    val optExports = rulePlan.ruleExports.flatMap { export =>
+      findRuleStepByName(steps, export.stepName).map { rs =>
+        rs.details.get(ProcessDetailsKeys._baselineDataSource) match {
+          case Some(dsname: String) => {
+            val defTmstOpt = (dsTmsts.get(dsname)).flatMap { set =>
+              try { Some(set.max) } catch { case _: Throwable => None }
+            }
+            defTmstOpt match {
+              case Some(t) => export.setDefTimestamp(t)
+              case _ => export
+            }
+          }
+          case _ => export
+        }
+      }
+    }
+    RulePlan(steps, optExports)
+  }
+
+  private def findRuleStepByName(steps: Seq[RuleStep], name: String): Option[RuleStep] = {
+    steps.filter(_.name == name).headOption
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
index a48c4d1..00c6ef4 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
@@ -34,16 +34,14 @@ trait DqEngine extends Loggable with Serializable {
 
   protected def collectable(): Boolean = false
 
-  def collectMetrics(timeInfo: TimeInfo, metricExport: MetricExport, procType: ProcessType
-                    ): Map[Long, Map[String, Any]]
+  def collectMetrics(timeInfo: TimeInfo, metricExport: MetricExport): Map[Long, Map[String, Any]]
 
   //  def collectRecords(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long]): Option[RDD[(Long, Iterable[String])]]
   //
   //  def collectUpdateCacheDatas(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long]): Option[RDD[(Long, Iterable[String])]]
 
 //  def collectUpdateRDD(ruleStep: RuleStep): Option[DataFrame]
-  def collectRecords(timeInfo: TimeInfo, recordExport: RecordExport, procType: ProcessType
-                    ): Map[Long, DataFrame]
+//  def collectRecords(timeInfo: TimeInfo, recordExport: RecordExport): Map[Long, DataFrame]
 
 
   def collectBatchRecords(recordExport: RecordExport): Option[RDD[String]]

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
index 03ee208..2163925 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
@@ -24,7 +24,8 @@ import org.apache.griffin.measure.config.params.user.DataSourceParam
 import org.apache.griffin.measure.data.source._
 import org.apache.griffin.measure.log.Loggable
 import org.apache.griffin.measure.persist.{Persist, PersistFactory}
-import org.apache.griffin.measure.process.{BatchProcessType, ProcessType, StreamingProcessType}
+import org.apache.griffin.measure.process.temp.TimeRange
+import org.apache.griffin.measure.process._
 import org.apache.griffin.measure.rule.adaptor.InternalColumns
 import org.apache.griffin.measure.rule.dsl._
 import org.apache.griffin.measure.rule.plan._
@@ -34,14 +35,14 @@ import org.apache.spark.sql.{DataFrame, Row}
 
 import scala.concurrent._
 import scala.concurrent.duration.Duration
-import scala.util.{Failure, Success}
+import scala.util.{Failure, Success, Try}
 import ExecutionContext.Implicits.global
 
 case class DqEngines(engines: Seq[DqEngine]) extends DqEngine {
 
   val persistOrder: List[PersistType] = List(MetricPersistType, RecordPersistType)
 
-  def loadData(dataSources: Seq[DataSource], timeInfo: TimeInfo): Map[String, Set[Long]] = {
+  def loadData(dataSources: Seq[DataSource], timeInfo: TimeInfo): Map[String, TimeRange] = {
     dataSources.map { ds =>
       (ds.name, ds.loadData(timeInfo))
     }.toMap
@@ -53,12 +54,11 @@ case class DqEngines(engines: Seq[DqEngine]) extends DqEngine {
     }
   }
 
-  def persistAllMetrics(timeInfo: TimeInfo, metricExports: Seq[MetricExport],
-                        procType: ProcessType, persistFactory: PersistFactory
+  def persistAllMetrics(timeInfo: TimeInfo, metricExports: Seq[MetricExport], persistFactory: PersistFactory
                        ): Unit = {
     val allMetrics: Map[Long, Map[String, Any]] = {
-      metricExports.foldLeft(Map[Long, Map[String, Any]]()) { (ret, step) =>
-        val metrics = collectMetrics(timeInfo, step, procType)
+      metricExports.foldLeft(Map[Long, Map[String, Any]]()) { (ret, metricExport) =>
+        val metrics = collectMetrics(timeInfo, metricExport)
         metrics.foldLeft(ret) { (total, pair) =>
           val (k, v) = pair
           total.get(k) match {
@@ -112,7 +112,7 @@ case class DqEngines(engines: Seq[DqEngine]) extends DqEngine {
     Await.result(pro.future, Duration.Inf)
   }
 
-  def persistAllRecords(timeInfo: TimeInfo, recordExports: Seq[RecordExport], procType: ProcessType,
+  def persistAllRecords(timeInfo: TimeInfo, recordExports: Seq[RecordExport],
                         persistFactory: PersistFactory, dataSources: Seq[DataSource]
                        ): Unit = {
     // method 1: multi thread persist multi data frame
@@ -124,13 +124,13 @@ case class DqEngines(engines: Seq[DqEngine]) extends DqEngine {
     // method 2: multi thread persist multi iterable
     recordExports.foreach { recordExport =>
 //      val records = collectRecords(timeInfo, recordExport, procType)
-      procType match {
-        case BatchProcessType => {
+      recordExport.mode match {
+        case SimpleMode => {
           collectBatchRecords(recordExport).foreach { rdd =>
             persistCollectedBatchRecords(timeInfo, recordExport, rdd, persistFactory)
           }
         }
-        case StreamingProcessType => {
+        case TimestampMode => {
           val (rddOpt, emptySet) = collectStreamingRecords(recordExport)
           persistCollectedStreamingRecords(recordExport, rddOpt, emptySet, persistFactory, dataSources)
 //          collectStreamingRecords(recordExport).foreach { rddPair =>
@@ -282,21 +282,20 @@ case class DqEngines(engines: Seq[DqEngine]) extends DqEngine {
 //      engine.collectUpdateCacheDatas(ruleStep, timeGroups)
 //    }.headOption
 //  }
-  def collectMetrics(timeInfo: TimeInfo, metricExport: MetricExport, procType: ProcessType
+  def collectMetrics(timeInfo: TimeInfo, metricExport: MetricExport
                     ): Map[Long, Map[String, Any]] = {
     val ret = engines.foldLeft(Map[Long, Map[String, Any]]()) { (ret, engine) =>
-      if (ret.nonEmpty) ret else engine.collectMetrics(timeInfo, metricExport, procType)
+      if (ret.nonEmpty) ret else engine.collectMetrics(timeInfo, metricExport)
     }
     ret
   }
 
-  def collectRecords(timeInfo: TimeInfo, recordExport: RecordExport, procType: ProcessType
-                    ): Map[Long, DataFrame] = {
-    val ret = engines.foldLeft(Map[Long, DataFrame]()) { (ret, engine) =>
-      if (ret.nonEmpty) ret else engine.collectRecords(timeInfo, recordExport, procType)
-    }
-    ret
-  }
+//  def collectRecords(timeInfo: TimeInfo, recordExport: RecordExport): Map[Long, DataFrame] = {
+//    val ret = engines.foldLeft(Map[Long, DataFrame]()) { (ret, engine) =>
+//      if (ret.nonEmpty) ret else engine.collectRecords(timeInfo, recordExport)
+//    }
+//    ret
+//  }
 
   def collectUpdateRDD(ruleStep: RuleStep): Option[DataFrame] = {
 //    engines.flatMap { engine =>

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
index f1e12d2..3bcecdb 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
@@ -20,7 +20,7 @@ package org.apache.griffin.measure.process.engine
 
 import org.apache.griffin.measure.cache.tmst.{TempName, TmstCache}
 import org.apache.griffin.measure.log.Loggable
-import org.apache.griffin.measure.process.{BatchProcessType, ProcessType, StreamingProcessType}
+import org.apache.griffin.measure.process._
 import org.apache.griffin.measure.rule.adaptor.InternalColumns
 import org.apache.griffin.measure.rule.dsl._
 import org.apache.griffin.measure.rule.plan._
@@ -68,20 +68,20 @@ trait SparkDqEngine extends DqEngine {
     }
   }
 
-  def collectMetrics(timeInfo: TimeInfo, metricExport: MetricExport, procType: ProcessType
+  def collectMetrics(timeInfo: TimeInfo, metricExport: MetricExport
                     ): Map[Long, Map[String, Any]] = {
     if (collectable) {
-      val MetricExport(name, stepName, collectType) = metricExport
+      val MetricExport(name, stepName, collectType, defTmst, mode) = metricExport
       try {
-        val metricMaps = getMetricMaps(stepName)
-        procType match {
-          case BatchProcessType => {
+        val metricMaps: Seq[Map[String, Any]] = getMetricMaps(stepName)
+        mode match {
+          case SimpleMode => {
             val metrics: Map[String, Any] = normalizeMetric(metricMaps, name, collectType)
-            emptyMetricMap + (timeInfo.calcTime -> metrics)
+            emptyMetricMap + (defTmst -> metrics)
           }
-          case StreamingProcessType => {
+          case TimestampMode => {
             val tmstMetrics = metricMaps.map { metric =>
-              val tmst = metric.getLong(InternalColumns.tmst, timeInfo.calcTime)
+              val tmst = metric.getLong(InternalColumns.tmst, defTmst)
               val pureMetric = metric.removeKeys(InternalColumns.columns)
               (tmst, pureMetric)
             }
@@ -103,44 +103,53 @@ trait SparkDqEngine extends DqEngine {
   }
 
 
-  def collectRecords(timeInfo: TimeInfo, recordExport: RecordExport, procType: ProcessType
-                    ): Map[Long, DataFrame] = {
-    if (collectable) {
-      val RecordExport(_, stepName, _, originDFOpt) = recordExport
-      val stepDf = sqlContext.table(s"`${stepName}`")
-      val recordsDf = originDFOpt match {
-        case Some(originName) => sqlContext.table(s"`${originName}`")
-        case _ => stepDf
-      }
-
-      procType match {
-        case BatchProcessType => {
-          val recordsDf = sqlContext.table(s"`${stepName}`")
-          emptyRecordMap + (timeInfo.calcTime -> recordsDf)
-        }
-        case StreamingProcessType => {
-          originDFOpt match {
-            case Some(originName) => {
-              val recordsDf = sqlContext.table(s"`${originName}`")
-              stepDf.collect.map { row =>
-                val tmst = row.getAs[Long](InternalColumns.tmst)
-                val trdf = recordsDf.filter(s"`${InternalColumns.tmst}` = ${tmst}")
-                (tmst, trdf)
-              }.toMap
-            }
-            case _ => {
-              val recordsDf = sqlContext.table(s"`${stepName}`")
-              emptyRecordMap + (timeInfo.calcTime -> recordsDf)
-            }
-          }
-        }
-      }
-    } else emptyRecordMap
+  private def getTmst(row: Row, defTmst: Long): Long = {
+    try {
+      row.getAs[Long](InternalColumns.tmst)
+    } catch {
+      case _: Throwable => defTmst
+    }
   }
 
+//  def collectRecords(timeInfo: TimeInfo, recordExport: RecordExport): Map[Long, DataFrame] = {
+//    if (collectable) {
+//      val RecordExport(_, stepName, _, originDFOpt, defTmst, procType) = recordExport
+//      val stepDf = sqlContext.table(s"`${stepName}`")
+//      val recordsDf = originDFOpt match {
+//        case Some(originName) => sqlContext.table(s"`${originName}`")
+//        case _ => stepDf
+//      }
+//
+//      procType match {
+//        case BatchProcessType => {
+//          val recordsDf = sqlContext.table(s"`${stepName}`")
+//          emptyRecordMap + (defTmst -> recordsDf)
+//        }
+//        case StreamingProcessType => {
+//          originDFOpt match {
+//            case Some(originName) => {
+//              val recordsDf = sqlContext.table(s"`${originName}`")
+//              stepDf.map { row =>
+//                val tmst = getTmst(row, defTmst)
+//                val trdf = if (recordsDf.columns.contains(InternalColumns.tmst)) {
+//                  recordsDf.filter(s"`${InternalColumns.tmst}` = ${tmst}")
+//                } else recordsDf
+//                (tmst, trdf)
+//              }.collect.toMap
+//            }
+//            case _ => {
+//              val recordsDf = stepDf
+//              emptyRecordMap + (defTmst -> recordsDf)
+//            }
+//          }
+//        }
+//      }
+//    } else emptyRecordMap
+//  }
+
   private def getRecordDataFrame(recordExport: RecordExport): Option[DataFrame] = {
     if (collectable) {
-      val RecordExport(_, stepName, _, _) = recordExport
+      val RecordExport(_, stepName, _, _, defTmst, procType) = recordExport
       val stepDf = sqlContext.table(s"`${stepName}`")
       Some(stepDf)
     } else None
@@ -151,14 +160,14 @@ trait SparkDqEngine extends DqEngine {
   }
 
   def collectStreamingRecords(recordExport: RecordExport): (Option[RDD[(Long, Iterable[String])]], Set[Long]) = {
-    val RecordExport(_, _, _, originDFOpt) = recordExport
+    val RecordExport(_, _, _, originDFOpt, defTmst, procType) = recordExport
     getRecordDataFrame(recordExport) match {
       case Some(stepDf) => {
         originDFOpt match {
           case Some(originName) => {
             val tmsts = (stepDf.collect.flatMap { row =>
               try {
-                val tmst = row.getAs[Long](InternalColumns.tmst)
+                val tmst = getTmst(row, defTmst)
                 val empty = row.getAs[Boolean](InternalColumns.empty)
                 Some((tmst, empty))
               } catch {
@@ -170,7 +179,7 @@ trait SparkDqEngine extends DqEngine {
             if (recordTmsts.size > 0) {
               val recordsDf = sqlContext.table(s"`${originName}`")
               val records = recordsDf.flatMap { row =>
-                val tmst = row.getAs[Long](InternalColumns.tmst)
+                val tmst = getTmst(row, defTmst)
                 if (recordTmsts.contains(tmst)) {
                   try {
                     val map = SparkRowFormatter.formatRow(row)
@@ -186,7 +195,7 @@ trait SparkDqEngine extends DqEngine {
           }
           case _ => {
             val records = stepDf.flatMap { row =>
-              val tmst = row.getAs[Long](InternalColumns.tmst)
+              val tmst = getTmst(row, defTmst)
               try {
                 val map = SparkRowFormatter.formatRow(row)
                 val str = JsonUtil.toJson(map)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
index 9de7955..dcb02f6 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
@@ -47,6 +47,9 @@ case class SparkSqlEngine(sqlContext: SQLContext) extends SparkDqEngine {
             }
           } else sqlContext.sql(rule)
 
+//          println(name)
+//          rdf.show(10)
+
           if (rs.isGlobal) {
             if (rs.needCache) DataFrameCaches.cacheGlobalDataFrame(name, rdf)
             TableRegisters.registerRunGlobalTable(rdf, name)