You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2017/06/02 09:31:55 UTC

[6/6] incubator-griffin git commit: griffin-measure package modification

griffin-measure package modification

- profile ut
- license for scala code
- delete griffin-measure-batch layer

Author: Liu <ll...@ebay.com>
Author: Lionel Liu <bh...@163.com>
Author: Liu <ll...@lm-shc-16501428.corp.ebay.com>
Author: Liu <ll...@lm-shc-16501428.dhcp>
Author: bhlx3lyx7 <bh...@163.com>

Closes #51 from bhlx3lyx7/master.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/8d43a4c0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/8d43a4c0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/8d43a4c0

Branch: refs/heads/master
Commit: 8d43a4c0e98d8c4c582c486d61db3c492a1f5096
Parents: 6d8e526
Author: Liu <ll...@ebay.com>
Authored: Fri Jun 2 17:31:39 2017 +0800
Committer: Lionel Liu <bh...@163.com>
Committed: Fri Jun 2 17:31:39 2017 +0800

----------------------------------------------------------------------
 README.md                                       |   4 +-
 .../jar/griffin-measure-batch.jar.placeholder   |   0
 .../prep/jar/griffin-measure.jar.placeholder    |   0
 docker/griffin_demo/prep/prepare.sh             |   2 +-
 griffin-doc/dockerUIguide.md                    |  12 +-
 measure/measure-batch/pom.xml                   |  19 --
 .../src/main/resources/config-old.json          |  45 ---
 .../src/main/resources/config.json              |  29 --
 .../measure-batch/src/main/resources/env.json   |  29 --
 .../src/main/resources/log4j.properties         |   5 -
 .../griffin/measure/batch/Application.scala     |  95 ------
 .../measure/batch/algo/AccuracyAlgo.scala       |   6 -
 .../griffin/measure/batch/algo/Algo.scala       |  16 -
 .../measure/batch/algo/BatchAccuracyAlgo.scala  | 165 ----------
 .../measure/batch/algo/BatchProfileAlgo.scala   | 137 --------
 .../measure/batch/algo/MeasureType.scala        |   8 -
 .../measure/batch/algo/ProfileAlgo.scala        |   5 -
 .../measure/batch/algo/core/AccuracyCore.scala  |  83 -----
 .../measure/batch/algo/core/ProfileCore.scala   |  53 ----
 .../measure/batch/config/params/AllParam.scala  |  14 -
 .../measure/batch/config/params/Param.scala     |   7 -
 .../batch/config/params/env/CleanerParam.scala  |  10 -
 .../batch/config/params/env/EnvParam.scala      |  13 -
 .../batch/config/params/env/PersistParam.scala  |  12 -
 .../batch/config/params/env/SparkParam.scala    |  13 -
 .../config/params/user/DataConnectorParam.scala |  13 -
 .../config/params/user/EvaluateRuleParam.scala  |  12 -
 .../batch/config/params/user/UserParam.scala    |  15 -
 .../batch/config/reader/ParamFileReader.scala   |  20 --
 .../config/reader/ParamHdfsFileReader.scala     |  20 --
 .../config/reader/ParamRawStringReader.scala    |  17 -
 .../batch/config/reader/ParamReader.scala       |  12 -
 .../config/reader/ParamReaderFactory.scala      |  22 --
 .../config/validator/AllParamValidator.scala    |  16 -
 .../batch/config/validator/ParamValidator.scala |  12 -
 .../batch/connector/AvroDataConnector.scala     |  91 ------
 .../measure/batch/connector/DataConnector.scala |  16 -
 .../batch/connector/DataConnectorFactory.scala  |  31 --
 .../batch/connector/HiveDataConnector.scala     | 113 -------
 .../griffin/measure/batch/log/Loggable.scala    |  25 --
 .../measure/batch/persist/HdfsPersist.scala     | 153 ---------
 .../measure/batch/persist/HttpPersist.scala     |  60 ----
 .../measure/batch/persist/MultiPersists.scala   |  31 --
 .../griffin/measure/batch/persist/Persist.scala |  26 --
 .../measure/batch/persist/PersistFactory.scala  |  31 --
 .../measure/batch/result/AccuracyResult.scala   |  26 --
 .../measure/batch/result/ProfileResult.scala    |  26 --
 .../griffin/measure/batch/result/Result.scala   |  14 -
 .../measure/batch/result/ResultInfo.scala       |  39 ---
 .../measure/batch/rule/CalculationUtil.scala    | 297 ------------------
 .../measure/batch/rule/ExprValueUtil.scala      |  75 -----
 .../measure/batch/rule/RuleAnalyzer.scala       |  60 ----
 .../measure/batch/rule/RuleFactory.scala        |  34 --
 .../griffin/measure/batch/rule/RuleParser.scala | 220 -------------
 .../batch/rule/expr/AnalyzableExpr.scala        |   7 -
 .../measure/batch/rule/expr/Cacheable.scala     |  15 -
 .../measure/batch/rule/expr/Calculatable.scala  |   7 -
 .../batch/rule/expr/DataSourceable.scala        |  10 -
 .../measure/batch/rule/expr/Describable.scala   |  15 -
 .../griffin/measure/batch/rule/expr/Expr.scala  |  33 --
 .../measure/batch/rule/expr/ExprDescOnly.scala  |  22 --
 .../measure/batch/rule/expr/ExprIdCounter.scala |  42 ---
 .../measure/batch/rule/expr/FieldDescOnly.scala |  40 ---
 .../measure/batch/rule/expr/LiteralExpr.scala   |  78 -----
 .../measure/batch/rule/expr/LogicalExpr.scala   | 159 ----------
 .../measure/batch/rule/expr/MathExpr.scala      |  79 -----
 .../measure/batch/rule/expr/SelectExpr.scala    |  53 ----
 .../measure/batch/rule/expr/StatementExpr.scala |  52 ----
 .../griffin/measure/batch/utils/HdfsUtil.scala  |  62 ----
 .../griffin/measure/batch/utils/HttpUtil.scala  |  30 --
 .../griffin/measure/batch/utils/JsonUtil.scala  |  32 --
 .../src/test/resources/config-profile.json      |  17 -
 .../src/test/resources/config.json              |  25 --
 .../src/test/resources/config1.json             |  27 --
 .../measure-batch/src/test/resources/env.json   |  27 --
 .../measure-batch/src/test/resources/env1.json  |  21 --
 .../src/test/resources/log4j.properties         |   5 -
 .../src/test/resources/users_info_src.avro      | Bin 3850 -> 0 bytes
 .../src/test/resources/users_info_src.dat       |  50 ---
 .../src/test/resources/users_info_target.avro   | Bin 3852 -> 0 bytes
 .../src/test/resources/users_info_target.dat    |  50 ---
 .../batch/algo/BatchAccuracyAlgoTest.scala      | 174 -----------
 .../batch/algo/BatchProfileAlgoTest.scala       | 149 ---------
 .../batch/algo/core/AccuracyCoreTest.scala      |  71 -----
 .../reader/ParamRawStringReaderTest.scala       |  20 --
 .../validator/AllParamValidatorTest.scala       |  22 --
 .../measure/batch/persist/HdfsPersistTest.scala |  30 --
 .../measure/batch/persist/HttpPersistTest.scala |  24 --
 .../batch/result/AccuracyResultTest.scala       |  39 ---
 .../measure/batch/rule/RuleAnalyzerTest.scala   |  46 ---
 .../measure/batch/rule/RuleFactoryTest.scala    |  26 --
 .../measure/batch/rule/RuleParserTest.scala     | 177 -----------
 .../measure/batch/utils/JsonUtilTest.scala      |  42 ---
 measure/pom.xml                                 |  17 +-
 measure/src/main/resources/config-old.json      |  45 +++
 measure/src/main/resources/config.json          |  29 ++
 measure/src/main/resources/env.json             |  29 ++
 measure/src/main/resources/log4j.properties     |   5 +
 .../griffin/measure/batch/Application.scala     | 109 +++++++
 .../measure/batch/algo/AccuracyAlgo.scala       |  20 ++
 .../griffin/measure/batch/algo/Algo.scala       |  30 ++
 .../measure/batch/algo/BatchAccuracyAlgo.scala  | 179 +++++++++++
 .../measure/batch/algo/BatchProfileAlgo.scala   | 151 +++++++++
 .../measure/batch/algo/MeasureType.scala        |  22 ++
 .../measure/batch/algo/ProfileAlgo.scala        |  19 ++
 .../measure/batch/algo/core/AccuracyCore.scala  |  97 ++++++
 .../measure/batch/algo/core/ProfileCore.scala   |  69 ++++
 .../measure/batch/config/params/AllParam.scala  |  28 ++
 .../measure/batch/config/params/Param.scala     |  21 ++
 .../batch/config/params/env/CleanerParam.scala  |  24 ++
 .../batch/config/params/env/EnvParam.scala      |  27 ++
 .../batch/config/params/env/PersistParam.scala  |  26 ++
 .../batch/config/params/env/SparkParam.scala    |  27 ++
 .../config/params/user/DataConnectorParam.scala |  27 ++
 .../config/params/user/EvaluateRuleParam.scala  |  26 ++
 .../batch/config/params/user/UserParam.scala    |  29 ++
 .../batch/config/reader/ParamFileReader.scala   |  34 ++
 .../config/reader/ParamHdfsFileReader.scala     |  34 ++
 .../config/reader/ParamRawStringReader.scala    |  31 ++
 .../batch/config/reader/ParamReader.scala       |  26 ++
 .../config/reader/ParamReaderFactory.scala      |  36 +++
 .../config/validator/AllParamValidator.scala    |  30 ++
 .../batch/config/validator/ParamValidator.scala |  26 ++
 .../batch/connector/AvroDataConnector.scala     | 105 +++++++
 .../measure/batch/connector/DataConnector.scala |  30 ++
 .../batch/connector/DataConnectorFactory.scala  |  45 +++
 .../batch/connector/HiveDataConnector.scala     | 127 ++++++++
 .../griffin/measure/batch/log/Loggable.scala    |  39 +++
 .../measure/batch/persist/HdfsPersist.scala     | 167 ++++++++++
 .../measure/batch/persist/HttpPersist.scala     |  74 +++++
 .../measure/batch/persist/MultiPersists.scala   |  45 +++
 .../griffin/measure/batch/persist/Persist.scala |  40 +++
 .../measure/batch/persist/PersistFactory.scala  |  45 +++
 .../measure/batch/result/AccuracyResult.scala   |  40 +++
 .../measure/batch/result/ProfileResult.scala    |  40 +++
 .../griffin/measure/batch/result/Result.scala   |  28 ++
 .../measure/batch/result/ResultInfo.scala       |  53 ++++
 .../measure/batch/rule/CalculationUtil.scala    | 311 +++++++++++++++++++
 .../measure/batch/rule/ExprValueUtil.scala      |  89 ++++++
 .../measure/batch/rule/RuleAnalyzer.scala       |  74 +++++
 .../measure/batch/rule/RuleFactory.scala        |  48 +++
 .../griffin/measure/batch/rule/RuleParser.scala | 236 ++++++++++++++
 .../batch/rule/expr/AnalyzableExpr.scala        |  21 ++
 .../measure/batch/rule/expr/Cacheable.scala     |  29 ++
 .../measure/batch/rule/expr/Calculatable.scala  |  21 ++
 .../batch/rule/expr/DataSourceable.scala        |  24 ++
 .../measure/batch/rule/expr/Describable.scala   |  29 ++
 .../griffin/measure/batch/rule/expr/Expr.scala  |  47 +++
 .../measure/batch/rule/expr/ExprDescOnly.scala  |  36 +++
 .../measure/batch/rule/expr/ExprIdCounter.scala |  56 ++++
 .../measure/batch/rule/expr/FieldDescOnly.scala |  54 ++++
 .../measure/batch/rule/expr/LiteralExpr.scala   |  92 ++++++
 .../measure/batch/rule/expr/LogicalExpr.scala   | 173 +++++++++++
 .../measure/batch/rule/expr/MathExpr.scala      |  93 ++++++
 .../measure/batch/rule/expr/SelectExpr.scala    |  67 ++++
 .../measure/batch/rule/expr/StatementExpr.scala |  66 ++++
 .../griffin/measure/batch/utils/HdfsUtil.scala  |  76 +++++
 .../griffin/measure/batch/utils/HttpUtil.scala  |  44 +++
 .../griffin/measure/batch/utils/JsonUtil.scala  |  46 +++
 measure/src/test/resources/config-profile.json  |  17 +
 measure/src/test/resources/config.json          |  25 ++
 measure/src/test/resources/config1.json         |  27 ++
 measure/src/test/resources/env.json             |  27 ++
 measure/src/test/resources/env1.json            |  21 ++
 measure/src/test/resources/log4j.properties     |   5 +
 measure/src/test/resources/users_info_src.avro  | Bin 0 -> 3850 bytes
 measure/src/test/resources/users_info_src.dat   |  50 +++
 .../src/test/resources/users_info_target.avro   | Bin 0 -> 3852 bytes
 .../src/test/resources/users_info_target.dat    |  50 +++
 .../batch/algo/BatchAccuracyAlgoTest.scala      | 188 +++++++++++
 .../batch/algo/BatchProfileAlgoTest.scala       | 163 ++++++++++
 .../batch/algo/core/AccuracyCoreTest.scala      |  85 +++++
 .../batch/algo/core/ProfileCoreTest.scala       |  75 +++++
 .../reader/ParamRawStringReaderTest.scala       |  34 ++
 .../validator/AllParamValidatorTest.scala       |  36 +++
 .../measure/batch/persist/HdfsPersistTest.scala |  44 +++
 .../measure/batch/persist/HttpPersistTest.scala |  38 +++
 .../batch/result/AccuracyResultTest.scala       |  53 ++++
 .../batch/result/ProfileResultTest.scala        |  53 ++++
 .../measure/batch/rule/RuleAnalyzerTest.scala   |  60 ++++
 .../measure/batch/rule/RuleFactoryTest.scala    |  40 +++
 .../measure/batch/rule/RuleParserTest.scala     | 198 ++++++++++++
 .../measure/batch/utils/JsonUtilTest.scala      |  56 ++++
 pom.xml                                         |  13 -
 .../griffin/core/schedule/SparkSubmitJob.java   |   1 -
 service/src/main/resources/sparkJob.properties  |   2 +-
 186 files changed, 5223 insertions(+), 4108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index dceefda..7f9453e 100644
--- a/README.md
+++ b/README.md
@@ -68,7 +68,7 @@ Release:
     ```
     service/src/main/resources/sparkJob.properties
     ```
-    sparkJob.file = hdfs://<griffin measure path>/griffin-measure-batch.jar
+    sparkJob.file = hdfs://<griffin measure path>/griffin-measure.jar
     sparkJob.args_1 = hdfs://<griffin env path>/env.json
     sparkJob.jars_1 = hdfs://<datanucleus path>/datanucleus-api-jdo-3.2.6.jar
     sparkJob.jars_2 = hdfs://<datanucleus path>/datanucleus-core-3.2.10.jar
@@ -86,7 +86,7 @@ Release:
     ```
     Create a directory in Hdfs, and put our measure package into it.
     ```
-    hdfs dfs -put /measure/measure-batch/target/griffin-measure-batch.jar <griffin measure path>/
+    hdfs dfs -put /measure/target/griffin-measure.jar <griffin measure path>/
     ```
     After all our environment services startup, we can start our server.
     ```

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/docker/griffin_demo/prep/jar/griffin-measure-batch.jar.placeholder
----------------------------------------------------------------------
diff --git a/docker/griffin_demo/prep/jar/griffin-measure-batch.jar.placeholder b/docker/griffin_demo/prep/jar/griffin-measure-batch.jar.placeholder
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/docker/griffin_demo/prep/jar/griffin-measure.jar.placeholder
----------------------------------------------------------------------
diff --git a/docker/griffin_demo/prep/jar/griffin-measure.jar.placeholder b/docker/griffin_demo/prep/jar/griffin-measure.jar.placeholder
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/docker/griffin_demo/prep/prepare.sh
----------------------------------------------------------------------
diff --git a/docker/griffin_demo/prep/prepare.sh b/docker/griffin_demo/prep/prepare.sh
index bd95d55..e89e5e4 100755
--- a/docker/griffin_demo/prep/prepare.sh
+++ b/docker/griffin_demo/prep/prepare.sh
@@ -9,7 +9,7 @@ hadoop fs -mkdir /griffin/data
 hadoop fs -mkdir /griffin/data/batch
 
 #jar file
-hadoop fs -put jar/griffin-measure-batch.jar /griffin/
+hadoop fs -put jar/griffin-measure.jar /griffin/
 
 #data
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/griffin-doc/dockerUIguide.md
----------------------------------------------------------------------
diff --git a/griffin-doc/dockerUIguide.md b/griffin-doc/dockerUIguide.md
index 3d2adf9..0c65b18 100644
--- a/griffin-doc/dockerUIguide.md
+++ b/griffin-doc/dockerUIguide.md
@@ -9,13 +9,13 @@ Follow the steps [here](https://github.com/apache/incubator-griffin#how-to-run-i
 1.  Click "Data Assets" at the top right corner, to watch all the exist data assets.  
     In docker, we've prepared two data asset in Hive, through this page, you can see all the table metadata in Hive.
 
-2.  Click "Models" button at the top left corner to watch all the measures here, and you can also create a new DQ measurement by following steps.  
-    1) Click "Create DQ Model" button at the top left corner, choose the top left block "Accuracy", at current we only support accuracy type.  
-    2)  Choose Source: find "demo_src" in the left tree, select some or all attributes in the right block, click "Next".  
-    3)  Choose Target: find "demo_tgt" in the left tree, select the matching attributes with source data asset in the right block, click "Next".  
-    4)  Mapping Source and Target: select "Source Fields" of each row, to match the corresponding field in target table, e.g. id maps to id, age maps to age, desc maps to desc.   
+2.  Click "Measures" button at the top left corner to watch all the measures here, and you can also create a new DQ measurement by following steps.  
+    1) Click "Create Measure" button at the top left corner, choose the top left block "Accuracy", at current we only support accuracy type.  
+    2) Choose Source: find "demo_src" in the left tree, select some or all attributes in the right block, click "Next".  
+    3) Choose Target: find "demo_tgt" in the left tree, select the matching attributes with source data asset in the right block, click "Next".  
+    4) Mapping Source and Target: select "Source Fields" of each row, to match the corresponding field in target table, e.g. id maps to id, age maps to age, desc maps to desc.   
     Finish all the mapping, click "Next".  
-    5)  Fill out the required table as required, "Organization" is the group of this measurement.  
+    5) Fill out the required table as required, "Organization" is the group of this measurement.  
     Submit and save, you can see your new DQ measurement created in the measures list.  
 
 3.  Now you've created a new DQ measurement, the measurement needs to be scheduled to run in the docker container.  

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/pom.xml
----------------------------------------------------------------------
diff --git a/measure/measure-batch/pom.xml b/measure/measure-batch/pom.xml
deleted file mode 100644
index ddb398b..0000000
--- a/measure/measure-batch/pom.xml
+++ /dev/null
@@ -1,19 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <parent>
-        <artifactId>measure</artifactId>
-        <groupId>org.apache.griffin</groupId>
-        <version>0.1.0-SNAPSHOT</version>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-
-    <artifactId>measure-batch</artifactId>
-    <packaging>jar</packaging>
-
-    <name>Apache Griffin :: Measures :: Measure Batch</name>
-    <url>http://maven.apache.org</url>
-
-
-</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/resources/config-old.json
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/resources/config-old.json b/measure/measure-batch/src/main/resources/config-old.json
deleted file mode 100644
index 63dee69..0000000
--- a/measure/measure-batch/src/main/resources/config-old.json
+++ /dev/null
@@ -1,45 +0,0 @@
-{
-  "name": "accu1",
-  "type": "accuracy",
-
-  "source": {
-    "connector": {
-      "type": "hive",
-      "version": "1.2",
-      "config": {
-        "table.name": "users_info_src",
-        "partitions": "dt=20170410, hour=14"
-      }
-    }
-  },
-
-  "target": {
-    "connector": {
-      "type": "hive",
-      "version": "1.2",
-      "config": {
-        "database": "default",
-        "table.name": "users_info_target",
-        "partitions": "dt=20170410, hour=14; dt=20170410, hour=15"
-      }
-    }
-  },
-
-  "evaluateRule": {
-    "sampleRatio": 1,
-    "assertion": {
-      "type": "DSL-griffin",
-      "rules": [
-        {
-          "rule": "@Key ${source}['user_id'] === ${target}['user_id']"
-        },
-        {
-          "rule": "${source}['first_name'] === ${target}['first_name']; ${source}['last_name'] === ${target}['last_name']; ${source}['address'] === ${target}['address']"
-        },
-        {
-          "rule": "${source}['email'] === ${target}['email']; ${source}['phone'] === ${target}['phone']; ${source}['post_code'] === ${target}['post_code']"
-        }
-      ]
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/resources/config.json
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/resources/config.json b/measure/measure-batch/src/main/resources/config.json
deleted file mode 100644
index edd2e6a..0000000
--- a/measure/measure-batch/src/main/resources/config.json
+++ /dev/null
@@ -1,29 +0,0 @@
-{
-  "name": "accu1",
-  "type": "accuracy",
-
-  "source": {
-    "type": "hive",
-    "version": "1.2",
-    "config": {
-      "database": "default",
-      "table.name": "users_info_src",
-      "partitions": "dt=23123, hour=432; dt=35464, hour=4657"
-    }
-  },
-
-  "target": {
-    "type": "hive",
-    "version": "1.2",
-    "config": {
-      "database": "default",
-      "table.name": "users_info_target",
-      "partitions": "dt=23123, hour=432; dt=35464, hour=4657"
-    }
-  },
-
-  "evaluateRule": {
-    "sampleRatio": 0.2,
-    "rules": "$source.user_id = $target.user_id AND $source.first_name = $target.first_name AND $source.last_name = $target.last_name AND $source.address = $target.address AND $source.email = $target.email AND $source.phone = $target.phone AND $source.post_code = $target.post_code"
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/resources/env.json
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/resources/env.json b/measure/measure-batch/src/main/resources/env.json
deleted file mode 100644
index 57da895..0000000
--- a/measure/measure-batch/src/main/resources/env.json
+++ /dev/null
@@ -1,29 +0,0 @@
-{
-  "spark": {
-    "log.level": "INFO",
-    "checkpoint.dir": "hdfs:///griffin/batch/cp",
-    "config": {}
-  },
-
-  "persist": [
-    {
-      "type": "hdfs",
-      "config": {
-        "path": "hdfs:///griffin/streaming/persist",
-        "max.persist.lines": 10000,
-        "max.lines.per.file": 10000
-      }
-    },
-    {
-      "type": "http",
-      "config": {
-        "method": "post",
-        "api": "http://HOSTNAME:9200/griffin/accuracy"
-      }
-    }
-  ],
-
-  "cleaner": {
-
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/resources/log4j.properties b/measure/measure-batch/src/main/resources/log4j.properties
deleted file mode 100644
index bd31e15..0000000
--- a/measure/measure-batch/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,5 +0,0 @@
-log4j.rootLogger=INFO, stdout
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.Target=System.out
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %-5p [%c] - %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/Application.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/Application.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/Application.scala
deleted file mode 100644
index 84fb1f3..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/Application.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-package org.apache.griffin.measure.batch
-
-import org.apache.griffin.measure.batch.algo._
-import org.apache.griffin.measure.batch.config.params._
-import org.apache.griffin.measure.batch.config.params.env._
-import org.apache.griffin.measure.batch.config.params.user._
-import org.apache.griffin.measure.batch.config.reader._
-import org.apache.griffin.measure.batch.config.validator.AllParamValidator
-import org.apache.griffin.measure.batch.log.Loggable
-
-import scala.util.{Failure, Success, Try}
-
-object Application extends Loggable {
-
-  def main(args: Array[String]): Unit = {
-    info(args.toString)
-    if (args.length < 2) {
-      error("Usage: class <env-param> <user-param> [List of String split by comma: raw | local | hdfs(default)]")
-      sys.exit(-1)
-    }
-
-    val envParamFile = args(0)
-    val userParamFile = args(1)
-    val (envFsType, userFsType) = if (args.length > 2) {
-      val fsTypes = args(2).trim.split(",")
-      if (fsTypes.length == 1) (fsTypes(0).trim, fsTypes(0).trim)
-      else if (fsTypes.length >= 2) (fsTypes(0).trim, fsTypes(1).trim)
-      else ("hdfs", "hdfs")
-    } else ("hdfs", "hdfs")
-
-    info(envParamFile)
-    info(userParamFile)
-
-    // read param files
-    val envParam = readParamFile[EnvParam](envParamFile, envFsType) match {
-      case Success(p) => p
-      case Failure(ex) => {
-        error(ex.getMessage)
-        sys.exit(-2)
-      }
-    }
-    val userParam = readParamFile[UserParam](userParamFile, userFsType) match {
-      case Success(p) => p
-      case Failure(ex) => {
-        error(ex.getMessage)
-        sys.exit(-2)
-      }
-    }
-    val allParam: AllParam = AllParam(envParam, userParam)
-
-    // validate param files
-    validateParams(allParam) match {
-      case Failure(ex) => {
-        error(ex.getMessage)
-        sys.exit(-3)
-      }
-      case _ => {
-        info("params validation pass")
-      }
-    }
-
-    // choose algorithm
-    val dqType = allParam.userParam.dqType
-    val algo: Algo = dqType match {
-      case MeasureType.accuracy() => BatchAccuracyAlgo(allParam)
-      case MeasureType.profile() => BatchProfileAlgo(allParam)
-      case _ => {
-        error(s"${dqType} is unsupported dq type!")
-        sys.exit(-4)
-      }
-    }
-
-    // algorithm run
-    algo.run match {
-      case Failure(ex) => {
-        error(ex.getMessage)
-        sys.exit(-5)
-      }
-      case _ => {
-        info("calculation finished")
-      }
-    }
-  }
-
-  private def readParamFile[T <: Param](file: String, fsType: String)(implicit m : Manifest[T]): Try[T] = {
-    val paramReader = ParamReaderFactory.getParamReader(file, fsType)
-    paramReader.readConfig[T]
-  }
-
-  private def validateParams(allParam: AllParam): Try[Boolean] = {
-    val allParamValidator = AllParamValidator()
-    allParamValidator.validate(allParam)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/AccuracyAlgo.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/AccuracyAlgo.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/AccuracyAlgo.scala
deleted file mode 100644
index d22add2..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/AccuracyAlgo.scala
+++ /dev/null
@@ -1,6 +0,0 @@
-package org.apache.griffin.measure.batch.algo
-
-
-trait AccuracyAlgo extends Algo {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/Algo.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/Algo.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/Algo.scala
deleted file mode 100644
index d7047ee..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/Algo.scala
+++ /dev/null
@@ -1,16 +0,0 @@
-package org.apache.griffin.measure.batch.algo
-
-import org.apache.griffin.measure.batch.config.params.env._
-import org.apache.griffin.measure.batch.config.params.user._
-import org.apache.griffin.measure.batch.log.Loggable
-
-import scala.util.Try
-
-trait Algo extends Loggable with Serializable {
-
-  val envParam: EnvParam
-  val userParam: UserParam
-
-  def run(): Try[_]
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgo.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgo.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgo.scala
deleted file mode 100644
index 0fbb0d0..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgo.scala
+++ /dev/null
@@ -1,165 +0,0 @@
-package org.apache.griffin.measure.batch.algo
-
-import java.util.Date
-
-import org.apache.griffin.measure.batch.algo.core.AccuracyCore
-import org.apache.griffin.measure.batch.config.params.AllParam
-import org.apache.griffin.measure.batch.connector._
-import org.apache.griffin.measure.batch.rule._
-import org.apache.griffin.measure.batch.rule.expr._
-import org.apache.griffin.measure.batch.persist._
-import org.apache.griffin.measure.batch.result._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.hive.HiveContext
-import org.apache.spark.{SparkConf, SparkContext}
-
-import scala.util.{Failure, Success, Try}
-
-// accuracy algorithm for batch mode
-case class BatchAccuracyAlgo(allParam: AllParam) extends AccuracyAlgo {
-  val envParam = allParam.envParam
-  val userParam = allParam.userParam
-
-  def run(): Try[_] = {
-    Try {
-      val metricName = userParam.name
-
-      val conf = new SparkConf().setAppName(metricName)
-      val sc = new SparkContext(conf)
-      val sqlContext = new HiveContext(sc)
-
-      // start time
-      val startTime = new Date().getTime()
-
-      // get persists to persist measure result
-      val persist: Persist = PersistFactory(envParam.persistParams, metricName).getPersists(startTime)
-
-      // get spark application id
-      val applicationId = sc.applicationId
-
-      // persist start id
-      persist.start(applicationId)
-
-      // generate rule from rule param, generate rule analyzer
-      val ruleFactory = RuleFactory(userParam.evaluateRuleParam)
-      val rule: StatementExpr = ruleFactory.generateRule()
-      val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
-
-      // const expr value map
-      val constExprValueMap = ExprValueUtil.genExprValueMap(None, ruleAnalyzer.constCacheExprs, Map[String, Any]())
-      val finalConstExprValueMap = ExprValueUtil.updateExprValueMap(ruleAnalyzer.constFinalCacheExprs, constExprValueMap)
-
-      // data connector
-      val sourceDataConnector: DataConnector =
-        DataConnectorFactory.getDataConnector(sqlContext, userParam.sourceParam,
-          ruleAnalyzer.sourceRuleExprs, finalConstExprValueMap
-        ) match {
-          case Success(cntr) => {
-            if (cntr.available) cntr
-            else throw new Exception("source data connection error!")
-          }
-          case Failure(ex) => throw ex
-        }
-      val targetDataConnector: DataConnector =
-        DataConnectorFactory.getDataConnector(sqlContext, userParam.targetParam,
-          ruleAnalyzer.targetRuleExprs, finalConstExprValueMap
-        ) match {
-          case Success(cntr) => {
-            if (cntr.available) cntr
-            else throw new Exception("target data connection error!")
-          }
-          case Failure(ex) => throw ex
-        }
-
-      // get metadata
-//      val sourceMetaData: Iterable[(String, String)] = sourceDataConnector.metaData() match {
-//        case Success(md) => md
-//        case _ => throw new Exception("source metadata error!")
-//      }
-//      val targetMetaData: Iterable[(String, String)] = targetDataConnector.metaData() match {
-//        case Success(md) => md
-//        case _ => throw new Exception("target metadata error!")
-//      }
-
-      // get data
-      val sourceData: RDD[(Product, Map[String, Any])] = sourceDataConnector.data() match {
-        case Success(dt) => dt
-        case Failure(ex) => throw ex
-      }
-      val targetData: RDD[(Product, Map[String, Any])] = targetDataConnector.data() match {
-        case Success(dt) => dt
-        case Failure(ex) => throw ex
-      }
-
-      // accuracy algorithm
-      val (accuResult, missingRdd, matchedRdd) = accuracy(sourceData, targetData, ruleAnalyzer)
-
-      // end time
-      val endTime = new Date().getTime
-      persist.log(endTime, s"calculation using time: ${endTime - startTime} ms")
-
-      // persist result
-      persist.result(endTime, accuResult)
-      val missingRecords = missingRdd.map(record2String(_, ruleAnalyzer.sourceRuleExprs.persistExprs, ruleAnalyzer.targetRuleExprs.persistExprs))
-      persist.missRecords(missingRecords)
-
-      // persist end time
-      val persistEndTime = new Date().getTime
-      persist.log(persistEndTime, s"persist using time: ${persistEndTime - endTime} ms")
-
-      // finish
-      persist.finish()
-
-      // context stop
-      sc.stop
-
-    }
-  }
-
-  def wrapInitData(data: Map[String, Any]): (Map[String, Any], Map[String, Any]) = {
-    (data, Map[String, Any]())
-  }
-
-  // calculate accuracy between source data and target data
-  def accuracy(sourceData: RDD[(Product, Map[String, Any])], targetData: RDD[(Product, Map[String, Any])], ruleAnalyzer: RuleAnalyzer
-              ): (AccuracyResult, RDD[(Product, (Map[String, Any], Map[String, Any]))], RDD[(Product, (Map[String, Any], Map[String, Any]))]) = {
-
-    // 1. wrap data
-    val sourceWrappedData: RDD[(Product, (Map[String, Any], Map[String, Any]))] = sourceData.map(r => (r._1, wrapInitData(r._2)))
-    val targetWrappedData: RDD[(Product, (Map[String, Any], Map[String, Any]))] = targetData.map(r => (r._1, wrapInitData(r._2)))
-
-    // 2. cogroup
-    val allKvs = sourceWrappedData.cogroup(targetWrappedData)
-
-    // 3. accuracy calculation
-    val (accuResult, missingRdd, matchedRdd) = AccuracyCore.accuracy(allKvs, ruleAnalyzer)
-
-    (accuResult, missingRdd, matchedRdd)
-  }
-
-  // convert data into a string
-  def record2String(rec: (Product, (Map[String, Any], Map[String, Any])), sourcePersist: Iterable[Expr], targetPersist: Iterable[Expr]): String = {
-    val (key, (data, info)) = rec
-    val persistData = getPersistMap(data, sourcePersist)
-    val persistInfo = info.mapValues { value =>
-      value match {
-        case vd: Map[String, Any] => getPersistMap(vd, targetPersist)
-        case v => v
-      }
-    }
-    s"${persistData} [${persistInfo}]"
-  }
-
-  // get the expr value map of the persist expressions
-  private def getPersistMap(data: Map[String, Any], persist: Iterable[Expr]): Map[String, Any] = {
-    val persistMap = persist.map(e => (e._id, e.desc)).toMap
-    data.flatMap { pair =>
-      val (k, v) = pair
-      persistMap.get(k) match {
-        case Some(d) => Some((d -> v))
-        case _ => None
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/BatchProfileAlgo.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/BatchProfileAlgo.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/BatchProfileAlgo.scala
deleted file mode 100644
index 7ea8d12..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/BatchProfileAlgo.scala
+++ /dev/null
@@ -1,137 +0,0 @@
-package org.apache.griffin.measure.batch.algo
-
-import java.util.Date
-
-import org.apache.griffin.measure.batch.algo.core.ProfileCore
-import org.apache.griffin.measure.batch.config.params._
-import org.apache.griffin.measure.batch.connector.{DataConnector, DataConnectorFactory}
-import org.apache.griffin.measure.batch.persist.{Persist, PersistFactory}
-import org.apache.griffin.measure.batch.result._
-import org.apache.griffin.measure.batch.rule.{ExprValueUtil, RuleAnalyzer, RuleFactory}
-import org.apache.griffin.measure.batch.rule.expr.{Expr, StatementExpr}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.hive.HiveContext
-import org.apache.spark.{SparkConf, SparkContext}
-
-import scala.util.{Failure, Success, Try}
-
-// profile algorithm for batch mode
-case class BatchProfileAlgo(allParam: AllParam) extends ProfileAlgo {
-  val envParam = allParam.envParam
-  val userParam = allParam.userParam
-
-  def run(): Try[_] = {
-    Try {
-      val metricName = userParam.name
-
-      val conf = new SparkConf().setAppName(metricName)
-      val sc = new SparkContext(conf)
-      val sqlContext = new HiveContext(sc)
-
-      // start time
-      val startTime = new Date().getTime()
-
-      // get persists to persist measure result
-      val persist: Persist = PersistFactory(envParam.persistParams, metricName).getPersists(startTime)
-
-      // get spark application id
-      val applicationId = sc.applicationId
-
-      // persist start id
-      persist.start(applicationId)
-
-      // generate rule from rule param, generate rule analyzer
-      val ruleFactory = RuleFactory(userParam.evaluateRuleParam)
-      val rule: StatementExpr = ruleFactory.generateRule()
-      val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
-
-      // const expr value map
-      val constExprValueMap = ExprValueUtil.genExprValueMap(None, ruleAnalyzer.constCacheExprs, Map[String, Any]())
-      val finalConstExprValueMap = ExprValueUtil.updateExprValueMap(ruleAnalyzer.constFinalCacheExprs, constExprValueMap)
-
-      // data connector
-      val sourceDataConnector: DataConnector =
-      DataConnectorFactory.getDataConnector(sqlContext, userParam.sourceParam,
-        ruleAnalyzer.sourceRuleExprs, finalConstExprValueMap
-      ) match {
-        case Success(cntr) => {
-          if (cntr.available) cntr
-          else throw new Exception("source data connection error!")
-        }
-        case Failure(ex) => throw ex
-      }
-
-      // get metadata
-      //      val sourceMetaData: Iterable[(String, String)] = sourceDataConnector.metaData() match {
-      //        case Success(md) => md
-      //        case _ => throw new Exception("source metadata error!")
-      //      }
-
-      // get data
-      val sourceData: RDD[(Product, Map[String, Any])] = sourceDataConnector.data() match {
-        case Success(dt) => dt
-        case Failure(ex) => throw ex
-      }
-
-      // profile algorithm
-      val (profileResult, missingRdd, matchedRdd) = profile(sourceData, ruleAnalyzer)
-
-      // end time
-      val endTime = new Date().getTime
-      persist.log(endTime, s"calculation using time: ${endTime - startTime} ms")
-
-      // persist result
-      persist.result(endTime, profileResult)
-      val matchedRecords = matchedRdd.map(record2String(_, ruleAnalyzer.sourceRuleExprs.persistExprs))
-      persist.matchRecords(matchedRecords)
-
-      // persist end time
-      val persistEndTime = new Date().getTime
-      persist.log(persistEndTime, s"persist using time: ${persistEndTime - endTime} ms")
-
-      // finish
-      persist.finish()
-
-      // context stop
-      sc.stop
-    }
-  }
-
-  def wrapInitData(data: Map[String, Any]): (Map[String, Any], Map[String, Any]) = {
-    (data, Map[String, Any]())
-  }
-
-  // calculate profile from source data
-  def profile(sourceData: RDD[(Product, Map[String, Any])], ruleAnalyzer: RuleAnalyzer
-              ): (ProfileResult, RDD[(Product, (Map[String, Any], Map[String, Any]))], RDD[(Product, (Map[String, Any], Map[String, Any]))]) = {
-
-    // 1. wrap data
-    val sourceWrappedData: RDD[(Product, (Map[String, Any], Map[String, Any]))] = sourceData.map(r => (r._1, wrapInitData(r._2)))
-
-    // 2. profile calculation
-    val (profileResult, missingRdd, matchedRdd) = ProfileCore.profile(sourceWrappedData, ruleAnalyzer)
-
-    (profileResult, missingRdd, matchedRdd)
-  }
-
-  // convert data into a string
-  def record2String(rec: (Product, (Map[String, Any], Map[String, Any])), sourcePersist: Iterable[Expr]): String = {
-    val (key, (data, info)) = rec
-    val persistData = getPersistMap(data, sourcePersist)
-    val persistInfo = info
-    if (persistInfo.size > 0) s"${persistData} [${persistInfo}]" else s"${persistData}"
-  }
-
-  // get the expr value map of the persist expressions
-  private def getPersistMap(data: Map[String, Any], persist: Iterable[Expr]): Map[String, Any] = {
-    val persistMap = persist.map(e => (e._id, e.desc)).toMap
-    data.flatMap { pair =>
-      val (k, v) = pair
-      persistMap.get(k) match {
-        case Some(d) => Some((d -> v))
-        case _ => None
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/MeasureType.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/MeasureType.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/MeasureType.scala
deleted file mode 100644
index 4f6924c..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/MeasureType.scala
+++ /dev/null
@@ -1,8 +0,0 @@
-package org.apache.griffin.measure.batch.algo
-
-object MeasureType {
-
-  val accuracy = """^(?i)accuracy$""".r
-  val profile = """^(?i)profile""".r
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/ProfileAlgo.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/ProfileAlgo.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/ProfileAlgo.scala
deleted file mode 100644
index fe64767..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/ProfileAlgo.scala
+++ /dev/null
@@ -1,5 +0,0 @@
-package org.apache.griffin.measure.batch.algo
-
-trait ProfileAlgo extends Algo {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/core/AccuracyCore.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/core/AccuracyCore.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/core/AccuracyCore.scala
deleted file mode 100644
index 9e544e4..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/core/AccuracyCore.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-package org.apache.griffin.measure.batch.algo.core
-
-import org.apache.griffin.measure.batch.rule.RuleAnalyzer
-import org.apache.griffin.measure.batch.result._
-import org.apache.spark.rdd.RDD
-
-
-object AccuracyCore {
-
-  type V = Map[String, Any]
-  type T = Map[String, Any]
-
-  // allKvs: rdd of (key, (List[(sourceData, sourceInfo)], List[(targetData, targetInfo)]))
-  // output: accuracy result, missing source data rdd, matched source data rdd
-  def accuracy(allKvs: RDD[(Product, (Iterable[(V, T)], Iterable[(V, T)]))], ruleAnalyzer: RuleAnalyzer
-              ): (AccuracyResult, RDD[(Product, (V, T))], RDD[(Product, (V, T))]) = {
-    val result: RDD[(Long, Long, List[(Product, (V, T))], List[(Product, (V, T))])] = allKvs.map { kv =>
-      val (key, (sourceDatas, targetDatas)) = kv
-
-      // result: (missCount, matchCount, missDataList, matchDataList)
-      val rslt = sourceDatas.foldLeft((0L, 0L, List[(Product, (V, T))](), List[(Product, (V, T))]())) { (sr, sourcePair) =>
-        val matchResult = if (targetDatas.isEmpty) {
-          (false, Map[String, Any](MismatchInfo.wrap("no target")))
-        } else {
-          targetDatas.foldLeft((false, Map[String, Any]())) { (tr, targetPair) =>
-            if (tr._1) tr
-            else matchData(sourcePair, targetPair, ruleAnalyzer)
-          }
-        }
-
-        if (matchResult._1) {
-          val matchItem = (key, sourcePair)
-          (sr._1, sr._2 + 1, sr._3, sr._4 :+ matchItem)
-        } else {
-          val missItem = (key, (sourcePair._1, sourcePair._2 ++ matchResult._2))
-          (sr._1 + 1, sr._2, sr._3 :+ missItem, sr._4)
-        }
-      }
-
-      rslt
-    }
-
-    val missRdd = result.flatMap(_._3)
-    val matchRdd = result.flatMap(_._4)
-
-    def seq(cnt: (Long, Long), rcd: (Long, Long, Any, Any)): (Long, Long) = {
-      (cnt._1 + rcd._1, cnt._2 + rcd._2)
-    }
-    def comb(c1: (Long, Long), c2: (Long, Long)): (Long, Long) = {
-      (c1._1 + c2._1, c1._2 + c2._2)
-    }
-    val countPair = result.aggregate((0L, 0L))(seq, comb)
-
-    (AccuracyResult(countPair._1, (countPair._1 + countPair._2)), missRdd, matchRdd)
-  }
-
-  // try to match source and target data, return true if matched, false if unmatched, also with some matching info
-  private def matchData(source: (V, T), target: (V, T), ruleAnalyzer: RuleAnalyzer): (Boolean, T) = {
-
-    // 1. merge source and target cached data
-    val mergedExprValueMap: Map[String, Any] = mergeExprValueMap(source, target)
-
-    // 2. check valid
-    if (ruleAnalyzer.rule.valid(mergedExprValueMap)) {
-      // 3. substitute the cached data into statement, get the statement value
-      val matched = ruleAnalyzer.rule.calculate(mergedExprValueMap) match {
-        case Some(b: Boolean) => b
-        case _ => false
-      }
-      // currently we can not get the mismatch reason, we need to add such information to figure out how it mismatches
-      if (matched) (matched, Map[String, Any]())
-      else (matched, Map[String, Any](MismatchInfo.wrap("not matched"), TargetInfo.wrap(target._1)))
-    } else {
-      (false, Map[String, Any](MismatchInfo.wrap("invalid to compare"), TargetInfo.wrap(target._1)))
-    }
-
-  }
-
-  private def mergeExprValueMap(source: (V, T), target: (V, T)): Map[String, Any] = {
-    source._1 ++ target._1
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/core/ProfileCore.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/core/ProfileCore.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/core/ProfileCore.scala
deleted file mode 100644
index c9fbcff..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/core/ProfileCore.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-package org.apache.griffin.measure.batch.algo.core
-
-import org.apache.griffin.measure.batch.rule.RuleAnalyzer
-import org.apache.griffin.measure.batch.result._
-import org.apache.spark.rdd.RDD
-
-
-object ProfileCore {
-
-  type V = Map[String, Any]
-  type T = Map[String, Any]
-
-  // allKvs: rdd of (key, (List[(sourceData, sourceInfo)], List[(targetData, targetInfo)]))
-  // output: accuracy result, missing source data rdd, matched source data rdd
-  def profile(dataRdd: RDD[(Product, (V, T))], ruleAnalyzer: RuleAnalyzer
-              ): (ProfileResult, RDD[(Product, (V, T))], RDD[(Product, (V, T))]) = {
-
-    val resultRdd: RDD[((Product, (V, T)), Boolean)] = dataRdd.map { kv =>
-      val (key, (data, info)) = kv
-      val (matched, missInfo) = matchData(data, ruleAnalyzer)
-      ((key, (data, info ++ missInfo)), matched)
-    }
-
-    val totalCount = resultRdd.count
-    val matchRdd = resultRdd.filter(_._2).map(_._1)
-    val matchCount = matchRdd.count
-    val missRdd = resultRdd.filter(!_._2).map(_._1)
-    val missCount = missRdd.count
-
-    (ProfileResult(matchCount, totalCount), missRdd, matchRdd)
-
-  }
-
-  // try to match data as rule, return true if matched, false if unmatched
-  private def matchData(data: V, ruleAnalyzer: RuleAnalyzer): (Boolean, T) = {
-
-    // 1. check valid
-    if (ruleAnalyzer.rule.valid(data)) {
-      // 2. substitute the cached data into statement, get the statement value
-      val matched = ruleAnalyzer.rule.calculate(data) match {
-        case Some(b: Boolean) => b
-        case _ => false
-      }
-      // currently we can not get the mismatch reason, we need to add such information to figure out how it mismatches
-      if (matched) (matched, Map[String, Any]())
-      else (matched, Map[String, Any](MismatchInfo.wrap("not matched")))
-    } else {
-      (false, Map[String, Any](MismatchInfo.wrap("invalid to compare")))
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/AllParam.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/AllParam.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/AllParam.scala
deleted file mode 100644
index b5618a8..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/AllParam.scala
+++ /dev/null
@@ -1,14 +0,0 @@
-package org.apache.griffin.measure.batch.config.params
-
-import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-import org.apache.griffin.measure.batch.config.params.env._
-import org.apache.griffin.measure.batch.config.params.user._
-
-// simply composite of env and user params, for convenient usage
-@JsonInclude(Include.NON_NULL)
-case class AllParam( @JsonProperty("env") envParam: EnvParam,
-                     @JsonProperty("user") userParam: UserParam
-                   ) extends Param {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/Param.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/Param.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/Param.scala
deleted file mode 100644
index ce94c5c..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/Param.scala
+++ /dev/null
@@ -1,7 +0,0 @@
-package org.apache.griffin.measure.batch.config.params
-
-trait Param extends Serializable {
-
-  def validate(): Boolean = true
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/CleanerParam.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/CleanerParam.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/CleanerParam.scala
deleted file mode 100644
index a29004c..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/CleanerParam.scala
+++ /dev/null
@@ -1,10 +0,0 @@
-package org.apache.griffin.measure.batch.config.params.env
-
-import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-import org.apache.griffin.measure.batch.config.params.Param
-
-@JsonInclude(Include.NON_NULL)
-case class CleanerParam() extends Param {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/EnvParam.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/EnvParam.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/EnvParam.scala
deleted file mode 100644
index 79e2575..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/EnvParam.scala
+++ /dev/null
@@ -1,13 +0,0 @@
-package org.apache.griffin.measure.batch.config.params.env
-
-import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-import org.apache.griffin.measure.batch.config.params.Param
-
-@JsonInclude(Include.NON_NULL)
-case class EnvParam( @JsonProperty("spark") sparkParam: SparkParam,
-                     @JsonProperty("persist") persistParams: List[PersistParam],
-                     @JsonProperty("cleaner") cleanerParam: CleanerParam
-                   ) extends Param {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/PersistParam.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/PersistParam.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/PersistParam.scala
deleted file mode 100644
index 655f19b..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/PersistParam.scala
+++ /dev/null
@@ -1,12 +0,0 @@
-package org.apache.griffin.measure.batch.config.params.env
-
-import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-import org.apache.griffin.measure.batch.config.params.Param
-
-@JsonInclude(Include.NON_NULL)
-case class PersistParam( @JsonProperty("type") persistType: String,
-                         @JsonProperty("config") config: Map[String, Any]
-                       ) extends Param {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/SparkParam.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/SparkParam.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/SparkParam.scala
deleted file mode 100644
index ba6f9a6..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/SparkParam.scala
+++ /dev/null
@@ -1,13 +0,0 @@
-package org.apache.griffin.measure.batch.config.params.env
-
-import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-import org.apache.griffin.measure.batch.config.params.Param
-
-@JsonInclude(Include.NON_NULL)
-case class SparkParam( @JsonProperty("log.level") logLevel: String,
-                       @JsonProperty("checkpoint.dir") cpDir: String,
-                       @JsonProperty("config") config: Map[String, Any]
-                     ) extends Param {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/user/DataConnectorParam.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/user/DataConnectorParam.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/user/DataConnectorParam.scala
deleted file mode 100644
index 7a8b2b2..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/user/DataConnectorParam.scala
+++ /dev/null
@@ -1,13 +0,0 @@
-package org.apache.griffin.measure.batch.config.params.user
-
-import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-import org.apache.griffin.measure.batch.config.params.Param
-
-@JsonInclude(Include.NON_NULL)
-case class DataConnectorParam( @JsonProperty("type") conType: String,
-                               @JsonProperty("version") version: String,
-                               @JsonProperty("config") config: Map[String, Any]
-                             ) extends Param {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/user/EvaluateRuleParam.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/user/EvaluateRuleParam.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/user/EvaluateRuleParam.scala
deleted file mode 100644
index edf0a38..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/user/EvaluateRuleParam.scala
+++ /dev/null
@@ -1,12 +0,0 @@
-package org.apache.griffin.measure.batch.config.params.user
-
-import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-import org.apache.griffin.measure.batch.config.params.Param
-
-@JsonInclude(Include.NON_NULL)
-case class EvaluateRuleParam( @JsonProperty("sampleRatio") sampleRatio: Double,
-                              @JsonProperty("rules") rules: String
-                            ) extends Param {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/user/UserParam.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/user/UserParam.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/user/UserParam.scala
deleted file mode 100644
index be0d3c8..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/user/UserParam.scala
+++ /dev/null
@@ -1,15 +0,0 @@
-package org.apache.griffin.measure.batch.config.params.user
-
-import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-import org.apache.griffin.measure.batch.config.params.Param
-
-@JsonInclude(Include.NON_NULL)
-case class UserParam(@JsonProperty("name") name: String,
-                     @JsonProperty("type") dqType: String,
-                     @JsonProperty("source") sourceParam: DataConnectorParam,
-                     @JsonProperty("target") targetParam: DataConnectorParam,
-                     @JsonProperty("evaluateRule") evaluateRuleParam: EvaluateRuleParam
-                    ) extends Param {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamFileReader.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamFileReader.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamFileReader.scala
deleted file mode 100644
index e30513f..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamFileReader.scala
+++ /dev/null
@@ -1,20 +0,0 @@
-package org.apache.griffin.measure.batch.config.reader
-
-import org.apache.griffin.measure.batch.config.params.Param
-import org.apache.griffin.measure.batch.utils.JsonUtil
-
-import scala.util.Try
-
-case class ParamFileReader(file: String) extends ParamReader {
-
-  def readConfig[T <: Param](implicit m : Manifest[T]): Try[T] = {
-    Try {
-      val source = scala.io.Source.fromFile(file)
-      val lines = source.mkString
-      val param = JsonUtil.fromJson[T](lines)
-      source.close
-      param
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamHdfsFileReader.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamHdfsFileReader.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamHdfsFileReader.scala
deleted file mode 100644
index 473b428..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamHdfsFileReader.scala
+++ /dev/null
@@ -1,20 +0,0 @@
-package org.apache.griffin.measure.batch.config.reader
-
-import org.apache.griffin.measure.batch.config.params.Param
-import org.apache.griffin.measure.batch.utils.JsonUtil
-import org.apache.griffin.measure.batch.utils.HdfsUtil
-
-import scala.util.Try
-
-case class ParamHdfsFileReader(filePath: String) extends ParamReader {
-
-  def readConfig[T <: Param](implicit m : Manifest[T]): Try[T] = {
-    Try {
-      val source = HdfsUtil.openFile(filePath)
-      val param = JsonUtil.fromJson[T](source)
-      source.close
-      param
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamRawStringReader.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamRawStringReader.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamRawStringReader.scala
deleted file mode 100644
index 1e931de..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamRawStringReader.scala
+++ /dev/null
@@ -1,17 +0,0 @@
-package org.apache.griffin.measure.batch.config.reader
-
-import org.apache.griffin.measure.batch.config.params.Param
-import org.apache.griffin.measure.batch.utils.JsonUtil
-
-import scala.util.Try
-
-case class ParamRawStringReader(rawString: String) extends ParamReader {
-
-  def readConfig[T <: Param](implicit m : Manifest[T]): Try[T] = {
-    Try {
-      val param = JsonUtil.fromJson[T](rawString)
-      param
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamReader.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamReader.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamReader.scala
deleted file mode 100644
index 2f42b09..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamReader.scala
+++ /dev/null
@@ -1,12 +0,0 @@
-package org.apache.griffin.measure.batch.config.reader
-
-import org.apache.griffin.measure.batch.log.Loggable
-import org.apache.griffin.measure.batch.config.params.Param
-
-import scala.util.Try
-
-trait ParamReader extends Loggable with Serializable {
-
-  def readConfig[T <: Param](implicit m : Manifest[T]): Try[T]
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamReaderFactory.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamReaderFactory.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamReaderFactory.scala
deleted file mode 100644
index dd77e8c..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamReaderFactory.scala
+++ /dev/null
@@ -1,22 +0,0 @@
-package org.apache.griffin.measure.batch.config.reader
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-
-
-object ParamReaderFactory {
-
-  val RawStringRegex = """^(?i)raw$""".r
-  val LocalFsRegex = """^(?i)local$""".r
-  val HdfsFsRegex = """^(?i)hdfs$""".r
-
-  def getParamReader(filePath: String, fsType: String): ParamReader = {
-    fsType match {
-      case RawStringRegex() => ParamRawStringReader(filePath)
-      case LocalFsRegex() => ParamFileReader(filePath)
-      case HdfsFsRegex() => ParamHdfsFileReader(filePath)
-      case _ => ParamHdfsFileReader(filePath)
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidator.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidator.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidator.scala
deleted file mode 100644
index ebcc74f..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidator.scala
+++ /dev/null
@@ -1,16 +0,0 @@
-package org.apache.griffin.measure.batch.config.validator
-
-import org.apache.griffin.measure.batch.config.params.Param
-
-import scala.util.Try
-
-// need to validate params
-case class AllParamValidator() extends ParamValidator {
-
-  def validate[T <: Param](param: Param): Try[Boolean] = {
-    Try {
-      param.validate
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/ParamValidator.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/ParamValidator.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/ParamValidator.scala
deleted file mode 100644
index 9dc9e60..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/ParamValidator.scala
+++ /dev/null
@@ -1,12 +0,0 @@
-package org.apache.griffin.measure.batch.config.validator
-
-import org.apache.griffin.measure.batch.log.Loggable
-import org.apache.griffin.measure.batch.config.params.Param
-
-import scala.util.Try
-
-trait ParamValidator extends Loggable with Serializable {
-
-  def validate[T <: Param](param: Param): Try[Boolean]
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/AvroDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/AvroDataConnector.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/AvroDataConnector.scala
deleted file mode 100644
index ade993c..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/AvroDataConnector.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-package org.apache.griffin.measure.batch.connector
-
-import org.apache.griffin.measure.batch.rule.expr._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
-import com.databricks.spark.avro._
-
-import scala.util.{Success, Try}
-import java.nio.file.{Files, Paths}
-
-import org.apache.griffin.measure.batch.rule.{ExprValueUtil, RuleExprs}
-import org.apache.griffin.measure.batch.utils.HdfsUtil
-
-// data connector for avro file
-case class AvroDataConnector(sqlContext: SQLContext, config: Map[String, Any],
-                             ruleExprs: RuleExprs, constFinalExprValueMap: Map[String, Any]
-                            ) extends DataConnector {
-
-  val FilePath = "file.path"
-  val FileName = "file.name"
-
-  val filePath = config.getOrElse(FilePath, "").toString
-  val fileName = config.getOrElse(FileName, "").toString
-
-  val concreteFileFullPath = if (pathPrefix) s"${filePath}${fileName}" else fileName
-
-  private def pathPrefix(): Boolean = {
-    filePath.nonEmpty
-  }
-
-  private def fileExist(): Boolean = {
-    HdfsUtil.existPath(concreteFileFullPath)
-  }
-
-  def available(): Boolean = {
-    (!concreteFileFullPath.isEmpty) && fileExist
-  }
-
-  def metaData(): Try[Iterable[(String, String)]] = {
-    Try {
-      val st = sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath).schema
-      st.fields.map(f => (f.name, f.dataType.typeName))
-    }
-  }
-
-  def data(): Try[RDD[(Product, Map[String, Any])]] = {
-    Try {
-      loadDataFile.flatMap { row =>
-        // generate cache data
-        val cacheExprValueMap: Map[String, Any] = ruleExprs.cacheExprs.foldLeft(constFinalExprValueMap) { (cachedMap, expr) =>
-          ExprValueUtil.genExprValueMap(Some(row), expr, cachedMap)
-        }
-        val finalExprValueMap = ExprValueUtil.updateExprValueMap(ruleExprs.finalCacheExprs, cacheExprValueMap)
-
-        // when clause filter data source
-        val whenResult = ruleExprs.whenClauseExprOpt match {
-          case Some(whenClause) => whenClause.calculate(finalExprValueMap)
-          case _ => None
-        }
-
-        // get groupby data
-        whenResult match {
-          case Some(false) => None
-          case _ => {
-            val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { expr =>
-              expr.calculate(finalExprValueMap) match {
-                case Some(v) => Some(v.asInstanceOf[AnyRef])
-                case _ => None
-              }
-            }
-            val key = toTuple(groupbyData)
-
-            Some((key, finalExprValueMap))
-          }
-        }
-      }
-    }
-  }
-
-  private def loadDataFile() = {
-    sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath)
-  }
-
-  private def toTuple[A <: AnyRef](as: Seq[A]): Product = {
-    if (as.size > 0) {
-      val tupleClass = Class.forName("scala.Tuple" + as.size)
-      tupleClass.getConstructors.apply(0).newInstance(as: _*).asInstanceOf[Product]
-    } else None
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnector.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnector.scala
deleted file mode 100644
index 9cc9be6..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnector.scala
+++ /dev/null
@@ -1,16 +0,0 @@
-package org.apache.griffin.measure.batch.connector
-
-import org.apache.spark.rdd.RDD
-
-import scala.util.Try
-
-
-trait DataConnector extends Serializable {
-
-  def available(): Boolean
-
-  def metaData(): Try[Iterable[(String, String)]]
-
-  def data(): Try[RDD[(Product, Map[String, Any])]]
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnectorFactory.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnectorFactory.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnectorFactory.scala
deleted file mode 100644
index f637b86..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnectorFactory.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-package org.apache.griffin.measure.batch.connector
-
-import org.apache.griffin.measure.batch.config.params.user._
-import org.apache.griffin.measure.batch.rule.RuleExprs
-import org.apache.griffin.measure.batch.rule.expr._
-import org.apache.spark.sql.SQLContext
-
-import scala.util.Try
-
-object DataConnectorFactory {
-
-  val HiveRegex = """^(?i)hive$""".r
-  val AvroRegex = """^(?i)avro$""".r
-
-  def getDataConnector(sqlContext: SQLContext,
-                       dataConnectorParam: DataConnectorParam,
-                       ruleExprs: RuleExprs,
-                       globalFinalCacheMap: Map[String, Any]
-                      ): Try[DataConnector] = {
-    val conType = dataConnectorParam.conType
-    val version = dataConnectorParam.version
-    Try {
-      conType match {
-        case HiveRegex() => HiveDataConnector(sqlContext, dataConnectorParam.config, ruleExprs, globalFinalCacheMap)
-        case AvroRegex() => AvroDataConnector(sqlContext, dataConnectorParam.config, ruleExprs, globalFinalCacheMap)
-        case _ => throw new Exception("connector creation error!")
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/HiveDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/HiveDataConnector.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/HiveDataConnector.scala
deleted file mode 100644
index 5d06210..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/HiveDataConnector.scala
+++ /dev/null
@@ -1,113 +0,0 @@
-package org.apache.griffin.measure.batch.connector
-
-import org.apache.griffin.measure.batch.rule.{ExprValueUtil, RuleExprs}
-import org.apache.griffin.measure.batch.rule.expr._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
-
-import scala.util.{Success, Try}
-
-// data connector for hive
-case class HiveDataConnector(sqlContext: SQLContext, config: Map[String, Any],
-                             ruleExprs: RuleExprs, constFinalExprValueMap: Map[String, Any]
-                            ) extends DataConnector {
-
-  val Database = "database"
-  val TableName = "table.name"
-  val Partitions = "partitions"
-
-  val database = config.getOrElse(Database, "").toString
-  val tableName = config.getOrElse(TableName, "").toString
-  val partitionsString = config.getOrElse(Partitions, "").toString
-
-  val concreteTableName = if (dbPrefix) s"${database}.${tableName}" else tableName
-  val partitions = partitionsString.split(";").map(s => s.split(",").map(_.trim))
-
-  private def dbPrefix(): Boolean = {
-    database.nonEmpty && !database.equals("default")
-  }
-
-  def available(): Boolean = {
-    (!tableName.isEmpty) && {
-      Try {
-        if (dbPrefix) {
-          sqlContext.tables(database).filter(tableExistsSql).collect.size
-        } else {
-          sqlContext.tables().filter(tableExistsSql).collect.size
-        }
-      } match {
-        case Success(s) => s > 0
-        case _ => false
-      }
-    }
-  }
-
-  def metaData(): Try[Iterable[(String, String)]] = {
-    Try {
-      val originRows = sqlContext.sql(metaDataSql).map(r => (r.getString(0), r.getString(1))).collect
-      val partitionPos: Int = originRows.indexWhere(pair => pair._1.startsWith("# "))
-      if (partitionPos < 0) originRows
-      else originRows.take(partitionPos)
-    }
-  }
-
-  def data(): Try[RDD[(Product, Map[String, Any])]] = {
-    Try {
-      sqlContext.sql(dataSql).flatMap { row =>
-        // generate cache data
-        val cacheExprValueMap: Map[String, Any] = ruleExprs.cacheExprs.foldLeft(constFinalExprValueMap) { (cachedMap, expr) =>
-          ExprValueUtil.genExprValueMap(Some(row), expr, cachedMap)
-        }
-        val finalExprValueMap = ExprValueUtil.updateExprValueMap(ruleExprs.finalCacheExprs, cacheExprValueMap)
-
-        // when clause filter data source
-        val whenResult = ruleExprs.whenClauseExprOpt match {
-          case Some(whenClause) => whenClause.calculate(finalExprValueMap)
-          case _ => None
-        }
-
-        // get groupby data
-        whenResult match {
-          case Some(false) => None
-          case _ => {
-            val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { expr =>
-              expr.calculate(finalExprValueMap) match {
-                case Some(v) => Some(v.asInstanceOf[AnyRef])
-                case _ => None
-              }
-            }
-            val key = toTuple(groupbyData)
-
-            Some((key, finalExprValueMap))
-          }
-        }
-      }
-    }
-  }
-
-  private def tableExistsSql(): String = {
-//    s"SHOW TABLES LIKE '${concreteTableName}'"    // this is hive sql, but not work for spark sql
-    s"tableName LIKE '${tableName}'"
-  }
-
-  private def metaDataSql(): String = {
-    s"DESCRIBE ${concreteTableName}"
-  }
-
-  private def dataSql(): String = {
-    val clauses = partitions.map { prtn =>
-      val cls = prtn.mkString(" AND ")
-      if (cls.isEmpty) s"SELECT * FROM ${concreteTableName}"
-      else s"SELECT * FROM ${concreteTableName} WHERE ${cls}"
-    }
-    clauses.mkString(" UNION ALL ")
-  }
-
-  private def toTuple[A <: AnyRef](as: Seq[A]): Product = {
-    if (as.size > 0) {
-      val tupleClass = Class.forName("scala.Tuple" + as.size)
-      tupleClass.getConstructors.apply(0).newInstance(as: _*).asInstanceOf[Product]
-    } else None
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/log/Loggable.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/log/Loggable.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/log/Loggable.scala
deleted file mode 100644
index f73e86c..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/log/Loggable.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-package org.apache.griffin.measure.batch.log
-
-import org.slf4j.LoggerFactory
-
-trait Loggable {
-
-  @transient private lazy val logger = LoggerFactory.getLogger(getClass)
-
-  protected def info(msg: String): Unit = {
-    logger.info(msg)
-  }
-
-  protected def debug(msg: String): Unit = {
-    logger.debug(msg)
-  }
-
-  protected def warn(msg: String): Unit = {
-    logger.warn(msg)
-  }
-
-  protected def error(msg: String): Unit = {
-    logger.error(msg)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala
deleted file mode 100644
index 03955d5..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-package org.apache.griffin.measure.batch.persist
-
-import java.util.Date
-
-import org.apache.griffin.measure.batch.result._
-import org.apache.griffin.measure.batch.utils.HdfsUtil
-import org.apache.spark.rdd.RDD
-
-import scala.util.Try
-
-// persist result and data to hdfs
-case class HdfsPersist(config: Map[String, Any], metricName: String, timeStamp: Long) extends Persist {
-
-  val Path = "path"
-  val MaxPersistLines = "max.persist.lines"
-  val MaxLinesPerFile = "max.lines.per.file"
-
-  val path = config.getOrElse(Path, "").toString
-  val maxPersistLines = try { config.getOrElse(MaxPersistLines, -1).toString.toInt } catch { case _ => -1 }
-  val maxLinesPerFile = try { config.getOrElse(MaxLinesPerFile, 10000).toString.toLong } catch { case _ => 10000 }
-
-  val separator = "/"
-
-  val StartFile = filePath("_START")
-  val FinishFile = filePath("_FINISH")
-  val ResultFile = filePath("_RESULT")
-
-  val MissRecFile = filePath("_MISSREC")      // optional
-  val MatchRecFile = filePath("_MATCHREC")    // optional
-
-  val LogFile = filePath("_LOG")
-
-  var _init = true
-  private def isInit = {
-    val i = _init
-    _init = false
-    i
-  }
-
-  def available(): Boolean = {
-    (path.nonEmpty) && (maxPersistLines < Int.MaxValue)
-  }
-
-  private def persistHead: String = {
-    val dt = new Date(timeStamp)
-    s"================ log of ${dt} ================\n"
-  }
-
-  private def timeHead(rt: Long): String = {
-    val dt = new Date(rt)
-    s"--- ${dt} ---\n"
-  }
-
-  protected def getFilePath(parentPath: String, fileName: String): String = {
-    if (parentPath.endsWith(separator)) parentPath + fileName else parentPath + separator + fileName
-  }
-
-  protected def filePath(file: String): String = {
-    getFilePath(path, s"${metricName}/${timeStamp}/${file}")
-  }
-
-  protected def withSuffix(path: String, suffix: String): String = {
-    s"${path}.${suffix}"
-  }
-
-  def start(msg: String): Unit = {
-    try {
-      HdfsUtil.writeContent(StartFile, msg)
-    } catch {
-      case e: Throwable => error(e.getMessage)
-    }
-  }
-  def finish(): Unit = {
-    try {
-      HdfsUtil.createEmptyFile(FinishFile)
-    } catch {
-      case e: Throwable => error(e.getMessage)
-    }
-  }
-
-  def result(rt: Long, result: Result): Unit = {
-    try {
-      val resStr = result match {
-        case ar: AccuracyResult => {
-          s"match percentage: ${ar.matchPercentage}\ntotal count: ${ar.getTotal}\nmiss count: ${ar.getMiss}, match count: ${ar.getMatch}"
-        }
-        case pr: ProfileResult => {
-          s"match percentage: ${pr.matchPercentage}\ntotal count: ${pr.getTotal}\nmiss count: ${pr.getMiss}, match count: ${pr.getMatch}"
-        }
-        case _ => {
-          s"result: ${result}"
-        }
-      }
-      HdfsUtil.writeContent(ResultFile, timeHead(rt) + resStr)
-      log(rt, resStr)
-
-      info(resStr)
-    } catch {
-      case e: Throwable => error(e.getMessage)
-    }
-  }
-
-  // need to avoid string too long
-  private def rddRecords(records: RDD[String], path: String): Unit = {
-    try {
-      val recordCount = records.count
-      val count = if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount)
-      if (count > 0) {
-        val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt
-        if (groupCount <= 1) {
-          val recs = records.take(count.toInt)
-          persistRecords(path, recs)
-        } else {
-          val groupedRecords: RDD[(Long, Iterable[String])] =
-            records.zipWithIndex.flatMap { r =>
-              val gid = r._2 / maxLinesPerFile
-              if (gid < groupCount) Some((gid, r._1)) else None
-            }.groupByKey()
-          groupedRecords.foreach { group =>
-            val (gid, recs) = group
-            val hdfsPath = if (gid == 0) path else withSuffix(path, gid.toString)
-            persistRecords(hdfsPath, recs)
-          }
-        }
-      }
-    } catch {
-      case e: Throwable => error(e.getMessage)
-    }
-  }
-
-  def missRecords(records: RDD[String]): Unit = {
-    rddRecords(records, MissRecFile)
-  }
-
-  def matchRecords(records: RDD[String]): Unit = {
-    rddRecords(records, MatchRecFile)
-  }
-
-  private def persistRecords(hdfsPath: String, records: Iterable[String]): Unit = {
-    val recStr = records.mkString("\n")
-    HdfsUtil.appendContent(hdfsPath, recStr)
-  }
-
-  def log(rt: Long, msg: String): Unit = {
-    try {
-      val logStr = (if (isInit) persistHead else "") + timeHead(rt) + s"${msg}\n\n"
-      HdfsUtil.appendContent(LogFile, logStr)
-    } catch {
-      case e: Throwable => error(e.getMessage)
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala
deleted file mode 100644
index fed4878..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-package org.apache.griffin.measure.batch.persist
-
-import org.apache.griffin.measure.batch.result._
-import org.apache.griffin.measure.batch.utils.{HttpUtil, JsonUtil}
-import org.apache.spark.rdd.RDD
-
-import scala.util.Try
-
-// persist result by http way
-case class HttpPersist(config: Map[String, Any], metricName: String, timeStamp: Long) extends Persist {
-
-  val Api = "api"
-  val Method = "method"
-
-  val api = config.getOrElse(Api, "").toString
-  val method = config.getOrElse(Method, "post").toString
-
-  def available(): Boolean = {
-    api.nonEmpty
-  }
-
-  def start(msg: String): Unit = {}
-  def finish(): Unit = {}
-
-  def result(rt: Long, result: Result): Unit = {
-    result match {
-      case ar: AccuracyResult => {
-        val dataMap = Map[String, Any](("name" -> metricName), ("tmst" -> timeStamp), ("total" -> ar.getTotal), ("matched" -> ar.getMatch))
-        httpResult(dataMap)
-      }
-      case pr: ProfileResult => {
-        val dataMap = Map[String, Any](("name" -> metricName), ("tmst" -> timeStamp), ("total" -> pr.getTotal), ("matched" -> pr.getMatch))
-        httpResult(dataMap)
-      }
-      case _ => {
-        info(s"result: ${result}")
-      }
-    }
-  }
-
-  private def httpResult(dataMap: Map[String, Any]) = {
-    try {
-      val data = JsonUtil.toJson(dataMap)
-      // post
-      val params = Map[String, Object]()
-      val header = Map[String, Object]()
-      val status = HttpUtil.httpRequest(api, method, params, header, data)
-      info(s"${method} to ${api} response status: ${status}")
-    } catch {
-      case e: Throwable => error(e.getMessage)
-    }
-
-  }
-
-  def missRecords(records: RDD[String]): Unit = {}
-  def matchRecords(records: RDD[String]): Unit = {}
-
-  def log(rt: Long, msg: String): Unit = {}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8d43a4c0/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/MultiPersists.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/MultiPersists.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/MultiPersists.scala
deleted file mode 100644
index 1e5fbed..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/MultiPersists.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-package org.apache.griffin.measure.batch.persist
-
-import org.apache.griffin.measure.batch.result._
-import org.apache.griffin.measure.batch.utils.{HttpUtil, JsonUtil}
-import org.apache.spark.rdd.RDD
-
-import scala.util.Try
-
-// persist result and data by multiple persists
-case class MultiPersists(persists: Iterable[Persist]) extends Persist {
-
-  val timeStamp: Long = persists match {
-    case Nil => 0
-    case _ => persists.head.timeStamp
-  }
-
-  val config: Map[String, Any] = Map[String, Any]()
-
-  def available(): Boolean = { persists.exists(_.available()) }
-
-  def start(msg: String): Unit = { persists.foreach(_.start(msg)) }
-  def finish(): Unit = { persists.foreach(_.finish()) }
-
-  def result(rt: Long, result: Result): Unit = { persists.foreach(_.result(rt, result)) }
-
-  def missRecords(records: RDD[String]): Unit = { persists.foreach(_.missRecords(records)) }
-  def matchRecords(records: RDD[String]): Unit = { persists.foreach(_.matchRecords(records)) }
-
-  def log(rt: Long, msg: String): Unit = { persists.foreach(_.log(rt, msg)) }
-
-}