You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2017/09/30 08:35:17 UTC

[02/11] incubator-griffin git commit: Dsl modify

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/resources/test-data.jsonFile
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/test-data.jsonFile b/measure/src/test/resources/test-data.jsonFile
new file mode 100644
index 0000000..73707f4
--- /dev/null
+++ b/measure/src/test/resources/test-data.jsonFile
@@ -0,0 +1,3 @@
+{ "name": "emily", "age": 5, "map": { "a": 1, "b": 2 }, "list": [ { "c": 1, "d": 2 }, { "c": 3, "d": 4 } ], "t": [1, 2, 3] }
+{ "name": "white", "age": 15, "map": { "a": 11, "b": 12 }, "list": [ { "c": 11, "d": 2 }, { "c": 23, "d": 4 } ], "t": [1, 2, 3] }
+{ "name": "west", "age": 25, "map": { "a": 21, "b": 22 }, "list": [ { "c": 11, "d": 2 }, { "c": 23, "d": 4 } ], "t": [1, 2, 3] }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/resources/test-data0.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/test-data0.json b/measure/src/test/resources/test-data0.json
new file mode 100644
index 0000000..406acb8
--- /dev/null
+++ b/measure/src/test/resources/test-data0.json
@@ -0,0 +1,56 @@
+[
+  {
+    "name": "emily",
+    "age": 5,
+    "map": {
+      "a": 1,
+      "b": 2
+    },
+    "list": [
+      {
+        "c": 1,
+        "d": 2
+      },
+      {
+        "c": 3,
+        "d": 4
+      }
+    ]
+  },
+  {
+    "name": "white",
+    "age": 15,
+    "map": {
+      "a": 11,
+      "b": 12
+    },
+    "list": [
+      {
+        "c": 11,
+        "d": 2
+      },
+      {
+        "c": 23,
+        "d": 4
+      }
+    ]
+  },
+  {
+    "name": "west",
+    "age": 25,
+    "map": {
+      "a": 21,
+      "b": 22
+    },
+    "list": [
+      {
+        "c": 11,
+        "d": 2
+      },
+      {
+        "c": 23,
+        "d": 4
+      }
+    ]
+  }
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/resources/test-data1.jsonFile
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/test-data1.jsonFile b/measure/src/test/resources/test-data1.jsonFile
new file mode 100644
index 0000000..1e1f28a
--- /dev/null
+++ b/measure/src/test/resources/test-data1.jsonFile
@@ -0,0 +1,31 @@
+[{
+	"Year": "2013",
+	"First Name": "DAVID",
+	"County": "KINGS",
+	"Sex": "M",
+	"Count": "272"
+}, {
+	"Year": "2013",
+	"First Name": "JAYDEN",
+	"County": "KINGS",
+	"Sex": "M",
+	"Count": "268"
+}, {
+	"Year": "2013",
+	"First Name": "JAYDEN",
+	"County": "QUEENS",
+	"Sex": "M",
+	"Count": "219"
+}, {
+	"Year": "2013",
+	"First Name": "MOSHE",
+	"County": "KINGS",
+	"Sex": "M",
+	"Count": "219"
+}, {
+	"Year": "2013",
+	"First Name": "ETHAN",
+	"County": "QUEENS",
+	"Sex": "M",
+	"Count": "216"
+}]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/algo/batch/BatchAccuracyAlgoTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/algo/batch/BatchAccuracyAlgoTest.scala b/measure/src/test/scala/org/apache/griffin/measure/algo/batch/BatchAccuracyAlgoTest.scala
deleted file mode 100644
index 6a60326..0000000
--- a/measure/src/test/scala/org/apache/griffin/measure/algo/batch/BatchAccuracyAlgoTest.scala
+++ /dev/null
@@ -1,198 +0,0 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements.  See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership.  The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License.  You may obtain a copy of the License at
-//
-//  http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied.  See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//package org.apache.griffin.measure.algo
-//
-//import java.util.Date
-//
-//import org.apache.griffin.measure.algo.batch.BatchAccuracyAlgo
-//import org.apache.griffin.measure.config.params._
-//import org.apache.griffin.measure.config.params.env._
-//import org.apache.griffin.measure.config.params.user._
-//import org.apache.griffin.measure.config.reader._
-//import org.apache.griffin.measure.config.validator._
-//import org.apache.griffin.measure.connector.direct.DirectDataConnector
-//import org.apache.griffin.measure.connector.{DataConnector, DataConnectorFactory}
-//import org.apache.griffin.measure.log.Loggable
-//import org.apache.griffin.measure.rule.expr._
-//import org.apache.griffin.measure.rule.{ExprValueUtil, RuleAnalyzer, RuleFactory}
-//import org.apache.spark.rdd.RDD
-//import org.apache.spark.sql.SQLContext
-//import org.apache.spark.{SparkConf, SparkContext}
-//import org.junit.runner.RunWith
-//import org.scalatest.junit.JUnitRunner
-//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-//
-//import scala.util.{Failure, Success, Try}
-//
-//
-//@RunWith(classOf[JUnitRunner])
-//class BatchAccuracyAlgoTest extends FunSuite with Matchers with BeforeAndAfter with Loggable {
-//
-//  val envFile = "src/test/resources/env.json"
-//  val confFile = "src/test/resources/config.json"
-////  val confFile = "{\"name\":\"accu1\",\"type\":\"accuracy\",\"source\":{\"type\":\"avro\",\"version\":\"1.7\",\"config\":{\"file.name\":\"src/test/resources/users_info_src.avro\"}},\"target\":{\"type\":\"avro\",\"version\":\"1.7\",\"config\":{\"file.name\":\"src/test/resources/users_info_target.avro\"}},\"evaluateRule\":{\"sampleRatio\":1,\"rules\":\"$source.user_id + 5 = $target.user_id + (2 + 3) AND $source.first_name + 12 = $target.first_name + (10 + 2) AND $source.last_name = $target.last_name AND $source.address = $target.address AND $source.email = $target.email AND $source.phone = $target.phone AND $source.post_code = $target.post_code AND (15 OR true) WHEN true AND $source.user_id > 10020\"}}"
-//  val envFsType = "local"
-//  val userFsType = "local"
-//
-//  val args = Array(envFile, confFile)
-//
-//  var sc: SparkContext = _
-//  var sqlContext: SQLContext = _
-//
-//  var allParam: AllParam = _
-//
-//  before {
-//    // read param files
-//    val envParam = readParamFile[EnvParam](envFile, envFsType) match {
-//      case Success(p) => p
-//      case Failure(ex) => {
-//        error(ex.getMessage)
-//        sys.exit(-2)
-//      }
-//    }
-//    val userParam = readParamFile[UserParam](confFile, userFsType) match {
-//      case Success(p) => p
-//      case Failure(ex) => {
-//        error(ex.getMessage)
-//        sys.exit(-2)
-//      }
-//    }
-//    allParam = AllParam(envParam, userParam)
-//
-//    // validate param files
-//    validateParams(allParam) match {
-//      case Failure(ex) => {
-//        error(ex.getMessage)
-//        sys.exit(-3)
-//      }
-//      case _ => {
-//        info("params validation pass")
-//      }
-//    }
-//
-//    val metricName = userParam.name
-//    val conf = new SparkConf().setMaster("local[*]").setAppName(metricName)
-//    sc = new SparkContext(conf)
-//    sqlContext = new SQLContext(sc)
-//  }
-//
-//  test("algorithm") {
-//    Try {
-//      val envParam = allParam.envParam
-//      val userParam = allParam.userParam
-//
-//      // start time
-//      val startTime = new Date().getTime()
-//
-//      // get spark application id
-//      val applicationId = sc.applicationId
-//
-//      // rules
-//      val ruleFactory = RuleFactory(userParam.evaluateRuleParam)
-//      val rule: StatementExpr = ruleFactory.generateRule()
-//      val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
-//
-//      ruleAnalyzer.constCacheExprs.foreach(println)
-//      ruleAnalyzer.constFinalCacheExprs.foreach(println)
-//
-//      // global cache data
-//      val constExprValueMap = ExprValueUtil.genExprValueMaps(None, ruleAnalyzer.constCacheExprs, Map[String, Any]())
-//      val finalConstExprValueMap = ExprValueUtil.updateExprValueMaps(ruleAnalyzer.constFinalCacheExprs, constExprValueMap)
-//      val finalConstMap = finalConstExprValueMap.headOption match {
-//        case Some(m) => m
-//        case _ => Map[String, Any]()
-//      }
-//
-//      // data connector
-//      val sourceDataConnector: DirectDataConnector =
-//        DataConnectorFactory.getDirectDataConnector(sqlContext, null, userParam.sourceParam,
-//          ruleAnalyzer.sourceRuleExprs, finalConstMap
-//        ) match {
-//          case Success(cntr) => {
-//            if (cntr.available) cntr
-//            else throw new Exception("source data not available!")
-//          }
-//          case Failure(ex) => throw ex
-//        }
-//      val targetDataConnector: DirectDataConnector =
-//        DataConnectorFactory.getDirectDataConnector(sqlContext, null, userParam.targetParam,
-//          ruleAnalyzer.targetRuleExprs, finalConstMap
-//        ) match {
-//          case Success(cntr) => {
-//            if (cntr.available) cntr
-//            else throw new Exception("target data not available!")
-//          }
-//          case Failure(ex) => throw ex
-//        }
-//
-//      // get metadata
-////      val sourceMetaData: Iterable[(String, String)] = sourceDataConnector.metaData() match {
-////        case Success(md) => md
-////        case Failure(ex) => throw ex
-////      }
-////      val targetMetaData: Iterable[(String, String)] = targetDataConnector.metaData() match {
-////        case Success(md) => md
-////        case Failure(ex) => throw ex
-////      }
-//
-//      // get data
-//      val sourceData: RDD[(Product, (Map[String, Any], Map[String, Any]))] = sourceDataConnector.data() match {
-//        case Success(dt) => dt
-//        case Failure(ex) => throw ex
-//      }
-//      val targetData: RDD[(Product, (Map[String, Any], Map[String, Any]))] = targetDataConnector.data() match {
-//        case Success(dt) => dt
-//        case Failure(ex) => throw ex
-//      }
-//
-//      // my algo
-//      val algo = BatchAccuracyAlgo(allParam)
-//
-//      // accuracy algorithm
-//      val (accuResult, missingRdd, matchedRdd) = algo.accuracy(sourceData, targetData, ruleAnalyzer)
-//
-//      println(s"match percentage: ${accuResult.matchPercentage}, total count: ${accuResult.total}")
-//
-//      missingRdd.map(rec => algo.record2String(rec, ruleAnalyzer.sourceRuleExprs.persistExprs, ruleAnalyzer.targetRuleExprs.persistExprs)).foreach(println)
-//
-//      // end time
-//      val endTime = new Date().getTime
-//      println(s"using time: ${endTime - startTime} ms")
-//    } match {
-//      case Failure(ex) => {
-//        error(ex.getMessage)
-//        sys.exit(-4)
-//      }
-//      case _ => {
-//        info("calculation finished")
-//      }
-//    }
-//  }
-//
-//  private def readParamFile[T <: Param](file: String, fsType: String)(implicit m : Manifest[T]): Try[T] = {
-//    val paramReader = ParamReaderFactory.getParamReader(file, fsType)
-//    paramReader.readConfig[T]
-//  }
-//
-//  private def validateParams(allParam: AllParam): Try[Boolean] = {
-//    val allParamValidator = AllParamValidator()
-//    allParamValidator.validate(allParam)
-//  }
-//
-//}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/algo/batch/BatchProfileAlgoTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/algo/batch/BatchProfileAlgoTest.scala b/measure/src/test/scala/org/apache/griffin/measure/algo/batch/BatchProfileAlgoTest.scala
deleted file mode 100644
index e0f500a..0000000
--- a/measure/src/test/scala/org/apache/griffin/measure/algo/batch/BatchProfileAlgoTest.scala
+++ /dev/null
@@ -1,173 +0,0 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements.  See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership.  The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License.  You may obtain a copy of the License at
-//
-//  http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied.  See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//package org.apache.griffin.measure.algo
-//
-//import java.util.Date
-//
-//import org.apache.griffin.measure.algo.batch.BatchProfileAlgo
-//import org.apache.griffin.measure.config.params._
-//import org.apache.griffin.measure.config.params.env._
-//import org.apache.griffin.measure.config.params.user._
-//import org.apache.griffin.measure.config.reader._
-//import org.apache.griffin.measure.config.validator._
-//import org.apache.griffin.measure.connector.direct.DirectDataConnector
-//import org.apache.griffin.measure.connector.{DataConnector, DataConnectorFactory}
-//import org.apache.griffin.measure.log.Loggable
-//import org.apache.griffin.measure.rule.expr._
-//import org.apache.griffin.measure.rule.{ExprValueUtil, RuleAnalyzer, RuleFactory}
-//import org.apache.spark.rdd.RDD
-//import org.apache.spark.sql.SQLContext
-//import org.apache.spark.{SparkConf, SparkContext}
-//import org.junit.runner.RunWith
-//import org.scalatest.junit.JUnitRunner
-//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-//
-//import scala.util.{Failure, Success, Try}
-//
-//
-//@RunWith(classOf[JUnitRunner])
-//class BatchProfileAlgoTest extends FunSuite with Matchers with BeforeAndAfter with Loggable {
-//
-//  val envFile = "src/test/resources/env.json"
-//  val confFile = "src/test/resources/config-profile.json"
-//  val envFsType = "local"
-//  val userFsType = "local"
-//
-//  val args = Array(envFile, confFile)
-//
-//  var sc: SparkContext = _
-//  var sqlContext: SQLContext = _
-//
-//  var allParam: AllParam = _
-//
-//  before {
-//    // read param files
-//    val envParam = readParamFile[EnvParam](envFile, envFsType) match {
-//      case Success(p) => p
-//      case Failure(ex) => {
-//        error(ex.getMessage)
-//        sys.exit(-2)
-//      }
-//    }
-//    val userParam = readParamFile[UserParam](confFile, userFsType) match {
-//      case Success(p) => p
-//      case Failure(ex) => {
-//        error(ex.getMessage)
-//        sys.exit(-2)
-//      }
-//    }
-//    allParam = AllParam(envParam, userParam)
-//
-//    // validate param files
-//    validateParams(allParam) match {
-//      case Failure(ex) => {
-//        error(ex.getMessage)
-//        sys.exit(-3)
-//      }
-//      case _ => {
-//        info("params validation pass")
-//      }
-//    }
-//
-//    val metricName = userParam.name
-//    val conf = new SparkConf().setMaster("local[*]").setAppName(metricName)
-//    sc = new SparkContext(conf)
-//    sqlContext = new SQLContext(sc)
-//  }
-//
-//  test("algorithm") {
-//    Try {
-//      val envParam = allParam.envParam
-//      val userParam = allParam.userParam
-//
-//      // start time
-//      val startTime = new Date().getTime()
-//
-//      // get spark application id
-//      val applicationId = sc.applicationId
-//
-//      // rules
-//      val ruleFactory = RuleFactory(userParam.evaluateRuleParam)
-//      val rule: StatementExpr = ruleFactory.generateRule()
-//      val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
-//
-//      ruleAnalyzer.constCacheExprs.foreach(println)
-//      ruleAnalyzer.constFinalCacheExprs.foreach(println)
-//
-//      // global cache data
-//      val constExprValueMap = ExprValueUtil.genExprValueMaps(None, ruleAnalyzer.constCacheExprs, Map[String, Any]())
-//      val finalConstExprValueMap = ExprValueUtil.updateExprValueMaps(ruleAnalyzer.constFinalCacheExprs, constExprValueMap)
-//      val finalConstMap = finalConstExprValueMap.headOption match {
-//        case Some(m) => m
-//        case _ => Map[String, Any]()
-//      }
-//
-//      // data connector
-//      val sourceDataConnector: DirectDataConnector =
-//        DataConnectorFactory.getDirectDataConnector(sqlContext, null, userParam.sourceParam,
-//          ruleAnalyzer.sourceRuleExprs, finalConstMap
-//        ) match {
-//          case Success(cntr) => {
-//            if (cntr.available) cntr
-//            else throw new Exception("source data not available!")
-//          }
-//          case Failure(ex) => throw ex
-//        }
-//
-//      // get data
-//      val sourceData: RDD[(Product, (Map[String, Any], Map[String, Any]))] = sourceDataConnector.data() match {
-//        case Success(dt) => dt
-//        case Failure(ex) => throw ex
-//      }
-//
-//      // my algo
-//      val algo = BatchProfileAlgo(allParam)
-//
-//      // profile algorithm
-//      val (profileResult, missingRdd, matchedRdd) = algo.profile(sourceData, ruleAnalyzer)
-//
-//      println(s"match percentage: ${profileResult.matchPercentage}, match count: ${profileResult.matchCount}, total count: ${profileResult.totalCount}")
-//
-//      matchedRdd.map(rec => algo.record2String(rec, ruleAnalyzer.sourceRuleExprs.persistExprs)).foreach(println)
-//
-//      // end time
-//      val endTime = new Date().getTime
-//      println(s"using time: ${endTime - startTime} ms")
-//    } match {
-//      case Failure(ex) => {
-//        error(ex.getMessage)
-//        sys.exit(-4)
-//      }
-//      case _ => {
-//        info("calculation finished")
-//      }
-//    }
-//  }
-//
-//  private def readParamFile[T <: Param](file: String, fsType: String)(implicit m : Manifest[T]): Try[T] = {
-//    val paramReader = ParamReaderFactory.getParamReader(file, fsType)
-//    paramReader.readConfig[T]
-//  }
-//
-//  private def validateParams(allParam: AllParam): Try[Boolean] = {
-//    val allParamValidator = AllParamValidator()
-//    allParamValidator.validate(allParam)
-//  }
-//
-//}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/algo/batch/DataFrameSaveTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/algo/batch/DataFrameSaveTest.scala b/measure/src/test/scala/org/apache/griffin/measure/algo/batch/DataFrameSaveTest.scala
deleted file mode 100644
index a76712f..0000000
--- a/measure/src/test/scala/org/apache/griffin/measure/algo/batch/DataFrameSaveTest.scala
+++ /dev/null
@@ -1,172 +0,0 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements.  See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership.  The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License.  You may obtain a copy of the License at
-//
-//  http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied.  See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//package org.apache.griffin.measure.algo.batch
-//
-//import java.util.Date
-//
-//import org.apache.griffin.measure.config.params._
-//import org.apache.griffin.measure.config.params.env._
-//import org.apache.griffin.measure.config.params.user._
-//import org.apache.griffin.measure.config.reader._
-//import org.apache.griffin.measure.config.validator._
-//import org.apache.griffin.measure.connector.DataConnectorFactory
-//import org.apache.griffin.measure.connector.direct.DirectDataConnector
-//import org.apache.griffin.measure.log.Loggable
-//import org.apache.griffin.measure.rule.expr._
-//import org.apache.griffin.measure.rule.{ExprValueUtil, RuleAnalyzer, RuleFactory}
-//import org.apache.spark.rdd.RDD
-//import org.apache.spark.sql.SQLContext
-//import org.apache.spark.{SparkConf, SparkContext}
-//import org.junit.runner.RunWith
-//import org.scalatest.junit.JUnitRunner
-//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-//
-//import scala.util.{Failure, Success, Try}
-//
-//
-//@RunWith(classOf[JUnitRunner])
-//class DataFrameSaveTest extends FunSuite with Matchers with BeforeAndAfter with Loggable {
-//
-//  val envFile = "src/test/resources/env.json"
-//  val confFile = "src/test/resources/config-profile.json"
-//  val envFsType = "local"
-//  val userFsType = "local"
-//
-//  val args = Array(envFile, confFile)
-//
-//  var sc: SparkContext = _
-//  var sqlContext: SQLContext = _
-//
-//  var allParam: AllParam = _
-//
-//  before {
-//    // read param files
-//    val envParam = readParamFile[EnvParam](envFile, envFsType) match {
-//      case Success(p) => p
-//      case Failure(ex) => {
-//        error(ex.getMessage)
-//        sys.exit(-2)
-//      }
-//    }
-//    val userParam = readParamFile[UserParam](confFile, userFsType) match {
-//      case Success(p) => p
-//      case Failure(ex) => {
-//        error(ex.getMessage)
-//        sys.exit(-2)
-//      }
-//    }
-//    allParam = AllParam(envParam, userParam)
-//
-//    // validate param files
-//    validateParams(allParam) match {
-//      case Failure(ex) => {
-//        error(ex.getMessage)
-//        sys.exit(-3)
-//      }
-//      case _ => {
-//        info("params validation pass")
-//      }
-//    }
-//
-//    val metricName = userParam.name
-//    val conf = new SparkConf().setMaster("local[*]").setAppName(metricName)
-//    sc = new SparkContext(conf)
-//    sqlContext = new SQLContext(sc)
-//  }
-//
-//  test("algorithm") {
-//    Try {
-//      val envParam = allParam.envParam
-//      val userParam = allParam.userParam
-//
-//      // start time
-//      val startTime = new Date().getTime()
-//
-//      // get spark application id
-//      val applicationId = sc.applicationId
-//
-//      // rules
-//      val ruleFactory = RuleFactory(userParam.evaluateRuleParam)
-//      val rule: StatementExpr = ruleFactory.generateRule()
-//      val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
-//
-//      ruleAnalyzer.constCacheExprs.foreach(println)
-//      ruleAnalyzer.constFinalCacheExprs.foreach(println)
-//
-//      // global cache data
-//      val constExprValueMap = ExprValueUtil.genExprValueMaps(None, ruleAnalyzer.constCacheExprs, Map[String, Any]())
-//      val finalConstExprValueMap = ExprValueUtil.updateExprValueMaps(ruleAnalyzer.constFinalCacheExprs, constExprValueMap)
-//      val finalConstMap = finalConstExprValueMap.headOption match {
-//        case Some(m) => m
-//        case _ => Map[String, Any]()
-//      }
-//
-//      // data connector
-//      val sourceDataConnector: DirectDataConnector =
-//        DataConnectorFactory.getDirectDataConnector(sqlContext, null, userParam.sourceParam,
-//          ruleAnalyzer.sourceRuleExprs, finalConstMap
-//        ) match {
-//          case Success(cntr) => {
-//            if (cntr.available) cntr
-//            else throw new Exception("source data not available!")
-//          }
-//          case Failure(ex) => throw ex
-//        }
-//
-//      // get data
-//      val sourceData: RDD[(Product, (Map[String, Any], Map[String, Any]))] = sourceDataConnector.data() match {
-//        case Success(dt) => dt
-//        case Failure(ex) => throw ex
-//      }
-//
-//      // my algo
-//      val algo = BatchProfileAlgo(allParam)
-//
-//      // profile algorithm
-//      val (profileResult, missingRdd, matchedRdd) = algo.profile(sourceData, ruleAnalyzer)
-//
-//      println(s"match percentage: ${profileResult.matchPercentage}, match count: ${profileResult.matchCount}, total count: ${profileResult.totalCount}")
-//
-//      matchedRdd.map(rec => algo.record2String(rec, ruleAnalyzer.sourceRuleExprs.persistExprs)).foreach(println)
-//
-//      // end time
-//      val endTime = new Date().getTime
-//      println(s"using time: ${endTime - startTime} ms")
-//    } match {
-//      case Failure(ex) => {
-//        error(ex.getMessage)
-//        sys.exit(-4)
-//      }
-//      case _ => {
-//        info("calculation finished")
-//      }
-//    }
-//  }
-//
-//  private def readParamFile[T <: Param](file: String, fsType: String)(implicit m : Manifest[T]): Try[T] = {
-//    val paramReader = ParamReaderFactory.getParamReader(file, fsType)
-//    paramReader.readConfig[T]
-//  }
-//
-//  private def validateParams(allParam: AllParam): Try[Boolean] = {
-//    val allParamValidator = AllParamValidator()
-//    allParamValidator.validate(allParam)
-//  }
-//
-//}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/algo/core/AccuracyCoreTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/algo/core/AccuracyCoreTest.scala b/measure/src/test/scala/org/apache/griffin/measure/algo/core/AccuracyCoreTest.scala
deleted file mode 100644
index 2179fba..0000000
--- a/measure/src/test/scala/org/apache/griffin/measure/algo/core/AccuracyCoreTest.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.algo.core
-
-import org.apache.griffin.measure.config.params.user.EvaluateRuleParam
-import org.apache.griffin.measure.rule.expr._
-import org.apache.griffin.measure.rule.{RuleAnalyzer, RuleFactory}
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-import org.scalatest.PrivateMethodTester
-
-@RunWith(classOf[JUnitRunner])
-class AccuracyCoreTest extends FunSuite with Matchers with BeforeAndAfter with PrivateMethodTester {
-
-  def findExprId(exprs: Iterable[Expr], desc: String): String = {
-    exprs.find(_.desc == desc) match {
-      case Some(expr) => expr._id
-      case _ => ""
-    }
-  }
-
-  test ("match data success") {
-    val rule = "$source.name = $target.name AND $source.age < $target.age"
-    val evaluateRuleParam = EvaluateRuleParam(1.0, rule)
-    val ruleFactory = RuleFactory(evaluateRuleParam)
-    val statement = ruleFactory.generateRule
-    val ruleAnalyzer = RuleAnalyzer(statement)
-
-    val sourcePersistExprs = ruleAnalyzer.sourceRuleExprs.persistExprs
-    val targetPersistExprs = ruleAnalyzer.targetRuleExprs.persistExprs
-
-    val source = (Map[String, Any](
-      (findExprId(sourcePersistExprs, "$source['name']") -> "jack"),
-      (findExprId(sourcePersistExprs, "$source['age']") -> 26)
-    ), Map[String, Any]())
-    val target = (Map[String, Any](
-      (findExprId(targetPersistExprs, "$target['name']") -> "jack"),
-      (findExprId(targetPersistExprs, "$target['age']") -> 27)
-    ), Map[String, Any]())
-
-    val matchData = PrivateMethod[(Boolean, Map[String, Any])]('matchData)
-    val result = AccuracyCore invokePrivate matchData(source, target, ruleAnalyzer)
-    result._1 should be (true)
-    result._2.size should be (0)
-  }
-
-  test ("match data fail") {
-    val rule = "$source.name = $target.name AND $source.age = $target.age"
-    val evaluateRuleParam = EvaluateRuleParam(1.0, rule)
-    val ruleFactory = RuleFactory(evaluateRuleParam)
-    val statement = ruleFactory.generateRule
-    val ruleAnalyzer = RuleAnalyzer(statement)
-
-    val sourcePersistExprs = ruleAnalyzer.sourceRuleExprs.persistExprs
-    val targetPersistExprs = ruleAnalyzer.targetRuleExprs.persistExprs
-
-    val source = (Map[String, Any](
-      (findExprId(sourcePersistExprs, "$source['name']") -> "jack"),
-      (findExprId(sourcePersistExprs, "$source['age']") -> 26)
-    ), Map[String, Any]())
-    val target = (Map[String, Any](
-      (findExprId(targetPersistExprs, "$target['name']") -> "jack"),
-      (findExprId(targetPersistExprs, "$target['age']") -> 27)
-    ), Map[String, Any]())
-
-    val matchData = PrivateMethod[(Boolean, Map[String, Any])]('matchData)
-    val result = AccuracyCore invokePrivate matchData(source, target, ruleAnalyzer)
-    result._1 should be (false)
-    result._2.size shouldNot be (0)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/algo/core/ProfileCoreTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/algo/core/ProfileCoreTest.scala b/measure/src/test/scala/org/apache/griffin/measure/algo/core/ProfileCoreTest.scala
deleted file mode 100644
index 087e8e5..0000000
--- a/measure/src/test/scala/org/apache/griffin/measure/algo/core/ProfileCoreTest.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.algo.core
-
-import org.apache.griffin.measure.config.params.user.EvaluateRuleParam
-import org.apache.griffin.measure.rule.expr._
-import org.apache.griffin.measure.rule.{RuleAnalyzer, RuleFactory}
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-import org.scalatest.PrivateMethodTester
-
-@RunWith(classOf[JUnitRunner])
-class ProfileCoreTest extends FunSuite with Matchers with BeforeAndAfter with PrivateMethodTester {
-
-  def findExprId(exprs: Iterable[Expr], desc: String): String = {
-    exprs.find(_.desc == desc) match {
-      case Some(expr) => expr._id
-      case _ => ""
-    }
-  }
-
-  test ("match data success") {
-    val rule = "$source.name = 'jack' AND $source.age = null"
-    val evaluateRuleParam = EvaluateRuleParam(1.0, rule)
-    val ruleFactory = RuleFactory(evaluateRuleParam)
-    val statement = ruleFactory.generateRule
-    val ruleAnalyzer = RuleAnalyzer(statement)
-
-    val sourcePersistExprs = ruleAnalyzer.sourceRuleExprs.persistExprs
-
-    val source = (Map[String, Any](
-      (findExprId(sourcePersistExprs, "$source['name']") -> "jack"),
-      (findExprId(sourcePersistExprs, "$source['age']") -> null)
-    ), Map[String, Any]())
-
-    val matchData = PrivateMethod[(Boolean, Map[String, Any])]('matchData)
-    val result = ProfileCore invokePrivate matchData(source, ruleAnalyzer)
-    result._1 should be (true)
-    result._2.size should be (0)
-  }
-
-  test ("match data fail") {
-    val rule = "$source.name = 'jack' AND $source.age != null"
-    val evaluateRuleParam = EvaluateRuleParam(1.0, rule)
-    val ruleFactory = RuleFactory(evaluateRuleParam)
-    val statement = ruleFactory.generateRule
-    val ruleAnalyzer = RuleAnalyzer(statement)
-
-    val sourcePersistExprs = ruleAnalyzer.sourceRuleExprs.persistExprs
-
-    val source = (Map[String, Any](
-      (findExprId(sourcePersistExprs, "$source['name']") -> "jack"),
-      (findExprId(sourcePersistExprs, "$source['age']") -> null)
-    ), Map[String, Any]())
-
-    val matchData = PrivateMethod[(Boolean, Map[String, Any])]('matchData)
-    val result = ProfileCore invokePrivate matchData(source, ruleAnalyzer)
-    result._1 should be (false)
-    result._2.size shouldNot be (0)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/algo/streaming/StreamingAccuracyAlgoTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/algo/streaming/StreamingAccuracyAlgoTest.scala b/measure/src/test/scala/org/apache/griffin/measure/algo/streaming/StreamingAccuracyAlgoTest.scala
deleted file mode 100644
index a22f91f..0000000
--- a/measure/src/test/scala/org/apache/griffin/measure/algo/streaming/StreamingAccuracyAlgoTest.scala
+++ /dev/null
@@ -1,267 +0,0 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements.  See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership.  The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License.  You may obtain a copy of the License at
-//
-//  http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied.  See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//package org.apache.griffin.measure.algo.streaming
-//
-//import java.util.Date
-//import java.util.concurrent.TimeUnit
-//
-//import org.apache.griffin.measure.algo.batch.BatchAccuracyAlgo
-//import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache}
-//import org.apache.griffin.measure.cache.result._
-//import org.apache.griffin.measure.config.params._
-//import org.apache.griffin.measure.config.params.env._
-//import org.apache.griffin.measure.config.params.user._
-//import org.apache.griffin.measure.config.reader._
-//import org.apache.griffin.measure.config.validator._
-//import org.apache.griffin.measure.connector.direct.DirectDataConnector
-//import org.apache.griffin.measure.connector.{DataConnector, DataConnectorFactory}
-//import org.apache.griffin.measure.log.Loggable
-//import org.apache.griffin.measure.persist.{Persist, PersistFactory, PersistType}
-//import org.apache.griffin.measure.result._
-//import org.apache.griffin.measure.rule.expr._
-//import org.apache.griffin.measure.rule.{ExprValueUtil, RuleAnalyzer, RuleFactory}
-//import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil}
-//import org.apache.spark.rdd.RDD
-//import org.apache.spark.sql.SQLContext
-//import org.apache.spark.sql.hive.HiveContext
-//import org.apache.spark.streaming.{Milliseconds, StreamingContext}
-//import org.apache.spark.{SparkConf, SparkContext}
-//import org.junit.runner.RunWith
-//import org.scalatest.junit.JUnitRunner
-//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-//
-//import scala.util.{Failure, Success, Try}
-//
-//
-//@RunWith(classOf[JUnitRunner])
-//class StreamingAccuracyAlgoTest extends FunSuite with Matchers with BeforeAndAfter with Loggable {
-//
-//  val envFile = "src/test/resources/env-streaming.json"
-//  val confFile = "src/test/resources/config-streaming3.json"
-//  val envFsType = "local"
-//  val userFsType = "local"
-//
-//  val args = Array(envFile, confFile)
-//
-//  var sc: SparkContext = _
-//  var sqlContext: SQLContext = _
-////  val ssc: StreamingContext = _
-//
-//  var allParam: AllParam = _
-//
-//  before {
-//    // read param files
-//    val envParam = readParamFile[EnvParam](envFile, envFsType) match {
-//      case Success(p) => p
-//      case Failure(ex) => {
-//        error(ex.getMessage)
-//        sys.exit(-2)
-//      }
-//    }
-//    val userParam = readParamFile[UserParam](confFile, userFsType) match {
-//      case Success(p) => p
-//      case Failure(ex) => {
-//        error(ex.getMessage)
-//        sys.exit(-2)
-//      }
-//    }
-//    allParam = AllParam(envParam, userParam)
-//
-//    // validate param files
-//    validateParams(allParam) match {
-//      case Failure(ex) => {
-//        error(ex.getMessage)
-//        sys.exit(-3)
-//      }
-//      case _ => {
-//        info("params validation pass")
-//      }
-//    }
-//
-//    val metricName = userParam.name
-//    val sparkParam = envParam.sparkParam
-//    val conf = new SparkConf().setMaster("local[*]").setAppName(metricName)
-//    conf.setAll(sparkParam.config)
-//    sc = new SparkContext(conf)
-//    sc.setLogLevel(envParam.sparkParam.logLevel)
-//    sqlContext = new SQLContext(sc)
-////    sqlContext = new HiveContext(sc)
-//
-////    val a = sqlContext.sql("select * from s1 limit 10")
-////    //    val a = sqlContext.sql("show tables")
-////    a.show(10)
-////
-////    val b = HdfsUtil.existPath("/griffin/streaming")
-////    println(b)
-//  }
-//
-//  test("algorithm") {
-//    val envParam = allParam.envParam
-//    val userParam = allParam.userParam
-//    val metricName = userParam.name
-//    val sparkParam = envParam.sparkParam
-//    val cleanerParam = envParam.cleanerParam
-//
-////    val ssc = StreamingContext.getOrCreate(sparkParam.cpDir,
-////      ( ) => {
-////        try {
-////          val batchInterval = TimeUtil.milliseconds(sparkParam.batchInterval) match {
-////            case Some(interval) => Milliseconds(interval)
-////            case _ => throw new Exception("invalid batch interval")
-////          }
-////          val ssc = new StreamingContext(sc, batchInterval)
-////          ssc.checkpoint(sparkParam.cpDir)
-////          ssc
-////        } catch {
-////          case runtime: RuntimeException => {
-////            throw runtime
-////          }
-////        }
-////      })
-//
-//    val batchInterval = TimeUtil.milliseconds(sparkParam.batchInterval) match {
-//      case Some(interval) => Milliseconds(interval)
-//      case _ => throw new Exception("invalid batch interval")
-//    }
-//    val ssc = new StreamingContext(sc, batchInterval)
-//    ssc.checkpoint(sparkParam.cpDir)
-//
-//    // start time
-//    val startTime = new Date().getTime()
-//
-//    val persistFactory = PersistFactory(envParam.persistParams, metricName)
-//
-//    // get persists to persist measure result
-//    val appPersist: Persist = persistFactory.getPersists(startTime)
-//
-//    // get spark application id
-//    val applicationId = sc.applicationId
-//
-//    // persist start id
-//    appPersist.start(applicationId)
-//
-//    InfoCacheInstance.initInstance(envParam.infoCacheParams, metricName)
-//    InfoCacheInstance.init
-//
-//    // generate rule from rule param, generate rule analyzer
-//    val ruleFactory = RuleFactory(userParam.evaluateRuleParam)
-//    val rule: StatementExpr = ruleFactory.generateRule()
-//    val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
-//
-//    // const expr value map
-//    val constExprValueMap = ExprValueUtil.genExprValueMaps(None, ruleAnalyzer.constCacheExprs, Map[String, Any]())
-//    val finalConstExprValueMap = ExprValueUtil.updateExprValueMaps(ruleAnalyzer.constFinalCacheExprs, constExprValueMap)
-//    val finalConstMap = finalConstExprValueMap.headOption match {
-//      case Some(m) => m
-//      case _ => Map[String, Any]()
-//    }
-//
-//    // data connector
-//    val sourceDataConnector: DirectDataConnector =
-//      DataConnectorFactory.getDirectDataConnector(sqlContext, ssc, userParam.sourceParam,
-//        ruleAnalyzer.sourceRuleExprs, finalConstMap
-//      ) match {
-//        case Success(cntr) => {
-//          if (cntr.available) cntr
-//          else throw new Exception("source data connection error!")
-//        }
-//        case Failure(ex) => throw ex
-//      }
-//    val targetDataConnector: DirectDataConnector =
-//      DataConnectorFactory.getDirectDataConnector(sqlContext, ssc, userParam.targetParam,
-//        ruleAnalyzer.targetRuleExprs, finalConstMap
-//      ) match {
-//        case Success(cntr) => {
-//          if (cntr.available) cntr
-//          else throw new Exception("target data connection error!")
-//        }
-//        case Failure(ex) => throw ex
-//      }
-//
-//    val cacheResultProcesser = CacheResultProcesser()
-//
-//    // init data stream
-//    sourceDataConnector.init()
-//    targetDataConnector.init()
-//
-//    // my algo
-//    val algo = StreamingAccuracyAlgo(allParam)
-//
-//    val streamingAccuracyProcess = StreamingAccuracyProcess(
-//      sourceDataConnector, targetDataConnector,
-//      ruleAnalyzer, cacheResultProcesser, persistFactory, appPersist)
-//
-//    val processInterval = TimeUtil.milliseconds(sparkParam.processInterval) match {
-//      case Some(interval) => interval
-//      case _ => throw new Exception("invalid batch interval")
-//    }
-//    val process = TimingProcess(processInterval, streamingAccuracyProcess)
-//
-//    // clean thread
-////    case class Clean() extends Runnable {
-////      val lock = InfoCacheInstance.genLock("clean")
-////      def run(): Unit = {
-////        val locked = lock.lock(5, TimeUnit.SECONDS)
-////        if (locked) {
-////          try {
-////            sourceDataConnector.cleanData
-////            targetDataConnector.cleanData
-////          } finally {
-////            lock.unlock()
-////          }
-////        }
-////      }
-////    }
-////    val cleanInterval = TimeUtil.milliseconds(cleanerParam.cleanInterval) match {
-////      case Some(interval) => interval
-////      case _ => throw new Exception("invalid batch interval")
-////    }
-////    val clean = TimingProcess(cleanInterval, Clean())
-//
-//    process.startup()
-////    clean.startup()
-//
-//    ssc.start()
-//    ssc.awaitTermination()
-//    ssc.stop(stopSparkContext=true, stopGracefully=true)
-//
-//    println("================ end ================")
-//
-//    // context stop
-//    sc.stop
-//
-//    InfoCacheInstance.close
-//
-//    appPersist.finish()
-//
-//    process.shutdown()
-////    clean.shutdown()
-//  }
-//
-//  private def readParamFile[T <: Param](file: String, fsType: String)(implicit m : Manifest[T]): Try[T] = {
-//    val paramReader = ParamReaderFactory.getParamReader(file, fsType)
-//    paramReader.readConfig[T]
-//  }
-//
-//  private def validateParams(allParam: AllParam): Try[Boolean] = {
-//    val allParamValidator = AllParamValidator()
-//    allParamValidator.validate(allParam)
-//  }
-//
-//}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/config/reader/ParamRawStringReaderTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/config/reader/ParamRawStringReaderTest.scala b/measure/src/test/scala/org/apache/griffin/measure/config/reader/ParamRawStringReaderTest.scala
index b3c94e5..9e5d380 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/config/reader/ParamRawStringReaderTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/config/reader/ParamRawStringReaderTest.scala
@@ -26,7 +26,7 @@ import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
 @RunWith(classOf[JUnitRunner])
 class ParamRawStringReaderTest extends FunSuite with Matchers with BeforeAndAfter {
 
-  test("read config") {
+  test("read raw config") {
     val rawString = """{"type": "hdfs", "config": {"path": "/path/to", "time": 1234567}}"""
 
     val reader = ParamRawStringReader(rawString)
@@ -34,5 +34,4 @@ class ParamRawStringReaderTest extends FunSuite with Matchers with BeforeAndAfte
     paramTry.isSuccess should be (true)
     paramTry.get should be (PersistParam("hdfs", Map[String, Any](("path" -> "/path/to"), ("time" -> 1234567))))
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/connector/ConnectorTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/connector/ConnectorTest.scala b/measure/src/test/scala/org/apache/griffin/measure/connector/ConnectorTest.scala
deleted file mode 100644
index 2139ff7..0000000
--- a/measure/src/test/scala/org/apache/griffin/measure/connector/ConnectorTest.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.connector
-
-import java.util.Date
-import java.util.concurrent.TimeUnit
-
-import kafka.serializer.StringDecoder
-import org.apache.griffin.measure.algo.streaming.TimingProcess
-import org.apache.griffin.measure.cache.info.InfoCacheInstance
-import org.apache.griffin.measure.config.params.env._
-import org.apache.griffin.measure.config.params.user.{DataCacheParam, DataConnectorParam, EvaluateRuleParam}
-import org.apache.griffin.measure.config.reader.ParamRawStringReader
-import org.apache.griffin.measure.result.{DataInfo, TimeStampInfo}
-import org.apache.griffin.measure.rule.expr.{Expr, StatementExpr}
-import org.apache.griffin.measure.rule._
-import org.apache.griffin.measure.utils.{HdfsFileDumpUtil, HdfsUtil, TimeUtil}
-import org.apache.griffin.measure.rule.{DataTypeCalculationUtil, ExprValueUtil, RuleExprs}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.types.{DataType, StructField, StructType}
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
-import org.apache.spark.streaming.dstream.InputDStream
-import org.apache.spark.streaming.kafka.KafkaUtils
-import org.apache.spark.streaming.{Milliseconds, StreamingContext}
-import org.apache.spark.{SparkConf, SparkContext}
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-
-import scala.reflect.ClassTag
-import scala.util.{Failure, Success, Try}
-
-@RunWith(classOf[JUnitRunner])
-class ConnectorTest extends FunSuite with Matchers with BeforeAndAfter {
-
-  test("read config") {
-
-    val a = "java.lang.String"
-    val at = getClassTag(a)
-    println(at)
-
-    at match {
-      case ClassTag(m) => println(m)
-      case _ => println("no")
-    }
-
-  }
-
-  private def getClassTag(tp: String): ClassTag[_] = {
-    val clazz = Class.forName(tp)
-    ClassTag(clazz)
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/data/connector/ConnectorTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/data/connector/ConnectorTest.scala b/measure/src/test/scala/org/apache/griffin/measure/data/connector/ConnectorTest.scala
new file mode 100644
index 0000000..ead84f7
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/data/connector/ConnectorTest.scala
@@ -0,0 +1,71 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.data.connector
+
+import java.util.Date
+import java.util.concurrent.TimeUnit
+
+import kafka.serializer.StringDecoder
+import org.apache.griffin.measure.cache.info.InfoCacheInstance
+import org.apache.griffin.measure.config.params.env._
+import org.apache.griffin.measure.config.params.user.{DataConnectorParam, EvaluateRuleParam}
+import org.apache.griffin.measure.config.reader.ParamRawStringReader
+import org.apache.griffin.measure.data.connector.batch.TextDirBatchDataConnector
+import org.apache.griffin.measure.process.TimingProcess
+import org.apache.griffin.measure.process.engine.DqEngines
+import org.apache.griffin.measure.result.{DataInfo, TimeStampInfo}
+import org.apache.griffin.measure.rule._
+import org.apache.griffin.measure.utils.{HdfsFileDumpUtil, HdfsUtil, TimeUtil}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import org.apache.spark.streaming.dstream.InputDStream
+import org.apache.spark.streaming.kafka.KafkaUtils
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
+import org.apache.spark.{SparkConf, SparkContext}
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+
+import scala.reflect.ClassTag
+import scala.util.{Failure, Success, Try}
+
+@RunWith(classOf[JUnitRunner])
+class ConnectorTest extends FunSuite with Matchers with BeforeAndAfter {
+
+  test("read config") {
+
+    val a = "java.lang.String"
+    val at = getClassTag(a)
+    println(at)
+
+    at match {
+      case ClassTag(m) => println(m)
+      case _ => println("no")
+    }
+
+  }
+
+  private def getClassTag(tp: String): ClassTag[_] = {
+    val clazz = Class.forName(tp)
+    ClassTag(clazz)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/process/BatchProcessTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/process/BatchProcessTest.scala b/measure/src/test/scala/org/apache/griffin/measure/process/BatchProcessTest.scala
new file mode 100644
index 0000000..a1e4854
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/process/BatchProcessTest.scala
@@ -0,0 +1,146 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.process
+
+import org.apache.griffin.measure.config.params.env._
+import org.apache.griffin.measure.config.params.user._
+import org.apache.griffin.measure.config.params._
+import org.apache.griffin.measure.config.reader.ParamReaderFactory
+import org.apache.griffin.measure.config.validator.AllParamValidator
+import org.apache.griffin.measure.log.Loggable
+import org.apache.griffin.measure.persist.PersistThreadPool
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+
+import scala.util.{Failure, Success, Try}
+
+@RunWith(classOf[JUnitRunner])
+class BatchProcessTest extends FunSuite with Matchers with BeforeAndAfter with Loggable {
+
+  val envFile = "src/test/resources/env-test.json"
+  val confFile = "src/test/resources/config-test-profiling.json"
+//  val confFile = "src/test/resources/config-test-accuracy.json"
+
+  val envFsType = "local"
+  val userFsType = "local"
+
+  val args = Array(envFile, confFile)
+
+  var allParam: AllParam = _
+
+  before {
+    // read param files
+    val envParam = readParamFile[EnvParam](envFile, envFsType) match {
+      case Success(p) => p
+      case Failure(ex) => {
+        error(ex.getMessage)
+        sys.exit(-2)
+      }
+    }
+    val userParam = readParamFile[UserParam](confFile, userFsType) match {
+      case Success(p) => p
+      case Failure(ex) => {
+        error(ex.getMessage)
+        sys.exit(-2)
+      }
+    }
+    allParam = AllParam(envParam, userParam)
+
+    // validate param files
+    validateParams(allParam) match {
+      case Failure(ex) => {
+        error(ex.getMessage)
+        sys.exit(-3)
+      }
+      case _ => {
+        info("params validation pass")
+      }
+    }
+  }
+
+  test ("batch process") {
+    val procType = ProcessType(allParam.userParam.procType)
+    val proc: DqProcess = procType match {
+      case BatchProcessType => BatchDqProcess(allParam)
+      case StreamingProcessType => StreamingDqProcess(allParam)
+      case _ => {
+        error(s"${procType} is unsupported process type!")
+        sys.exit(-4)
+      }
+    }
+
+    // process init
+    proc.init match {
+      case Success(_) => {
+        info("process init success")
+      }
+      case Failure(ex) => {
+        error(s"process init error: ${ex.getMessage}")
+        shutdown
+        sys.exit(-5)
+      }
+    }
+
+    // process run
+    proc.run match {
+      case Success(_) => {
+        info("process run success")
+      }
+      case Failure(ex) => {
+        error(s"process run error: ${ex.getMessage}")
+
+        if (proc.retriable) {
+          throw ex
+        } else {
+          shutdown
+          sys.exit(-5)
+        }
+      }
+    }
+
+    // process end
+    proc.end match {
+      case Success(_) => {
+        info("process end success")
+      }
+      case Failure(ex) => {
+        error(s"process end error: ${ex.getMessage}")
+        shutdown
+        sys.exit(-5)
+      }
+    }
+
+    shutdown
+  }
+
+  private def readParamFile[T <: Param](file: String, fsType: String)(implicit m : Manifest[T]): Try[T] = {
+    val paramReader = ParamReaderFactory.getParamReader(file, fsType)
+    paramReader.readConfig[T]
+  }
+
+  private def validateParams(allParam: AllParam): Try[Boolean] = {
+    val allParamValidator = AllParamValidator()
+    allParamValidator.validate(allParam)
+  }
+
+  private def shutdown(): Unit = {
+    PersistThreadPool.shutdown
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/process/JsonParseTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/process/JsonParseTest.scala b/measure/src/test/scala/org/apache/griffin/measure/process/JsonParseTest.scala
new file mode 100644
index 0000000..b119d76
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/process/JsonParseTest.scala
@@ -0,0 +1,531 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.process
+
+import org.apache.griffin.measure.config.params._
+import org.apache.griffin.measure.config.params.env._
+import org.apache.griffin.measure.config.params.user._
+import org.apache.griffin.measure.config.reader.ParamReaderFactory
+import org.apache.griffin.measure.config.validator.AllParamValidator
+import org.apache.griffin.measure.log.Loggable
+import org.apache.griffin.measure.persist.PersistThreadPool
+import org.apache.griffin.measure.process.engine.DataFrameOprs
+import org.apache.griffin.measure.utils.{HdfsUtil, JsonUtil}
+import org.apache.hadoop.hive.ql.exec.UDF
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.types._
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+
+import scala.collection.mutable.WrappedArray
+import scala.util.{Failure, Success, Try}
+
+@RunWith(classOf[JUnitRunner])
+class JsonParseTest extends FunSuite with Matchers with BeforeAndAfter with Loggable {
+
+  var sparkContext: SparkContext = _
+  var sqlContext: SQLContext = _
+
+  before {
+    val conf = new SparkConf().setAppName("test json").setMaster("local[*]")
+    sparkContext = new SparkContext(conf)
+    sparkContext.setLogLevel("WARN")
+//    sqlContext = new HiveContext(sparkContext)
+    sqlContext = new SQLContext(sparkContext)
+  }
+
+  test ("json test") {
+    // 0. prepare data
+//    val dt =
+//      """
+//        |{"name": "s1", "age": 12, "items": [1, 2, 3],
+//        |"subs": [{"id": 1, "type": "seed"}, {"id": 2, "type": "frog"}],
+//        |"inner": {"a": 1, "b": 2}, "jstr": "{\"s1\": \"aaa\", \"s2\": 123}"
+//        |}""".stripMargin
+//    val rdd0 = sparkContext.parallelize(Seq(dt)).map(Row(_))
+    val rdd0 = sparkContext.textFile("src/test/resources/input.msg").map(Row(_))
+
+    val vtp = StructField("value", StringType)
+    val df0 = sqlContext.createDataFrame(rdd0, StructType(Array(vtp)))
+    df0.registerTempTable("src")
+
+//    val fromJson2Array = (s: String) => {
+//      JsonUtil.fromJson[Seq[String]](s)
+//    }
+//    sqlContext.udf.register("from_json_to_array", fromJson2Array)
+//
+//    val df2 = sqlContext.sql("SELECT explode(from_json_to_array(get_json_object(value, '$.seeds'))) as value FROM src")
+//    df2.printSchema
+//    df2.show(10)
+//    df2.registerTempTable("df2")
+
+
+
+    // 1. read from json string to extracted json row
+//    val readSql = "SELECT value FROM src"
+//    val df = sqlContext.sql(readSql)
+//    val df = sqlContext.table("src")
+//    val rdd = df.map { row =>
+//      row.getAs[String]("value")
+//    }
+//    val df1 = sqlContext.read.json(rdd)
+//    df1.printSchema
+//    df1.show(10)
+//    df1.registerTempTable("df1")
+    val details = Map[String, Any](("df.name" -> "src"))
+    val df1 = DataFrameOprs.fromJson(sqlContext, details)
+    df1.registerTempTable("df1")
+
+    // 2. extract json array into lines
+//    val rdd2 = df1.flatMap { row =>
+//      row.getAs[WrappedArray[String]]("seeds")
+//    }
+//    val df2 = sqlContext.read.json(rdd2)
+    val df2 = sqlContext.sql("select explode(seeds) as value from df1")
+//    val tdf = sqlContext.sql("select name, age, explode(items) as item from df1")
+//    tdf.registerTempTable("tdf")
+//    val df2 = sqlContext.sql("select struct(name, age, item) as ttt from tdf")
+    df2.printSchema
+    df2.show(10)
+    df2.registerTempTable("df2")
+    println(df2.count)
+
+    val sql1 = "SELECT value FROM df2"
+    val df22 = sqlContext.sql(sql1)
+    val rdd22 = df22.map { row =>
+      row.getAs[String]("value")
+    }
+    import org.apache.spark.sql.functions._
+    val df23 = sqlContext.read.json(rdd22)
+    df23.registerTempTable("df23")
+//    df23.withColumn("par", monotonicallyIncreasingId)
+
+    val df24 = sqlContext.sql("SELECT url, cast(get_json_object(metadata, '$.tracker.crawlRequestCreateTS') as bigint) as ts FROM df23")
+    df24.printSchema
+    df24.show(10)
+    df24.registerTempTable("df24")
+    println(df24.count)
+
+//    val df25 = sqlContext.sql("select ")
+
+//
+//    // 3. extract json string into row
+////    val df3 = sqlContext.sql("select cast(get_json_object(metadata, '$.tracker.crawlRequestCreateTS') as bigint), url from df2")
+//    val df3 = sqlContext.sql("select cast(get_json_object(get_json_object(value, '$.metadata'), '$.tracker.crawlRequestCreateTS') as bigint), get_json_object(value, '$.url') from df2")
+//    df3.printSchema()
+//    df3.show(10)
+//    println(df3.count)
+
+
+
+//    val df5 = sqlContext.sql("select get_json_object(value, '$.subs') as subs from src")
+//    df5.printSchema()
+//    df5.show(10)
+//    df5.registerTempTable("df5")
+//    val rdd5 = df5.map { row =>
+//      row.getAs[String]("subs")
+//    }
+//    val df6 = sqlContext.read.json(rdd5)
+//    df6.printSchema
+//    df6.show(10)
+
+    // 2. extract json string to row
+//    val df2 = sqlContext.sql("select jstr from df1")
+//    val rdd2 = df2.map { row =>
+//      row.getAs[String]("jstr")
+//    }
+//    val df22 = sqlContext.read.json(rdd2)
+//    df22.printSchema
+//    df22.show(100)
+//    df22.registerTempTable("df2")
+//
+//    val df23 = sqlContext.sql("select json_tuple(jstr, 's1', 's2') from df1")
+//    df23.printSchema()
+//    df23.show(100)
+
+    // 3. extract json array into lines ??
+
+    // 3. flatmap from json row to json row
+//    val df3 = sqlContext.sql("select explode(subs) as sub, items from df1")
+//    df3.printSchema()
+//    df3.show(10)
+//    df3.registerTempTable("df3")
+//
+//    val df4 = sqlContext.sql("select explode(items) as item, sub from df3")
+//    df4.printSchema()
+//    df4.show(10)
+
+//    sqlContext.udf.register("length", (s: WrappedArray[_]) => s.length)
+    //
+    //    val df2 = sqlContext.sql("SELECT inner from df1")
+    //    df2.registerTempTable("df2")
+    //    df2.printSchema
+    //    df2.show(100)
+
+//    def children(colname: String, df: DataFrame): Array[DataFrame] = {
+//      val parent = df.schema.fields.filter(_.name == colname).head
+//      println(parent)
+//      val fields: Array[StructField] = parent.dataType match {
+//        case x: StructType => x.fields
+//        case _ => Array.empty[StructField]
+//      }
+//      fields.map(x => col(s"$colname.${x.name}"))
+////      fields.foreach(println)
+//    }
+////
+//    children("inner", df2)
+//
+//    df2.select(children("bar", df): _*).printSchema
+
+//    val df3 = sqlContext.sql("select inline(subs) from df1")
+//    df3.printSchema()
+//    df3.show(100)
+
+//    val rdd2 = df2.flatMap { row =>
+//      row.getAs[GenericRowWithSchema]("inner") :: Nil
+//    }
+//
+//    rdd2.
+
+//    val funcs = sqlContext.sql("show functions")
+//    funcs.printSchema()
+//    funcs.show(1000)
+//
+//    val desc = sqlContext.sql("describe function inline")
+//    desc.printSchema()
+//    desc.show(100)
+
+    //
+
+  }
+
+  test ("json test 2") {
+    val rdd0 = sparkContext.textFile("src/test/resources/output.msg").map(Row(_))
+
+    val vtp = StructField("value", StringType)
+    val df0 = sqlContext.createDataFrame(rdd0, StructType(Array(vtp)))
+    df0.registerTempTable("tgt")
+
+//    val fromJson2StringArray = (s: String) => {
+//      val seq = JsonUtil.fromJson[Seq[Any]](s)
+//      seq.map(i => JsonUtil.toJson(i))
+//    }
+//    sqlContext.udf.register("from_json_to_string_array", fromJson2StringArray)
+//
+//    val df2 = sqlContext.sql("SELECT from_json_to_string_array(get_json_object(value, '$.groups[0].attrsList')) as value FROM tgt")
+//    df2.printSchema()
+//    df2.show(10)
+//    df2.registerTempTable("df2")
+//
+//    val indexOfStringArray = (sa: String, )
+
+
+    // 1. read from json string to extracted json row
+    val readSql = "SELECT value FROM tgt"
+    val df = sqlContext.sql(readSql)
+    val rdd = df.map { row =>
+      row.getAs[String]("value")
+    }
+    val df1 = sqlContext.read.json(rdd)
+    df1.printSchema
+    df1.show(10)
+    df1.registerTempTable("df1")
+
+
+    val df2 = sqlContext.sql("select groups[0].attrsList as attrs from df1")
+    df2.printSchema
+    df2.show(10)
+    df2.registerTempTable("df2")
+    println(df2.count)
+
+    val indexOf = (arr: Seq[String], v: String) => {
+      arr.indexOf(v)
+    }
+    sqlContext.udf.register("index_of", indexOf)
+
+    val df3 = sqlContext.sql("select attrs.values[index_of(attrs.name, 'URL')][0] as url, cast(get_json_object(attrs.values[index_of(attrs.name, 'CRAWLMETADATA')][0], '$.tracker.crawlRequestCreateTS') as bigint) as ts from df2")
+    df3.printSchema()
+    df3.show(10)
+    df3.registerTempTable("df3")
+  }
+
+  test ("testing") {
+    val dt =
+      """
+        |{"name": "age", "age": 12, "items": [1, 2, 3],
+        |"subs": [{"id": 1, "type": "seed"}, {"id": 2, "type": "frog"}],
+        |"inner": {"a": 1, "b": 2}, "jstr": "{\"s1\": \"aaa\", \"s2\": 123}", "b": true
+        |}""".stripMargin
+    val rdd = sparkContext.parallelize(Seq(dt)).map(Row(_))
+    val vtp = StructField("value", StringType)
+    val df = sqlContext.createDataFrame(rdd, StructType(Array(vtp)))
+    df.registerTempTable("df")
+
+    val df1 = sqlContext.read.json(sqlContext.sql("select * from df").map(r => r.getAs[String]("value")))
+    df1.printSchema()
+    df1.show(10)
+    df1.registerTempTable("df1")
+
+    val test = (s: String) => {
+      s.toInt
+    }
+    sqlContext.udf.register("to_int", test)
+
+    val df2 = sqlContext.sql("select (b) as aa, inner.a from df1 where age = to_int(\"12\")")
+    df2.printSchema()
+    df2.show(10)
+  }
+
+  test ("test input only sql") {
+    val rdd0 = sparkContext.textFile("src/test/resources/input.msg").map(Row(_))
+
+    val vtp = StructField("value", StringType)
+    val df0 = sqlContext.createDataFrame(rdd0, StructType(Array(vtp)))
+    df0.registerTempTable("src")
+    df0.show(10)
+
+    // 1. read from json string to extracted json row
+    val df1 = sqlContext.sql("SELECT get_json_object(value, '$.seeds') as seeds FROM src")
+    df1.printSchema
+    df1.show(10)
+    df1.registerTempTable("df1")
+
+    val json2StringArray: (String) => Seq[String] = (s: String) => {
+      val seq = JsonUtil.fromJson[Seq[String]](s)
+//      seq.map(i => JsonUtil.toJson(i))
+      seq
+    }
+    sqlContext.udf.register("json_to_string_array", json2StringArray)
+
+    val df2 = sqlContext.sql("SELECT explode(json_to_string_array(seeds)) as seed FROM df1")
+    df2.printSchema
+    df2.show(10)
+    df2.registerTempTable("df2")
+
+
+    val df3 = sqlContext.sql("SELECT get_json_object(seed, '$.url') as url, cast(get_json_object(get_json_object(seed, '$.metadata'), '$.tracker.crawlRequestCreateTS') as bigint) as ts FROM df2")
+    df3.printSchema
+    df3.show(10)
+  }
+
+  test ("test output only sql") {
+    val rdd0 = sparkContext.textFile("src/test/resources/output.msg").map(Row(_))
+
+    val vtp = StructField("value", StringType)
+    val df0 = sqlContext.createDataFrame(rdd0, StructType(Array(vtp)))
+    df0.registerTempTable("tgt")
+    df0.printSchema()
+    df0.show(10)
+
+    val json2StringArray: (String) => Seq[String] = (s: String) => {
+      JsonUtil.fromJson[Seq[String]](s)
+    }
+    sqlContext.udf.register("json_to_string_array", json2StringArray)
+
+    val json2StringJsonArray: (String) => Seq[String] = (s: String) => {
+      val seq = JsonUtil.fromJson[Seq[Any]](s)
+      seq.map(i => JsonUtil.toJson(i))
+    }
+    sqlContext.udf.register("json_to_string_json_array", json2StringJsonArray)
+
+    val indexOf = (arr: Seq[String], v: String) => {
+      arr.indexOf(v)
+    }
+    sqlContext.udf.register("index_of", indexOf)
+
+    val indexOfField = (arr: Seq[String], k: String, v: String) => {
+      val seq = arr.flatMap { item =>
+        JsonUtil.fromJson[Map[String, Any]](item).get(k)
+      }
+      seq.indexOf(v)
+    }
+    sqlContext.udf.register("index_of_field", indexOfField)
+
+    // 1. read from json string to extracted json row
+    val df1 = sqlContext.sql("SELECT get_json_object(value, '$.groups[0].attrsList') as attrs FROM tgt")
+    df1.printSchema
+    df1.show(10)
+    df1.registerTempTable("df1")
+
+    val df2 = sqlContext.sql("SELECT json_to_string_json_array(attrs) as attrs FROM df1")
+    df2.printSchema()
+    df2.show(10)
+    df2.registerTempTable("df2")
+
+    val df3 = sqlContext.sql("SELECT attrs[index_of_field(attrs, 'name', 'URL')] as attr1, attrs[index_of_field(attrs, 'name', 'CRAWLMETADATA')] as attr2 FROM df2")
+    df3.printSchema()
+    df3.show(10)
+    df3.registerTempTable("df3")
+
+    val df4 = sqlContext.sql("SELECT json_to_string_array(get_json_object(attr1, '$.values'))[0], cast(get_json_object(json_to_string_array(get_json_object(attr2, '$.values'))[0], '$.tracker.crawlRequestCreateTS') as bigint) FROM df3")
+    df4.printSchema()
+    df4.show(10)
+  }
+
+  test ("test from json") {
+    val fromJson2Map = (str: String) => {
+      val a = JsonUtil.fromJson[Map[String, Any]](str)
+      a.mapValues { v =>
+        v match {
+          case t: String => t
+          case _ => JsonUtil.toJson(v)
+        }
+      }
+    }
+    sqlContext.udf.register("from_json_to_map", fromJson2Map)
+
+    val fromJson2Array = (str: String) => {
+      val a = JsonUtil.fromJson[Seq[Any]](str)
+      a.map { v =>
+        v match {
+          case t: String => t
+          case _ => JsonUtil.toJson(v)
+        }
+      }
+    }
+    sqlContext.udf.register("from_json_to_array", fromJson2Array)
+
+    // ========================
+
+    val srdd = sparkContext.textFile("src/test/resources/input.msg").map(Row(_))
+    val svtp = StructField("value", StringType)
+    val sdf0 = sqlContext.createDataFrame(srdd, StructType(Array(svtp)))
+    sdf0.registerTempTable("sdf0")
+    sdf0.show(10)
+
+    // 1. read from json string to extracted json row
+    val sdf1 = sqlContext.sql("SELECT explode(from_json_to_array(get_json_object(value, '$.seeds'))) as seed FROM sdf0")
+    sdf1.printSchema
+    sdf1.show(10)
+    sdf1.registerTempTable("sdf1")
+
+    val sdf2 = sqlContext.sql("SELECT get_json_object(seed, '$.url') as url, cast(get_json_object(get_json_object(seed, '$.metadata'), '$.tracker.crawlRequestCreateTS') as bigint) as ts FROM sdf1")
+    sdf2.printSchema
+    sdf2.show(10)
+
+    // ---------------------------------------
+
+    val trdd = sparkContext.textFile("src/test/resources/output.msg").map(Row(_))
+    val tvtp = StructField("value", StringType)
+    val tdf0 = sqlContext.createDataFrame(trdd, StructType(Array(tvtp)))
+    tdf0.registerTempTable("tdf0")
+    tdf0.printSchema()
+    tdf0.show(10)
+
+//    val json2StringArray: (String) => Seq[String] = (s: String) => {
+//      JsonUtil.fromJson[Seq[String]](s)
+//    }
+//    sqlContext.udf.register("json_to_string_array", json2StringArray)
+//
+//    val json2StringJsonArray: (String) => Seq[String] = (s: String) => {
+//      val seq = JsonUtil.fromJson[Seq[Any]](s)
+//      seq.map(i => JsonUtil.toJson(i))
+//    }
+//    sqlContext.udf.register("json_to_string_json_array", json2StringJsonArray)
+//
+//    val indexOf = (arr: Seq[String], v: String) => {
+//      arr.indexOf(v)
+//    }
+//    sqlContext.udf.register("index_of", indexOf)
+//
+    val indexOfField = (arr: Seq[String], k: String, v: String) => {
+      val seq = arr.flatMap { item =>
+        JsonUtil.fromJson[Map[String, Any]](item).get(k)
+      }
+      seq.indexOf(v)
+    }
+    sqlContext.udf.register("index_of_field", indexOfField)
+
+    // 1. read from json string to extracted json row
+//    val df1 = sqlContext.sql("SELECT get_json_object(value, '$.groups[0].attrsList') as attrs FROM tdf0")
+    val tdf1 = sqlContext.sql("SELECT from_json_to_array(get_json_object(value, '$.groups[0].attrsList')) as attrs FROM tdf0")
+    tdf1.printSchema
+    tdf1.show(10)
+    tdf1.registerTempTable("tdf1")
+
+//    val tdf2 = sqlContext.sql("SELECT attrs[index_of_field(attrs, 'name', 'URL')] as attr1, attrs[index_of_field(attrs, 'name', 'CRAWLMETADATA')] as attr2 FROM tdf1")
+//    tdf2.printSchema()
+//    tdf2.show(10)
+//    tdf2.registerTempTable("tdf2")
+
+    val tdf3 = sqlContext.sql("SELECT from_json_to_array(get_json_object(attrs[index_of_field(attrs, 'name', 'URL')], '$.values'))[0] as url, cast(get_json_object(from_json_to_array(get_json_object(attrs[index_of_field(attrs, 'name', 'CRAWLMETADATA')], '$.values'))[0], '$.tracker.crawlRequestCreateTS') as bigint) as ts FROM tdf1")
+    tdf3.printSchema()
+    tdf3.show(10)
+  }
+
+  test ("sql functions") {
+    val functions = sqlContext.sql("show functions")
+    functions.printSchema()
+    functions.show(10)
+
+    val functionNames = functions.map(_.getString(0)).collect
+    functionNames.foreach(println)
+  }
+
+  test ("test text file read") {
+    val partitionPaths = Seq[String](
+      "hdfs://localhost/griffin/streaming/dump/source/418010/25080625/1504837518000",
+      "hdfs://localhost/griffin/streaming/dump/target/418010/25080625/1504837518000")
+    val df = sqlContext.read.json(partitionPaths: _*)
+    df.printSchema()
+    df.show(10)
+  }
+
+  test ("list paths") {
+    val filePath = "hdfs://localhost/griffin/streaming/dump/source"
+    val partitionRanges = List[(Long, Long)]((0, 0), (-2, 0))
+    val partitionPaths = listPathsBetweenRanges(filePath :: Nil, partitionRanges)
+    println(partitionPaths)
+  }
+
+  private def listPathsBetweenRanges(paths: List[String],
+                                     partitionRanges: List[(Long, Long)]
+                                    ): List[String] = {
+    partitionRanges match {
+      case Nil => paths
+      case head :: tail => {
+        val (lb, ub) = head
+        val curPaths = paths.flatMap { path =>
+          val names = HdfsUtil.listSubPathsByType(path, "dir").toList
+          println(names)
+          names.filter { name =>
+            str2Long(name) match {
+              case Some(t) => (t >= lb) && (t <= ub)
+              case _ => false
+            }
+          }.map(HdfsUtil.getHdfsFilePath(path, _))
+        }
+        listPathsBetweenRanges(curPaths, tail)
+      }
+    }
+  }
+
+  private def str2Long(str: String): Option[Long] = {
+    try {
+      Some(str.toLong)
+    } catch {
+      case e: Throwable => None
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/process/JsonToStructs.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/process/JsonToStructs.scala b/measure/src/test/scala/org/apache/griffin/measure/process/JsonToStructs.scala
new file mode 100644
index 0000000..394917c
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/process/JsonToStructs.scala
@@ -0,0 +1,85 @@
+package org.apache.griffin.measure.process
+
+import org.apache.griffin.measure.utils.JsonUtil
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData}
+import org.apache.spark.sql.execution.datasources.json.JSONOptions
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+
+case class JsonToStructs(
+//                          schema: DataType,
+//                          options: Map[String, String],
+                          child: Expression)
+  extends UnaryExpression with CodegenFallback with ExpectsInputTypes {
+  override def nullable: Boolean = true
+
+//  def this(schema: DataType, options: Map[String, String], child: Expression) =
+//    this(schema, options, child, None)
+
+  // Used in `FunctionRegistry`
+//  def this(child: Expression, schema: Expression) =
+//  this(
+//    schema = JsonExprUtils.validateSchemaLiteral(schema),
+//    options = Map.empty[String, String],
+//    child = child,
+//    timeZoneId = None)
+//
+//  def this(child: Expression, schema: Expression, options: Expression) =
+//    this(
+//      schema = JsonExprUtils.validateSchemaLiteral(schema),
+//      options = JsonExprUtils.convertToMapData(options),
+//      child = child,
+//      timeZoneId = None)
+//
+//  override def checkInputDataTypes(): TypeCheckResult = schema match {
+//    case _: StructType | ArrayType(_: StructType, _) =>
+//      super.checkInputDataTypes()
+//    case _ => TypeCheckResult.TypeCheckFailure(
+//      s"Input schema ${schema.simpleString} must be a struct or an array of structs.")
+//  }
+
+  override def dataType: DataType = MapType(StringType, StringType)
+
+//  override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
+//    copy(timeZoneId = Option(timeZoneId))
+
+  override def nullSafeEval(json: Any): Any = {
+    if (json.toString.trim.isEmpty) return null
+
+    try {
+      JsonUtil.fromJson[Map[String, Any]](json.toString)
+    } catch {
+      case _: Throwable => null
+    }
+  }
+
+  override def inputTypes: Seq[DataType] = StringType :: Nil
+}
+//
+//object JsonExprUtils {
+//
+//  def validateSchemaLiteral(exp: Expression): StructType = exp match {
+//    case Literal(s, StringType) => CatalystSqlParser.parseTableSchema(s.toString)
+//    case e => throw new AnalysisException(s"Expected a string literal instead of $e")
+//  }
+//
+//  def convertToMapData(exp: Expression): Map[String, String] = exp match {
+//    case m: CreateMap
+//      if m.dataType.acceptsType(MapType(StringType, StringType, valueContainsNull = false)) =>
+//      val arrayMap = m.eval().asInstanceOf[ArrayBasedMapData]
+//      ArrayBasedMapData.toScalaMap(arrayMap).map { case (key, value) =>
+//        key.toString -> value.toString
+//      }
+//    case m: CreateMap =>
+//      throw new AnalysisException(
+//        s"A type of keys and values in map() must be string, but got ${m.dataType}")
+//    case _ =>
+//      throw new AnalysisException("Must use a map() function for options")
+//  }
+//}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/process/StreamingProcessTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/process/StreamingProcessTest.scala b/measure/src/test/scala/org/apache/griffin/measure/process/StreamingProcessTest.scala
new file mode 100644
index 0000000..07b7c5e
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/process/StreamingProcessTest.scala
@@ -0,0 +1,147 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.process
+
+import org.apache.griffin.measure.config.params._
+import org.apache.griffin.measure.config.params.env._
+import org.apache.griffin.measure.config.params.user._
+import org.apache.griffin.measure.config.reader.ParamReaderFactory
+import org.apache.griffin.measure.config.validator.AllParamValidator
+import org.apache.griffin.measure.log.Loggable
+import org.apache.griffin.measure.persist.PersistThreadPool
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+
+import scala.util.{Failure, Success, Try}
+
+@RunWith(classOf[JUnitRunner])
+class StreamingProcessTest extends FunSuite with Matchers with BeforeAndAfter with Loggable {
+
+  val envFile = "src/test/resources/env-streaming.json"
+//  val confFile = "src/test/resources/config-test-accuracy-streaming-multids.json"
+  val confFile = "src/test/resources/config-test-accuracy-streaming.json"
+//  val confFile = "src/test/resources/config-test-profiling-streaming.json"
+
+  val envFsType = "local"
+  val userFsType = "local"
+
+  val args = Array(envFile, confFile)
+
+  var allParam: AllParam = _
+
+  before {
+    // read param files
+    val envParam = readParamFile[EnvParam](envFile, envFsType) match {
+      case Success(p) => p
+      case Failure(ex) => {
+        error(ex.getMessage)
+        sys.exit(-2)
+      }
+    }
+    val userParam = readParamFile[UserParam](confFile, userFsType) match {
+      case Success(p) => p
+      case Failure(ex) => {
+        error(ex.getMessage)
+        sys.exit(-2)
+      }
+    }
+    allParam = AllParam(envParam, userParam)
+
+    // validate param files
+    validateParams(allParam) match {
+      case Failure(ex) => {
+        error(ex.getMessage)
+        sys.exit(-3)
+      }
+      case _ => {
+        info("params validation pass")
+      }
+    }
+  }
+
+  test ("streaming process") {
+    val procType = ProcessType(allParam.userParam.procType)
+    val proc: DqProcess = procType match {
+      case BatchProcessType => BatchDqProcess(allParam)
+      case StreamingProcessType => StreamingDqProcess(allParam)
+      case _ => {
+        error(s"${procType} is unsupported process type!")
+        sys.exit(-4)
+      }
+    }
+
+    // process init
+    proc.init match {
+      case Success(_) => {
+        info("process init success")
+      }
+      case Failure(ex) => {
+        error(s"process init error: ${ex.getMessage}")
+        shutdown
+        sys.exit(-5)
+      }
+    }
+
+    // process run
+    proc.run match {
+      case Success(_) => {
+        info("process run success")
+      }
+      case Failure(ex) => {
+        error(s"process run error: ${ex.getMessage}")
+
+        if (proc.retriable) {
+          throw ex
+        } else {
+          shutdown
+          sys.exit(-5)
+        }
+      }
+    }
+
+    // process end
+    proc.end match {
+      case Success(_) => {
+        info("process end success")
+      }
+      case Failure(ex) => {
+        error(s"process end error: ${ex.getMessage}")
+        shutdown
+        sys.exit(-5)
+      }
+    }
+
+    shutdown
+  }
+
+  private def readParamFile[T <: Param](file: String, fsType: String)(implicit m : Manifest[T]): Try[T] = {
+    val paramReader = ParamReaderFactory.getParamReader(file, fsType)
+    paramReader.readConfig[T]
+  }
+
+  private def validateParams(allParam: AllParam): Try[Boolean] = {
+    val allParamValidator = AllParamValidator()
+    allParamValidator.validate(allParam)
+  }
+
+  private def shutdown(): Unit = {
+    PersistThreadPool.shutdown
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/rule/ExprValueUtilTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/rule/ExprValueUtilTest.scala b/measure/src/test/scala/org/apache/griffin/measure/rule/ExprValueUtilTest.scala
deleted file mode 100644
index dd8d4a0..0000000
--- a/measure/src/test/scala/org/apache/griffin/measure/rule/ExprValueUtilTest.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule
-
-import org.apache.griffin.measure.config.params.user.EvaluateRuleParam
-import org.apache.griffin.measure.rule.expr.{Expr, StatementExpr}
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-
-@RunWith(classOf[JUnitRunner])
-class ExprValueUtilTest extends FunSuite with BeforeAndAfter with Matchers {
-
-  test ("rule calculation") {
-    //    val rules = "$source.json().name = 's2' and $source.json().age[*] = 32"
-    //    val rules = "$source.json().items[*] = 202 AND $source.json().age[*] = 32 AND $source.json().df[*].a = 1"
-    val rules = "$source.json().items[*] = 202 AND $source.json().age[*] = 32 AND $source.json().df['a' = 1].b = 4"
-    //    val rules = "$source.json().df[0].a = 1"
-    val ep = EvaluateRuleParam(1, rules)
-
-    val ruleFactory = RuleFactory(ep)
-    val rule: StatementExpr = ruleFactory.generateRule()
-    val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
-
-    val ruleExprs = ruleAnalyzer.sourceRuleExprs
-    val constFinalExprValueMap = Map[String, Any]()
-
-    val data = List[String](
-      ("""{"name": "s1", "age": [22, 23], "items": [102, 104, 106], "df": [{"a": 1, "b": 3}, {"a": 2, "b": 4}]}"""),
-      ("""{"name": "s2", "age": [32, 33], "items": [202, 204, 206], "df": [{"a": 1, "b": 4}, {"a": 2, "b": 4}]}"""),
-      ("""{"name": "s3", "age": [42, 43], "items": [302, 304, 306], "df": [{"a": 1, "b": 5}, {"a": 2, "b": 4}]}""")
-    )
-
-    def str(expr: Expr) = {
-      s"${expr._id}: ${expr.desc} [${expr.getClass.getSimpleName}]"
-    }
-    println("====")
-    ruleExprs.finalCacheExprs.foreach { expr =>
-      println(str(expr))
-    }
-    println("====")
-    ruleExprs.cacheExprs.foreach { expr =>
-      println(str(expr))
-    }
-
-    val constExprValueMap = ExprValueUtil.genExprValueMaps(None, ruleAnalyzer.constCacheExprs, Map[String, Any]())
-    val finalConstExprValueMap = ExprValueUtil.updateExprValueMaps(ruleAnalyzer.constFinalCacheExprs, constExprValueMap)
-    val finalConstMap = finalConstExprValueMap.headOption match {
-      case Some(m) => m
-      case _ => Map[String, Any]()
-    }
-    println("====")
-    println(ruleAnalyzer.constCacheExprs)
-    println(ruleAnalyzer.constFinalCacheExprs)
-    println(finalConstMap)
-
-    println("====")
-    val valueMaps = data.flatMap { msg =>
-      val cacheExprValueMaps = ExprValueUtil.genExprValueMaps(Some(msg), ruleExprs.cacheExprs, finalConstMap)
-      val finalExprValueMaps = ExprValueUtil.updateExprValueMaps(ruleExprs.finalCacheExprs, cacheExprValueMaps)
-
-      finalExprValueMaps
-    }
-
-    valueMaps.foreach(println)
-    println(valueMaps.size)
-
-  }
-
-}