You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2018/01/09 09:43:37 UTC

[10/11] incubator-griffin git commit: upgrade new version

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/griffin-doc/measure/dsl-guide.md
----------------------------------------------------------------------
diff --git a/griffin-doc/measure/dsl-guide.md b/griffin-doc/measure/dsl-guide.md
new file mode 100644
index 0000000..e7f8569
--- /dev/null
+++ b/griffin-doc/measure/dsl-guide.md
@@ -0,0 +1,181 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Apache Griffin DSL Guide
+Griffin DSL is designed for DQ measurement, as a SQL-like language, trying to describe the DQ domain request.
+
+## Griffin DSL Syntax Description
+Griffin DSL is SQL-like, case insensitive, and easy to learn.
+
+### Supporting process
+- logical operation: not, and, or, in, between, like, is null, is nan, =, !=, <=, >=, <, >
+- mathematical operation: +, -, *, /, %
+- sql statement: as, where, group by, having, order by, limit
+
+
+### Keywords
+- `null, nan, true, false`
+- `not, and, or`
+- `in, between, like, is`
+- `select, from, as, where, group, by, having, order, desc, asc, limit`
+
+### Operators
+- `!, &&, ||, =, !=, <, >, <=, >=, <>`
+- `+, -, *, /, %`
+- `(, )`
+- `., [, ]`
+
+### Literals
+- **string**: any string surrounded with a pair of " or ', with escape charactor \ if any request.  
+	e.g. `"test"`, `'string 1'`, `"hello \" world \" "`
+- **number**: double or integer number.  
+	e.g. `123`, `33.5`
+- **time**: a integer with unit in a string, will be translated to a integer number in millisecond.  
+	e.g. `3d`, `5h`, `4ms`
+- **boolean**: boolean value directly.  
+	e.g. `true`, `false`
+
+### Selections
+- **selection head**: data source name.  
+	e.g. `source`, `target`, `` `my table name` ``
+- **all field selection**: * or with data source name ahead.  
+	e.g. `*`, `source.*`, `target.*`
+- **field selection**: field name or with data source name ahead.  
+	e.g. `source.age`, `target.name`, `user_id`
+- **index selection**: interget between square brackets "[]" with field name ahead.  
+	e.g. `source.attributes[3]`
+- **function selection**: function name with brackets "()", with field name ahead or not.  
+	e.g. `count(*)`, `*.count()`, `source.user_id.count()`, `max(source.age)`
+- **alias**: declare an alias after a selection.  
+	e.g. `source.user_id as id`, `target.user_name as name`
+
+### Math expressions
+- **math factor**: literal or function or selection or math exression with brackets.  
+	e.g. `123`, `max(1, 2, 3, 4)`, `source.age`, `(source.age + 13)`
+- **unary math expression**: unary math operator with factor.  
+	e.g. `-(100 - source.score)`
+- **binary math expression**: math factors with binary math operators.  
+	e.g. `source.age + 13`, `score * 2 + ratio`
+
+### Logical expression
+- **in**: in clause like sql.  
+	e.g. `source.country in ("USA", "CHN", "RSA")`
+- **between**: between clause like sql.  
+	e.g. `source.age between 3 and 30`, `source.age between (3, 30)`
+- **like**: like clause like sql.  
+	e.g. `source.name like "%abc%"`
+- **is null**: is null operator like sql.  
+	e.g. `source.desc is not null`
+- **is nan**: check if the value is not a number, the syntax like `is null`  
+	e.g. `source.age is not nan`
+- **logical factor**: math expression or logical expressions above or other logical expressions with brackets.  
+	e.g. `(source.user_id = target.user_id AND source.age > target.age)`
+- **unary logical expression**: unary logical operator with factor.  
+	e.g. `NOT source.has_data`, `!(source.age = target.age)`
+- **binary logical expression**: logical factors with binary logical operators, including `and`, `or` and comparison operators.  
+	e.g. `source.age = target.age OR source.ticket = target.tck`
+
+
+### Expression
+- **expression**: logical expression and math expression.
+
+### Function
+- **argument**: expression.
+- **function**: function name with arguments between brackets.  
+	e.g. `max(source.age, target.age)`, `count(*)`
+
+### Clause
+- **select clause**: the result columns like sql select clause, we can ignore the word "select" in Griffin DSL.  
+	e.g. `select user_id.count(), age.max() as max`, `source.user_id.count() as cnt, source.age.min()`
+- **from clause**: the table name like sql from clause, in which the data source name must be one of data source names or the output table name of the former rule steps, we can ignore this clause by configoring the data source name.  
+	e.g. `from source`, ``from `target` ``
+- **where clause**: the filter condition like sql where clause, optional.  
+	e.g. `where source.id = target.id and source.age = target.age`
+- **group-by clause**: like the group-by clause in sql, optional. Optional having clause could be following.  
+	e.g. `group by cntry`, `group by gender having count(*) > 50`
+- **order-by clause**: like the order-by clause, optional.  
+	e.g. `order by name`, `order by first_name desc, age asc`
+- **limit clause**: like the limit clause in sql, optional.  
+	e.g. `limit 5`
+
+### Accuracy Rule
+Accuracy rule expression in Griffin DSL is a logical expression, telling the mapping relation between data sources.  
+	e.g. `source.id = target.id and source.name = target.name and source.age between (target.age, target.age + 5)`
+
+### Profiling Rule
+Profiling rule expression in Griffin DSL is a sql-like expression, with select clause ahead, following optional from clause, where clause, group-by clause, order-by clause, limit clause in order.  
+	e.g. `source.gender, source.id.count() where source.age > 20 group by source.gender`, `select country, max(age), min(age), count(*) as cnt from source group by country order by cnt desc limit 5`
+
+## Griffin DSL translation to SQL
+Griffin DSL is defined for DQ measurement, to describe DQ domain problem.  
+Actually, in Griffin, we get Griffin DSL rules, translate them into spark-sql rules for calculation in spark-sql engine.  
+In DQ domain, there're multiple dimensions, we need to translate them in different ways.
+
+### Accuracy
+For accuracy, we need to get the match count between source and target, the rule describes the mapping relation between data sources. Griffin needs to translate the dsl rule into multiple sql rules.  
+For example, the dsl rule is `source.id = target.id and source.name = target.name`, which represents the match condition of accuracy. After the translation, the sql rules are as below:  
+- **get miss items from source**: `SELECT source.* FROM source LEFT JOIN target ON coalesce(source.id, '') = coalesce(target.id, '') and coalesce(source.name, '') = coalesce(target.name, '') WHERE (NOT (source.id IS NULL AND source.name IS NULL)) AND (target.id IS NULL AND target.name IS NULL)`, save as table `miss_items`.
+- **get miss count**: `SELECT COUNT(*) AS miss FROM miss_items`, save as table `miss_count`.
+- **get total count from source**: `SELECT COUNT(*) AS total FROM source`, save as table `total_count`.
+- **get accuracy metric**: `SELECT miss_count.miss AS miss, total_count.total AS total, (total_count.total - miss_count.miss) AS matched FROM miss_count FULL JOIN total_count`, save as table `accuracy`.  
+
+After the translation, the metrics will be persisted in table `accuracy`.
+
+### Profiling
+For profiling, the request is always the aggregation function of data, the rule is mainly the same as sql, but only supporting `select`, `from`, `where`, `group-by`, `having`, `order-by`, `limit` clauses, which can describe most of the profiling requests. If any complicate request, you can use sql rule directly to describe it.  
+For example, the dsl rule is `source.cntry, source.id.count(), source.age.max() group by source.cntry`, which represents the profiling requests. After the translation, the sql rule is as below:  
+- **profiling sql rule**: `SELECT source.cntry, count(source.id), max(source.age) FROM source GROUP BY source.cntry`, save as table `profiling`.  
+
+After the translation, the metrics will be persisted in table `profiling`.  
+
+## Alternative Rules
+You can simply use Griffin DSL rule to describe your problem in DQ domain, for some complicate requirement, you can also use some alternative rules supported by Griffin.  
+
+### Spark sql
+Griffin supports spark-sql directly, you can write rule in sql like this:  
+```
+{
+	"dsl.type": "spark-sql",
+	"name": "source",
+	"rule": "SELECT count(id) AS cnt, max(timestamp) AS fresh_time FROM source"
+}
+```
+Griffin will calculate it in spark-sql engine directly.  
+
+### Data frame operation
+Griffin supports some other operations on data frame in spark, like converting json string data frame into extracted data frame with extracted object schema. For example:  
+```
+{
+	"dsl.type": "df-opr",
+	"name": "ext_source",
+	"rule": "from_json",
+	"details": {
+		"df.name": "json_source"
+	}
+}
+```
+Griffin will do the operation to extract json strings.  
+Actually, you can also extend the df-opr engine and df-opr adaptor in Griffin to support more types of data frame operations.  
+
+## Tips
+Griffin engine runs on spark, it might works in two phases, pre-proc phase and run phase.  
+- **Pre-proc phase**: Griffin calculates data source directly, to get appropriate data format, as a preparation for DQ calculation. In this phase, you can use df-opr and spark-sql rules.  
+After preparation, to support streaming DQ calculation, a timestamp column will be added in each row of data, so the data frame in run phase contains an extra column named "__tmst".  
+- **Run phase**: Griffin calculates with prepared data, to get the DQ metrics. In this phase, you can use griffin-dsl, spark-sql rules, and a part of df-opr rules.  
+For griffin-dsl rule, griffin translates it into spark-sql rule with a group-by condition for column "__tmst", it's useful for especially streaming DQ calculation. But for spark-sql rule, griffin use it directly, you need to add the "__tmst" column in your spark-sql rule explicitly, or you can't get correct metrics result after calculation.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/griffin-doc/measure/measure-batch-sample.md
----------------------------------------------------------------------
diff --git a/griffin-doc/measure/measure-batch-sample.md b/griffin-doc/measure/measure-batch-sample.md
new file mode 100644
index 0000000..3783f94
--- /dev/null
+++ b/griffin-doc/measure/measure-batch-sample.md
@@ -0,0 +1,140 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Measure Batch Sample
+Measures consists of batch measure and streaming measure. This document is for the batch measure sample.
+
+## Batch Accuracy Sample
+```
+{
+  "name": "accu_batch",
+
+  "process.type": "batch",
+
+  "data.sources": [
+    {
+      "name": "src",
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "users_info_src.avro"
+          }
+        }
+      ]
+    }, {
+      "name": "tgt",
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "users_info_target.avro"
+          }
+        }
+      ]
+    }
+  ],
+
+  "evaluateRule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "accuracy",
+        "rule": "src.user_id = tgt.user_id AND upper(src.first_name) = upper(tgt.first_name) AND src.last_name = tgt.last_name",
+        "details": {
+          "source": "src",
+          "target": "tgt",
+          "miss.records": {
+            "name": "miss.records",
+            "persist.type": "record"
+          },
+          "accuracy": {
+            "name": "accu",
+            "persist.type": "metric"
+          },
+          "miss": "miss_count",
+          "total": "total_count",
+          "matched": "matched_count"
+        }
+      }
+    ]
+  }
+}
+```
+Above is the configure file of batch accuracy job.  
+
+### Data source
+In this sample, we use avro file as source and target.  
+
+### Evaluate rule
+In this accuracy sample, the rule describes the match condition: `src.user_id = tgt.user_id AND upper(src.first_name) = upper(tgt.first_name) AND src.last_name = tgt.last_name`.  
+The accuracy metrics will be persisted as metric, with miss column named "miss_count", total column named "total_count", matched column named "matched_count".  
+The miss records of source will be persisted as record.  
+
+## Batch Profiling Sample
+```
+{
+  "name": "prof_batch_test",
+
+  "process.type": "batch",
+
+  "data.sources": [
+    {
+      "name": "source",
+      "connectors": [
+        {
+          "type": "hive",
+          "version": "1.2",
+          "config": {
+          	"database": "griffin",
+          	"table.name": "demo_src"
+          }
+        }
+      ]
+    }
+  ],
+
+  "evaluateRule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "profiling",
+        "rule": "country, country.count() as cnt group by country order by cnt desc limit 3",
+        "details": {
+          "source": "source",
+          "profiling": {
+            "name": "cntry-group",
+            "persist.type": "metric"
+          }
+        }
+      }
+    ]
+  }
+}
+```
+Above is the configure file of batch profiling job.  
+
+### Data source
+In this sample, we use hive table as source.  
+
+### Evaluate rule
+In this profiling sample, the rule describes the profiling request: `country, country.count() as cnt group by country order by cnt desc limit 3`.  
+The profiling metrics will be persisted as metric, listing the most 3 groups of items in same country.  
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/griffin-doc/measure/measure-configuration-guide.md
----------------------------------------------------------------------
diff --git a/griffin-doc/measure/measure-configuration-guide.md b/griffin-doc/measure/measure-configuration-guide.md
new file mode 100644
index 0000000..0632927
--- /dev/null
+++ b/griffin-doc/measure/measure-configuration-guide.md
@@ -0,0 +1,211 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Griffin Measure Configuration Guide
+Griffin measure module needs two configuration files to define the parameters of execution, one is for environment, the other is for dq job.
+
+## Environment Parameters
+```
+{
+  "spark": {
+    "log.level": "WARN",
+    "checkpoint.dir": "hdfs:///griffin/streaming/cp",
+    "batch.interval": "5s",
+    "process.interval": "30s",
+    "config": {
+      "spark.task.maxFailures": 5,
+      "spark.streaming.kafkaMaxRatePerPartition": 1000,
+      "spark.streaming.concurrentJobs": 4
+    }
+  },
+
+  "persist": [
+    {
+      "type": "log",
+      "config": {
+        "max.log.lines": 100
+      }
+    }, {
+      "type": "hdfs",
+      "config": {
+        "path": "hdfs:///griffin/streaming/persist",
+        "max.persist.lines": 10000,
+        "max.lines.per.file": 10000
+      }
+    }
+  ],
+
+  "info.cache": [
+    {
+      "type": "zk",
+      "config": {
+        "hosts": "<zookeeper host ip>:2181",
+        "namespace": "griffin/infocache",
+        "lock.path": "lock",
+        "mode": "persist",
+        "init.clear": true,
+        "close.clear": false
+      }
+    }
+  ]
+}
+```
+Above lists environment parameters.  
+
+- **spark**: This field configures spark and spark streaming parameters.  
+	+ log.level: Level of spark log.
+	+ checkpoint.dir: Check point directory of spark streaming, for streaming mode.
+	+ batch.interval: Interval of dumping streaming data, for streaming mode.
+	+ process.interval: Interval of processing dumped streaming data, for streaming mode.
+	+ config: Configuration of spark parameters.
+- **persist**: This field configures list of metrics persist parameters, multiple persist ways are supported. Details of persist configuration [here](#persist).
+- **info.cache**: This field configures list of information cache parameters, multiple cache ways are supported. It is only for streaming dq case. Details of info cache configuration [here](#info-cache).
+
+### <a name="persist"></a>Persist
+- **type**: Metrics persist type, "log", "hdfs" and "http". 
+- **config**: Configure parameters of each persist type.
+	+ log persist
+		* max.log.lines: the max lines of log.
+	+ hdfs persist
+		* path: hdfs path to persist metrics
+		* max.persist.lines: the max lines of total persist data.
+		* max.lines.per.file: the max lines of each persist file.
+	+ http persist
+		* api: api to submit persist metrics.
+		* method: http method, "post" default.
+
+### <a name="info-cache"></a>Info Cache
+- **type**: Information cache type, "zk" for zookeeper cache.
+- **config**: Configure parameters of info cache type.
+	+ zookeeper cache
+		* hosts: zookeeper hosts list as a string, separated by comma.
+		* namespace: namespace of cache info, "" as default.
+		* lock.path: path of lock info, "lock" as default.
+		* mode: create mode of zookeeper node, "persist" as default.
+		* init.clear: clear cache info when initialize, true default.
+		* close.clear: clear cache info when close connection, false default.
+
+## DQ Job Parameters
+```
+{
+  "name": "accu_batch",
+
+  "process.type": "batch",
+
+  "data.sources": [
+    {
+      "name": "src",
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+          	"file.path": "<path>/<to>",
+            "file.name": "<source-file>.avro"
+          }
+        }
+      ]
+    }, {
+      "name": "tgt",
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+          	"file.path": "<path>/<to>",
+            "file.name": "<target-file>.avro"
+          }
+        }
+      ]
+    }
+  ],
+
+  "evaluateRule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "accuracy",
+        "rule": "src.user_id = tgt.user_id AND upper(src.first_name) = upper(tgt.first_name) AND src.last_name = tgt.last_name",
+        "details": {
+          "source": "src",
+          "target": "tgt",
+          "miss.records": {
+            "name": "miss.records",
+            "persist.type": "record"
+          },
+          "accuracy": {
+            "name": "accu",
+            "persist.type": "metric"
+          },
+          "miss": "miss_count",
+          "total": "total_count",
+          "matched": "matched_count"
+        }
+      }
+    ]
+  }
+}
+```
+Above lists DQ job configure parameters.  
+
+- **name**: Name of DQ job.
+- **process.type**: Process type of DQ job, "batch" or "streaming".
+- **data.sources**: List of data sources in this DQ job.
+	+ name: Name of this data source, it should be different from other data sources.
+	+ connectors: List of data connectors combined as the same data source. Details of data connector configuration [here](#data-connector).
+- **evaluateRule**: Evaluate rule parameters of this DQ job.
+	+ dsl.type: Default dsl type of all the rules.
+	+ rules: List of rules, to define every rule step. Details of rule configuration [here](#rule).
+
+### <a name="data-connector"></a>Data Connector
+- **type**: Data connector type, "avro", "hive", "text-dir" for batch mode, "kafka" for streaming mode.
+- **version**: Version string of data connector type.
+- **config**: Configure parameters of each data connector type.
+	+ avro data connector
+		* file.path: avro file path, optional, "" as default.
+		* file.name: avro file name.
+	+ hive data connector
+		* database: data base name, optional, "default" as default.
+		* table.name: table name.
+		* partitions: partition conditions string, split by ";" and ",", optional. 
+			e.g. `dt=20170410, hour=15; dt=20170411, hour=15; dt=20170412, hour=15`
+	+ text dir data connector
+		* dir.path: parent directory path.
+		* data.dir.depth: integer, depth of data directories, 0 as default.
+		* success.file: success file name, 
+		* done.file: 
+
+### <a name="rule"></a>Rule
+- **dsl.type**: Rule dsl type, "spark-sql", "df-opr" and "griffin-dsl".
+- **name** (step information): Result table name of this rule, optional for "griffin-dsl" type.
+- **persist.type** (step information): Persist type of result table, optional for "griffin-dsl" type. Supporting "metric", "record" and "none" type, "metric" type indicates the result will be persisted as metrics, "record" type indicates the result will be persisted as record only, "none" type indicates the result will not be persisted. Default is "none" type.
+- **update.data.source** (step information): If the result table needs to update the data source, this parameter is the data source name, for streaming accuracy case, optional.
+- **dq.type**: DQ type of this rule, only for "griffin-dsl" type, supporting "accuracy" and "profiling".
+- **details**: Details of this rule, optional.
+	+ accuracy dq type detail configuration
+		* source: the data source name which as source in accuracy, default is the name of first data source in "data.sources" if not configured.
+		* target: the data source name which as target in accuracy, default is the name of second data source in "data.sources" if not configured.
+		* miss.records: step information of miss records result table step in accuracy.
+		* accuracy: step information of accuracy result table step in accuracy.
+		* miss: alias of miss column in result table.
+		* total: alias of total column in result table.
+		* matched: alias of matched column in result table.
+	+ profiling dq type detail configuration
+		* source: the data source name which as source in profiling, default is the name of first data source in "data.sources" if not configured. If the griffin-dsl rule contains from clause, this parameter is ignored.
+		* profiling: step information of profiling result table step in profiling.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/griffin-doc/measure/measure-streaming-sample-old.md
----------------------------------------------------------------------
diff --git a/griffin-doc/measure/measure-streaming-sample-old.md b/griffin-doc/measure/measure-streaming-sample-old.md
new file mode 100644
index 0000000..004ed3b
--- /dev/null
+++ b/griffin-doc/measure/measure-streaming-sample-old.md
@@ -0,0 +1,204 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+# Measure streaming sample
+Measures consists of batch measure and streaming measure. This document is for the streaming measure sample.
+
+### Data source
+At current, we support kafka as streaming data source.  
+In this sample, we also need a kafka as data source.
+
+### Measure type
+At current, we support accuracy measure in streaming mode.
+
+### Kafka decoder
+In kafka, data always needs encode and decode, we support String type kafka data currently, you can also implement and use your decoder for kafka case.
+
+### Environment
+For current griffin streaming case, we need some necessary environment dependencies, zookeeper and hdfs.  
+We use zookeeper to cache some checkpoint information, it's optional, but we recommend it.  
+We use hdfs to save the temporary data, it's also a recommend selection.
+
+### Streaming accuracy result
+The streaming data will be separated into mini-batches of data, for each mini-batch data, there should be an accuracy result. Therefore, the streaming accuracy result should be a bunch of batch accuracy results with timestamp.  
+Considering the latency of streaming data, which means the source data and the matching target data will not exactly reach exactly at the same time, we have to accept some delay of data in streaming mode, by holding unmatched data in memory or disk, and try to match them later until the data is out-time.
+
+## How to run streaming sample
+### Environment Preparation
+At first, we need some environment preparation.  
+- Zookeeper: Zookeeper 3.4.10
+- Hadoop: Hadoop 2.6
+- Spark: Spark 1.6
+- Kafka: Kafka 0.8
+
+### Data Preparation
+Create two topics in kafka, for source and target data. For example, topic "source" for source data, and topic "target" for target data.  
+Streaming data should also be prepared, the format could be json string, for example:  
+Source data could be:
+```
+{"name": "kevin", "age": 24}
+{"name": "jason", "age": 25}
+{"name": "jhon", "age": 28}
+{"name": "steve", "age": 31}
+```
+Target data could be:
+```
+{"name": "kevin", "age": 24}
+{"name": "jason", "age": 25}
+{"name": "steve", "age": 20}
+```
+You need to input the source data and target data into these two topics, through console producer might be a good choice for experimental purpose.
+
+### Configuration Preparation
+Two configuration files are required.
+Environment configuration file: env.json
+```
+{
+  "spark": {
+    "log.level": "WARN",
+    "checkpoint.dir": "hdfs:///griffin/streaming/cp",
+    "batch.interval": "5s",
+    "process.interval": "30s",
+    "config": {
+      "spark.task.maxFailures": 5,
+      "spark.streaming.kafkaMaxRatePerPartition": 1000,
+      "spark.streaming.concurrentJobs": 4
+    }
+  },
+
+  "persist": [
+    {
+      "type": "log",
+      "config": {
+        "max.log.lines": 100
+      }
+    }, {
+      "type": "hdfs",
+      "config": {
+        "path": "hdfs:///griffin/streaming/persist",
+        "max.persist.lines": 10000,
+        "max.lines.per.file": 10000
+      }
+    }
+  ],
+
+  "info.cache": [
+    {
+      "type": "zk",
+      "config": {
+        "hosts": "<zookeeper host ip>:2181",
+        "namespace": "griffin/infocache",
+        "lock.path": "lock",
+        "mode": "persist",
+        "init.clear": true,
+        "close.clear": false
+      }
+    }
+  ]
+}
+```
+In env.json, "spark" field configures the spark and spark streaming parameters, "persist" field configures the persist ways, we support "log", "hdfs" and "http" ways at current, "info.cache" field configures the information cache parameters, we support zookeeper only at current.  
+
+Process configuration file: config.json
+```
+{
+  "name": "streaming-accu-sample",
+  "type": "accuracy",
+  "process.type": "streaming",
+
+  "source": {
+    "type": "kafka",
+    "version": "0.8",
+    "config": {
+      "kafka.config": {
+        "bootstrap.servers": "<kafka host ip>:9092",
+        "group.id": "group1",
+        "auto.offset.reset": "smallest",
+        "auto.commit.enable": "false"
+      },
+      "topics": "source",
+      "key.type": "java.lang.String",
+      "value.type": "java.lang.String"
+    },
+    "cache": {
+      "type": "text",
+      "config": {
+        "file.path": "hdfs:///griffin/streaming/dump/source",
+        "info.path": "source",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0"
+      },
+      "time.range": ["-5m", "0"]
+    },
+    "match.once": true
+  },
+
+  "target": {
+    "type": "kafka",
+    "version": "0.8",
+    "config": {
+      "kafka.config": {
+        "bootstrap.servers": "<kafka host ip>:9092",
+        "group.id": "group1",
+        "auto.offset.reset": "smallest",
+        "auto.commit.enable": "false"
+      },
+      "topics": "target",
+      "key.type": "java.lang.String",
+      "value.type": "java.lang.String"
+    },
+    "cache": {
+      "type": "text",
+      "config": {
+        "file.path": "hdfs:///griffin/streaming/dump/target",
+        "info.path": "target",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0"
+      },
+      "time.range": ["-5m", "0"]
+    },
+    "match.once": false
+  },
+
+  "evaluateRule": {
+    "rules": "$source.json().name = $target.json().name AND $source.json().age = $target.json().age"
+  }
+}
+```
+In config.json, "source" and "target" fields configure the data source parameters.  
+The "cache" field in data source configuration represents the temporary data cache way, at current we support "text" and "hive" ways. We recommend "text" way, it only depends on hdfs. "time.range" means that the data older than the lower bound should be considered as out-time, and the out-time data will not be calculated any more.   
+"match.once" represents the data from this data source could be matched only once or more times.  
+"evaluateRule.rule" configures the match rule between each source and target data.
+
+### Run
+Build the measure package.
+```
+mvn clean install
+```
+Get the measure package ```measure-<version>-incubating-SNAPSHOT.jar```, rename it to ```griffin-measure.jar```.  
+Put measure package together with env.json and config.json.
+Run the following command:
+```
+spark-submit --class org.apache.griffin.measure.Application \
+--master yarn-client --queue default \
+griffin-measure.jar \
+env.json config.json local,local
+```
+The first two parameters are the paths of env.json and config.json, the third parameter represents the file system type of the two configuration files, "local" or "hdfs" are both supported.  
+
+The spark streaming application will be long-time running, you can get the results of each mini-batch of data, during the run-time, you can also input more data into source and target topics, to check the results of the later mini-batches.

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/griffin-doc/measure/measures.md
----------------------------------------------------------------------
diff --git a/griffin-doc/measure/measures.md b/griffin-doc/measure/measures.md
new file mode 100644
index 0000000..2f6680e
--- /dev/null
+++ b/griffin-doc/measure/measures.md
@@ -0,0 +1,173 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+# Measures
+measures to calculate data quality metrics.
+
+### Accuracy measure
+accuracy measure is to compare source and target content, given corresponding mapping relationship.
+
+#### Introduction
+How to measure accuracy dimension of one target dataset T, given source of truth as golden dataset S.
+To measure accuracy quality of target dataset T,
+basic approach is to calculate discrepancy between target and source datasets by going through their contents,
+examining whether all fields are exactly matched as below,
+```
+                Count(source.field1 == target.field1 && source.field2 == target.field2 && ...source.fieldN == target.fieldN)
+Accuracy  =     ---------------------------------------------------------------------------------------------------------------
+                Count(source)
+
+```
+
+Since two datasets are too big to fit in one box, so our approach is to leverage map reduce programming model by distributed computing.
+
+The real challenge is how to make this comparing algorithm generic enough to release data analysts and data scientists from coding burdens, and at the same time, it keeps flexibility to cover most of accuracy requirements.
+
+Traditional way is to use SQL based join to calculate this, like scripts in hive.
+
+But this SQL based solution can be improved since it has not considered unique natures of source dataset and target dataset in this context.
+
+Our approach is to provide a generic accuracy measure, after taking into consideration of special natures of source dataset and target dataset.
+
+Our implementation is in scala, leveraging scala's declarative capability to cater for various requirements, and running in spark cluster.
+
+To make it concrete, schema for Source is as below
+
+```
+|-- uid: string (nullable = true)
+|-- site_id: string (nullable = true)
+|-- page_id: string (nullable = true)
+|-- curprice: string (nullable = true)
+|-- itm: string (nullable = true)
+|-- itmcond: string (nullable = true)
+|-- itmtitle: string (nullable = true)
+|-- l1: string (nullable = true)
+|-- l2: string (nullable = true)
+|-- leaf: string (nullable = true)
+|-- meta: string (nullable = true)
+|-- st: string (nullable = true)
+|-- dc: string (nullable = true)
+|-- tr: string (nullable = true)
+|-- eventtimestamp: string (nullable = true)
+|-- cln: string (nullable = true)
+|-- siid: string (nullable = true)
+|-- ciid: string (nullable = true)
+|-- sellerid: string (nullable = true)
+|-- pri: string (nullable = true)
+|-- pt: string (nullable = true)
+|-- dt: string (nullable = true)
+|-- hour: string (nullable = true)
+```
+
+and schema for target is below as
+
+```
+|-- uid: string (nullable = true)
+|-- page_id: string (nullable = true)
+|-- site_id: string (nullable = true)
+|-- js_ev_mak: string (nullable = true)
+|-- js_ev_orgn: string (nullable = true)
+|-- curprice: string (nullable = true)
+|-- itm: string (nullable = true)
+|-- itmcond: string (nullable = true)
+|-- itmtitle: string (nullable = true)
+|-- l1: string (nullable = true)
+|-- l2: string (nullable = true)
+|-- leaf: string (nullable = true)
+|-- meta: string (nullable = true)
+|-- st: string (nullable = true)
+|-- dc: string (nullable = true)
+|-- tr: string (nullable = true)
+|-- eventtimestamp: string (nullable = true)
+|-- cln: string (nullable = true)
+|-- siid: string (nullable = true)
+|-- ciid: string (nullable = true)
+|-- sellerid: string (nullable = true)
+|-- product_ref_id: string (nullable = true)
+|-- product_type: string (nullable = true)
+|-- is_bu: string (nullable = true)
+|-- is_udid: string (nullable = true)
+|-- is_userid: string (nullable = true)
+|-- is_cguid: string (nullable = true)
+|-- dt: string (nullable = true)
+|-- hour: string (nullable = true)
+```
+
+
+#### Accuracy Measure In Deep
+
+##### Pre-Process phase (transform raw data)
+For efficient, we will convert our raw record to some key-value pair , after that, we just need to compare values which have the same key.
+Since two dataset might have different names for the same field, and fields might come in different order, we will keep original information in associative map for later process.
+
+The records will look like,
+```
+((uid,eventtimestamp)->(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...)
+```
+and to track where are the data from, we add one labeling tag here.
+for source dataset, we add label tag "\_\_source\_\_" and for target dataset, we add label tag "\_\_target\_\_".
+```
+((uid,eventtimestamp)->("__source__",(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...)))
+((uid,eventtimestamp)->("__target__",(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...)))
+```
+Ideally, in dataset, applying those composite keys, we should be able to get unique records for every composite key.
+but the reality is , for various unknown reasons, dataset might have duplicate records given one unique composite key.
+To cover this problem, and to track all records from source node, we will append all duplicate records in a list during this step.
+The record will look like after pre process ,
+```
+((uid,eventtimestamp)->List(("__source__",(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...)),...,("__source__",(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...))))
+```
+To save all records from target node, we will insert all records in a set during this step.
+The record will look like after pre process ,
+```
+((uid,eventtimestamp)->Set(("__target__",(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...)),...,("__target__",(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...))))
+```
+##### Aggregate and Comparing phase
+Union source and target together, execute one aggregate for all, we can apply rules defined by users to check whether records in source and target are matched or not.
+
+```
+aggregate { (List(sources),Set(targets)) =>
+ if(foreach element from List(sources) in Set(targets)) emit true
+ else emit false
+}
+```
+We can also execute one aggregate to count the mismatch records in source
+```
+aggregate (missedCount = 0) { (List(sources), Set(targets)) =>
+ foreach (element in List(sources)) {
+  if (element in Set(targets)) continue
+  else missedCount += 1
+ }
+}
+```
+#### Benefits
+ + It is two times faster than traditional SQL JOIN based solution, since it is using algorithm customized for this special accuracy problem.
+ + It is easily to iterate new accuracy metric as it is packaged as a common library as a basic service, previously it took us one week to develop and deploy one new metrics from scratch, but after applying this approach , it only need several hours to get all done.
+
+
+
+
+#### Further discussion
+ + How to select keys?
+	How many keys we should use, if we use too many keys, it will reduce our calculation performance, otherwise, it might have too many duplicate records, which will make our comparison logic complex.
+ + How to define content equation?
+	For some data, it is straightforward, but for some data, it might require transform by some UDFS, how can we make our system extensible to support different raw data.
+ + How to fix data latency issue?
+	To compare, we have to have data available, but how to handle data latency issue which happens often in real enterprise environment.
+ + How to restore lost data?
+	Detect data lost is good, but the further action is how can we restore those lost data?

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/griffin-doc/measures.md
----------------------------------------------------------------------
diff --git a/griffin-doc/measures.md b/griffin-doc/measures.md
deleted file mode 100644
index 2f6680e..0000000
--- a/griffin-doc/measures.md
+++ /dev/null
@@ -1,173 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-# Measures
-measures to calculate data quality metrics.
-
-### Accuracy measure
-accuracy measure is to compare source and target content, given corresponding mapping relationship.
-
-#### Introduction
-How to measure accuracy dimension of one target dataset T, given source of truth as golden dataset S.
-To measure accuracy quality of target dataset T,
-basic approach is to calculate discrepancy between target and source datasets by going through their contents,
-examining whether all fields are exactly matched as below,
-```
-                Count(source.field1 == target.field1 && source.field2 == target.field2 && ...source.fieldN == target.fieldN)
-Accuracy  =     ---------------------------------------------------------------------------------------------------------------
-                Count(source)
-
-```
-
-Since two datasets are too big to fit in one box, so our approach is to leverage map reduce programming model by distributed computing.
-
-The real challenge is how to make this comparing algorithm generic enough to release data analysts and data scientists from coding burdens, and at the same time, it keeps flexibility to cover most of accuracy requirements.
-
-Traditional way is to use SQL based join to calculate this, like scripts in hive.
-
-But this SQL based solution can be improved since it has not considered unique natures of source dataset and target dataset in this context.
-
-Our approach is to provide a generic accuracy measure, after taking into consideration of special natures of source dataset and target dataset.
-
-Our implementation is in scala, leveraging scala's declarative capability to cater for various requirements, and running in spark cluster.
-
-To make it concrete, schema for Source is as below
-
-```
-|-- uid: string (nullable = true)
-|-- site_id: string (nullable = true)
-|-- page_id: string (nullable = true)
-|-- curprice: string (nullable = true)
-|-- itm: string (nullable = true)
-|-- itmcond: string (nullable = true)
-|-- itmtitle: string (nullable = true)
-|-- l1: string (nullable = true)
-|-- l2: string (nullable = true)
-|-- leaf: string (nullable = true)
-|-- meta: string (nullable = true)
-|-- st: string (nullable = true)
-|-- dc: string (nullable = true)
-|-- tr: string (nullable = true)
-|-- eventtimestamp: string (nullable = true)
-|-- cln: string (nullable = true)
-|-- siid: string (nullable = true)
-|-- ciid: string (nullable = true)
-|-- sellerid: string (nullable = true)
-|-- pri: string (nullable = true)
-|-- pt: string (nullable = true)
-|-- dt: string (nullable = true)
-|-- hour: string (nullable = true)
-```
-
-and schema for target is below as
-
-```
-|-- uid: string (nullable = true)
-|-- page_id: string (nullable = true)
-|-- site_id: string (nullable = true)
-|-- js_ev_mak: string (nullable = true)
-|-- js_ev_orgn: string (nullable = true)
-|-- curprice: string (nullable = true)
-|-- itm: string (nullable = true)
-|-- itmcond: string (nullable = true)
-|-- itmtitle: string (nullable = true)
-|-- l1: string (nullable = true)
-|-- l2: string (nullable = true)
-|-- leaf: string (nullable = true)
-|-- meta: string (nullable = true)
-|-- st: string (nullable = true)
-|-- dc: string (nullable = true)
-|-- tr: string (nullable = true)
-|-- eventtimestamp: string (nullable = true)
-|-- cln: string (nullable = true)
-|-- siid: string (nullable = true)
-|-- ciid: string (nullable = true)
-|-- sellerid: string (nullable = true)
-|-- product_ref_id: string (nullable = true)
-|-- product_type: string (nullable = true)
-|-- is_bu: string (nullable = true)
-|-- is_udid: string (nullable = true)
-|-- is_userid: string (nullable = true)
-|-- is_cguid: string (nullable = true)
-|-- dt: string (nullable = true)
-|-- hour: string (nullable = true)
-```
-
-
-#### Accuracy Measure In Deep
-
-##### Pre-Process phase (transform raw data)
-For efficient, we will convert our raw record to some key-value pair , after that, we just need to compare values which have the same key.
-Since two dataset might have different names for the same field, and fields might come in different order, we will keep original information in associative map for later process.
-
-The records will look like,
-```
-((uid,eventtimestamp)->(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...)
-```
-and to track where are the data from, we add one labeling tag here.
-for source dataset, we add label tag "\_\_source\_\_" and for target dataset, we add label tag "\_\_target\_\_".
-```
-((uid,eventtimestamp)->("__source__",(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...)))
-((uid,eventtimestamp)->("__target__",(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...)))
-```
-Ideally, in dataset, applying those composite keys, we should be able to get unique records for every composite key.
-but the reality is , for various unknown reasons, dataset might have duplicate records given one unique composite key.
-To cover this problem, and to track all records from source node, we will append all duplicate records in a list during this step.
-The record will look like after pre process ,
-```
-((uid,eventtimestamp)->List(("__source__",(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...)),...,("__source__",(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...))))
-```
-To save all records from target node, we will insert all records in a set during this step.
-The record will look like after pre process ,
-```
-((uid,eventtimestamp)->Set(("__target__",(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...)),...,("__target__",(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...))))
-```
-##### Aggregate and Comparing phase
-Union source and target together, execute one aggregate for all, we can apply rules defined by users to check whether records in source and target are matched or not.
-
-```
-aggregate { (List(sources),Set(targets)) =>
- if(foreach element from List(sources) in Set(targets)) emit true
- else emit false
-}
-```
-We can also execute one aggregate to count the mismatch records in source
-```
-aggregate (missedCount = 0) { (List(sources), Set(targets)) =>
- foreach (element in List(sources)) {
-  if (element in Set(targets)) continue
-  else missedCount += 1
- }
-}
-```
-#### Benefits
- + It is two times faster than traditional SQL JOIN based solution, since it is using algorithm customized for this special accuracy problem.
- + It is easily to iterate new accuracy metric as it is packaged as a common library as a basic service, previously it took us one week to develop and deploy one new metrics from scratch, but after applying this approach , it only need several hours to get all done.
-
-
-
-
-#### Further discussion
- + How to select keys?
-	How many keys we should use, if we use too many keys, it will reduce our calculation performance, otherwise, it might have too many duplicate records, which will make our comparison logic complex.
- + How to define content equation?
-	For some data, it is straightforward, but for some data, it might require transform by some UDFS, how can we make our system extensible to support different raw data.
- + How to fix data latency issue?
-	To compare, we have to have data available, but how to handle data latency issue which happens often in real enterprise environment.
- + How to restore lost data?
-	Detect data lost is good, but the further action is how can we restore those lost data?