You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2017/05/26 09:17:32 UTC

[2/9] incubator-griffin git commit: [GRIFFIN-19] update document of docker

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2de6a549/docker/griffin_env/conf/mysql/bind_0.cnf
----------------------------------------------------------------------
diff --git a/docker/griffin_env/conf/mysql/bind_0.cnf b/docker/griffin_env/conf/mysql/bind_0.cnf
new file mode 100644
index 0000000..84e7bbf
--- /dev/null
+++ b/docker/griffin_env/conf/mysql/bind_0.cnf
@@ -0,0 +1,2 @@
+[mysqld]
+bind-address=0.0.0.0

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2de6a549/docker/griffin_env/conf/mysql/hive-metastore-init.sql
----------------------------------------------------------------------
diff --git a/docker/griffin_env/conf/mysql/hive-metastore-init.sql b/docker/griffin_env/conf/mysql/hive-metastore-init.sql
new file mode 100644
index 0000000..94c6c16
--- /dev/null
+++ b/docker/griffin_env/conf/mysql/hive-metastore-init.sql
@@ -0,0 +1,7 @@
+CREATE DATABASE metastore;
+USE metastore;
+SOURCE /apache/hive/scripts/metastore/upgrade/mysql/hive-schema-1.2.0.mysql.sql;
+
+CREATE USER 'hive'@'%' IDENTIFIED BY '123456';
+GRANT ALL PRIVILEGES ON metastore.* TO 'hive'@'%';
+FLUSH PRIVILEGES;

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2de6a549/docker/griffin_env/conf/mysql/mysql-init.sh
----------------------------------------------------------------------
diff --git a/docker/griffin_env/conf/mysql/mysql-init.sh b/docker/griffin_env/conf/mysql/mysql-init.sh
new file mode 100755
index 0000000..785ac49
--- /dev/null
+++ b/docker/griffin_env/conf/mysql/mysql-init.sh
@@ -0,0 +1,10 @@
+#!/bin/bash
+
+service mysql start
+
+sed -i.bak s/^.*"hive-txn-schema-0.13.0.mysql.sql".*/"SOURCE \/apache\/hive\/scripts\/metastore\/upgrade\/mysql\/hive-txn-schema-0.13.0.mysql.sql;"/ /apache/hive/scripts/metastore/upgrade/mysql/hive-schema-1.2.0.mysql.sql
+
+mysql -u root < hive-metastore-init.sql
+
+mysql -u root < quartz-metastore-init.sql
+mysql -u griffin -p123456 -D quartz < quartz-table-init.sql

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2de6a549/docker/griffin_env/conf/mysql/quartz-metastore-init.sql
----------------------------------------------------------------------
diff --git a/docker/griffin_env/conf/mysql/quartz-metastore-init.sql b/docker/griffin_env/conf/mysql/quartz-metastore-init.sql
new file mode 100644
index 0000000..dd75c3b
--- /dev/null
+++ b/docker/griffin_env/conf/mysql/quartz-metastore-init.sql
@@ -0,0 +1,6 @@
+CREATE DATABASE quartz;
+USE quartz;
+
+CREATE USER 'griffin'@'%' IDENTIFIED BY '123456';
+GRANT ALL PRIVILEGES ON quartz.* TO 'griffin'@'%';
+FLUSH PRIVILEGES;

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2de6a549/docker/griffin_env/conf/mysql/quartz-table-init.sql
----------------------------------------------------------------------
diff --git a/docker/griffin_env/conf/mysql/quartz-table-init.sql b/docker/griffin_env/conf/mysql/quartz-table-init.sql
new file mode 100644
index 0000000..3d7be12
--- /dev/null
+++ b/docker/griffin_env/conf/mysql/quartz-table-init.sql
@@ -0,0 +1,180 @@
+#
+# In your Quartz properties file, you'll need to set
+# org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
+#
+#
+# By: Ron Cordell - roncordell
+#  I didn't see this anywhere, so I thought I'd post it here. This is the script from Quartz to create the tables in a MySQL database, modified to use INNODB instead of MYISAM.
+
+DROP TABLE IF EXISTS QRTZ_FIRED_TRIGGERS;
+DROP TABLE IF EXISTS QRTZ_PAUSED_TRIGGER_GRPS;
+DROP TABLE IF EXISTS QRTZ_SCHEDULER_STATE;
+DROP TABLE IF EXISTS QRTZ_LOCKS;
+DROP TABLE IF EXISTS QRTZ_SIMPLE_TRIGGERS;
+DROP TABLE IF EXISTS QRTZ_SIMPROP_TRIGGERS;
+DROP TABLE IF EXISTS QRTZ_CRON_TRIGGERS;
+DROP TABLE IF EXISTS QRTZ_BLOB_TRIGGERS;
+DROP TABLE IF EXISTS QRTZ_TRIGGERS;
+DROP TABLE IF EXISTS QRTZ_JOB_DETAILS;
+DROP TABLE IF EXISTS QRTZ_CALENDARS;
+
+CREATE TABLE QRTZ_JOB_DETAILS(
+  SCHED_NAME VARCHAR(120) NOT NULL,
+  JOB_NAME VARCHAR(200) NOT NULL,
+  JOB_GROUP VARCHAR(200) NOT NULL,
+  DESCRIPTION VARCHAR(250) NULL,
+  JOB_CLASS_NAME VARCHAR(250) NOT NULL,
+  IS_DURABLE VARCHAR(1) NOT NULL,
+  IS_NONCONCURRENT VARCHAR(1) NOT NULL,
+  IS_UPDATE_DATA VARCHAR(1) NOT NULL,
+  REQUESTS_RECOVERY VARCHAR(1) NOT NULL,
+  JOB_DATA BLOB NULL,
+  PRIMARY KEY (SCHED_NAME,JOB_NAME,JOB_GROUP))
+  ENGINE=InnoDB;
+
+CREATE TABLE QRTZ_TRIGGERS (
+  SCHED_NAME VARCHAR(120) NOT NULL,
+  TRIGGER_NAME VARCHAR(200) NOT NULL,
+  TRIGGER_GROUP VARCHAR(200) NOT NULL,
+  JOB_NAME VARCHAR(200) NOT NULL,
+  JOB_GROUP VARCHAR(200) NOT NULL,
+  DESCRIPTION VARCHAR(250) NULL,
+  NEXT_FIRE_TIME BIGINT(13) NULL,
+  PREV_FIRE_TIME BIGINT(13) NULL,
+  PRIORITY INTEGER NULL,
+  TRIGGER_STATE VARCHAR(16) NOT NULL,
+  TRIGGER_TYPE VARCHAR(8) NOT NULL,
+  START_TIME BIGINT(13) NOT NULL,
+  END_TIME BIGINT(13) NULL,
+  CALENDAR_NAME VARCHAR(200) NULL,
+  MISFIRE_INSTR SMALLINT(2) NULL,
+  JOB_DATA BLOB NULL,
+  PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
+  FOREIGN KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
+  REFERENCES QRTZ_JOB_DETAILS(SCHED_NAME,JOB_NAME,JOB_GROUP))
+  ENGINE=InnoDB;
+
+CREATE TABLE QRTZ_SIMPLE_TRIGGERS (
+  SCHED_NAME VARCHAR(120) NOT NULL,
+  TRIGGER_NAME VARCHAR(200) NOT NULL,
+  TRIGGER_GROUP VARCHAR(200) NOT NULL,
+  REPEAT_COUNT BIGINT(7) NOT NULL,
+  REPEAT_INTERVAL BIGINT(12) NOT NULL,
+  TIMES_TRIGGERED BIGINT(10) NOT NULL,
+  PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
+  FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
+  REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
+  ENGINE=InnoDB;
+
+CREATE TABLE QRTZ_CRON_TRIGGERS (
+  SCHED_NAME VARCHAR(120) NOT NULL,
+  TRIGGER_NAME VARCHAR(200) NOT NULL,
+  TRIGGER_GROUP VARCHAR(200) NOT NULL,
+  CRON_EXPRESSION VARCHAR(120) NOT NULL,
+  TIME_ZONE_ID VARCHAR(80),
+  PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
+  FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
+  REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
+  ENGINE=InnoDB;
+
+CREATE TABLE QRTZ_SIMPROP_TRIGGERS
+(
+  SCHED_NAME VARCHAR(120) NOT NULL,
+  TRIGGER_NAME VARCHAR(200) NOT NULL,
+  TRIGGER_GROUP VARCHAR(200) NOT NULL,
+  STR_PROP_1 VARCHAR(512) NULL,
+  STR_PROP_2 VARCHAR(512) NULL,
+  STR_PROP_3 VARCHAR(512) NULL,
+  INT_PROP_1 INT NULL,
+  INT_PROP_2 INT NULL,
+  LONG_PROP_1 BIGINT NULL,
+  LONG_PROP_2 BIGINT NULL,
+  DEC_PROP_1 NUMERIC(13,4) NULL,
+  DEC_PROP_2 NUMERIC(13,4) NULL,
+  BOOL_PROP_1 VARCHAR(1) NULL,
+  BOOL_PROP_2 VARCHAR(1) NULL,
+  PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
+  FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
+  REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
+  ENGINE=InnoDB;
+
+CREATE TABLE QRTZ_BLOB_TRIGGERS (
+  SCHED_NAME VARCHAR(120) NOT NULL,
+  TRIGGER_NAME VARCHAR(200) NOT NULL,
+  TRIGGER_GROUP VARCHAR(200) NOT NULL,
+  BLOB_DATA BLOB NULL,
+  PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
+  INDEX (SCHED_NAME,TRIGGER_NAME, TRIGGER_GROUP),
+  FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
+  REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
+  ENGINE=InnoDB;
+
+CREATE TABLE QRTZ_CALENDARS (
+  SCHED_NAME VARCHAR(120) NOT NULL,
+  CALENDAR_NAME VARCHAR(200) NOT NULL,
+  CALENDAR BLOB NOT NULL,
+  PRIMARY KEY (SCHED_NAME,CALENDAR_NAME))
+  ENGINE=InnoDB;
+
+CREATE TABLE QRTZ_PAUSED_TRIGGER_GRPS (
+  SCHED_NAME VARCHAR(120) NOT NULL,
+  TRIGGER_GROUP VARCHAR(200) NOT NULL,
+  PRIMARY KEY (SCHED_NAME,TRIGGER_GROUP))
+  ENGINE=InnoDB;
+
+CREATE TABLE QRTZ_FIRED_TRIGGERS (
+  SCHED_NAME VARCHAR(120) NOT NULL,
+  ENTRY_ID VARCHAR(95) NOT NULL,
+  TRIGGER_NAME VARCHAR(200) NOT NULL,
+  TRIGGER_GROUP VARCHAR(200) NOT NULL,
+  INSTANCE_NAME VARCHAR(200) NOT NULL,
+  FIRED_TIME BIGINT(13) NOT NULL,
+  SCHED_TIME BIGINT(13) NOT NULL,
+  PRIORITY INTEGER NOT NULL,
+  STATE VARCHAR(16) NOT NULL,
+  JOB_NAME VARCHAR(200) NULL,
+  JOB_GROUP VARCHAR(200) NULL,
+  IS_NONCONCURRENT VARCHAR(1) NULL,
+  REQUESTS_RECOVERY VARCHAR(1) NULL,
+  PRIMARY KEY (SCHED_NAME,ENTRY_ID))
+  ENGINE=InnoDB;
+
+CREATE TABLE QRTZ_SCHEDULER_STATE (
+  SCHED_NAME VARCHAR(120) NOT NULL,
+  INSTANCE_NAME VARCHAR(200) NOT NULL,
+  LAST_CHECKIN_TIME BIGINT(13) NOT NULL,
+  CHECKIN_INTERVAL BIGINT(13) NOT NULL,
+  PRIMARY KEY (SCHED_NAME,INSTANCE_NAME))
+  ENGINE=InnoDB;
+
+CREATE TABLE QRTZ_LOCKS (
+  SCHED_NAME VARCHAR(120) NOT NULL,
+  LOCK_NAME VARCHAR(40) NOT NULL,
+  PRIMARY KEY (SCHED_NAME,LOCK_NAME))
+  ENGINE=InnoDB;
+
+CREATE INDEX IDX_QRTZ_J_REQ_RECOVERY ON QRTZ_JOB_DETAILS(SCHED_NAME,REQUESTS_RECOVERY);
+CREATE INDEX IDX_QRTZ_J_GRP ON QRTZ_JOB_DETAILS(SCHED_NAME,JOB_GROUP);
+
+CREATE INDEX IDX_QRTZ_T_J ON QRTZ_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP);
+CREATE INDEX IDX_QRTZ_T_JG ON QRTZ_TRIGGERS(SCHED_NAME,JOB_GROUP);
+CREATE INDEX IDX_QRTZ_T_C ON QRTZ_TRIGGERS(SCHED_NAME,CALENDAR_NAME);
+CREATE INDEX IDX_QRTZ_T_G ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP);
+CREATE INDEX IDX_QRTZ_T_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE);
+CREATE INDEX IDX_QRTZ_T_N_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP,TRIGGER_STATE);
+CREATE INDEX IDX_QRTZ_T_N_G_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP,TRIGGER_STATE);
+CREATE INDEX IDX_QRTZ_T_NEXT_FIRE_TIME ON QRTZ_TRIGGERS(SCHED_NAME,NEXT_FIRE_TIME);
+CREATE INDEX IDX_QRTZ_T_NFT_ST ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE,NEXT_FIRE_TIME);
+CREATE INDEX IDX_QRTZ_T_NFT_MISFIRE ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME);
+CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_STATE);
+CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE_GRP ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_GROUP,TRIGGER_STATE);
+
+CREATE INDEX IDX_QRTZ_FT_TRIG_INST_NAME ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME);
+CREATE INDEX IDX_QRTZ_FT_INST_JOB_REQ_RCVRY ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME,REQUESTS_RECOVERY);
+CREATE INDEX IDX_QRTZ_FT_J_G ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP);
+CREATE INDEX IDX_QRTZ_FT_JG ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_GROUP);
+CREATE INDEX IDX_QRTZ_FT_T_G ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP);
+CREATE INDEX IDX_QRTZ_FT_TG ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_GROUP);
+
+commit;
+

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2de6a549/docker/griffin_env/conf/spark/slaves
----------------------------------------------------------------------
diff --git a/docker/griffin_env/conf/spark/slaves b/docker/griffin_env/conf/spark/slaves
new file mode 100644
index 0000000..e13d73d
--- /dev/null
+++ b/docker/griffin_env/conf/spark/slaves
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# A Spark Worker will be started on each of the machines listed below.
+localhost

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2de6a549/docker/griffin_env/conf/spark/spark-defaults.conf
----------------------------------------------------------------------
diff --git a/docker/griffin_env/conf/spark/spark-defaults.conf b/docker/griffin_env/conf/spark/spark-defaults.conf
new file mode 100644
index 0000000..d89a2f0
--- /dev/null
+++ b/docker/griffin_env/conf/spark/spark-defaults.conf
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Default system properties included when running spark-submit.
+# This is useful for setting default environmental settings.
+
+# Example:
+# spark.master                     spark://master:7077
+# spark.eventLog.enabled           true
+# spark.eventLog.dir               hdfs://namenode:8021/directory
+# spark.serializer                 org.apache.spark.serializer.KryoSerializer
+# spark.driver.memory              5g
+# spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"
+
+#spark.master                    spark://10.9.246.187:7077
+spark.master                    yarn-cluster
+spark.serializer                org.apache.spark.serializer.KryoSerializer
+spark.yarn.jar			hdfs:///home/spark_lib/spark-assembly-1.6.0-hadoop2.6.0.jar

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2de6a549/docker/griffin_env/conf/spark/spark-env.sh
----------------------------------------------------------------------
diff --git a/docker/griffin_env/conf/spark/spark-env.sh b/docker/griffin_env/conf/spark/spark-env.sh
new file mode 100755
index 0000000..46de554
--- /dev/null
+++ b/docker/griffin_env/conf/spark/spark-env.sh
@@ -0,0 +1,75 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This file is sourced when running various Spark programs.
+# Copy it as spark-env.sh and edit that to configure Spark for your site.
+
+# Options read when launching programs locally with
+# ./bin/run-example or ./bin/spark-submit
+# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
+# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
+# - SPARK_PUBLIC_DNS, to set the public dns name of the driver program
+# - SPARK_CLASSPATH, default classpath entries to append
+
+# Options read by executors and drivers running inside the cluster
+# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
+# - SPARK_PUBLIC_DNS, to set the public DNS name of the driver program
+# - SPARK_CLASSPATH, default classpath entries to append
+# - SPARK_LOCAL_DIRS, storage directories to use on this node for shuffle and RDD data
+# - MESOS_NATIVE_JAVA_LIBRARY, to point to your libmesos.so if you use Mesos
+
+# Options read in YARN client mode
+# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
+# - SPARK_EXECUTOR_INSTANCES, Number of executors to start (Default: 2)
+# - SPARK_EXECUTOR_CORES, Number of cores for the executors (Default: 1).
+# - SPARK_EXECUTOR_MEMORY, Memory per Executor (e.g. 1000M, 2G) (Default: 1G)
+# - SPARK_DRIVER_MEMORY, Memory for Driver (e.g. 1000M, 2G) (Default: 1G)
+# - SPARK_YARN_APP_NAME, The name of your application (Default: Spark)
+# - SPARK_YARN_QUEUE, The hadoop queue to use for allocation requests (Default: ‘default’)
+# - SPARK_YARN_DIST_FILES, Comma separated list of files to be distributed with the job.
+# - SPARK_YARN_DIST_ARCHIVES, Comma separated list of archives to be distributed with the job.
+
+# Options for the daemons used in the standalone deploy mode
+# - SPARK_MASTER_IP, to bind the master to a different IP address or hostname
+# - SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports for the master
+# - SPARK_MASTER_OPTS, to set config properties only for the master (e.g. "-Dx=y")
+# - SPARK_WORKER_CORES, to set the number of cores to use on this machine
+# - SPARK_WORKER_MEMORY, to set how much total memory workers have to give executors (e.g. 1000m, 2g)
+# - SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT, to use non-default ports for the worker
+# - SPARK_WORKER_INSTANCES, to set the number of worker processes per node
+# - SPARK_WORKER_DIR, to set the working directory of worker processes
+# - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g. "-Dx=y")
+# - SPARK_DAEMON_MEMORY, to allocate to the master, worker and history server themselves (default: 1g).
+# - SPARK_HISTORY_OPTS, to set config properties only for the history server (e.g. "-Dx=y")
+# - SPARK_SHUFFLE_OPTS, to set config properties only for the external shuffle service (e.g. "-Dx=y")
+# - SPARK_DAEMON_JAVA_OPTS, to set config properties for all daemons (e.g. "-Dx=y")
+# - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers
+
+# Generic options for the daemons used in the standalone deploy mode
+# - SPARK_CONF_DIR      Alternate conf dir. (Default: ${SPARK_HOME}/conf)
+# - SPARK_LOG_DIR       Where log files are stored.  (Default: ${SPARK_HOME}/logs)
+# - SPARK_PID_DIR       Where the pid file is stored. (Default: /tmp)
+# - SPARK_IDENT_STRING  A string representing this instance of spark. (Default: $USER)
+# - SPARK_NICENESS      The scheduling priority for daemons. (Default: 0)
+
+JAVA_HOME=/apache/jdk
+SPARK_JAVA_OPTS=-Dspark.driver.port=53411
+HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
+SPARK_MASTER_IP=localhost
+SPARK_MASTER_WEBUI_PORT=8082

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2de6a549/docker/griffin_env/prep/dir.sh
----------------------------------------------------------------------
diff --git a/docker/griffin_env/prep/dir.sh b/docker/griffin_env/prep/dir.sh
new file mode 100755
index 0000000..3699e66
--- /dev/null
+++ b/docker/griffin_env/prep/dir.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+
+mkdir /data
+chmod 777 /data
+mkdir -p /data/hadoop-data/nn
+mkdir -p /data/hadoop-data/snn
+mkdir -p /data/hadoop-data/dn
+mkdir -p /data/hadoop-data/tmp
+mkdir -p /data/hadoop-data/mapred/system
+mkdir -p /data/hadoop-data/mapred/local
+
+mkdir -p /tmp/logs
+chmod 777 /tmp/logs
+
+#elasticsearch
+mkdir /data/elasticsearch
+chmod 777 /data/elasticsearch

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2de6a549/docker/griffin_env/prep/hdfs_file.sh
----------------------------------------------------------------------
diff --git a/docker/griffin_env/prep/hdfs_file.sh b/docker/griffin_env/prep/hdfs_file.sh
new file mode 100755
index 0000000..dc8a06d
--- /dev/null
+++ b/docker/griffin_env/prep/hdfs_file.sh
@@ -0,0 +1,29 @@
+#!/bin/bash
+
+# spark dir
+hadoop fs -mkdir -p /home/spark_lib
+hadoop fs -put $SPARK_HOME/lib/spark-assembly-1.6.0-hadoop2.6.0.jar /home/spark_lib/
+hadoop fs -chmod 755 /home/spark_lib/spark-assembly-1.6.0-hadoop2.6.0.jar
+
+# yarn dir
+hadoop fs -mkdir -p /yarn-logs/logs
+hadoop fs -chmod g+w /yarn-logs/logs
+
+# hive dir
+hadoop fs -mkdir /tmp
+hadoop fs -mkdir /user
+hadoop fs -mkdir /user/hive
+hadoop fs -mkdir /user/hive/warehouse
+hadoop fs -chmod g+w /tmp
+hadoop fs -chmod g+w /user/hive/warehouse
+
+# livy dir
+hadoop fs -mkdir /livy
+
+wget http://central.maven.org/maven2/org/datanucleus/datanucleus-rdbms/3.2.9/datanucleus-rdbms-3.2.9.jar -O /apache/datanucleus-rdbms-3.2.9.jar
+wget http://central.maven.org/maven2/org/datanucleus/datanucleus-core/3.2.10/datanucleus-core-3.2.10.jar -O /apache/datanucleus-core-3.2.10.jar
+wget http://central.maven.org/maven2/org/datanucleus/datanucleus-api-jdo/3.2.6/datanucleus-api-jdo-3.2.6.jar -O /apache/datanucleus-api-jdo-3.2.6.jar
+
+hadoop fs -put /apache/datanucleus-rdbms-3.2.9.jar /livy/
+hadoop fs -put /apache/datanucleus-core-3.2.10.jar /livy/
+hadoop fs -put /apache/datanucleus-api-jdo-3.2.6.jar /livy/

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2de6a549/docker/griffin_env/prep/software-config.sh
----------------------------------------------------------------------
diff --git a/docker/griffin_env/prep/software-config.sh b/docker/griffin_env/prep/software-config.sh
new file mode 100755
index 0000000..fc563e6
--- /dev/null
+++ b/docker/griffin_env/prep/software-config.sh
@@ -0,0 +1,35 @@
+#!/bin/bash
+
+# java 8
+
+# hadoop
+
+cp conf/hadoop/* hadoop/etc/hadoop/
+sed s/HOSTNAME/localhost/ hadoop/etc/hadoop/core-site.xml.template > hadoop/etc/hadoop/core-site.xml
+sed s/HOSTNAME/localhost/ hadoop/etc/hadoop/yarn-site.xml.template > hadoop/etc/hadoop/yarn-site.xml
+sed s/HOSTNAME/localhost/ hadoop/etc/hadoop/mapred-site.xml.template > hadoop/etc/hadoop/mapred-site.xml
+
+chmod 755 hadoop/etc/hadoop/hadoop-env.sh
+
+# scala
+
+# spark
+
+cp conf/spark/* spark/conf/
+
+# hive
+
+cp conf/hive/* hive/conf/
+sed s/HOSTNAME/localhost/ hive/conf/hive-site.xml.template > hive/conf/hive-site.xml
+echo "export HADOOP_HOME=/apache/hadoop" >> hive/bin/hive-config.sh
+
+# livy
+
+cp conf/livy/* livy/conf/
+mkdir livy/logs
+
+# elasticsearch
+
+cp conf/elasticsearch/elasticsearch.yml /etc/elasticsearch/
+cp conf/elasticsearch/elasticsearch /etc/init.d/
+chmod 755 /etc/init.d/elasticsearch

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2de6a549/docker/griffin_env/prep/software-install.sh
----------------------------------------------------------------------
diff --git a/docker/griffin_env/prep/software-install.sh b/docker/griffin_env/prep/software-install.sh
new file mode 100755
index 0000000..601347e
--- /dev/null
+++ b/docker/griffin_env/prep/software-install.sh
@@ -0,0 +1,38 @@
+#!/bin/bash
+
+# java 8
+wget --no-cookies --no-check-certificate --header "Cookie: oraclelicense=accept-securebackup-cookie" \
+http://download.oracle.com/otn-pub/java/jdk/8-b132/jdk-8-linux-x64.tar.gz \
+-O jdk8-linux-x64.tar.gz
+tar -xvzf jdk8-linux-x64.tar.gz
+ln -s jdk1.8.0 jdk
+
+# hadoop
+wget http://mirror.cogentco.com/pub/apache/hadoop/common/hadoop-2.6.5/hadoop-2.6.5.tar.gz
+tar -xvf hadoop-2.6.5.tar.gz
+ln -s hadoop-2.6.5 hadoop
+
+# scala
+wget http://downloads.lightbend.com/scala/2.10.6/scala-2.10.6.tgz
+tar -xvf scala-2.10.6.tgz
+ln -s scala-2.10.6 scala
+
+# spark
+wget http://archive.apache.org/dist/spark/spark-1.6.0/spark-1.6.0-bin-hadoop2.6.tgz
+tar -xvf spark-1.6.0-bin-hadoop2.6.tgz
+ln -s spark-1.6.0-bin-hadoop2.6 spark
+
+# hive
+wget https://www.apache.org/dist/hive/hive-1.2.2/apache-hive-1.2.2-bin.tar.gz
+tar -xvf apache-hive-1.2.2-bin.tar.gz
+ln -s apache-hive-1.2.2-bin hive
+
+# livy
+wget http://archive.cloudera.com/beta/livy/livy-server-0.3.0.zip
+unzip livy-server-0.3.0.zip
+ln -s livy-server-0.3.0 livy
+
+#elasticsearch
+wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.4.0.deb
+dpkg -i elasticsearch-5.4.0.deb
+update-rc.d elasticsearch defaults

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2de6a549/docker/griffin_env/ssh_config
----------------------------------------------------------------------
diff --git a/docker/griffin_env/ssh_config b/docker/griffin_env/ssh_config
new file mode 100644
index 0000000..535f9d3
--- /dev/null
+++ b/docker/griffin_env/ssh_config
@@ -0,0 +1,5 @@
+Host *
+  UserKnownHostsFile /dev/null
+  StrictHostKeyChecking no
+  LogLevel quiet
+  Port 2122

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2de6a549/griffin-doc/dockerUIguide.md
----------------------------------------------------------------------
diff --git a/griffin-doc/dockerUIguide.md b/griffin-doc/dockerUIguide.md
index 24d76e5..467681e 100644
--- a/griffin-doc/dockerUIguide.md
+++ b/griffin-doc/dockerUIguide.md
@@ -2,62 +2,32 @@
 
 ### Preparatory work
 
-Follow the steps [here](https://github.com/eBay/DQSolution/blob/master/README.md#how-to-run-in-docker), prepare your docker container of griffin, and get your webUI ready.
+Follow the steps [here](https://github.com/apache/incubator-griffin#how-to-run-in-docker), prepare your docker container of griffin, and get your webUI ready.
 
 ### webUI test case guide
 
-1.  Click "Data Assets" at the top right corner, to watch all the exist data assets.
-
-2.  Click "Register Data Asset" button at the top left corner, fill out the "Required Information" table as the following data, then submit and save to finish the creation of a new data asset.
-    ```
-    Asset Name:     users_info_src
-    Asset Type:     hivetable
-    HDFS Path:      /user/hive/warehouse/users_info_src
-    Organization:   <any>
-    Schema:         user_id     bigint
-                    first_name  string
-                    last_name   string
-                    address     string
-                    email       string
-                    phone       string
-                    post_code   string
-    ```
-    The data asset "users_info_src" has been prepared in our docker image already, and the information is shown above.  
-    "Asset Name" item needs to be the same with the Hive table name, "HDFS Path" is exactly the path in HDFS, they should be filled as real.  
-    "Asset Type" item has only one selection "hivetable" at current, and you need to choose it, while the "Organization" item could be set as any one you like.  
-    "Schema" item lists the schema of the data asset, the names and types are better to be exactly the same with the hive table while it's not required now, but the number and order of schema items need to be.  
-
-    Repeat the above step, create another new data asset by filling out as following.
+1.  Click "Data Assets" at the top right corner, to watch all the exist data assets.  
+    In docker, we've prepared two data asset in Hive, through this page, you can see all the table metadata in Hive.
+
+2.  Click "Models" button at the top left corner to watch all the measures here, and you can also create a new DQ measurement by following steps.  
+    1) Click "Create DQ Model" button at the top left corner, choose the top left block "Accuracy", at current we only support accuracy type.  
+    2)  Choose Source: find "demo_src" in the left tree, select some or all attributes in the right block, click "Next".  
+    3)  Choose Target: find "demo_tgt" in the left tree, select the matching attributes with source data asset in the right block, click "Next".  
+    4)  Mapping Source and Target: select "Source Fields" of each row, to match the corresponding field in target table, e.g. id maps to id, age maps to age, desc maps to desc.   
+    Finish all the mapping, click "Next".  
+    5)  Fill out the required table as required, "Organization" is the group of this measurement.  
+    Submit and save, you can see your new DQ measurement created in the measures list.  
+
+3.  Now you've created a new DQ measurement, the measurement needs to be scheduled to run in the docker container.  
+    Our default schedule UI is not implemented at current, so you can call the rest api we provide, to schedule this measurement.
+    Assume that your measure name is "demo_accu", and you want to start the job immediately, run the job every 5 minutes.  
     ```
-    Asset Name:     users_info_target
-    Asset Type:     hivetable
-    HDFS Path:      /user/hive/warehouse/users_info_target
-    Organization:   <any>
-    Schema:         user_id     bigint
-                    first_name  string
-                    last_name   string
-                    address     string
-                    email       string
-                    phone       string
-                    post_code   string
+    curl -X POST -d '{
+      "sourcePat":"YYYYMMdd-HH",
+      "targetPat":"YYYYMMdd-HH",
+      "jobStartTime":"0",
+      "periodTime":"300"
+    }' "http://<you local ip>:38080/jobs/add/BA/<job id>/demo_accu"
     ```  
-    "users_info_target" is also prepared in our docker image with the information above.
-    If you want to test your own data assets, it's necessary to put them into Hive in the docker container first.  
-
-3.  Click "Models" at the top left corner to watch all the models here, now there has been two new models named "TotalCount_users_info_src" and "TotalCount_users_info_target" created automatically by the new data asset creation.  
-    You can create a new accuracy model for the two new data assets registered just now.  
-    Click "Create DQ Model" button at the top left corner, choose the top left block "Accuracy", follow the steps below.  
-    1)  Choose Source: find "users_info_src" in the left tree, select some or all attributes in the right block, click "Next".  
-    2)  Choose Target: find "users_info_target" in the left tree, select the matching attributes with previous ones in the right block, click "Next".  
-    3)  Mapping Source and Target: choose the first row "user_id" as "PK" which means "Primary Key", and select "Source Fields" of each row, to match the same item in source table, e.g. user_id maps to user_id, first_name maps to first_name.   
-        Finish all the mapping, click "Next".  
-    4)  Fill out the required table freely, "Schedule Type" is the calculation period.  
-        Submit and save, you can see your new DQ model created in the models list.  
-
-4.  Now you've created two data assets and three DQ models, the models are calculated automatically at background in the docker container.  
-    Wait for about 20 minutes, results would be published to web UI. Then you can see the dashboards of your new models in "My Dashboard" page.  
-    View the accuracy model, there will be a "Deploy" button when the result comes out, click "Deploy" button to enable the periodically calculation of it, then you can get your dashboard growing by the period as you set.
-
-### User data case guide
-
-You can follow the steps [here](https://github.com/eBay/griffin/blob/master/griffin-doc/userDataCaseGuide.md) to use your own data for test.
\ No newline at end of file
+    The port 38080 is mapping to docker container 8080, the port of griffin service, and each time you submit a new schedule, you need to submit a new <job id>.  
+    Wait for about 1 minute, after the calculation, results would be published to web UI, then you can watch the dashboard by clicking "DQ Metrics" at the top right corner.

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2de6a549/griffin-doc/userDataCaseGuide.md
----------------------------------------------------------------------
diff --git a/griffin-doc/userDataCaseGuide.md b/griffin-doc/userDataCaseGuide.md
deleted file mode 100644
index d8824c4..0000000
--- a/griffin-doc/userDataCaseGuide.md
+++ /dev/null
@@ -1,58 +0,0 @@
-## User Data Case Guide
-
-Assuming that you've ran our docker example, here we will show you how to insert your own data and use griffin to calculate its data quality.  
-As an example, our environment here is the docker container, which is built following [the docker guide](https://github.com/eBay/griffin/blob/master/README.md#how-to-run-in-docker), you can ssh into it and follow our steps. 
-
-### Data Prepare
-
-First of all, you might have got your initial data and schema. As we only support hive table now, you need to create a hive table.  
-
-Here lists our [sample data](https://github.com/eBay/griffin/tree/master/docker/griffin-base/griffin/dataFile), we've put them into our docker image, 
-and also created hive tables for them using [hql file](https://github.com/eBay/griffin/blob/master/docker/griffin-base/griffin/hive-input.hql).  
-
-### Data Insert
-
-Here we also use these two data files and their schemas but for new hive tables.
-```
-CREATE TABLE demo_source (
-  user_id bigint,
-  first_name string,
-  last_name string,
-  address string,
-  email string,
-  phone string,
-  post_code string)
-ROW FORMAT DELIMITED
-FIELDS TERMINATED BY '|'
-STORED AS TEXTFILE;
-
-CREATE TABLE demo_target (
-  user_id bigint,
-  first_name string,
-  last_name string,
-  address string,
-  email string,
-  phone string,
-  post_code string)
-ROW FORMAT DELIMITED
-FIELDS TERMINATED BY '|'
-STORED AS TEXTFILE;
-```
-There will be new path in HDFS for data, you can put the data files directly using hdfs command or load them through hive.
-```
-LOAD DATA LOCAL INPATH 'dataFile/users_info_src.dat' OVERWRITE INTO TABLE demo_source;
-LOAD DATA LOCAL INPATH 'dataFile/users_info_target.dat' OVERWRITE INTO TABLE demo_target;
-```
-Do not forget to touch a success file for your hdfs path.
-```
-hadoop fs -touchz /user/hive/warehouse/demo_source/_SUCCESS
-hadoop fs -touchz /user/hive/warehouse/demo_target/_SUCCESS
-```
-Now we have inserted initial data, you can also modify or append data items later. Till now the data preparation is done.
-
-### UI Operation
-
-Switch to our UI part, we can follow [UI guide](https://github.com/eBay/griffin/blob/master/griffin-doc/dockerUIguide.md#webui-test-case-guide) to create our data assets inserted just now.  
-You need to modify some items such as "Asset Name" and "HDFS Path" to your own ones set as above.  
-
-After your model created, you just need to wait for the results, when your data is modified, the result would change at the next calculation.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2de6a549/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/Application.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/Application.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/Application.scala
index 6fe6bcb..fcfb34a 100644
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/Application.scala
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/Application.scala
@@ -59,14 +59,21 @@ object Application extends Loggable {
       }
     }
 
-    // choose algorithm   // fixme: not done, need to choose algorithm by param
+    // choose algorithm
     val dqType = allParam.userParam.dqType
-    val algo: Algo = BatchAccuracyAlgo(allParam)
+    val algo: Algo = dqType match {
+      case "accuracy" => BatchAccuracyAlgo(allParam)
+      case _ => {
+        error(s"${dqType} is unsupported dq type!")
+        sys.exit(-4)
+      }
+    }
 
+    // algorithm run
     algo.run match {
       case Failure(ex) => {
         error(ex.getMessage)
-        sys.exit(-4)
+        sys.exit(-5)
       }
       case _ => {
         info("calculation finished")

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2de6a549/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgo.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgo.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgo.scala
index e4ed616..0fbb0d0 100644
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgo.scala
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgo.scala
@@ -15,7 +15,7 @@ import org.apache.spark.{SparkConf, SparkContext}
 
 import scala.util.{Failure, Success, Try}
 
-
+// accuracy algorithm for batch mode
 case class BatchAccuracyAlgo(allParam: AllParam) extends AccuracyAlgo {
   val envParam = allParam.envParam
   val userParam = allParam.userParam
@@ -31,30 +31,28 @@ case class BatchAccuracyAlgo(allParam: AllParam) extends AccuracyAlgo {
       // start time
       val startTime = new Date().getTime()
 
-      // get persists
+      // get persists to persist measure result
       val persist: Persist = PersistFactory(envParam.persistParams, metricName).getPersists(startTime)
 
       // get spark application id
       val applicationId = sc.applicationId
 
-      // start
+      // persist start id
       persist.start(applicationId)
 
-      // rules
+      // generate rule from rule param, generate rule analyzer
       val ruleFactory = RuleFactory(userParam.evaluateRuleParam)
       val rule: StatementExpr = ruleFactory.generateRule()
       val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
 
-      // global cache data
-      val globalCachedData = CacheDataUtil.genCachedMap(None, ruleAnalyzer.globalCacheExprs, Map[String, Any]())
-      val globalFinalCachedData = CacheDataUtil.filterCachedMap(ruleAnalyzer.globalFinalCacheExprs, globalCachedData)
+      // const expr value map
+      val constExprValueMap = ExprValueUtil.genExprValueMap(None, ruleAnalyzer.constCacheExprs, Map[String, Any]())
+      val finalConstExprValueMap = ExprValueUtil.updateExprValueMap(ruleAnalyzer.constFinalCacheExprs, constExprValueMap)
 
       // data connector
       val sourceDataConnector: DataConnector =
         DataConnectorFactory.getDataConnector(sqlContext, userParam.sourceParam,
-          ruleAnalyzer.sourceGroupbyExprs, ruleAnalyzer.sourceCacheExprs,
-          ruleAnalyzer.sourceFinalCacheExprs, globalFinalCachedData,
-          ruleAnalyzer.whenClauseExpr
+          ruleAnalyzer.sourceRuleExprs, finalConstExprValueMap
         ) match {
           case Success(cntr) => {
             if (cntr.available) cntr
@@ -64,9 +62,7 @@ case class BatchAccuracyAlgo(allParam: AllParam) extends AccuracyAlgo {
         }
       val targetDataConnector: DataConnector =
         DataConnectorFactory.getDataConnector(sqlContext, userParam.targetParam,
-          ruleAnalyzer.targetGroupbyExprs, ruleAnalyzer.targetCacheExprs,
-          ruleAnalyzer.targetFinalCacheExprs, globalFinalCachedData,
-          ruleAnalyzer.whenClauseExpr
+          ruleAnalyzer.targetRuleExprs, finalConstExprValueMap
         ) match {
           case Success(cntr) => {
             if (cntr.available) cntr
@@ -96,7 +92,7 @@ case class BatchAccuracyAlgo(allParam: AllParam) extends AccuracyAlgo {
       }
 
       // accuracy algorithm
-      val (accuResult, missingRdd, matchingRdd) = accuracy(sourceData, targetData, ruleAnalyzer)
+      val (accuResult, missingRdd, matchedRdd) = accuracy(sourceData, targetData, ruleAnalyzer)
 
       // end time
       val endTime = new Date().getTime
@@ -104,7 +100,7 @@ case class BatchAccuracyAlgo(allParam: AllParam) extends AccuracyAlgo {
 
       // persist result
       persist.result(endTime, accuResult)
-      val missingRecords = missingRdd.map(record2String(_, ruleAnalyzer.sourcePersistExprs, ruleAnalyzer.targetPersistExprs))
+      val missingRecords = missingRdd.map(record2String(_, ruleAnalyzer.sourceRuleExprs.persistExprs, ruleAnalyzer.targetRuleExprs.persistExprs))
       persist.missRecords(missingRecords)
 
       // persist end time
@@ -124,6 +120,7 @@ case class BatchAccuracyAlgo(allParam: AllParam) extends AccuracyAlgo {
     (data, Map[String, Any]())
   }
 
+  // calculate accuracy between source data and target data
   def accuracy(sourceData: RDD[(Product, Map[String, Any])], targetData: RDD[(Product, Map[String, Any])], ruleAnalyzer: RuleAnalyzer
               ): (AccuracyResult, RDD[(Product, (Map[String, Any], Map[String, Any]))], RDD[(Product, (Map[String, Any], Map[String, Any]))]) = {
 
@@ -135,11 +132,12 @@ case class BatchAccuracyAlgo(allParam: AllParam) extends AccuracyAlgo {
     val allKvs = sourceWrappedData.cogroup(targetWrappedData)
 
     // 3. accuracy calculation
-    val (accuResult, missingRdd, matchingRdd) = AccuracyCore.accuracy(allKvs, ruleAnalyzer)
+    val (accuResult, missingRdd, matchedRdd) = AccuracyCore.accuracy(allKvs, ruleAnalyzer)
 
-    (accuResult, missingRdd, matchingRdd)
+    (accuResult, missingRdd, matchedRdd)
   }
 
+  // convert data into a string
   def record2String(rec: (Product, (Map[String, Any], Map[String, Any])), sourcePersist: Iterable[Expr], targetPersist: Iterable[Expr]): String = {
     val (key, (data, info)) = rec
     val persistData = getPersistMap(data, sourcePersist)
@@ -152,6 +150,7 @@ case class BatchAccuracyAlgo(allParam: AllParam) extends AccuracyAlgo {
     s"${persistData} [${persistInfo}]"
   }
 
+  // get the expr value map of the persist expressions
   private def getPersistMap(data: Map[String, Any], persist: Iterable[Expr]): Map[String, Any] = {
     val persistMap = persist.map(e => (e._id, e.desc)).toMap
     data.flatMap { pair =>

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2de6a549/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/core/AccuracyCore.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/core/AccuracyCore.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/core/AccuracyCore.scala
index 639dd8a..9e544e4 100644
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/core/AccuracyCore.scala
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/core/AccuracyCore.scala
@@ -10,6 +10,8 @@ object AccuracyCore {
   type V = Map[String, Any]
   type T = Map[String, Any]
 
+  // allKvs: rdd of (key, (List[(sourceData, sourceInfo)], List[(targetData, targetInfo)]))
+  // output: accuracy result, missing source data rdd, matched source data rdd
   def accuracy(allKvs: RDD[(Product, (Iterable[(V, T)], Iterable[(V, T)]))], ruleAnalyzer: RuleAnalyzer
               ): (AccuracyResult, RDD[(Product, (V, T))], RDD[(Product, (V, T))]) = {
     val result: RDD[(Long, Long, List[(Product, (V, T))], List[(Product, (V, T))])] = allKvs.map { kv =>
@@ -52,26 +54,29 @@ object AccuracyCore {
     (AccuracyResult(countPair._1, (countPair._1 + countPair._2)), missRdd, matchRdd)
   }
 
+  // try to match source and target data, return true if matched, false if unmatched, also with some matching info
   private def matchData(source: (V, T), target: (V, T), ruleAnalyzer: RuleAnalyzer): (Boolean, T) = {
 
     // 1. merge source and target cached data
-    val mergedData: Map[String, Any] = mergeData(source, target)
+    val mergedExprValueMap: Map[String, Any] = mergeExprValueMap(source, target)
 
     // 2. check valid
-    if (ruleAnalyzer.rule.valid(mergedData)) {
+    if (ruleAnalyzer.rule.valid(mergedExprValueMap)) {
       // 3. substitute the cached data into statement, get the statement value
-      // currently we can not get the mismatch reason, we need to add such information to figure out how it mismatches
-      ((ruleAnalyzer.rule.calculate(mergedData) match {
+      val matched = ruleAnalyzer.rule.calculate(mergedExprValueMap) match {
         case Some(b: Boolean) => b
         case _ => false
-      }), Map[String, Any](MismatchInfo.wrap("not matched"), TargetInfo.wrap(target._1)))
+      }
+      // currently we can not get the mismatch reason, we need to add such information to figure out how it mismatches
+      if (matched) (matched, Map[String, Any]())
+      else (matched, Map[String, Any](MismatchInfo.wrap("not matched"), TargetInfo.wrap(target._1)))
     } else {
       (false, Map[String, Any](MismatchInfo.wrap("invalid to compare"), TargetInfo.wrap(target._1)))
     }
 
   }
 
-  private def mergeData(source: (V, T), target: (V, T)): Map[String, Any] = {
+  private def mergeExprValueMap(source: (V, T), target: (V, T)): Map[String, Any] = {
     source._1 ++ target._1
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2de6a549/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidator.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidator.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidator.scala
index b0ab8b5..ebcc74f 100644
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidator.scala
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidator.scala
@@ -4,12 +4,12 @@ import org.apache.griffin.measure.batch.config.params.Param
 
 import scala.util.Try
 
+// need to validate params
 case class AllParamValidator() extends ParamValidator {
 
   def validate[T <: Param](param: Param): Try[Boolean] = {
     Try {
-      // fixme: not done, need to validate param
-      true
+      param.validate
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2de6a549/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/AvroDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/AvroDataConnector.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/AvroDataConnector.scala
index 4de22fb..ade993c 100644
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/AvroDataConnector.scala
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/AvroDataConnector.scala
@@ -8,12 +8,12 @@ import com.databricks.spark.avro._
 import scala.util.{Success, Try}
 import java.nio.file.{Files, Paths}
 
+import org.apache.griffin.measure.batch.rule.{ExprValueUtil, RuleExprs}
 import org.apache.griffin.measure.batch.utils.HdfsUtil
 
+// data connector for avro file
 case class AvroDataConnector(sqlContext: SQLContext, config: Map[String, Any],
-                             groupbyExprs: Seq[MathExpr], cacheExprs: Iterable[Expr],
-                             finalCacheExprs: Iterable[Expr], globalFinalCacheMap: Map[String, Any],
-                             whenClauseOpt: Option[LogicalExpr]
+                             ruleExprs: RuleExprs, constFinalExprValueMap: Map[String, Any]
                             ) extends DataConnector {
 
   val FilePath = "file.path"
@@ -47,14 +47,14 @@ case class AvroDataConnector(sqlContext: SQLContext, config: Map[String, Any],
     Try {
       loadDataFile.flatMap { row =>
         // generate cache data
-        val cacheData: Map[String, Any] = cacheExprs.foldLeft(globalFinalCacheMap) { (cachedMap, expr) =>
-          CacheDataUtil.genCachedMap(Some(row), expr, cachedMap)
+        val cacheExprValueMap: Map[String, Any] = ruleExprs.cacheExprs.foldLeft(constFinalExprValueMap) { (cachedMap, expr) =>
+          ExprValueUtil.genExprValueMap(Some(row), expr, cachedMap)
         }
-        val finalCacheData = CacheDataUtil.filterCachedMap(finalCacheExprs, cacheData)
+        val finalExprValueMap = ExprValueUtil.updateExprValueMap(ruleExprs.finalCacheExprs, cacheExprValueMap)
 
         // when clause filter data source
-        val whenResult = whenClauseOpt match {
-          case Some(whenClause) => whenClause.calculate(finalCacheData)
+        val whenResult = ruleExprs.whenClauseExprOpt match {
+          case Some(whenClause) => whenClause.calculate(finalExprValueMap)
           case _ => None
         }
 
@@ -62,15 +62,15 @@ case class AvroDataConnector(sqlContext: SQLContext, config: Map[String, Any],
         whenResult match {
           case Some(false) => None
           case _ => {
-            val groupbyData: Seq[AnyRef] = groupbyExprs.flatMap { expr =>
-              expr.calculate(finalCacheData) match {
+            val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { expr =>
+              expr.calculate(finalExprValueMap) match {
                 case Some(v) => Some(v.asInstanceOf[AnyRef])
                 case _ => None
               }
             }
             val key = toTuple(groupbyData)
 
-            Some((key, finalCacheData))
+            Some((key, finalExprValueMap))
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2de6a549/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/CacheDataUtil.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/CacheDataUtil.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/CacheDataUtil.scala
deleted file mode 100644
index 884df5d..0000000
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/CacheDataUtil.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-package org.apache.griffin.measure.batch.connector
-
-import org.apache.griffin.measure.batch.rule.expr._
-import org.apache.spark.sql.Row
-
-import scala.util.{Success, Try}
-
-object CacheDataUtil {
-
-  // for now, one expr only get one value, not supporting one expr get multiple values
-  private def getCacheData(data: Option[Any], expr: Expr, cachedMap: Map[String, Any]): Option[Any] = {
-    Try {
-      expr match {
-        case selection: SelectionExpr => {
-          selection.selectors.foldLeft(data) { (dt, selector) =>
-            getCacheData(dt, selector, cachedMap)
-          }
-        }
-        case selector: IndexFieldRangeSelectExpr => {
-          data match {
-            case Some(row: Row) => {
-              if (selector.fields.size == 1) {
-                selector.fields.head match {
-                  case i: IndexDesc => Some(row.getAs[Any](i.index))
-                  case f: FieldDesc => Some(row.getAs[Any](f.field))
-                  case _ => None
-                }
-              } else None
-            }
-            case _ => None
-          }
-        }
-        case _ => expr.calculate(cachedMap)
-      }
-    } match {
-      case Success(v) => v
-      case _ => None
-    }
-  }
-
-  def genCachedMap(data: Option[Any], expr: Expr, initialCachedMap: Map[String, Any]): Map[String, Any] = {
-    val valueOpt = getCacheData(data, expr, initialCachedMap)
-    if (valueOpt.nonEmpty) {
-      initialCachedMap + (expr._id -> valueOpt.get)
-    } else initialCachedMap
-  }
-
-  def genCachedMap(data: Option[Any], exprs: Iterable[Expr], initialCachedMap: Map[String, Any]): Map[String, Any] = {
-    exprs.foldLeft(initialCachedMap) { (cachedMap, expr) =>
-      CacheDataUtil.genCachedMap(None, expr, cachedMap)
-    }
-  }
-
-  def filterCachedMap(exprs: Iterable[Expr], cachedMap: Map[String, Any]): Map[String, Any] = {
-    exprs.foldLeft(Map[String, Any]()) { (newMap, expr) =>
-      val valueOpt = expr.calculate(cachedMap)
-      if (valueOpt.nonEmpty) {
-        newMap + (expr._id -> valueOpt.get)
-      } else newMap
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2de6a549/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnectorFactory.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnectorFactory.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnectorFactory.scala
index 8f636c1..f637b86 100644
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnectorFactory.scala
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnectorFactory.scala
@@ -1,6 +1,7 @@
 package org.apache.griffin.measure.batch.connector
 
 import org.apache.griffin.measure.batch.config.params.user._
+import org.apache.griffin.measure.batch.rule.RuleExprs
 import org.apache.griffin.measure.batch.rule.expr._
 import org.apache.spark.sql.SQLContext
 
@@ -13,20 +14,15 @@ object DataConnectorFactory {
 
   def getDataConnector(sqlContext: SQLContext,
                        dataConnectorParam: DataConnectorParam,
-                       groupbyExprs: Seq[MathExpr],
-                       cacheExprs: Iterable[Expr],
-                       finalCacheExprs: Iterable[Expr],
-                       globalFinalCacheMap: Map[String, Any],
-                       whenClauseOpt: Option[LogicalExpr]
+                       ruleExprs: RuleExprs,
+                       globalFinalCacheMap: Map[String, Any]
                       ): Try[DataConnector] = {
     val conType = dataConnectorParam.conType
     val version = dataConnectorParam.version
     Try {
       conType match {
-        case HiveRegex() => HiveDataConnector(sqlContext, dataConnectorParam.config,
-          groupbyExprs, cacheExprs, finalCacheExprs, globalFinalCacheMap, whenClauseOpt)
-        case AvroRegex() => AvroDataConnector(sqlContext, dataConnectorParam.config,
-          groupbyExprs, cacheExprs, finalCacheExprs, globalFinalCacheMap, whenClauseOpt)
+        case HiveRegex() => HiveDataConnector(sqlContext, dataConnectorParam.config, ruleExprs, globalFinalCacheMap)
+        case AvroRegex() => AvroDataConnector(sqlContext, dataConnectorParam.config, ruleExprs, globalFinalCacheMap)
         case _ => throw new Exception("connector creation error!")
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2de6a549/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/HiveDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/HiveDataConnector.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/HiveDataConnector.scala
index a0fd414..5d06210 100644
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/HiveDataConnector.scala
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/HiveDataConnector.scala
@@ -1,15 +1,15 @@
 package org.apache.griffin.measure.batch.connector
 
+import org.apache.griffin.measure.batch.rule.{ExprValueUtil, RuleExprs}
 import org.apache.griffin.measure.batch.rule.expr._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, Row, SQLContext}
 
 import scala.util.{Success, Try}
 
+// data connector for hive
 case class HiveDataConnector(sqlContext: SQLContext, config: Map[String, Any],
-                             groupbyExprs: Seq[MathExpr], cacheExprs: Iterable[Expr],
-                             finalCacheExprs: Iterable[Expr], globalFinalCacheMap: Map[String, Any],
-                             whenClauseOpt: Option[LogicalExpr]
+                             ruleExprs: RuleExprs, constFinalExprValueMap: Map[String, Any]
                             ) extends DataConnector {
 
   val Database = "database"
@@ -55,14 +55,14 @@ case class HiveDataConnector(sqlContext: SQLContext, config: Map[String, Any],
     Try {
       sqlContext.sql(dataSql).flatMap { row =>
         // generate cache data
-        val cacheData: Map[String, Any] = cacheExprs.foldLeft(globalFinalCacheMap) { (cachedMap, expr) =>
-          CacheDataUtil.genCachedMap(Some(row), expr, cachedMap)
+        val cacheExprValueMap: Map[String, Any] = ruleExprs.cacheExprs.foldLeft(constFinalExprValueMap) { (cachedMap, expr) =>
+          ExprValueUtil.genExprValueMap(Some(row), expr, cachedMap)
         }
-        val finalCacheData = CacheDataUtil.filterCachedMap(finalCacheExprs, cacheData)
+        val finalExprValueMap = ExprValueUtil.updateExprValueMap(ruleExprs.finalCacheExprs, cacheExprValueMap)
 
         // when clause filter data source
-        val whenResult = whenClauseOpt match {
-          case Some(whenClause) => whenClause.calculate(finalCacheData)
+        val whenResult = ruleExprs.whenClauseExprOpt match {
+          case Some(whenClause) => whenClause.calculate(finalExprValueMap)
           case _ => None
         }
 
@@ -70,15 +70,15 @@ case class HiveDataConnector(sqlContext: SQLContext, config: Map[String, Any],
         whenResult match {
           case Some(false) => None
           case _ => {
-            val groupbyData: Seq[AnyRef] = groupbyExprs.flatMap { expr =>
-              expr.calculate(finalCacheData) match {
+            val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { expr =>
+              expr.calculate(finalExprValueMap) match {
                 case Some(v) => Some(v.asInstanceOf[AnyRef])
                 case _ => None
               }
             }
             val key = toTuple(groupbyData)
 
-            Some((key, finalCacheData))
+            Some((key, finalExprValueMap))
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2de6a549/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala
index 49ebcea..de05eb3 100644
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala
@@ -8,7 +8,7 @@ import org.apache.spark.rdd.RDD
 
 import scala.util.Try
 
-
+// persist result and data to hdfs
 case class HdfsPersist(config: Map[String, Any], metricName: String, timeStamp: Long) extends Persist {
 
   val Path = "path"
@@ -16,8 +16,8 @@ case class HdfsPersist(config: Map[String, Any], metricName: String, timeStamp:
   val MaxLinesPerFile = "max.lines.per.file"
 
   val path = config.getOrElse(Path, "").toString
-  val maxPersistLines = config.getOrElse(MaxPersistLines, -1).toString.toLong
-  val maxLinesPerFile = config.getOrElse(MaxLinesPerFile, 10000).toString.toLong
+  val maxPersistLines = try { config.getOrElse(MaxPersistLines, -1).toString.toInt } catch { case _ => -1 }
+  val maxLinesPerFile = try { config.getOrElse(MaxLinesPerFile, 10000).toString.toLong } catch { case _ => 10000 }
 
   val separator = "/"
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2de6a549/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala
index fa2078b..bb0ea6f 100644
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala
@@ -6,6 +6,7 @@ import org.apache.spark.rdd.RDD
 
 import scala.util.Try
 
+// persist result by http way
 case class HttpPersist(config: Map[String, Any], metricName: String, timeStamp: Long) extends Persist {
 
   val Api = "api"

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2de6a549/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/MultiPersists.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/MultiPersists.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/MultiPersists.scala
index 33f4a08..11973e9 100644
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/MultiPersists.scala
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/MultiPersists.scala
@@ -6,6 +6,7 @@ import org.apache.spark.rdd.RDD
 
 import scala.util.Try
 
+// persist result and data by multiple persists
 case class MultiPersists(persists: Iterable[Persist]) extends Persist {
 
   val timeStamp: Long = persists match {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2de6a549/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/PersistFactory.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/PersistFactory.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/PersistFactory.scala
index 84dc4ce..356934b 100644
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/PersistFactory.scala
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/PersistFactory.scala
@@ -14,6 +14,7 @@ case class PersistFactory(persistParams: Iterable[PersistParam], metricName: Str
     MultiPersists(persistParams.flatMap(param => getPersist(timeStamp, param)))
   }
 
+  // get the persists configured
   private def getPersist(timeStamp: Long, persistParam: PersistParam): Option[Persist] = {
     val persistConfig = persistParam.config
     val persistTry = persistParam.persistType match {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2de6a549/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/AccuracyResult.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/AccuracyResult.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/AccuracyResult.scala
index 55505ac..3ee7544 100644
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/AccuracyResult.scala
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/AccuracyResult.scala
@@ -1,6 +1,6 @@
 package org.apache.griffin.measure.batch.result
 
-
+// result for accuracy: miss count, total count
 case class AccuracyResult(miss: Long, total: Long) extends Result {
 
   type T = AccuracyResult

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2de6a549/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/CalculationUtil.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/CalculationUtil.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/CalculationUtil.scala
new file mode 100644
index 0000000..e4e7bbf
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/CalculationUtil.scala
@@ -0,0 +1,272 @@
+package org.apache.griffin.measure.batch.rule
+
+import scala.util.{Success, Try}
+
+
+object CalculationUtil {
+
+  implicit def option2CalculationValue(v: Option[_]): CalculationValue = CalculationValue(v)
+
+  // redefine the calculation method of operators in DSL
+  case class CalculationValue(value: Option[_]) extends Serializable {
+
+    def + (other: Option[_]): Option[_] = {
+      Try {
+        (value, other) match {
+          case (Some(v1: String), Some(v2)) => Some(v1 + v2.toString)
+          case (Some(v1: Byte), Some(v2)) => Some(v1 + v2.toString.toByte)
+          case (Some(v1: Short), Some(v2)) => Some(v1 + v2.toString.toShort)
+          case (Some(v1: Int), Some(v2)) => Some(v1 + v2.toString.toInt)
+          case (Some(v1: Long), Some(v2)) => Some(v1 + v2.toString.toLong)
+          case (Some(v1: Float), Some(v2)) => Some(v1 + v2.toString.toFloat)
+          case (Some(v1: Double), Some(v2)) => Some(v1 + v2.toString.toDouble)
+          case (None, Some(v2)) => other
+          case _ => value
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => value
+      }
+    }
+
+    def - (other: Option[_]): Option[_] = {
+      Try {
+        (value, other) match {
+          case (Some(v1: Byte), Some(v2)) => Some(v1 - v2.toString.toByte)
+          case (Some(v1: Short), Some(v2)) => Some(v1 - v2.toString.toShort)
+          case (Some(v1: Int), Some(v2)) => Some(v1 - v2.toString.toInt)
+          case (Some(v1: Long), Some(v2)) => Some(v1 - v2.toString.toLong)
+          case (Some(v1: Float), Some(v2)) => Some(v1 - v2.toString.toFloat)
+          case (Some(v1: Double), Some(v2)) => Some(v1 - v2.toString.toDouble)
+          case _ => value
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => value
+      }
+    }
+
+    def * (other: Option[_]): Option[_] = {
+      Try {
+        (value, other) match {
+          case (Some(s1: String), Some(n2: Int)) => Some(s1 * n2)
+          case (Some(s1: String), Some(n2: Long)) => Some(s1 * n2.toInt)
+          case (Some(v1: Byte), Some(v2)) => Some(v1 * v2.toString.toByte)
+          case (Some(v1: Short), Some(v2)) => Some(v1 * v2.toString.toShort)
+          case (Some(v1: Int), Some(v2)) => Some(v1 * v2.toString.toInt)
+          case (Some(v1: Long), Some(v2)) => Some(v1 * v2.toString.toLong)
+          case (Some(v1: Float), Some(v2)) => Some(v1 * v2.toString.toFloat)
+          case (Some(v1: Double), Some(v2)) => Some(v1 * v2.toString.toDouble)
+          case _ => value
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => value
+      }
+    }
+
+    def / (other: Option[_]): Option[_] = {
+      Try {
+        (value, other) match {
+          case (Some(v1: Byte), Some(v2)) => Some(v1 / v2.toString.toByte)
+          case (Some(v1: Short), Some(v2)) => Some(v1 / v2.toString.toShort)
+          case (Some(v1: Int), Some(v2)) => Some(v1 / v2.toString.toInt)
+          case (Some(v1: Long), Some(v2)) => Some(v1 / v2.toString.toLong)
+          case (Some(v1: Float), Some(v2)) => Some(v1 / v2.toString.toFloat)
+          case (Some(v1: Double), Some(v2)) => Some(v1 / v2.toString.toDouble)
+          case _ => value
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => value
+      }
+    }
+
+    def % (other: Option[_]): Option[_] = {
+      Try {
+        (value, other) match {
+          case (Some(v1: Byte), Some(v2)) => Some(v1 % v2.toString.toByte)
+          case (Some(v1: Short), Some(v2)) => Some(v1 % v2.toString.toShort)
+          case (Some(v1: Int), Some(v2)) => Some(v1 % v2.toString.toInt)
+          case (Some(v1: Long), Some(v2)) => Some(v1 % v2.toString.toLong)
+          case (Some(v1: Float), Some(v2)) => Some(v1 % v2.toString.toFloat)
+          case (Some(v1: Double), Some(v2)) => Some(v1 % v2.toString.toDouble)
+          case _ => value
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => value
+      }
+    }
+
+    def unary_- (): Option[_] = {
+      value match {
+        case Some(v: String) => Some(v.reverse.toString)
+        case Some(v: Boolean) => Some(!v)
+        case Some(v: Byte) => Some(-v)
+        case Some(v: Short) => Some(-v)
+        case Some(v: Int) => Some(-v)
+        case Some(v: Long) => Some(-v)
+        case Some(v: Float) => Some(-v)
+        case Some(v: Double) => Some(-v)
+        case Some(v) => Some(v)
+        case _ => None
+      }
+    }
+
+
+    def === (other: Option[_]): Option[Boolean] = {
+      (value, other) match {
+        case (Some(v1), Some(v2)) => Some(v1 == v2)
+        case (None, None) => Some(true)
+        case _ => Some(false)
+      }
+    }
+
+    def =!= (other: Option[_]): Option[Boolean] = {
+      (value, other) match {
+        case (Some(v1), Some(v2)) => Some(v1 != v2)
+        case (None, None) => Some(false)
+        case _ => Some(true)
+      }
+    }
+
+    def > (other: Option[_]): Option[Boolean] = {
+      Try {
+        (value, other) match {
+          case (Some(v1: String), Some(v2: String)) => Some(v1 > v2)
+          case (Some(v1: Byte), Some(v2)) => Some(v1 > v2.toString.toDouble)
+          case (Some(v1: Short), Some(v2)) => Some(v1 > v2.toString.toDouble)
+          case (Some(v1: Int), Some(v2)) => Some(v1 > v2.toString.toDouble)
+          case (Some(v1: Long), Some(v2)) => Some(v1 > v2.toString.toDouble)
+          case (Some(v1: Float), Some(v2)) => Some(v1 > v2.toString.toDouble)
+          case (Some(v1: Double), Some(v2)) => Some(v1 > v2.toString.toDouble)
+          case _ => None
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => None
+      }
+    }
+
+    def >= (other: Option[_]): Option[Boolean] = {
+      Try {
+        (value, other) match {
+          case (None, None) => Some(true)
+          case (Some(v1: String), Some(v2: String)) => Some(v1 >= v2)
+          case (Some(v1: Byte), Some(v2)) => Some(v1 >= v2.toString.toDouble)
+          case (Some(v1: Short), Some(v2)) => Some(v1 >= v2.toString.toDouble)
+          case (Some(v1: Int), Some(v2)) => Some(v1 >= v2.toString.toDouble)
+          case (Some(v1: Long), Some(v2)) => Some(v1 >= v2.toString.toDouble)
+          case (Some(v1: Float), Some(v2)) => Some(v1 >= v2.toString.toDouble)
+          case (Some(v1: Double), Some(v2)) => Some(v1 >= v2.toString.toDouble)
+          case _ => None
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => None
+      }
+    }
+
+    def < (other: Option[_]): Option[Boolean] = {
+      Try {
+        (value, other) match {
+          case (Some(v1: String), Some(v2: String)) => Some(v1 < v2)
+          case (Some(v1: Byte), Some(v2)) => Some(v1 < v2.toString.toDouble)
+          case (Some(v1: Short), Some(v2)) => Some(v1 < v2.toString.toDouble)
+          case (Some(v1: Int), Some(v2)) => Some(v1 < v2.toString.toDouble)
+          case (Some(v1: Long), Some(v2)) => Some(v1 < v2.toString.toDouble)
+          case (Some(v1: Float), Some(v2)) => Some(v1 < v2.toString.toDouble)
+          case (Some(v1: Double), Some(v2)) => Some(v1 < v2.toString.toDouble)
+          case _ => None
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => None
+      }
+    }
+
+    def <= (other: Option[_]): Option[Boolean] = {
+      Try {
+        (value, other) match {
+          case (None, None) => Some(true)
+          case (Some(v1: String), Some(v2: String)) => Some(v1 <= v2)
+          case (Some(v1: Byte), Some(v2)) => Some(v1 <= v2.toString.toDouble)
+          case (Some(v1: Short), Some(v2)) => Some(v1 <= v2.toString.toDouble)
+          case (Some(v1: Int), Some(v2)) => Some(v1 <= v2.toString.toDouble)
+          case (Some(v1: Long), Some(v2)) => Some(v1 <= v2.toString.toDouble)
+          case (Some(v1: Float), Some(v2)) => Some(v1 <= v2.toString.toDouble)
+          case (Some(v1: Double), Some(v2)) => Some(v1 <= v2.toString.toDouble)
+          case _ => None
+        }
+      } match {
+        case Success(opt) => opt
+        case _ => None
+      }
+    }
+
+
+    def in (other: Iterable[Option[_]]): Option[Boolean] = {
+      other.foldLeft(Some(false): Option[Boolean]) { (res, next) =>
+        optOr(res, ===(next))
+      }
+    }
+
+    def not_in (other: Iterable[Option[_]]): Option[Boolean] = {
+      other.foldLeft(Some(true): Option[Boolean]) { (res, next) =>
+        optAnd(res, =!=(next))
+      }
+    }
+
+    def between (other: Iterable[Option[_]]): Option[Boolean] = {
+      if (other.size < 2) None else {
+        val (begin, end) = (other.head, other.tail.head)
+        if (begin.isEmpty && end.isEmpty) Some(value.isEmpty)
+        else optAnd(>=(begin), <=(end))
+      }
+    }
+
+    def not_between (other: Iterable[Option[_]]): Option[Boolean] = {
+      if (other.size < 2) None else {
+        val (begin, end) = (other.head, other.tail.head)
+        if (begin.isEmpty && end.isEmpty) Some(value.nonEmpty)
+        else optOr(<(begin), >(end))
+      }
+    }
+
+    def unary_! (): Option[Boolean] = {
+      optNot(value)
+    }
+
+    def && (other: Option[_]): Option[Boolean] = {
+      optAnd(value, other)
+    }
+
+    def || (other: Option[_]): Option[Boolean] = {
+      optOr(value, other)
+    }
+
+
+    private def optNot(a: Option[_]): Option[Boolean] = {
+      a match {
+        case Some(v: Boolean) => Some(!v)
+        case _ => None
+      }
+    }
+    private def optAnd(a: Option[_], b: Option[_]): Option[Boolean] = {
+      (a, b) match {
+        case (Some(false), _) | (_, Some(false)) => Some(false)
+        case (Some(b1: Boolean), Some(b2: Boolean)) => Some(b1 && b2)
+        case _ => None
+      }
+    }
+    private def optOr(a: Option[_], b: Option[_]): Option[Boolean] = {
+      (a, b) match {
+        case (Some(true), _) | (_, Some(true)) => Some(true)
+        case (Some(b1: Boolean), Some(b2: Boolean)) => Some(b1 || b2)
+        case _ => None
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2de6a549/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/ExprValueUtil.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/ExprValueUtil.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/ExprValueUtil.scala
new file mode 100644
index 0000000..ca42c5f
--- /dev/null
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/ExprValueUtil.scala
@@ -0,0 +1,75 @@
+package org.apache.griffin.measure.batch.rule
+
+import org.apache.griffin.measure.batch.rule.expr._
+import org.apache.spark.sql.Row
+
+import scala.util.{Success, Try}
+
+object ExprValueUtil {
+
+  // from origin data such as a Row of DataFrame, with existed expr value map, calculate related expression, get the expression value
+  // for now, one expr only get one value, not supporting one expr get multiple values
+  // params:
+  // - originData: the origin data such as a Row of DataFrame
+  // - expr: the expression to be calculated
+  // - existExprValueMap: existed expression value map, which might be used to get some existed expression value during calculation
+  // output: the calculated expression value
+  private def calcExprValue(originData: Option[Any], expr: Expr, existExprValueMap: Map[String, Any]): Option[Any] = {
+    Try {
+      expr match {
+        case selection: SelectionExpr => {
+          selection.selectors.foldLeft(originData) { (dt, selector) =>
+            calcExprValue(dt, selector, existExprValueMap)
+          }
+        }
+        case selector: IndexFieldRangeSelectExpr => {
+          originData match {
+            case Some(row: Row) => {
+              if (selector.fields.size == 1) {
+                selector.fields.head match {
+                  case i: IndexDesc => Some(row.getAs[Any](i.index))
+                  case f: FieldDesc => Some(row.getAs[Any](f.field))
+                  case _ => None
+                }
+              } else None
+            }
+            case _ => None
+          }
+        }
+        case _ => expr.calculate(existExprValueMap)
+      }
+    } match {
+      case Success(v) => v
+      case _ => None
+    }
+  }
+
+  // try to calculate expr from data and initExprValueMap, generate a new expression value map
+  // depends on origin data and existed expr value map
+  def genExprValueMap(data: Option[Any], expr: Expr, initExprValueMap: Map[String, Any]): Map[String, Any] = {
+    val valueOpt = calcExprValue(data, expr, initExprValueMap)
+    if (valueOpt.nonEmpty) {
+      initExprValueMap + (expr._id -> valueOpt.get)
+    } else initExprValueMap
+  }
+
+  // try to calculate some exprs from data and initExprValueMap, generate a new expression value map
+  // depends on origin data and existed expr value map
+  def genExprValueMap(data: Option[Any], exprs: Iterable[Expr], initExprValueMap: Map[String, Any]): Map[String, Any] = {
+    exprs.foldLeft(initExprValueMap) { (evMap, expr) =>
+      ExprValueUtil.genExprValueMap(None, expr, evMap)
+    }
+  }
+
+  // with exprValueMap, calculate expressions, update the expression value map
+  // only depends on existed expr value map, only calculation, not need origin data
+  def updateExprValueMap(exprs: Iterable[Expr], exprValueMap: Map[String, Any]): Map[String, Any] = {
+    exprs.foldLeft(Map[String, Any]()) { (evMap, expr) =>
+      val valueOpt = expr.calculate(exprValueMap)
+      if (valueOpt.nonEmpty) {
+        evMap + (expr._id -> valueOpt.get)
+      } else evMap
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2de6a549/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzer.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzer.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzer.scala
index 78665f9..d129ad7 100644
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzer.scala
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzer.scala
@@ -4,25 +4,57 @@ import org.apache.griffin.measure.batch.rule.expr._
 
 case class RuleAnalyzer(rule: StatementExpr) extends Serializable {
 
-  val GlobalData = ""
-  val SourceData = "source"
-  val TargetData = "target"
+  val constData = ""
+  private val SourceData = "source"
+  private val TargetData = "target"
 
-  val globalCacheExprs: Iterable[Expr] = rule.getCacheExprs(GlobalData)
-  val sourceCacheExprs: Iterable[Expr] = rule.getCacheExprs(SourceData)
-  val targetCacheExprs: Iterable[Expr] = rule.getCacheExprs(TargetData)
+  val constCacheExprs: Iterable[Expr] = rule.getCacheExprs(constData)
+  private val sourceCacheExprs: Iterable[Expr] = rule.getCacheExprs(SourceData)
+  private val targetCacheExprs: Iterable[Expr] = rule.getCacheExprs(TargetData)
 
-  val sourcePersistExprs: Iterable[Expr] = rule.getPersistExprs(SourceData)
-  val targetPersistExprs: Iterable[Expr] = rule.getPersistExprs(TargetData)
+  private val sourcePersistExprs: Iterable[Expr] = rule.getPersistExprs(SourceData)
+  private val targetPersistExprs: Iterable[Expr] = rule.getPersistExprs(TargetData)
 
-  val globalFinalCacheExprs: Iterable[Expr] = rule.getFinalCacheExprs(GlobalData).toSet
-  val sourceFinalCacheExprs: Iterable[Expr] = rule.getFinalCacheExprs(SourceData).toSet ++ sourcePersistExprs.toSet
-  val targetFinalCacheExprs: Iterable[Expr] = rule.getFinalCacheExprs(TargetData).toSet ++ targetPersistExprs.toSet
+  val constFinalCacheExprs: Iterable[Expr] = rule.getFinalCacheExprs(constData).toSet
+  private val sourceFinalCacheExprs: Iterable[Expr] = rule.getFinalCacheExprs(SourceData).toSet ++ sourcePersistExprs.toSet
+  private val targetFinalCacheExprs: Iterable[Expr] = rule.getFinalCacheExprs(TargetData).toSet ++ targetPersistExprs.toSet
 
-  val groupbyExprPairs: Seq[(MathExpr, MathExpr)] = rule.getGroupbyExprPairs((SourceData, TargetData))
-  val sourceGroupbyExprs: Seq[MathExpr] = groupbyExprPairs.map(_._1)
-  val targetGroupbyExprs: Seq[MathExpr] = groupbyExprPairs.map(_._2)
+  private val groupbyExprPairs: Seq[(Expr, Expr)] = rule.getGroupbyExprPairs((SourceData, TargetData))
+  private val sourceGroupbyExprs: Seq[Expr] = groupbyExprPairs.map(_._1)
+  private val targetGroupbyExprs: Seq[Expr] = groupbyExprPairs.map(_._2)
 
-  val whenClauseExpr: Option[LogicalExpr] = rule.getWhenClauseExpr
+  private val whenClauseExprOpt: Option[LogicalExpr] = rule.getWhenClauseExpr
+
+  val sourceRuleExprs: RuleExprs = RuleExprs(sourceGroupbyExprs, sourceCacheExprs,
+    sourceFinalCacheExprs, sourcePersistExprs, whenClauseExprOpt)
+  val targetRuleExprs: RuleExprs = RuleExprs(targetGroupbyExprs, targetCacheExprs,
+    targetFinalCacheExprs, targetPersistExprs, whenClauseExprOpt)
 
 }
+
+
+// for a single data source
+// groupbyExprs: in accuracy case, these exprs could be groupby exprs
+//                  Data keys for accuracy case, generated by the equal statements, to improve the calculation efficiency
+// cacheExprs: the exprs value could be caculated independently, and cached for later use
+//                  Cached for the finalCacheExprs calculation, it has some redundant values, saving it wastes a lot
+// finalCacheExprs: the root of cachedExprs, cached for later use, plus with persistExprs
+//                  Cached for the calculation usage, and can be saved for the re-calculation in streaming mode
+// persistExprs: the expr values should be persisted, only the direct selection exprs are persistable
+//                  Persisted for record usage, to record the missing data, need be readable as raw data
+// whenClauseExprOpt: when clause of rule, to determine if the row of data source is filtered
+//                  Can be prep-calculated to filter some data in data connector
+case class RuleExprs(groupbyExprs: Seq[Expr],
+                     cacheExprs: Iterable[Expr],
+                     finalCacheExprs: Iterable[Expr],
+                     persistExprs: Iterable[Expr],
+                     whenClauseExprOpt: Option[LogicalExpr]
+                    ) {
+  // for example: for a rule "$source.name = $target.name AND $source.age < $target.age + (3 * 4)"
+  // in this rule, for the target data source, the targetRuleExprs looks like below
+  // groupbyExprs: $target.name
+  // cacheExprs: $target.name, $target.age, $target.age + (3 * 4)
+  // finalCacheExprs: $target.name, $target.age + (3 * 4), $target.age
+  // persistExprs: $target.name, $target.age
+  // whenClauseExprOpt: None
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2de6a549/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParser.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParser.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParser.scala
index 996e808..47bf38e 100644
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParser.scala
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParser.scala
@@ -120,18 +120,19 @@ case class RuleParser() extends JavaTokenParsers with Serializable {
   import SomeNumber._
 
   // -- literal --
-  def literal: Parser[LiteralExpr] = literialString | literialTime | literialNumber | literialBoolean
+  def literal: Parser[LiteralExpr] = literialString | literialTime | literialNumber | literialBoolean | literialNull
   def literialString: Parser[LiteralStringExpr] = (SQuote ~> AnyString <~ SQuote | DQuote ~> AnyString <~ DQuote) ^^ { LiteralStringExpr(_) }
   def literialNumber: Parser[LiteralNumberExpr] = (DoubleNumber | IntegerNumber) ^^ { LiteralNumberExpr(_) }
   def literialTime: Parser[LiteralTimeExpr] = """(\d+(d|h|m|s|ms))+""".r ^^ { LiteralTimeExpr(_) }
   def literialBoolean: Parser[LiteralBooleanExpr] = ("""(?i)true""".r | """(?i)false""".r) ^^ { LiteralBooleanExpr(_) }
+  def literialNull: Parser[LiteralNullExpr] = ("""(?i)null""".r | """(?i)undefined""".r | """(?i)none""".r) ^^ { LiteralNullExpr(_) }
 
   // -- selection --
   // <selection> ::= <selection-head> [ <field-sel> | <function-operation> | <index-field-range-sel> | <filter-sel> ]+
   def selection: Parser[SelectionExpr] = selectionHead ~ rep(selector) ^^ {
     case head ~ selectors => SelectionExpr(head, selectors)
   }
-  def selector: Parser[SelectExpr] = (fieldSelect | functionOperation | indexFieldRangeSelect | filterSelect)
+  def selector: Parser[SelectExpr] = (functionOperation | fieldSelect | indexFieldRangeSelect | filterSelect)
 
   def selectionHead: Parser[SelectionHead] = DataSourceKeywords ^^ { SelectionHead(_) }
   // <field-sel> ::= "." <field-string>
@@ -215,84 +216,4 @@ case class RuleParser() extends JavaTokenParsers with Serializable {
     case ls ~ _ => SimpleStatementExpr(ls)
   }
 
-  // for complie only
-//  case class NullStatementExpr(expression: String) extends StatementExpr {
-//    def genValue(values: Map[String, Any]): Option[Any] = None
-//    def getDataRelatedExprs(dataSign: String): Iterable[DataExpr] = Nil
-//  }
-//  def statementsExpr = mathExpr ^^ { NullStatementExpr(_) }
-
-
-//
-//  // basic
-//  val anyString: Parser[String] = """[^'{}\[\]()=<>.$@;+\-*/\\\"]*""".r
-//  val variable: Parser[String] = """[a-zA-Z_]\w*""".r
-//  val number: Parser[String] = """[+\-]?\d+""".r
-//  val time: Parser[String] = """\d+(y|M|w|d|h|m|s|ms)""".r
-//
-//  val numPosition: Parser[String] = """\d+""".r
-//  val anyPosition: Parser[String] = "*"
-//
-//  val filterOpr: Parser[String] = "=" | "!=" | ">" | "<" | ">=" | "<="
-//
-//  val opr1: Parser[String] = "*" | "/" | "%"
-//  val opr2: Parser[String] = "+" | "-"
-//
-//  val assignOpr: Parser[String] = "="
-//  val compareOpr: Parser[String] = "==" | "!=" | ">" | "<" | ">=" | "<="
-//  val mappingOpr: Parser[String] = "==="
-//
-//  val exprSep: Parser[String] = ";"
-//
-//  // simple
-//  def variableString: Parser[VariableExpr] = (("'" ~> anyString <~ "'") | ("\"" ~> anyString <~ "\"")) ^^ { VariableStringExpr(_) }
-//  def constString: Parser[ConstExpr] = (("'" ~> anyString <~ "'") | ("\"" ~> anyString <~ "\"")) ^^ { ConstStringExpr(_) }
-//  def constValue: Parser[ConstExpr] = time ^^ { ConstTimeExpr(_) } | number ^^ { ConstNumberExpr(_)} | constString
-//  def variableValue: Parser[VariableExpr] = variable ^^ { VariableStringExpr(_) }
-//  def quoteVariableValue: Parser[QuoteVariableExpr] = "${" ~> variable <~ "}" ^^ { QuoteVariableExpr(_) }
-//  def position: Parser[SelectExpr] = anyPosition ^^ { AnyPositionExpr(_) } | """\d+""".r ^^ { NumPositionExpr(_) } | (("'" ~> anyString <~ "'") | ("\"" ~> anyString <~ "\"")) ^^ { StringPositionExpr(_) }
-//  def argument: Parser[ConstExpr] = constValue
-//  def annotationExpr: Parser[AnnotationExpr] = "@" ~> variable ^^ { AnnotationExpr(_) }
-//
-//  // selector
-//  def filterOpration: Parser[SelectExpr] = (variableString ~ filterOpr ~ constString) ^^ {
-//    case v ~ opr ~ c => FilterOprExpr(opr, v, c)
-//  }
-//  def positionExpr: Parser[SelectExpr] = "[" ~> (filterOpration | position) <~ "]"
-//  def functionExpr: Parser[SelectExpr] = "." ~ variable ~ "(" ~ repsep(argument, ",") ~ ")" ^^ {
-//    case _ ~ v ~ _ ~ args ~ _ => FunctionExpr(v, args)
-//  }
-//  def selectorExpr: Parser[SelectExpr] = positionExpr | functionExpr
-//
-//  // data
-//  def selectorsExpr: Parser[DataExpr] = quoteVariableValue ~ rep(selectorExpr) ^^ {
-//    case q ~ tails => SelectionExpr(q, tails)
-//  }
-//
-//  // calculation
-//  def factor: Parser[ElementExpr] = (constValue | selectorsExpr | "(" ~> expr <~ ")") ^^ { FactorExpr(_) }
-//  def term: Parser[ElementExpr] = factor ~ rep(opr1 ~ factor) ^^ {
-//    case a ~ Nil => a
-//    case a ~ list => CalculationExpr(a, list.map(c => (c._1, c._2)))
-//  }
-//  def expr: Parser[ElementExpr] = term ~ rep(opr2 ~ term) ^^ {
-//    case a ~ Nil => a
-//    case a ~ list => CalculationExpr(a, list.map(c => (c._1, c._2)))
-//  }
-//
-//  // statement
-//  def assignExpr: Parser[StatementExpr] = variableValue ~ assignOpr ~ expr ^^ {
-//    case v ~ opr ~ c => AssignExpr(opr, v, c)
-//  }
-//  def conditionExpr: Parser[StatementExpr] = rep(annotationExpr) ~ expr ~ compareOpr ~ expr ^^ {
-//    case anos ~ le ~ opr ~ re => ConditionExpr(opr, le, re, anos)
-//  }
-//  def mappingExpr: Parser[StatementExpr] = rep(annotationExpr) ~ expr ~ mappingOpr ~ expr ^^ {
-//    case anos ~ le ~ opr ~ re => MappingExpr(opr, le, re, anos)
-//  }
-//  def statementExpr: Parser[StatementExpr] = assignExpr | conditionExpr | mappingExpr
-//
-//  // statements
-//  def statementsExpr: Parser[StatementExpr] = repsep(statementExpr, exprSep) ^^ { StatementsExpr(_) }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2de6a549/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/AnalyzableExpr.scala
----------------------------------------------------------------------
diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/AnalyzableExpr.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/AnalyzableExpr.scala
index 0d54707..82c494c 100644
--- a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/AnalyzableExpr.scala
+++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/AnalyzableExpr.scala
@@ -2,6 +2,6 @@ package org.apache.griffin.measure.batch.rule.expr
 
 
 trait AnalyzableExpr extends Serializable {
-  def getGroupbyExprPairs(dsPair: (String, String)): Seq[(MathExpr, MathExpr)] = Nil
+  def getGroupbyExprPairs(dsPair: (String, String)): Seq[(Expr, Expr)] = Nil
   def getWhenClauseExpr(): Option[LogicalExpr] = None
 }
\ No newline at end of file