You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2017/09/30 08:35:26 UTC

[11/11] incubator-griffin git commit: Dsl modify

Dsl modify

dsl modified in measure module, with document in griffin-doc/dsl-guide.md

Author: Lionel Liu <bh...@163.com>

Closes #123 from bhlx3lyx7/dsl-modify.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/4aa6f779
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/4aa6f779
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/4aa6f779

Branch: refs/heads/master
Commit: 4aa6f779969650ee6d1871d342ad7e114b7ed4ce
Parents: ac8351f
Author: Lionel Liu <bh...@163.com>
Authored: Sat Sep 30 16:34:53 2017 +0800
Committer: William Guo <gu...@icloud.com>
Committed: Sat Sep 30 16:34:53 2017 +0800

----------------------------------------------------------------------
 griffin-doc/dsl-guide.md                        |  83 +++
 measure/derby.log                               |  13 +
 measure/src/main/resources/config-old.json      |  31 ++
 measure/src/main/resources/config-sql.json      |  54 ++
 measure/src/main/resources/config.json          |  71 ++-
 .../apache/griffin/measure/Application.scala    | 102 +++-
 .../griffin/measure/algo/AccuracyAlgo.scala     |  24 -
 .../org/apache/griffin/measure/algo/Algo.scala  |  34 --
 .../griffin/measure/algo/MeasureType.scala      |  26 -
 .../griffin/measure/algo/ProcessType.scala      |  26 -
 .../griffin/measure/algo/ProfileAlgo.scala      |  23 -
 .../measure/algo/batch/BatchAccuracyAlgo.scala  | 190 -------
 .../measure/algo/batch/BatchProfileAlgo.scala   | 162 ------
 .../measure/algo/core/AccuracyCore.scala        | 103 ----
 .../griffin/measure/algo/core/ProfileCore.scala |  73 ---
 .../algo/streaming/StreamingAccuracyAlgo.scala  | 358 -------------
 .../streaming/StreamingAccuracyProcess.scala    | 234 --------
 .../measure/algo/streaming/TimingProcess.scala  |  46 --
 .../measure/cache/info/TimeInfoCache.scala      |   2 +-
 .../cache/result/CacheResultProcesser.scala     |   2 +-
 .../config/params/user/DataCacheParam.scala     |  31 --
 .../config/params/user/DataConnectorParam.scala |   6 +-
 .../config/params/user/DataSourceParam.scala    |  31 ++
 .../config/params/user/EvaluateRuleParam.scala  |   4 +-
 .../measure/config/params/user/UserParam.scala  |  10 +-
 .../measure/connector/DataConnector.scala       |  32 --
 .../connector/DataConnectorFactory.scala        | 139 -----
 .../connector/cache/CacheDataConnector.scala    |  33 --
 .../measure/connector/cache/DataCacheable.scala |  86 ---
 .../measure/connector/cache/DataUpdatable.scala |  30 --
 .../cache/HiveCacheDataConnector.scala          | 351 ------------
 .../cache/TextCacheDataConnector.scala          | 311 -----------
 .../direct/AvroDirectDataConnector.scala        | 132 -----
 .../connector/direct/DirectDataConnector.scala  |  34 --
 .../direct/HiveDirectDataConnector.scala        | 158 ------
 .../direct/KafkaCacheDirectDataConnector.scala  | 125 -----
 .../StreamingCacheDirectDataConnector.scala     |  60 ---
 .../streaming/KafkaStreamingDataConnector.scala |  58 --
 .../streaming/StreamingDataConnector.scala      |  34 --
 .../measure/data/connector/DataConnector.scala  | 114 ++++
 .../data/connector/DataConnectorFactory.scala   | 150 ++++++
 .../batch/AvroBatchDataConnector.scala          | 112 ++++
 .../connector/batch/BatchDataConnector.scala    |  35 ++
 .../batch/HiveBatchDataConnector.scala          | 149 ++++++
 .../batch/KafkaCacheDirectDataConnector.scala   | 125 +++++
 .../StreamingCacheDirectDataConnector.scala     |  60 +++
 .../batch/TextDirBatchDataConnector.scala       | 136 +++++
 .../connector/cache/CacheDataConnector.scala    |  33 ++
 .../data/connector/cache/DataCacheable.scala    |  86 +++
 .../data/connector/cache/DataUpdatable.scala    |  30 ++
 .../cache/HiveCacheDataConnector.scala          | 351 ++++++++++++
 .../cache/TextCacheDataConnector.scala          | 311 +++++++++++
 .../streaming/KafkaStreamingDataConnector.scala |  70 +++
 .../KafkaStreamingStringDataConnector.scala     |  65 +++
 .../streaming/StreamingDataConnector.scala      |  43 ++
 .../measure/data/source/DataCacheable.scala     |  76 +++
 .../measure/data/source/DataSource.scala        | 109 ++++
 .../measure/data/source/DataSourceCache.scala   | 347 ++++++++++++
 .../measure/data/source/DataSourceFactory.scala |  80 +++
 .../griffin/measure/persist/HdfsPersist.scala   | 240 ++++++---
 .../griffin/measure/persist/HttpPersist.scala   |  64 ++-
 .../griffin/measure/persist/LoggerPersist.scala | 173 +++---
 .../griffin/measure/persist/MultiPersists.scala |  14 +-
 .../measure/persist/OldHttpPersist.scala        | 174 +++---
 .../griffin/measure/persist/Persist.scala       |  24 +-
 .../measure/persist/PersistFactory.scala        |   4 +-
 .../apache/griffin/measure/process/Algo.scala   |  34 ++
 .../measure/process/BatchDqProcess.scala        | 117 ++++
 .../griffin/measure/process/DqProcess.scala     |  40 ++
 .../griffin/measure/process/ProcessType.scala   |  47 ++
 .../measure/process/StreamingDqProcess.scala    | 157 ++++++
 .../measure/process/StreamingDqThread.scala     | 185 +++++++
 .../griffin/measure/process/TimingProcess.scala |  46 ++
 .../measure/process/check/DataChecker.scala     |  29 +
 .../process/engine/DataFrameOprEngine.scala     | 165 ++++++
 .../measure/process/engine/DqEngine.scala       |  41 ++
 .../process/engine/DqEngineFactory.scala        |  47 ++
 .../measure/process/engine/DqEngines.scala      | 208 ++++++++
 .../measure/process/engine/SparkDqEngine.scala  | 167 ++++++
 .../process/engine/SparkRowFormatter.scala      |  62 +++
 .../measure/process/engine/SparkSqlEngine.scala |  58 ++
 .../griffin/measure/rule/CalculationUtil.scala  | 315 -----------
 .../measure/rule/DataTypeCalculationUtil.scala  | 159 ------
 .../griffin/measure/rule/ExprValueUtil.scala    | 263 ---------
 .../griffin/measure/rule/RuleAnalyzer.scala     |  72 ---
 .../griffin/measure/rule/RuleFactory.scala      |  52 --
 .../griffin/measure/rule/RuleParser.scala       | 244 ---------
 .../measure/rule/SchemaValueCombineUtil.scala   | 187 -------
 .../measure/rule/adaptor/AdaptPhase.scala       |  25 +
 .../rule/adaptor/DataFrameOprAdaptor.scala      |  44 ++
 .../rule/adaptor/GriffinDslAdaptor.scala        | 349 ++++++++++++
 .../measure/rule/adaptor/RuleAdaptor.scala      |  72 +++
 .../measure/rule/adaptor/RuleAdaptorGroup.scala | 105 ++++
 .../measure/rule/adaptor/SparkSqlAdaptor.scala  |  54 ++
 .../griffin/measure/rule/dsl/DqType.scala       |  58 ++
 .../griffin/measure/rule/dsl/DslType.scala      |  58 ++
 .../griffin/measure/rule/dsl/PersistType.scala  |  58 ++
 .../rule/dsl/analyzer/AccuracyAnalyzer.scala    |  41 ++
 .../rule/dsl/analyzer/BasicAnalyzer.scala       |  53 ++
 .../rule/dsl/analyzer/ProfilingAnalyzer.scala   |  52 ++
 .../measure/rule/dsl/expr/AliasableExpr.scala   |  25 +
 .../rule/dsl/expr/ClauseExpression.scala        | 150 ++++++
 .../griffin/measure/rule/dsl/expr/Expr.scala    |  29 +
 .../measure/rule/dsl/expr/FunctionExpr.scala    |  29 +
 .../measure/rule/dsl/expr/LiteralExpr.scala     |  72 +++
 .../measure/rule/dsl/expr/LogicalExpr.scala     | 170 ++++++
 .../measure/rule/dsl/expr/MathExpr.scala        |  80 +++
 .../measure/rule/dsl/expr/SelectExpr.scala      | 115 ++++
 .../measure/rule/dsl/expr/TreeNode.scala        |  45 ++
 .../measure/rule/dsl/parser/BasicParser.scala   | 337 ++++++++++++
 .../rule/dsl/parser/GriffinDslParser.scala      |  50 ++
 .../measure/rule/expr/AnalyzableExpr.scala      |  24 -
 .../griffin/measure/rule/expr/Cacheable.scala   |  33 --
 .../measure/rule/expr/Calculatable.scala        |  25 -
 .../griffin/measure/rule/expr/ClauseExpr.scala  | 109 ----
 .../measure/rule/expr/DataSourceable.scala      |  28 -
 .../griffin/measure/rule/expr/Describable.scala |  33 --
 .../apache/griffin/measure/rule/expr/Expr.scala |  53 --
 .../measure/rule/expr/ExprDescOnly.scala        |  40 --
 .../measure/rule/expr/ExprIdCounter.scala       |  60 ---
 .../measure/rule/expr/FieldDescOnly.scala       |  58 --
 .../griffin/measure/rule/expr/LiteralExpr.scala |  83 ---
 .../griffin/measure/rule/expr/LogicalExpr.scala | 178 -------
 .../griffin/measure/rule/expr/MathExpr.scala    |  99 ----
 .../griffin/measure/rule/expr/SelectExpr.scala  |  88 ---
 .../rule/func/DefaultFunctionDefine.scala       |  36 --
 .../measure/rule/func/FunctionDefine.scala      |  25 -
 .../measure/rule/func/FunctionUtil.scala        |  75 ---
 .../rule/preproc/PreProcRuleGenerator.scala     |  72 +++
 .../measure/rule/step/ConcreteRuleStep.scala    |  37 ++
 .../griffin/measure/rule/step/DfOprStep.scala   |  29 +
 .../measure/rule/step/GriffinDslStep.scala      |  28 +
 .../griffin/measure/rule/step/RuleStep.scala    |  31 ++
 .../measure/rule/step/SparkSqlStep.scala        |  30 ++
 .../griffin/measure/rule/udf/GriffinUdfs.scala  |  33 ++
 .../measure/utils/HdfsFileDumpUtil.scala        |   2 +-
 .../apache/griffin/measure/utils/HdfsUtil.scala |  71 ++-
 .../griffin/measure/utils/ParamUtil.scala       | 164 ++++++
 .../apache/griffin/measure/utils/TimeUtil.scala |   4 +-
 .../config-test-accuracy-streaming-multids.json | 144 +++++
 .../config-test-accuracy-streaming.json         | 119 +++++
 .../test/resources/config-test-accuracy.json    |  56 ++
 .../config-test-profiling-streaming.json        |  68 +++
 .../test/resources/config-test-profiling.json   |  37 ++
 measure/src/test/resources/config-test.json     |  55 ++
 measure/src/test/resources/config-test1.json    |  96 ++++
 measure/src/test/resources/config.json          |   2 +-
 measure/src/test/resources/env-streaming.json   |   1 +
 measure/src/test/resources/env-test.json        |  38 ++
 measure/src/test/resources/input.msg            |   1 +
 measure/src/test/resources/output.msg           |   1 +
 measure/src/test/resources/test-data.jsonFile   |   3 +
 measure/src/test/resources/test-data0.json      |  56 ++
 measure/src/test/resources/test-data1.jsonFile  |  31 ++
 .../algo/batch/BatchAccuracyAlgoTest.scala      | 198 -------
 .../algo/batch/BatchProfileAlgoTest.scala       | 173 ------
 .../measure/algo/batch/DataFrameSaveTest.scala  | 172 ------
 .../measure/algo/core/AccuracyCoreTest.scala    |  89 ----
 .../measure/algo/core/ProfileCoreTest.scala     |  79 ---
 .../streaming/StreamingAccuracyAlgoTest.scala   | 267 ----------
 .../reader/ParamRawStringReaderTest.scala       |   3 +-
 .../measure/connector/ConnectorTest.scala       |  70 ---
 .../measure/data/connector/ConnectorTest.scala  |  71 +++
 .../measure/process/BatchProcessTest.scala      | 146 +++++
 .../griffin/measure/process/JsonParseTest.scala | 531 +++++++++++++++++++
 .../griffin/measure/process/JsonToStructs.scala |  85 +++
 .../measure/process/StreamingProcessTest.scala  | 147 +++++
 .../measure/rule/ExprValueUtilTest.scala        |  86 ---
 .../griffin/measure/rule/RuleAnalyzerTest.scala |  60 ---
 .../griffin/measure/rule/RuleFactoryTest.scala  |  44 --
 .../griffin/measure/rule/RuleParserTest.scala   | 213 --------
 .../rule/adaptor/GriffinDslAdaptorTest.scala    |  65 +++
 .../rule/dsl/parser/BasicParserTest.scala       | 205 +++++++
 .../apache/griffin/measure/sql/SqlTest.scala    | 125 +++++
 .../griffin/measure/utils/HdfsUtilTest.scala    | 132 +++++
 .../griffin/measure/utils/JsonUtilTest.scala    | 120 ++---
 .../griffin/measure/utils/ParamUtilTest.scala   |  50 ++
 177 files changed, 9536 insertions(+), 7114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/griffin-doc/dsl-guide.md
----------------------------------------------------------------------
diff --git a/griffin-doc/dsl-guide.md b/griffin-doc/dsl-guide.md
new file mode 100644
index 0000000..6a7b3f8
--- /dev/null
+++ b/griffin-doc/dsl-guide.md
@@ -0,0 +1,83 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+# Apache Griffin DSL Guide
+Griffin DSL is designed for DQ measurement, as a SQL-like language, trying to describe the DQ domain request.
+
+## Griffin DSL Syntax Description
+Griffin DSL is SQL-like, case insensitive, and easy to learn.
+
+### Supporting process
+- logical operation: not, and, or, in, between, like, is null, is nan, =, !=, <=, >=, <, >
+- mathematical operation: +, -, *, /, %
+- sql statement: as, where, group by, having, order by, limit
+
+
+### Keywords
+- `null, nan, true, false`
+- `not, and, or`
+- `in, between, like, is`
+- `as, where, group, by, having, order, desc, asc, limit`
+
+### Operators
+- `!, &&, ||, =, !=, <, >, <=, >=, <>`
+- `+, -, *, /, %`
+- `(, )`
+- `., [, ]`
+
+### Literals
+- **string**: any string surrounded with a pair of " or ', with escape charactor \ if any request.  
+	e.g. `"test", 'string 1', "hello \" world \" "`
+- **number**: double or integer number.  
+	e.g. `123, 33.5`
+- **time**: a integer with unit in a string, will be translated to a integer number in millisecond.  
+	e.g. `3d, 5h, 4ms`
+- **boolean**: boolean value directly.  
+	e.g. `true, false`
+
+### Selections
+- **selection head**: data source name.  
+	e.g. `source, target`
+- **all field selection**: * or with data source name ahead.  
+	e.g. `*, source.*, target.*`
+- **field selection**: field name or with data source name ahead.  
+	e.g. `source.age, target.name, user_id`
+- **index selection**: interget between square brackets "[]" with field name ahead.  
+	e.g. `source.attributes[3]`
+- **function selection**: function name with brackets "()", with field name ahead or not.  
+	e.g. `count(*), *.count(), source.user_id.count(), max(source.age)`
+- **alias**: declare an alias after a selection.  
+	e.g. `source.user_id as id, target.user_name as name`
+
+### Math expressions
+- **math factor**: literal or function or selection or math exression with brackets.  
+	e.g. `123, max(1, 2, 3, 4), source.age, (source.age + 13)`
+- **unary math expression**: unary math operator with factor.  
+	e.g. `-(100 - source.score)`
+- **binary math expression**: math factors with binary math operators.  
+	e.g. `source.age + 13, score * 2 + ratio`
+
+### Logical expression
+- **in**: in clause like sql.  
+	e.g. `source.country in ("USA", "CHN", "RSA")`
+- **between**: between clause like sql.  
+	e.g. `source.age between 3 and 30, source.age between (3, 30)`
+- **like**: like clause like sql.  
+	e.g. `source.name like "%abc%"`
+- **logical factor**: 
+

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/derby.log
----------------------------------------------------------------------
diff --git a/measure/derby.log b/measure/derby.log
new file mode 100644
index 0000000..4b93055
--- /dev/null
+++ b/measure/derby.log
@@ -0,0 +1,13 @@
+----------------------------------------------------------------
+Fri Sep 29 15:53:18 CST 2017:
+Booting Derby version The Apache Software Foundation - Apache Derby - 10.10.2.0 - (1582446): instance a816c00e-015e-cca0-1a8b-00000f890648 
+on database directory /private/var/folders/p0/462y3wrn4lv1fptxx5bwy7b839572r/T/spark-890ab6e2-ee56-4d73-8c6a-0dcce204322e/metastore with class loader sun.misc.Launcher$AppClassLoader@18b4aac2 
+Loaded from file:/Users/lliu13/.m2/repository/org/apache/derby/derby/10.10.2.0/derby-10.10.2.0.jar
+java.vendor=Oracle Corporation
+java.runtime.version=1.8.0_101-b13
+user.dir=/Users/lliu13/git/incubator-griffin/measure
+os.name=Mac OS X
+os.arch=x86_64
+os.version=10.12.6
+derby.system.home=null
+Database Class Loader started - derby.database.classpath=''

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/resources/config-old.json
----------------------------------------------------------------------
diff --git a/measure/src/main/resources/config-old.json b/measure/src/main/resources/config-old.json
new file mode 100644
index 0000000..ab32b75
--- /dev/null
+++ b/measure/src/main/resources/config-old.json
@@ -0,0 +1,31 @@
+{
+  "name": "accu1",
+  "type": "accuracy",
+
+  "process.type": "batch",
+
+  "source": {
+    "type": "hive",
+    "version": "1.2",
+    "config": {
+      "database": "default",
+      "table.name": "users_info_src",
+      "partitions": "dt=23123, hour=432; dt=35464, hour=4657"
+    }
+  },
+
+  "target": {
+    "type": "hive",
+    "version": "1.2",
+    "config": {
+      "database": "default",
+      "table.name": "users_info_target",
+      "partitions": "dt=23123, hour=432; dt=35464, hour=4657"
+    }
+  },
+
+  "evaluateRule": {
+    "sampleRatio": 0.2,
+    "rules": "$source.user_id = $target.user_id AND $source.first_name = $target.first_name AND $source.last_name = $target.last_name AND $source.address = $target.address AND $source.email = $target.email AND $source.phone = $target.phone AND $source.post_code = $target.post_code"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/resources/config-sql.json
----------------------------------------------------------------------
diff --git a/measure/src/main/resources/config-sql.json b/measure/src/main/resources/config-sql.json
new file mode 100644
index 0000000..aad9584
--- /dev/null
+++ b/measure/src/main/resources/config-sql.json
@@ -0,0 +1,54 @@
+{
+  "name": "accu1",
+
+  "process.type": "batch",
+
+  "data.sources": [
+    {
+      "name": "source",
+      "connectors": [
+        {
+          "type": "hive",
+          "version": "1.2",
+          "config": {
+            "database": "default",
+            "table.name": "users_info_src",
+            "partitions": "dt=23123, hour=432; dt=35464, hour=4657"
+          }
+        }
+      ]
+    }, {
+      "name": "target",
+      "connectors": [
+        {
+          "type": "hive",
+          "version": "1.2",
+          "config": {
+            "database": "default",
+            "table.name": "users_info_target",
+            "partitions": "dt=23123, hour=432; dt=35464, hour=4657"
+          }
+        }
+      ]
+    }
+  ],
+
+  "evaluateRule": {
+    "dsl.type": "spark-sql",
+    "rules": [
+      {
+        "name": "miss.record",
+        "rule": "SELECT source.name FROM source LEFT JOIN target ON coalesce(source.name, 'null') = coalesce(target.name, 'null') WHERE (NOT (source.name IS NULL)) AND (target.name IS NULL)",
+        "persist.type": "record"
+      }, {
+        "name": "miss.count",
+        "rule": "SELECT COUNT(*) FROM miss",
+        "persist.type": "metric"
+      }, {
+        "name": "total.count",
+        "rule": "SELECT COUNT(*) FROM source",
+        "persist.type": "metric"
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/resources/config.json
----------------------------------------------------------------------
diff --git a/measure/src/main/resources/config.json b/measure/src/main/resources/config.json
index ab32b75..b6e5af9 100644
--- a/measure/src/main/resources/config.json
+++ b/measure/src/main/resources/config.json
@@ -1,31 +1,60 @@
 {
   "name": "accu1",
-  "type": "accuracy",
 
   "process.type": "batch",
 
-  "source": {
-    "type": "hive",
-    "version": "1.2",
-    "config": {
-      "database": "default",
-      "table.name": "users_info_src",
-      "partitions": "dt=23123, hour=432; dt=35464, hour=4657"
+  "data.sources": [
+    {
+      "name": "source",
+      "connectors": [
+        {
+          "type": "hive",
+          "version": "1.2",
+          "config": {
+            "database": "default",
+            "table.name": "users_info_src",
+            "partitions": "dt=23123, hour=432; dt=35464, hour=4657"
+          }
+        }
+      ]
+    }, {
+      "name": "target",
+      "connectors": [
+        {
+          "type": "hive",
+          "version": "1.2",
+          "config": {
+            "database": "default",
+            "table.name": "users_info_target",
+            "partitions": "dt=23123, hour=432; dt=35464, hour=4657"
+          }
+        }
+      ]
     }
-  },
-
-  "target": {
-    "type": "hive",
-    "version": "1.2",
-    "config": {
-      "database": "default",
-      "table.name": "users_info_target",
-      "partitions": "dt=23123, hour=432; dt=35464, hour=4657"
-    }
-  },
+  ],
 
   "evaluateRule": {
-    "sampleRatio": 0.2,
-    "rules": "$source.user_id = $target.user_id AND $source.first_name = $target.first_name AND $source.last_name = $target.last_name AND $source.address = $target.address AND $source.email = $target.email AND $source.phone = $target.phone AND $source.post_code = $target.post_code"
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "accuracy",
+        "rule": "source.user_id = target.user_id AND source.first_name = target.first_name AND source.last_name = target.last_name AND source.address = target.address AND source.email = target.email AND source.phone = target.phone AND source.post_code = target.post_code",
+        "details": {
+          "source": "source",
+          "miss.record": {
+            "name": "miss.record",
+            "persist.type": "record"
+          },
+          "miss.count": {
+            "name": "miss.count",
+            "persist.type": "metric"
+          },
+          "total.count": {
+            "name": "total.count",
+            "persist.type": "metric"
+          }
+        }
+      }
+    ]
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/Application.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/Application.scala b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
index af8c830..edbb552 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/Application.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
@@ -18,9 +18,6 @@ under the License.
 */
 package org.apache.griffin.measure
 
-import org.apache.griffin.measure.algo._
-import org.apache.griffin.measure.algo.batch._
-import org.apache.griffin.measure.algo.streaming._
 import org.apache.griffin.measure.config.params._
 import org.apache.griffin.measure.config.params.env._
 import org.apache.griffin.measure.config.params.user._
@@ -28,6 +25,7 @@ import org.apache.griffin.measure.config.reader._
 import org.apache.griffin.measure.config.validator.AllParamValidator
 import org.apache.griffin.measure.log.Loggable
 import org.apache.griffin.measure.persist.PersistThreadPool
+import org.apache.griffin.measure.process._
 
 import scala.util.{Failure, Success, Try}
 
@@ -81,39 +79,91 @@ object Application extends Loggable {
     }
 
     // choose algorithm
-    val dqType = allParam.userParam.dqType
-    val procType = allParam.userParam.procType
-    val algo: Algo = (dqType, procType) match {
-      case (MeasureType.accuracy(), ProcessType.batch()) => BatchAccuracyAlgo(allParam)
-      case (MeasureType.profile(), ProcessType.batch()) => BatchProfileAlgo(allParam)
-      case (MeasureType.accuracy(), ProcessType.streaming()) => StreamingAccuracyAlgo(allParam)
-//      case (MeasureType.profile(), ProcessType.streaming()) => StreamingProfileAlgo(allParam)
+//    val dqType = allParam.userParam.dqType
+    val procType = ProcessType(allParam.userParam.procType)
+    val proc: DqProcess = procType match {
+      case BatchProcessType => BatchDqProcess(allParam)
+      case StreamingProcessType => StreamingDqProcess(allParam)
       case _ => {
-        error(s"${dqType} with ${procType} is unsupported dq type!")
+        error(s"${procType} is unsupported process type!")
         sys.exit(-4)
       }
     }
 
-    // algorithm run
-    algo.run match {
+    // process init
+    proc.init match {
+      case Success(_) => {
+        info("process init success")
+      }
       case Failure(ex) => {
-        error(s"app error: ${ex.getMessage}")
-
-        procType match {
-          case ProcessType.streaming() => {
-            // streaming need to attempt more times by spark streaming itself
-            throw ex
-          }
-          case _ => {
-            shutdown
-            sys.exit(-5)
-          }
+        error(s"process init error: ${ex.getMessage}")
+        shutdown
+        sys.exit(-5)
+      }
+    }
+
+    // process run
+    proc.run match {
+      case Success(_) => {
+        info("process run success")
+      }
+      case Failure(ex) => {
+        error(s"process run error: ${ex.getMessage}")
+
+        if (proc.retriable) {
+          throw ex
+        } else {
+          shutdown
+          sys.exit(-5)
         }
       }
-      case _ => {
-        info("app finished and success")
+    }
+
+    // process end
+    proc.end match {
+      case Success(_) => {
+        info("process end success")
+      }
+      case Failure(ex) => {
+        error(s"process end error: ${ex.getMessage}")
+        shutdown
+        sys.exit(-5)
       }
     }
+
+    shutdown
+
+//    val algo: Algo = (dqType, procType) match {
+//      case (MeasureType.accuracy(), ProcessType.batch()) => BatchAccuracyAlgo(allParam)
+//      case (MeasureType.profile(), ProcessType.batch()) => BatchProfileAlgo(allParam)
+//      case (MeasureType.accuracy(), ProcessType.streaming()) => StreamingAccuracyAlgo(allParam)
+////      case (MeasureType.profile(), ProcessType.streaming()) => StreamingProfileAlgo(allParam)
+//      case _ => {
+//        error(s"${dqType} with ${procType} is unsupported dq type!")
+//        sys.exit(-4)
+//      }
+//    }
+
+    // algorithm run
+//    algo.run match {
+//      case Failure(ex) => {
+//        error(s"app error: ${ex.getMessage}")
+//
+//        procType match {
+//          case ProcessType.streaming() => {
+//            // streaming need to attempt more times by spark streaming itself
+//            throw ex
+//          }
+//          case _ => {
+//            shutdown
+//            sys.exit(-5)
+//          }
+//        }
+//      }
+//      case _ => {
+//        info("app finished and success")
+//      }
+//    }
   }
 
   private def readParamFile[T <: Param](file: String, fsType: String)(implicit m : Manifest[T]): Try[T] = {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/algo/AccuracyAlgo.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/algo/AccuracyAlgo.scala b/measure/src/main/scala/org/apache/griffin/measure/algo/AccuracyAlgo.scala
deleted file mode 100644
index 7e0a563..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/algo/AccuracyAlgo.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.algo
-
-
-trait AccuracyAlgo extends Algo {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/algo/Algo.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/algo/Algo.scala b/measure/src/main/scala/org/apache/griffin/measure/algo/Algo.scala
deleted file mode 100644
index 82b71f1..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/algo/Algo.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.algo
-
-import org.apache.griffin.measure.config.params.env._
-import org.apache.griffin.measure.config.params.user._
-import org.apache.griffin.measure.log.Loggable
-
-import scala.util.Try
-
-trait Algo extends Loggable with Serializable {
-
-  val envParam: EnvParam
-  val userParam: UserParam
-
-  def run(): Try[_]
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/algo/MeasureType.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/algo/MeasureType.scala b/measure/src/main/scala/org/apache/griffin/measure/algo/MeasureType.scala
deleted file mode 100644
index 23d4dac..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/algo/MeasureType.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.algo
-
-object MeasureType {
-
-  val accuracy = """^(?i)accuracy$""".r
-  val profile = """^(?i)profile$""".r
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/algo/ProcessType.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/algo/ProcessType.scala b/measure/src/main/scala/org/apache/griffin/measure/algo/ProcessType.scala
deleted file mode 100644
index 5a85c7c..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/algo/ProcessType.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.algo
-
-object ProcessType {
-
-  val batch = """^(?i)batch$""".r
-  val streaming = """^(?i)streaming$""".r
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/algo/ProfileAlgo.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/algo/ProfileAlgo.scala b/measure/src/main/scala/org/apache/griffin/measure/algo/ProfileAlgo.scala
deleted file mode 100644
index 6ffc87a..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/algo/ProfileAlgo.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.algo
-
-trait ProfileAlgo extends Algo {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/algo/batch/BatchAccuracyAlgo.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/algo/batch/BatchAccuracyAlgo.scala b/measure/src/main/scala/org/apache/griffin/measure/algo/batch/BatchAccuracyAlgo.scala
deleted file mode 100644
index 241f456..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/algo/batch/BatchAccuracyAlgo.scala
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.algo.batch
-
-import java.util.Date
-
-import org.apache.griffin.measure.algo.AccuracyAlgo
-import org.apache.griffin.measure.algo.core.AccuracyCore
-import org.apache.griffin.measure.config.params.AllParam
-import org.apache.griffin.measure.connector._
-import org.apache.griffin.measure.connector.direct.DirectDataConnector
-import org.apache.griffin.measure.persist._
-import org.apache.griffin.measure.result._
-import org.apache.griffin.measure.rule._
-import org.apache.griffin.measure.rule.expr._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.hive.HiveContext
-import org.apache.spark.{SparkConf, SparkContext}
-
-import scala.util.{Failure, Success, Try}
-
-// accuracy algorithm for batch mode
-case class BatchAccuracyAlgo(allParam: AllParam) extends AccuracyAlgo {
-  val envParam = allParam.envParam
-  val userParam = allParam.userParam
-
-  def run(): Try[_] = {
-    Try {
-      val metricName = userParam.name
-
-      val sparkParam = envParam.sparkParam
-
-      val conf = new SparkConf().setAppName(metricName)
-      conf.setAll(sparkParam.config)
-      val sc = new SparkContext(conf)
-      sc.setLogLevel(sparkParam.logLevel)
-      val sqlContext = new HiveContext(sc)
-
-      // start time
-      val startTime = new Date().getTime()
-
-      // get persists to persist measure result
-      val persist: Persist = PersistFactory(envParam.persistParams, metricName).getPersists(startTime)
-
-      // get spark application id
-      val applicationId = sc.applicationId
-
-      // persist start id
-      persist.start(applicationId)
-
-      // generate rule from rule param, generate rule analyzer
-      val ruleFactory = RuleFactory(userParam.evaluateRuleParam)
-      val rule: StatementExpr = ruleFactory.generateRule()
-      val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
-
-      // const expr value map
-      val constExprValueMap = ExprValueUtil.genExprValueMaps(None, ruleAnalyzer.constCacheExprs, Map[String, Any]())
-      val finalConstExprValueMap = ExprValueUtil.updateExprValueMaps(ruleAnalyzer.constFinalCacheExprs, constExprValueMap)
-      val finalConstMap = finalConstExprValueMap.headOption match {
-        case Some(m) => m
-        case _ => Map[String, Any]()
-      }
-
-      // data connector
-      val sourceDataConnector: DirectDataConnector =
-        DataConnectorFactory.getDirectDataConnector(sqlContext, null, userParam.sourceParam,
-          ruleAnalyzer.sourceRuleExprs, finalConstMap
-        ) match {
-          case Success(cntr) => {
-            if (cntr.available) cntr
-            else throw new Exception("source data connection error!")
-          }
-          case Failure(ex) => throw ex
-        }
-      val targetDataConnector: DirectDataConnector =
-        DataConnectorFactory.getDirectDataConnector(sqlContext, null, userParam.targetParam,
-          ruleAnalyzer.targetRuleExprs, finalConstMap
-        ) match {
-          case Success(cntr) => {
-            if (cntr.available) cntr
-            else throw new Exception("target data connection error!")
-          }
-          case Failure(ex) => throw ex
-        }
-
-      // get metadata
-//      val sourceMetaData: Iterable[(String, String)] = sourceDataConnector.metaData() match {
-//        case Success(md) => md
-//        case _ => throw new Exception("source metadata error!")
-//      }
-//      val targetMetaData: Iterable[(String, String)] = targetDataConnector.metaData() match {
-//        case Success(md) => md
-//        case _ => throw new Exception("target metadata error!")
-//      }
-
-      // get data
-      val sourceData: RDD[(Product, (Map[String, Any], Map[String, Any]))] = sourceDataConnector.data() match {
-        case Success(dt) => dt
-        case Failure(ex) => throw ex
-      }
-      val targetData: RDD[(Product, (Map[String, Any], Map[String, Any]))] = targetDataConnector.data() match {
-        case Success(dt) => dt
-        case Failure(ex) => throw ex
-      }
-
-      // accuracy algorithm
-      val (accuResult, missingRdd, matchedRdd) = accuracy(sourceData, targetData, ruleAnalyzer)
-
-      // end time
-      val endTime = new Date().getTime
-      persist.log(endTime, s"calculation using time: ${endTime - startTime} ms")
-
-      // persist result
-      persist.result(endTime, accuResult)
-      val missingRecords = missingRdd.map(record2String(_, ruleAnalyzer.sourceRuleExprs.persistExprs, ruleAnalyzer.targetRuleExprs.persistExprs))
-//      persist.missRecords(missingRecords)
-      persist.records(missingRecords, PersistType.MISS)
-
-      // persist end time
-      val persistEndTime = new Date().getTime
-      persist.log(persistEndTime, s"persist using time: ${persistEndTime - endTime} ms")
-
-      // finish
-      persist.finish()
-
-      // context stop
-      sc.stop
-
-    }
-  }
-
-  def wrapInitData(data: Map[String, Any]): (Map[String, Any], Map[String, Any]) = {
-    (data, Map[String, Any]())
-  }
-
-  // calculate accuracy between source data and target data
-  def accuracy(sourceData: RDD[(Product, (Map[String, Any], Map[String, Any]))],
-               targetData: RDD[(Product, (Map[String, Any], Map[String, Any]))],
-               ruleAnalyzer: RuleAnalyzer) = {
-    // 1. cogroup
-    val allKvs = sourceData.cogroup(targetData)
-
-    // 2. accuracy calculation
-    val (accuResult, missingRdd, matchedRdd) = AccuracyCore.accuracy(allKvs, ruleAnalyzer)
-
-    (accuResult, missingRdd, matchedRdd)
-  }
-
-  // convert data into a string
-  def record2String(rec: (Product, (Map[String, Any], Map[String, Any])), sourcePersist: Iterable[Expr], targetPersist: Iterable[Expr]): String = {
-    val (key, (data, info)) = rec
-    val persistData = getPersistMap(data, sourcePersist)
-    val persistInfo = info.mapValues { value =>
-      value match {
-        case vd: Map[String, Any] => getPersistMap(vd, targetPersist)
-        case v => v
-      }
-    }.map(identity)
-    s"${persistData} [${persistInfo}]"
-  }
-
-  // get the expr value map of the persist expressions
-  private def getPersistMap(data: Map[String, Any], persist: Iterable[Expr]): Map[String, Any] = {
-    val persistMap = persist.map(e => (e._id, e.desc)).toMap
-    data.flatMap { pair =>
-      val (k, v) = pair
-      persistMap.get(k) match {
-        case Some(d) => Some((d -> v))
-        case _ => None
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/algo/batch/BatchProfileAlgo.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/algo/batch/BatchProfileAlgo.scala b/measure/src/main/scala/org/apache/griffin/measure/algo/batch/BatchProfileAlgo.scala
deleted file mode 100644
index 163a0b7..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/algo/batch/BatchProfileAlgo.scala
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.algo.batch
-
-import java.util.Date
-
-import org.apache.griffin.measure.algo.ProfileAlgo
-import org.apache.griffin.measure.algo.core.ProfileCore
-import org.apache.griffin.measure.config.params._
-import org.apache.griffin.measure.connector._
-import org.apache.griffin.measure.connector.direct.DirectDataConnector
-import org.apache.griffin.measure.persist.{Persist, PersistFactory, PersistType}
-import org.apache.griffin.measure.result._
-import org.apache.griffin.measure.rule.expr._
-import org.apache.griffin.measure.rule.{ExprValueUtil, RuleAnalyzer, RuleFactory}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.hive.HiveContext
-import org.apache.spark.{SparkConf, SparkContext}
-
-import scala.util.{Failure, Success, Try}
-
-// profile algorithm for batch mode
-case class BatchProfileAlgo(allParam: AllParam) extends ProfileAlgo {
-  val envParam = allParam.envParam
-  val userParam = allParam.userParam
-
-  def run(): Try[_] = {
-    Try {
-      val metricName = userParam.name
-
-      val sparkParam = envParam.sparkParam
-
-      val conf = new SparkConf().setAppName(metricName)
-      conf.setAll(sparkParam.config)
-      val sc = new SparkContext(conf)
-      sc.setLogLevel(sparkParam.logLevel)
-      val sqlContext = new HiveContext(sc)
-
-      // start time
-      val startTime = new Date().getTime()
-
-      // get persists to persist measure result
-      val persist: Persist = PersistFactory(envParam.persistParams, metricName).getPersists(startTime)
-
-      // get spark application id
-      val applicationId = sc.applicationId
-
-      // persist start id
-      persist.start(applicationId)
-
-      // generate rule from rule param, generate rule analyzer
-      val ruleFactory = RuleFactory(userParam.evaluateRuleParam)
-      val rule: StatementExpr = ruleFactory.generateRule()
-      val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
-
-      // const expr value map
-      val constExprValueMap = ExprValueUtil.genExprValueMaps(None, ruleAnalyzer.constCacheExprs, Map[String, Any]())
-      val finalConstExprValueMap = ExprValueUtil.updateExprValueMaps(ruleAnalyzer.constFinalCacheExprs, constExprValueMap)
-      val finalConstMap = finalConstExprValueMap.headOption match {
-        case Some(m) => m
-        case _ => Map[String, Any]()
-      }
-
-      // data connector
-      val sourceDataConnector: DirectDataConnector =
-      DataConnectorFactory.getDirectDataConnector(sqlContext, null, userParam.sourceParam,
-        ruleAnalyzer.sourceRuleExprs, finalConstMap
-      ) match {
-        case Success(cntr) => {
-          if (cntr.available) cntr
-          else throw new Exception("source data connection error!")
-        }
-        case Failure(ex) => throw ex
-      }
-
-      // get metadata
-      //      val sourceMetaData: Iterable[(String, String)] = sourceDataConnector.metaData() match {
-      //        case Success(md) => md
-      //        case _ => throw new Exception("source metadata error!")
-      //      }
-
-      // get data
-      val sourceData: RDD[(Product, (Map[String, Any], Map[String, Any]))] = sourceDataConnector.data() match {
-        case Success(dt) => dt
-        case Failure(ex) => throw ex
-      }
-
-      // profile algorithm
-      val (profileResult, missingRdd, matchedRdd) = profile(sourceData, ruleAnalyzer)
-
-      // end time
-      val endTime = new Date().getTime
-      persist.log(endTime, s"calculation using time: ${endTime - startTime} ms")
-
-      // persist result
-      persist.result(endTime, profileResult)
-      val matchedRecords = matchedRdd.map(record2String(_, ruleAnalyzer.sourceRuleExprs.persistExprs))
-//      persist.matchRecords(matchedRecords)
-      persist.records(matchedRecords, PersistType.MATCH)
-
-      // persist end time
-      val persistEndTime = new Date().getTime
-      persist.log(persistEndTime, s"persist using time: ${persistEndTime - endTime} ms")
-
-      // finish
-      persist.finish()
-
-      // context stop
-      sc.stop
-    }
-  }
-
-  def wrapInitData(data: Map[String, Any]): (Map[String, Any], Map[String, Any]) = {
-    (data, Map[String, Any]())
-  }
-
-  // calculate profile from source data
-  def profile(sourceData: RDD[(Product, (Map[String, Any], Map[String, Any]))], ruleAnalyzer: RuleAnalyzer
-              ) = {
-    // 1. profile calculation
-    val (profileResult, missingRdd, matchedRdd) = ProfileCore.profile(sourceData, ruleAnalyzer)
-
-    (profileResult, missingRdd, matchedRdd)
-  }
-
-  // convert data into a string
-  def record2String(rec: (Product, (Map[String, Any], Map[String, Any])), sourcePersist: Iterable[Expr]): String = {
-    val (key, (data, info)) = rec
-    val persistData = getPersistMap(data, sourcePersist)
-    val persistInfo = info
-    if (persistInfo.size > 0) s"${persistData} [${persistInfo}]" else s"${persistData}"
-  }
-
-  // get the expr value map of the persist expressions
-  private def getPersistMap(data: Map[String, Any], persist: Iterable[Expr]): Map[String, Any] = {
-    val persistMap = persist.map(e => (e._id, e.desc)).toMap
-    data.flatMap { pair =>
-      val (k, v) = pair
-      persistMap.get(k) match {
-        case Some(d) => Some((d -> v))
-        case _ => None
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/algo/core/AccuracyCore.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/algo/core/AccuracyCore.scala b/measure/src/main/scala/org/apache/griffin/measure/algo/core/AccuracyCore.scala
deleted file mode 100644
index 4ec6505..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/algo/core/AccuracyCore.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.algo.core
-
-import org.apache.griffin.measure.rule.RuleAnalyzer
-import org.apache.griffin.measure.result._
-import org.apache.spark.rdd.RDD
-
-
-object AccuracyCore {
-
-  type V = Map[String, Any]
-  type T = Map[String, Any]
-
-  // allKvs: rdd of (key, (List[(sourceData, sourceInfo)], List[(targetData, targetInfo)]))
-  // output: accuracy result, missing source data rdd, matched source data rdd
-  def accuracy(allKvs: RDD[(Product, (Iterable[(V, T)], Iterable[(V, T)]))], ruleAnalyzer: RuleAnalyzer
-              ): (AccuracyResult, RDD[(Product, (V, T))], RDD[(Product, (V, T))]) = {
-    val result: RDD[(Long, Long, List[(Product, (V, T))], List[(Product, (V, T))])] = allKvs.map { kv =>
-      val (key, (sourceDatas, targetDatas)) = kv
-
-      // result: (missCount, matchCount, missDataList, matchDataList)
-      val rslt = sourceDatas.foldLeft((0L, 0L, List[(Product, (V, T))](), List[(Product, (V, T))]())) { (sr, sourcePair) =>
-        val matchResult = if (targetDatas.isEmpty) {
-          (false, Map[String, Any](MismatchInfo.wrap("no target")))
-        } else {
-          targetDatas.foldLeft((false, Map[String, Any]())) { (tr, targetPair) =>
-            if (tr._1) tr
-            else matchData(sourcePair, targetPair, ruleAnalyzer)
-          }
-        }
-
-        if (matchResult._1) {
-          val matchItem = (key, sourcePair)
-          (sr._1, sr._2 + 1, sr._3, sr._4 :+ matchItem)
-        } else {
-          val missItem = (key, (sourcePair._1, sourcePair._2 ++ matchResult._2))
-          (sr._1 + 1, sr._2, sr._3 :+ missItem, sr._4)
-        }
-      }
-
-      rslt
-    }
-
-    val missRdd = result.flatMap(_._3)
-    val matchRdd = result.flatMap(_._4)
-
-    def seq(cnt: (Long, Long), rcd: (Long, Long, Any, Any)): (Long, Long) = {
-      (cnt._1 + rcd._1, cnt._2 + rcd._2)
-    }
-    def comb(c1: (Long, Long), c2: (Long, Long)): (Long, Long) = {
-      (c1._1 + c2._1, c1._2 + c2._2)
-    }
-    val countPair = result.aggregate((0L, 0L))(seq, comb)
-
-    (AccuracyResult(countPair._1, (countPair._1 + countPair._2)), missRdd, matchRdd)
-  }
-
-  // try to match source and target data, return true if matched, false if unmatched, also with some matching info
-  private def matchData(source: (V, T), target: (V, T), ruleAnalyzer: RuleAnalyzer): (Boolean, T) = {
-
-    // 1. merge source and target cached data
-    val mergedExprValueMap: Map[String, Any] = mergeExprValueMap(source, target)
-
-    // 2. check valid
-    if (ruleAnalyzer.rule.valid(mergedExprValueMap)) {
-      // 3. substitute the cached data into statement, get the statement value
-      val matched = ruleAnalyzer.rule.calculate(mergedExprValueMap) match {
-        case Some(b: Boolean) => b
-        case _ => false
-      }
-      // currently we can not get the mismatch reason, we need to add such information to figure out how it mismatches
-      if (matched) (matched, Map[String, Any]())
-      else (matched, Map[String, Any](MismatchInfo.wrap("not matched")))
-    } else {
-      (false, Map[String, Any](MismatchInfo.wrap("invalid to compare")))
-    }
-
-  }
-
-//  private def when
-
-  private def mergeExprValueMap(source: (V, T), target: (V, T)): Map[String, Any] = {
-    source._1 ++ target._1
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/algo/core/ProfileCore.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/algo/core/ProfileCore.scala b/measure/src/main/scala/org/apache/griffin/measure/algo/core/ProfileCore.scala
deleted file mode 100644
index 2987f2f..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/algo/core/ProfileCore.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.algo.core
-
-import org.apache.griffin.measure.rule.RuleAnalyzer
-import org.apache.griffin.measure.result._
-import org.apache.spark.rdd.RDD
-
-
-object ProfileCore {
-
-  type V = Map[String, Any]
-  type T = Map[String, Any]
-
-  // dataRdd: rdd of (key, (sourceData, sourceInfo))
-  // output: accuracy result, missing source data rdd, matched source data rdd
-  def profile(dataRdd: RDD[(Product, (V, T))], ruleAnalyzer: RuleAnalyzer
-              ): (ProfileResult, RDD[(Product, (V, T))], RDD[(Product, (V, T))]) = {
-
-    val resultRdd: RDD[((Product, (V, T)), Boolean)] = dataRdd.map { kv =>
-      val (key, (data, info)) = kv
-      val (matched, missInfo) = matchData((data, info), ruleAnalyzer)
-      ((key, (data, info ++ missInfo)), matched)
-    }
-
-    val totalCount = resultRdd.count
-    val matchRdd = resultRdd.filter(_._2).map(_._1)
-    val matchCount = matchRdd.count
-    val missRdd = resultRdd.filter(!_._2).map(_._1)
-    val missCount = missRdd.count
-
-    (ProfileResult(matchCount, totalCount), missRdd, matchRdd)
-
-  }
-
-  // try to match data as rule, return true if matched, false if unmatched
-  private def matchData(dataPair: (V, T), ruleAnalyzer: RuleAnalyzer): (Boolean, T) = {
-
-    val data: Map[String, Any] = dataPair._1
-
-    // 1. check valid
-    if (ruleAnalyzer.rule.valid(data)) {
-      // 2. substitute the cached data into statement, get the statement value
-      val matched = ruleAnalyzer.rule.calculate(data) match {
-        case Some(b: Boolean) => b
-        case _ => false
-      }
-      // currently we can not get the mismatch reason, we need to add such information to figure out how it mismatches
-      if (matched) (matched, Map[String, Any]())
-      else (matched, Map[String, Any](MismatchInfo.wrap("not matched")))
-    } else {
-      (false, Map[String, Any](MismatchInfo.wrap("invalid to compare")))
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/algo/streaming/StreamingAccuracyAlgo.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/algo/streaming/StreamingAccuracyAlgo.scala b/measure/src/main/scala/org/apache/griffin/measure/algo/streaming/StreamingAccuracyAlgo.scala
deleted file mode 100644
index bdac64e..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/algo/streaming/StreamingAccuracyAlgo.scala
+++ /dev/null
@@ -1,358 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.algo.streaming
-
-import java.util.Date
-import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit}
-
-import org.apache.griffin.measure.algo.AccuracyAlgo
-import org.apache.griffin.measure.algo.core.AccuracyCore
-import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache}
-import org.apache.griffin.measure.cache.result.CacheResultProcesser
-import org.apache.griffin.measure.config.params.AllParam
-import org.apache.griffin.measure.connector._
-import org.apache.griffin.measure.connector.direct.DirectDataConnector
-import org.apache.griffin.measure.persist.{Persist, PersistFactory, PersistType}
-import org.apache.griffin.measure.result.{AccuracyResult, MismatchInfo, TimeStampInfo}
-import org.apache.griffin.measure.rule.{ExprValueUtil, RuleAnalyzer, RuleFactory}
-import org.apache.griffin.measure.rule.expr._
-import org.apache.griffin.measure.utils.TimeUtil
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.hive.HiveContext
-import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}
-import org.apache.spark.{SparkConf, SparkContext}
-
-import scala.util.{Failure, Success, Try}
-
-
-case class StreamingAccuracyAlgo(allParam: AllParam) extends AccuracyAlgo {
-  val envParam = allParam.envParam
-  val userParam = allParam.userParam
-
-  def run(): Try[_] = {
-    Try {
-      val metricName = userParam.name
-
-      val sparkParam = envParam.sparkParam
-
-      val conf = new SparkConf().setAppName(metricName)
-      conf.setAll(sparkParam.config)
-      val sc = new SparkContext(conf)
-      sc.setLogLevel(sparkParam.logLevel)
-      val sqlContext = new HiveContext(sc)
-//      val sqlContext = new SQLContext(sc)
-
-      val batchInterval = TimeUtil.milliseconds(sparkParam.batchInterval) match {
-        case Some(interval) => Milliseconds(interval)
-        case _ => throw new Exception("invalid batch interval")
-      }
-      val ssc = new StreamingContext(sc, batchInterval)
-      ssc.checkpoint(sparkParam.cpDir)
-
-      // init info cache instance
-      InfoCacheInstance.initInstance(envParam.infoCacheParams, metricName)
-      InfoCacheInstance.init
-
-      // start time
-      val startTime = new Date().getTime()
-
-      val persistFactory = PersistFactory(envParam.persistParams, metricName)
-
-      // get persists to persist measure result
-      val appPersist: Persist = persistFactory.getPersists(startTime)
-
-      // get spark application id
-      val applicationId = sc.applicationId
-
-      // persist start id
-      appPersist.start(applicationId)
-
-      // generate rule from rule param, generate rule analyzer
-      val ruleFactory = RuleFactory(userParam.evaluateRuleParam)
-      val rule: StatementExpr = ruleFactory.generateRule()
-      val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
-
-      // const expr value map
-      val constExprValueMap = ExprValueUtil.genExprValueMaps(None, ruleAnalyzer.constCacheExprs, Map[String, Any]())
-      val finalConstExprValueMap = ExprValueUtil.updateExprValueMaps(ruleAnalyzer.constFinalCacheExprs, constExprValueMap)
-      val finalConstMap = finalConstExprValueMap.headOption match {
-        case Some(m) => m
-        case _ => Map[String, Any]()
-      }
-
-      // data connector
-      val sourceDataConnector: DirectDataConnector =
-      DataConnectorFactory.getDirectDataConnector(sqlContext, ssc, userParam.sourceParam,
-        ruleAnalyzer.sourceRuleExprs, finalConstMap
-      ) match {
-        case Success(cntr) => {
-          if (cntr.available) cntr
-          else throw new Exception("source data connection error!")
-        }
-        case Failure(ex) => throw ex
-      }
-      val targetDataConnector: DirectDataConnector =
-        DataConnectorFactory.getDirectDataConnector(sqlContext, ssc, userParam.targetParam,
-          ruleAnalyzer.targetRuleExprs, finalConstMap
-        ) match {
-          case Success(cntr) => {
-            if (cntr.available) cntr
-            else throw new Exception("target data connection error!")
-          }
-          case Failure(ex) => throw ex
-        }
-
-      val cacheResultProcesser = CacheResultProcesser()
-
-      // init data stream
-      sourceDataConnector.init()
-      targetDataConnector.init()
-
-      val streamingAccuracyProcess = StreamingAccuracyProcess(
-        sourceDataConnector, targetDataConnector,
-        ruleAnalyzer, cacheResultProcesser, persistFactory, appPersist)
-
-      // process thread
-//      case class Process() extends Runnable {
-//        val lock = InfoCacheInstance.genLock("process")
-//        def run(): Unit = {
-//          val updateTime = new Date().getTime
-//          val locked = lock.lock(5, TimeUnit.SECONDS)
-//          if (locked) {
-//            try {
-//              val st = new Date().getTime
-//
-//              TimeInfoCache.startTimeInfoCache
-//
-//              // get data
-//              val sourceData = sourceDataConnector.data match {
-//                case Success(dt) => dt
-//                case Failure(ex) => throw ex
-//              }
-//              val targetData = targetDataConnector.data match {
-//                case Success(dt) => dt
-//                case Failure(ex) => throw ex
-//              }
-//
-//              sourceData.cache
-//              targetData.cache
-//
-//              println(s"sourceData.count: ${sourceData.count}")
-//              println(s"targetData.count: ${targetData.count}")
-//
-//              // accuracy algorithm
-//              val (accuResult, missingRdd, matchedRdd) = accuracy(sourceData, targetData, ruleAnalyzer)
-//              println(s"accuResult: ${accuResult}")
-//
-//              val ct = new Date().getTime
-//              appPersist.log(ct, s"calculation using time: ${ct - st} ms")
-//
-//              sourceData.unpersist()
-//              targetData.unpersist()
-//
-//              // result of every group
-//              val matchedGroups = reorgByTimeGroup(matchedRdd)
-//              val matchedGroupCount = matchedGroups.count
-//              println(s"===== matchedGroupCount: ${matchedGroupCount} =====")
-//
-//              // get missing results
-//              val missingGroups = reorgByTimeGroup(missingRdd)
-//              val missingGroupCount = missingGroups.count
-//              println(s"===== missingGroupCount: ${missingGroupCount} =====")
-//
-//              val groups = matchedGroups.cogroup(missingGroups)
-//              val groupCount = groups.count
-//              println(s"===== groupCount: ${groupCount} =====")
-//
-//              val updateResults = groups.flatMap { group =>
-//                val (t, (matchData, missData)) = group
-//
-//                val matchSize = matchData.size
-//                val missSize = missData.size
-//                val res = AccuracyResult(missSize, matchSize + missSize)
-//
-//                val updatedCacheResultOpt = cacheResultProcesser.genUpdateCacheResult(t, updateTime, res)
-//
-//                updatedCacheResultOpt.flatMap { updatedCacheResult =>
-//                  Some((updatedCacheResult, (t, missData)))
-//                }
-//              }
-//
-//              updateResults.cache
-//
-//              val updateResultsPart =  updateResults.map(_._1)
-//              val updateDataPart =  updateResults.map(_._2)
-//
-//              val updateResultsArray = updateResultsPart.collect()
-//
-//              // update results cache (in driver)
-//              // collect action is traversable once action, it will make rdd updateResults empty
-//              updateResultsArray.foreach { updateResult =>
-//                println(s"update result: ${updateResult}")
-//                cacheResultProcesser.update(updateResult)
-//                // persist result
-//                val persist: Persist = persistFactory.getPersists(updateResult.timeGroup)
-//                persist.result(updateTime, updateResult.result)
-//              }
-//
-//              // record missing data and update old data (in executor)
-//              updateDataPart.foreach { grp =>
-//                val (t, datas) = grp
-//                val persist: Persist = persistFactory.getPersists(t)
-//                // persist missing data
-//                val missStrings = datas.map { row =>
-//                  val (_, (value, info)) = row
-//                  s"${value} [${info.getOrElse(MismatchInfo.key, "unknown")}]"
-//                }
-//                persist.records(missStrings, PersistType.MISS)
-//                // data connector update old data
-//                val dumpDatas = datas.map { r =>
-//                  val (_, (v, i)) = r
-//                  v ++ i
-//                }
-//
-//                println(t)
-//                dumpDatas.foreach(println)
-//
-//                sourceDataConnector.updateOldData(t, dumpDatas)
-//                targetDataConnector.updateOldData(t, dumpDatas)    // not correct
-//              }
-//
-//              updateResults.unpersist()
-//
-//              // dump missing rdd   (this part not need for future version, only for current df cache data version)
-//              val dumpRdd: RDD[Map[String, Any]] = missingRdd.map { r =>
-//                val (_, (v, i)) = r
-//                v ++ i
-//              }
-//              sourceDataConnector.updateAllOldData(dumpRdd)
-//              targetDataConnector.updateAllOldData(dumpRdd)    // not correct
-//
-//              TimeInfoCache.endTimeInfoCache
-//
-//              val et = new Date().getTime
-//              appPersist.log(et, s"persist using time: ${et - ct} ms")
-//
-//            } catch {
-//              case e: Throwable => error(s"process error: ${e.getMessage}")
-//            } finally {
-//              lock.unlock()
-//            }
-//          }
-//        }
-//      }
-
-      val processInterval = TimeUtil.milliseconds(sparkParam.processInterval) match {
-        case Some(interval) => interval
-        case _ => throw new Exception("invalid batch interval")
-      }
-      val process = TimingProcess(processInterval, streamingAccuracyProcess)
-
-      // clean thread
-//    case class Clean() extends Runnable {
-//      val lock = InfoCacheInstance.genLock("clean")
-//      def run(): Unit = {
-//        val locked = lock.lock(5, TimeUnit.SECONDS)
-//        if (locked) {
-//          try {
-//            sourceDataConnector.cleanData
-//            targetDataConnector.cleanData
-//          } finally {
-//            lock.unlock()
-//          }
-//        }
-//      }
-//    }
-//    val cleanInterval = TimeUtil.milliseconds(cleanerParam.cleanInterval) match {
-//      case Some(interval) => interval
-//      case _ => throw new Exception("invalid batch interval")
-//    }
-//    val clean = TimingProcess(cleanInterval, Clean())
-
-      process.startup()
-//    clean.startup()
-
-      ssc.start()
-      ssc.awaitTermination()
-      ssc.stop(stopSparkContext=true, stopGracefully=true)
-
-      // context stop
-      sc.stop
-
-      InfoCacheInstance.close
-
-      appPersist.finish()
-
-      process.shutdown()
-//    clean.shutdown()
-    }
-  }
-
-  // calculate accuracy between source data and target data
-//  def accuracy(sourceData: RDD[(Product, (Map[String, Any], Map[String, Any]))],
-//               targetData: RDD[(Product, (Map[String, Any], Map[String, Any]))],
-//               ruleAnalyzer: RuleAnalyzer) = {
-//    // 1. cogroup
-//    val allKvs = sourceData.cogroup(targetData)
-//
-//    // 2. accuracy calculation
-//    val (accuResult, missingRdd, matchedRdd) = AccuracyCore.accuracy(allKvs, ruleAnalyzer)
-//
-//    (accuResult, missingRdd, matchedRdd)
-//  }
-
-//  // convert data into a string
-//  def record2String(rec: (Product, (Map[String, Any], Map[String, Any])), sourcePersist: Iterable[Expr], targetPersist: Iterable[Expr]): String = {
-//    val (key, (data, info)) = rec
-//    val persistData = getPersistMap(data, sourcePersist)
-//    val persistInfo = info.mapValues { value =>
-//      value match {
-//        case vd: Map[String, Any] => getPersistMap(vd, targetPersist)
-//        case v => v
-//      }
-//    }.map(identity)
-//    s"${persistData} [${persistInfo}]"
-//  }
-//
-//  // get the expr value map of the persist expressions
-//  private def getPersistMap(data: Map[String, Any], persist: Iterable[Expr]): Map[String, Any] = {
-//    val persistMap = persist.map(e => (e._id, e.desc)).toMap
-//    data.flatMap { pair =>
-//      val (k, v) = pair
-//      persistMap.get(k) match {
-//        case Some(d) => Some((d -> v))
-//        case _ => None
-//      }
-//    }
-//  }
-
-//  def reorgByTimeGroup(rdd: RDD[(Product, (Map[String, Any], Map[String, Any]))]
-//                      ): RDD[(Long, (Product, (Map[String, Any], Map[String, Any])))] = {
-//    rdd.flatMap { row =>
-//      val (key, (value, info)) = row
-//      val b: Option[(Long, (Product, (Map[String, Any], Map[String, Any])))] = info.get(TimeStampInfo.key) match {
-//        case Some(t: Long) => Some((t, row))
-//        case _ => None
-//      }
-//      b
-//    }
-//  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/algo/streaming/StreamingAccuracyProcess.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/algo/streaming/StreamingAccuracyProcess.scala b/measure/src/main/scala/org/apache/griffin/measure/algo/streaming/StreamingAccuracyProcess.scala
deleted file mode 100644
index be1f846..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/algo/streaming/StreamingAccuracyProcess.scala
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.algo.streaming
-
-import java.util.Date
-import java.util.concurrent.TimeUnit
-
-import org.apache.griffin.measure.algo.core.AccuracyCore
-import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache}
-import org.apache.griffin.measure.cache.result.CacheResultProcesser
-import org.apache.griffin.measure.connector.direct.DirectDataConnector
-import org.apache.griffin.measure.log.Loggable
-import org.apache.griffin.measure.persist._
-import org.apache.griffin.measure.result.{AccuracyResult, MismatchInfo, TimeStampInfo}
-import org.apache.griffin.measure.rule._
-import org.apache.griffin.measure.rule.expr._
-import org.apache.spark.rdd.RDD
-
-import scala.util.{Failure, Success}
-
-case class StreamingAccuracyProcess(sourceDataConnector: DirectDataConnector,
-                                    targetDataConnector: DirectDataConnector,
-                                    ruleAnalyzer: RuleAnalyzer,
-                                    cacheResultProcesser: CacheResultProcesser,
-                                    persistFactory: PersistFactory,
-                                    appPersist: Persist
-                                   ) extends Runnable with Loggable {
-
-  val lock = InfoCacheInstance.genLock("process")
-
-  def run(): Unit = {
-//    println(s"cache count: ${cacheResultProcesser.cacheGroup.size}")
-    val updateTimeDate = new Date()
-    val updateTime = updateTimeDate.getTime
-    println(s"===== [${updateTimeDate}] process begins =====")
-    val locked = lock.lock(5, TimeUnit.SECONDS)
-    if (locked) {
-      try {
-        val st = new Date().getTime
-
-        TimeInfoCache.startTimeInfoCache
-
-        // get data
-        val sourceData = sourceDataConnector.data match {
-          case Success(dt) => dt
-          case Failure(ex) => throw ex
-        }
-        val targetData = targetDataConnector.data match {
-          case Success(dt) => dt
-          case Failure(ex) => throw ex
-        }
-
-        sourceData.cache
-        targetData.cache
-
-        println(s"sourceData.count: ${sourceData.count}")
-        println(s"targetData.count: ${targetData.count}")
-
-        // accuracy algorithm
-        val (accuResult, missingRdd, matchedRdd) = accuracy(sourceData, targetData, ruleAnalyzer)
-//        println(s"accuResult: ${accuResult}")
-
-        val ct = new Date().getTime
-        appPersist.log(ct, s"calculation using time: ${ct - st} ms")
-
-        sourceData.unpersist()
-        targetData.unpersist()
-
-        // result of every group
-        val matchedGroups = reorgByTimeGroup(matchedRdd)
-//        val matchedGroupCount = matchedGroups.count
-//        println(s"===== matchedGroupCount: ${matchedGroupCount} =====")
-
-        // get missing results
-        val missingGroups = reorgByTimeGroup(missingRdd)
-//        val missingGroupCount = missingGroups.count
-//        println(s"===== missingGroupCount: ${missingGroupCount} =====")
-
-        val groups = matchedGroups.cogroup(missingGroups)
-//        val groupCount = groups.count
-//        println(s"===== groupCount: ${groupCount} =====")
-
-        val updateResults = groups.flatMap { group =>
-          val (t, (matchData, missData)) = group
-
-          val matchSize = matchData.size
-          val missSize = missData.size
-          val res = AccuracyResult(missSize, matchSize + missSize)
-
-          val updatedCacheResultOpt = cacheResultProcesser.genUpdateCacheResult(t, updateTime, res)
-
-          updatedCacheResultOpt.flatMap { updatedCacheResult =>
-            Some((updatedCacheResult, (t, missData)))
-          }
-        }
-
-        updateResults.cache
-
-        val updateResultsPart =  updateResults.map(_._1)
-        val updateDataPart =  updateResults.map(_._2)
-
-        val updateResultsArray = updateResultsPart.collect()
-
-        // update results cache (in driver)
-        // collect action is traversable once action, it will make rdd updateResults empty
-        updateResultsArray.foreach { updateResult =>
-//          println(s"update result: ${updateResult}")
-          cacheResultProcesser.update(updateResult)
-          // persist result
-          val persist: Persist = persistFactory.getPersists(updateResult.timeGroup)
-          persist.result(updateTime, updateResult.result)
-        }
-
-        // record missing data and dump old data (in executor)
-        updateDataPart.foreach { grp =>
-          val (t, datas) = grp
-          val persist: Persist = persistFactory.getPersists(t)
-          // persist missing data
-          val missStrings = datas.map { row =>
-            record2String(row, ruleAnalyzer.sourceRuleExprs.persistExprs, ruleAnalyzer.targetRuleExprs.persistExprs)
-          }
-          persist.records(missStrings, PersistType.MISS)
-
-          // data connector update old data
-          val dumpDatas = datas.map { r =>
-            val (_, (v, i)) = r
-            v ++ i
-          }
-          sourceDataConnector.updateOldData(t, dumpDatas)
-//          targetDataConnector.updateOldData(t, dumpDatas)    // not correct
-        }
-
-        updateResults.unpersist()
-
-        TimeInfoCache.endTimeInfoCache
-
-        // clean old data
-        cleanData()
-
-        val et = new Date().getTime
-        appPersist.log(et, s"persist using time: ${et - ct} ms")
-
-      } catch {
-        case e: Throwable => error(s"process error: ${e.getMessage}")
-      } finally {
-        lock.unlock()
-      }
-    } else {
-      println(s"===== [${updateTimeDate}] process ignores =====")
-    }
-    val endTime = new Date().getTime
-    println(s"===== [${updateTimeDate}] process ends, using ${endTime - updateTime} ms =====")
-  }
-
-  // clean old data and old result cache
-  def cleanData(): Unit = {
-    try {
-      sourceDataConnector.cleanOldData
-      targetDataConnector.cleanOldData
-
-      val cleanTime = TimeInfoCache.getCleanTime
-      cacheResultProcesser.refresh(cleanTime)
-    } catch {
-      case e: Throwable => error(s"clean data error: ${e.getMessage}")
-    }
-  }
-
-  // calculate accuracy between source data and target data
-  private def accuracy(sourceData: RDD[(Product, (Map[String, Any], Map[String, Any]))],
-               targetData: RDD[(Product, (Map[String, Any], Map[String, Any]))],
-               ruleAnalyzer: RuleAnalyzer) = {
-    // 1. cogroup
-    val allKvs = sourceData.cogroup(targetData)
-
-    // 2. accuracy calculation
-    val (accuResult, missingRdd, matchedRdd) = AccuracyCore.accuracy(allKvs, ruleAnalyzer)
-
-    (accuResult, missingRdd, matchedRdd)
-  }
-
-  private def reorgByTimeGroup(rdd: RDD[(Product, (Map[String, Any], Map[String, Any]))]
-                      ): RDD[(Long, (Product, (Map[String, Any], Map[String, Any])))] = {
-    rdd.flatMap { row =>
-      val (key, (value, info)) = row
-      val b: Option[(Long, (Product, (Map[String, Any], Map[String, Any])))] = info.get(TimeStampInfo.key) match {
-        case Some(t: Long) => Some((t, row))
-        case _ => None
-      }
-      b
-    }
-  }
-
-  // convert data into a string
-  def record2String(rec: (Product, (Map[String, Any], Map[String, Any])), dataPersist: Iterable[Expr], infoPersist: Iterable[Expr]): String = {
-    val (key, (data, info)) = rec
-    val persistData = getPersistMap(data, dataPersist)
-    val persistInfo = info.mapValues { value =>
-      value match {
-        case vd: Map[String, Any] => getPersistMap(vd, infoPersist)
-        case v => v
-      }
-    }.map(identity)
-    s"${persistData} [${persistInfo}]"
-  }
-
-  // get the expr value map of the persist expressions
-  private def getPersistMap(data: Map[String, Any], persist: Iterable[Expr]): Map[String, Any] = {
-    val persistMap = persist.map(e => (e._id, e.desc)).toMap
-    data.flatMap { pair =>
-      val (k, v) = pair
-      persistMap.get(k) match {
-        case Some(d) => Some((d -> v))
-        case _ => None
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/algo/streaming/TimingProcess.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/algo/streaming/TimingProcess.scala b/measure/src/main/scala/org/apache/griffin/measure/algo/streaming/TimingProcess.scala
deleted file mode 100644
index e5bd7de..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/algo/streaming/TimingProcess.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.algo.streaming
-
-import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit}
-import java.util.{Timer, TimerTask}
-
-case class TimingProcess(interval: Long, runnable: Runnable) {
-
-  val pool: ThreadPoolExecutor = Executors.newFixedThreadPool(5).asInstanceOf[ThreadPoolExecutor]
-
-  val timer = new Timer("process", true)
-
-  val timerTask = new TimerTask() {
-    override def run(): Unit = {
-      pool.submit(runnable)
-    }
-  }
-
-  def startup(): Unit = {
-    timer.schedule(timerTask, interval, interval)
-  }
-
-  def shutdown(): Unit = {
-    timer.cancel()
-    pool.shutdown()
-    pool.awaitTermination(10, TimeUnit.SECONDS)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala b/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala
index ac0acff..b581a58 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala
@@ -109,7 +109,7 @@ object TimeInfoCache extends Loggable with Serializable {
         case _ => -1
       }
     } catch {
-      case _ => -1
+      case e: Throwable => -1
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResultProcesser.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResultProcesser.scala b/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResultProcesser.scala
index 50d3ada..9916e92 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResultProcesser.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResultProcesser.scala
@@ -23,7 +23,7 @@ import org.apache.griffin.measure.result._
 
 import scala.collection.mutable.{Map => MutableMap}
 
-case class CacheResultProcesser() extends Loggable {
+object CacheResultProcesser extends Loggable {
 
   val cacheGroup: MutableMap[Long, CacheResult] = MutableMap()
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataCacheParam.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataCacheParam.scala b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataCacheParam.scala
deleted file mode 100644
index 9c60755..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataCacheParam.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.config.params.user
-
-import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-import org.apache.griffin.measure.config.params.Param
-
-@JsonInclude(Include.NON_NULL)
-case class DataCacheParam( @JsonProperty("type") cacheType: String,
-                           @JsonProperty("config") config: Map[String, Any],
-                           @JsonProperty("time.range") timeRange: List[String]
-                         ) extends Param {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataConnectorParam.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataConnectorParam.scala b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataConnectorParam.scala
index dbc2e0b..a819997 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataConnectorParam.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataConnectorParam.scala
@@ -26,12 +26,8 @@ import org.apache.griffin.measure.config.params.Param
 case class DataConnectorParam( @JsonProperty("type") conType: String,
                                @JsonProperty("version") version: String,
                                @JsonProperty("config") config: Map[String, Any],
-                               @JsonProperty("cache") cache: DataCacheParam,
-                               @JsonProperty("match.once") matchOnce: Boolean
+                               @JsonProperty("pre.proc") preProc: List[Map[String, Any]]
                              ) extends Param {
 
-  def getMatchOnce(): Boolean = {
-    if (matchOnce == null) false else matchOnce
-  }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataSourceParam.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataSourceParam.scala b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataSourceParam.scala
new file mode 100644
index 0000000..b638234
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataSourceParam.scala
@@ -0,0 +1,31 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.config.params.user
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import org.apache.griffin.measure.config.params.Param
+
+@JsonInclude(Include.NON_NULL)
+case class DataSourceParam( @JsonProperty("name") name: String,
+                            @JsonProperty("connectors") connectors: List[DataConnectorParam],
+                            @JsonProperty("cache") cache: Map[String, Any]
+                          ) extends Param {
+
+}