You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2017/10/30 08:08:20 UTC

[1/2] incubator-griffin git commit: remove some unused files

Repository: incubator-griffin
Updated Branches:
  refs/heads/master 0dd1d3599 -> 1f984da1a


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/test/scala/org/apache/griffin/measure/process/StreamingProcessTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/process/StreamingProcessTest.scala b/measure/src/test/scala/org/apache/griffin/measure/process/StreamingProcessTest.scala
deleted file mode 100644
index caecc9c..0000000
--- a/measure/src/test/scala/org/apache/griffin/measure/process/StreamingProcessTest.scala
+++ /dev/null
@@ -1,147 +0,0 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements.  See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership.  The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License.  You may obtain a copy of the License at
-//
-//  http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied.  See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//package org.apache.griffin.measure.process
-//
-//import org.apache.griffin.measure.config.params._
-//import org.apache.griffin.measure.config.params.env._
-//import org.apache.griffin.measure.config.params.user._
-//import org.apache.griffin.measure.config.reader.ParamReaderFactory
-//import org.apache.griffin.measure.config.validator.AllParamValidator
-//import org.apache.griffin.measure.log.Loggable
-//import org.apache.griffin.measure.persist.PersistThreadPool
-//import org.junit.runner.RunWith
-//import org.scalatest.junit.JUnitRunner
-//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-//
-//import scala.util.{Failure, Success, Try}
-//
-//@RunWith(classOf[JUnitRunner])
-//class StreamingProcessTest extends FunSuite with Matchers with BeforeAndAfter with Loggable {
-//
-//  val envFile = "src/test/resources/env-streaming.json"
-////  val confFile = "src/test/resources/config-test-accuracy-streaming-multids.json"
-//  val confFile = "src/test/resources/config-test-accuracy-streaming.json"
-////  val confFile = "src/test/resources/config-test-profiling-streaming.json"
-//
-//  val envFsType = "local"
-//  val userFsType = "local"
-//
-//  val args = Array(envFile, confFile)
-//
-//  var allParam: AllParam = _
-//
-//  before {
-//    // read param files
-//    val envParam = readParamFile[EnvParam](envFile, envFsType) match {
-//      case Success(p) => p
-//      case Failure(ex) => {
-//        error(ex.getMessage)
-//        sys.exit(-2)
-//      }
-//    }
-//    val userParam = readParamFile[UserParam](confFile, userFsType) match {
-//      case Success(p) => p
-//      case Failure(ex) => {
-//        error(ex.getMessage)
-//        sys.exit(-2)
-//      }
-//    }
-//    allParam = AllParam(envParam, userParam)
-//
-//    // validate param files
-//    validateParams(allParam) match {
-//      case Failure(ex) => {
-//        error(ex.getMessage)
-//        sys.exit(-3)
-//      }
-//      case _ => {
-//        info("params validation pass")
-//      }
-//    }
-//  }
-//
-//  test ("streaming process") {
-//    val procType = ProcessType(allParam.userParam.procType)
-//    val proc: DqProcess = procType match {
-//      case BatchProcessType => BatchDqProcess(allParam)
-//      case StreamingProcessType => StreamingDqProcess(allParam)
-//      case _ => {
-//        error(s"${procType} is unsupported process type!")
-//        sys.exit(-4)
-//      }
-//    }
-//
-//    // process init
-//    proc.init match {
-//      case Success(_) => {
-//        info("process init success")
-//      }
-//      case Failure(ex) => {
-//        error(s"process init error: ${ex.getMessage}")
-//        shutdown
-//        sys.exit(-5)
-//      }
-//    }
-//
-//    // process run
-//    proc.run match {
-//      case Success(_) => {
-//        info("process run success")
-//      }
-//      case Failure(ex) => {
-//        error(s"process run error: ${ex.getMessage}")
-//
-//        if (proc.retriable) {
-//          throw ex
-//        } else {
-//          shutdown
-//          sys.exit(-5)
-//        }
-//      }
-//    }
-//
-//    // process end
-//    proc.end match {
-//      case Success(_) => {
-//        info("process end success")
-//      }
-//      case Failure(ex) => {
-//        error(s"process end error: ${ex.getMessage}")
-//        shutdown
-//        sys.exit(-5)
-//      }
-//    }
-//
-//    shutdown
-//  }
-//
-//  private def readParamFile[T <: Param](file: String, fsType: String)(implicit m : Manifest[T]): Try[T] = {
-//    val paramReader = ParamReaderFactory.getParamReader(file, fsType)
-//    paramReader.readConfig[T]
-//  }
-//
-//  private def validateParams(allParam: AllParam): Try[Boolean] = {
-//    val allParamValidator = AllParamValidator()
-//    allParamValidator.validate(allParam)
-//  }
-//
-//  private def shutdown(): Unit = {
-//    PersistThreadPool.shutdown
-//  }
-//}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/test/scala/org/apache/griffin/measure/sql/SqlTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/sql/SqlTest.scala b/measure/src/test/scala/org/apache/griffin/measure/sql/SqlTest.scala
deleted file mode 100644
index 7b23062..0000000
--- a/measure/src/test/scala/org/apache/griffin/measure/sql/SqlTest.scala
+++ /dev/null
@@ -1,125 +0,0 @@
-//package org.apache.griffin.measure.sql
-//
-//import org.apache.griffin.measure.config.params.user.EvaluateRuleParam
-//import org.apache.griffin.measure.rule.expr.{Expr, StatementExpr}
-//import org.apache.spark.sql.{DataFrame, SQLContext}
-//import org.apache.spark.sql.types.{ArrayType, IntegerType, StructField, StructType}
-//import org.apache.spark.{SparkConf, SparkContext}
-//import org.junit.runner.RunWith
-//import org.scalatest.junit.JUnitRunner
-//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-//
-//@RunWith(classOf[JUnitRunner])
-//class SqlTest extends FunSuite with BeforeAndAfter with Matchers {
-//
-//  var sc: SparkContext = _
-//  var sqlContext: SQLContext = _
-//
-//  before {
-//    val conf = new SparkConf().setMaster("local[*]").setAppName("test")
-//    sc = new SparkContext(conf)
-//    sqlContext = new SQLContext(sc)
-//  }
-//
-//  test ("spark sql") {
-//
-//    val squared = (s: Int) => {
-//      s * s
-//    }
-//    sqlContext.udf.register("square", squared)
-//
-//    val a = sqlContext.range(1, 20)
-//    a.show
-//
-//    a.registerTempTable("test")
-//
-//    val table = sqlContext.sql("select * from test")
-//    table.show()
-//
-//    val result = sqlContext.sql("select id, square(id) as id_squared from test")
-//    result.show()
-//
-//  }
-//
-//  test ("json") {
-//    def jsonToDataFrame(json: String, schema: Option[StructType] = None): DataFrame = {
-//      val reader = sqlContext.read
-//      val rd = schema match {
-//        case Some(scm) => reader.schema(scm)
-//        case _ => reader
-//      }
-//      rd.json(sc.parallelize(json :: Nil))
-//    }
-//
-//    val json =
-//      """
-//        |{
-//        |  "a": [
-//        |     1, 2, 3
-//        |  ]
-//        |}
-//      """.stripMargin
-//
-////    val bt = StructField("b", IntegerType)
-////    val at = StructField("a", StructType(bt :: Nil))
-////    val schema = StructType(at :: Nil)
-//
-//    val at = StructField("a", ArrayType(IntegerType))
-//    val schema = StructType(at :: Nil)
-//
-//    val df = jsonToDataFrame(json, Some(schema))
-//
-//    df.registerTempTable("json")
-//
-//    val result = sqlContext.sql("select a[1] from json")
-//    result.show
-//
-//  }
-//
-//  test ("json file") {
-//
-//    // read json file directly
-////    val filePath = "src/test/resources/test-data.jsonFile"
-////    val reader = sqlContext.read
-////    val df = reader.json(filePath)
-////    df.show
-////
-////    df.registerTempTable("ttt")
-////    val result = sqlContext.sql("select * from ttt where list[0].c = 11")
-////    result.show
-//
-//    // whole json file
-////    val filePath = "src/test/resources/test-data0.json"
-//////    val filePath = "hdfs://localhost/test/file/t1.json"
-////    val jsonRDD = sc.wholeTextFiles(s"${filePath},${filePath}").map(x => x._2)
-////    val namesJson = sqlContext.read.json(jsonRDD)
-////    namesJson.printSchema
-////    namesJson.show
-//
-//    // read text file then convert to json
-//    val filePath = "src/test/resources/test-data.jsonFile"
-//    val rdd = sc.textFile(filePath)
-//    val reader = sqlContext.read
-//    val df = reader.json(rdd)
-//    df.show
-//    df.printSchema
-//
-//    df.registerTempTable("ttt")
-//    val result = sqlContext.sql("select * from ttt where list[0].c = 11")
-//    result.show
-//
-//    // udf
-//    val slice = (arr: Seq[Long], f: Int, e: Int) => arr.slice(f, e)
-////    val slice = (arr: Seq[Long]) => arr.slice(0, 1)
-//    sqlContext.udf.register("slice", slice)
-//
-//    val result1 = sqlContext.sql("select slice(t, 0, 2) from ttt")
-//    result1.show
-//
-//  }
-//
-//  test ("accu sql") {
-////    val file1 =
-//  }
-//
-//}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/test/scala/org/apache/griffin/measure/utils/JsonUtilTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/utils/JsonUtilTest.scala b/measure/src/test/scala/org/apache/griffin/measure/utils/JsonUtilTest.scala
deleted file mode 100644
index 233d78c..0000000
--- a/measure/src/test/scala/org/apache/griffin/measure/utils/JsonUtilTest.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements.  See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership.  The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License.  You may obtain a copy of the License at
-//
-//  http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied.  See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//package org.apache.griffin.measure.utils
-//
-//import org.junit.runner.RunWith
-//import org.scalatest.junit.JUnitRunner
-//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-//
-//
-//@RunWith(classOf[JUnitRunner])
-//class JsonUtilTest extends FunSuite with Matchers with BeforeAndAfter {
-//
-//  val map = Map[String, Any](("name" -> "test"), ("age" -> 15))
-//  val json = """{"name":"test","age":15}"""
-//
-//  val person = JsonUtilTest.Person("test", 15)
-//
-//  test ("toJson 1") {
-//    val symbolMap = map.map(p => (Symbol(p._1), p._2))
-//    JsonUtil.toJson(symbolMap) should equal (json)
-//  }
-//
-//  test ("toJson 2") {
-//    JsonUtil.toJson(map) should equal (json)
-//  }
-//
-//  test ("toMap") {
-//    JsonUtil.toMap(json) should equal (map)
-//  }
-//
-//  test ("fromJson 1") {
-//    JsonUtil.fromJson[JsonUtilTest.Person](json) should equal (person)
-//  }
-//
-//  test ("fromJson 2") {
-//    val is = new java.io.ByteArrayInputStream(json.getBytes("utf-8"));
-//    JsonUtil.fromJson[JsonUtilTest.Person](is) should equal (person)
-//  }
-//
-//}
-//
-//object JsonUtilTest {
-//  case class Person(name: String, age: Int){}
-//}


[2/2] incubator-griffin git commit: remove some unused files

Posted by gu...@apache.org.
remove some unused files

Author: Lionel Liu <bh...@163.com>

Closes #152 from bhlx3lyx7/master.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/1f984da1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/1f984da1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/1f984da1

Branch: refs/heads/master
Commit: 1f984da1aea86e8be507db37f426b5e28d0d81e8
Parents: 0dd1d35
Author: Lionel Liu <bh...@163.com>
Authored: Mon Oct 30 16:08:14 2017 +0800
Committer: Lionel Liu <bh...@163.com>
Committed: Mon Oct 30 16:08:14 2017 +0800

----------------------------------------------------------------------
 .../batch/KafkaCacheDirectDataConnector.scala   | 125 -----
 .../StreamingCacheDirectDataConnector.scala     |  60 ---
 .../connector/cache/CacheDataConnector.scala    |  33 --
 .../data/connector/cache/DataCacheable.scala    |  86 ---
 .../data/connector/cache/DataUpdatable.scala    |  30 --
 .../cache/HiveCacheDataConnector.scala          | 351 ------------
 .../cache/TextCacheDataConnector.scala          | 311 -----------
 .../measure/persist/OldHttpPersist.scala        |  87 ---
 .../apache/griffin/measure/process/Algo.scala   |  34 --
 measure/src/test/resources/test-data.jsonFile   |   3 -
 measure/src/test/resources/test-data1.jsonFile  |  31 --
 .../measure/cache/InfoCacheInstanceTest.scala   |  78 ---
 .../griffin/measure/cache/ZKCacheLockTest.scala |  84 ---
 .../griffin/measure/cache/ZKInfoCacheTest.scala |  90 ----
 .../measure/process/BatchProcessTest.scala      | 146 -----
 .../griffin/measure/process/JsonParseTest.scala | 531 -------------------
 .../griffin/measure/process/JsonToStructs.scala |  85 ---
 .../measure/process/StreamingProcessTest.scala  | 147 -----
 .../apache/griffin/measure/sql/SqlTest.scala    | 125 -----
 .../griffin/measure/utils/JsonUtilTest.scala    |  60 ---
 20 files changed, 2497 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/KafkaCacheDirectDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/KafkaCacheDirectDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/KafkaCacheDirectDataConnector.scala
deleted file mode 100644
index 70ddcde..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/KafkaCacheDirectDataConnector.scala
+++ /dev/null
@@ -1,125 +0,0 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements.  See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership.  The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License.  You may obtain a copy of the License at
-//
-//  http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied.  See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//package org.apache.griffin.measure.data.connector.direct
-//
-//import org.apache.griffin.measure.config.params.user.DataConnectorParam
-//import org.apache.griffin.measure.data.connector.DataConnectorFactory
-//import org.apache.griffin.measure.data.connector.cache.CacheDataConnector
-//import org.apache.griffin.measure.data.connector.streaming.StreamingDataConnector
-//import org.apache.griffin.measure.result._
-//import org.apache.griffin.measure.rule._
-//import org.apache.spark.rdd.RDD
-//import org.apache.spark.sql.SQLContext
-//import org.apache.spark.streaming.StreamingContext
-//
-//import scala.util.{Failure, Success, Try}
-//
-//case class KafkaCacheDirectDataConnector(@transient streamingDataConnectorTry: Try[StreamingDataConnector],
-//                                         cacheDataConnectorTry: Try[CacheDataConnector],
-//                                         dataConnectorParam: DataConnectorParam,
-//                                         ruleExprs: RuleExprs,
-//                                         constFinalExprValueMap: Map[String, Any]
-//                                        ) extends StreamingCacheDirectDataConnector {
-//
-//  val cacheDataConnector: CacheDataConnector = cacheDataConnectorTry match {
-//    case Success(cntr) => cntr
-//    case Failure(ex) => throw ex
-//  }
-//  @transient val streamingDataConnector: StreamingDataConnector = streamingDataConnectorTry match {
-//    case Success(cntr) => cntr
-//    case Failure(ex) => throw ex
-//  }
-//
-//  protected def transform(rdd: RDD[(streamingDataConnector.K, streamingDataConnector.V)],
-//                          ms: Long
-//                         ): RDD[Map[String, Any]] = {
-//    val dataInfoMap = DataInfo.cacheInfoList.map(_.defWrap).toMap + TimeStampInfo.wrap(ms)
-//
-//    rdd.flatMap { kv =>
-//      val msg = kv._2
-//
-//      val cacheExprValueMaps = ExprValueUtil.genExprValueMaps(Some(msg), ruleExprs.cacheExprs, constFinalExprValueMap)
-//      val finalExprValueMaps = ExprValueUtil.updateExprValueMaps(ruleExprs.finalCacheExprs, cacheExprValueMaps)
-//
-//      finalExprValueMaps.map { vm =>
-//        vm ++ dataInfoMap
-//      }
-//    }
-//  }
-//
-//  def metaData(): Try[Iterable[(String, String)]] = Try {
-//    Map.empty[String, String]
-//  }
-//
-//  def data(): Try[RDD[(Product, (Map[String, Any], Map[String, Any]))]] = Try {
-//    cacheDataConnector.readData match {
-//      case Success(rdd) => {
-//        rdd.flatMap { row =>
-//          val finalExprValueMap = ruleExprs.finalCacheExprs.flatMap { expr =>
-//            row.get(expr._id).flatMap { d =>
-//              Some((expr._id, d))
-//            }
-//          }.toMap
-//
-//          val dataInfoMap: Map[String, Any] = DataInfo.cacheInfoList.map { info =>
-//            row.get(info.key) match {
-//              case Some(d) => (info.key -> d)
-//              case _ => info.defWrap
-//            }
-//          }.toMap
-//
-//          val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { expr =>
-//            expr.calculate(finalExprValueMap) match {
-//              case Some(v) => Some(v.asInstanceOf[AnyRef])
-//              case _ => None
-//            }
-//          }
-//          val key = toTuple(groupbyData)
-//
-//          Some((key, (finalExprValueMap, dataInfoMap)))
-//        }
-//      }
-//      case Failure(ex) => throw ex
-//    }
-//  }
-//
-//  override def cleanOldData(): Unit = {
-//    cacheDataConnector.cleanOldData
-//  }
-//
-//  override def updateOldData(t: Long, oldData: Iterable[Map[String, Any]]): Unit = {
-//    if (dataConnectorParam.getMatchOnce) {
-//      cacheDataConnector.updateOldData(t, oldData)
-//    }
-//  }
-//
-//  override def updateAllOldData(oldRdd: RDD[Map[String, Any]]): Unit = {
-//    if (dataConnectorParam.getMatchOnce) {
-//      cacheDataConnector.updateAllOldData(oldRdd)
-//    }
-//  }
-//
-//  private def toTuple[A <: AnyRef](as: Seq[A]): Product = {
-//    if (as.size > 0) {
-//      val tupleClass = Class.forName("scala.Tuple" + as.size)
-//      tupleClass.getConstructors.apply(0).newInstance(as: _*).asInstanceOf[Product]
-//    } else None
-//  }
-//
-//}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/StreamingCacheDirectDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/StreamingCacheDirectDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/StreamingCacheDirectDataConnector.scala
deleted file mode 100644
index dddf430..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/StreamingCacheDirectDataConnector.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements.  See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership.  The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License.  You may obtain a copy of the License at
-//
-//  http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied.  See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//package org.apache.griffin.measure.data.connector.direct
-//
-//import org.apache.griffin.measure.data.connector.cache.CacheDataConnector
-//import org.apache.griffin.measure.data.connector.streaming.StreamingDataConnector
-//import org.apache.griffin.measure.result.{DataInfo, TimeStampInfo}
-//import org.apache.griffin.measure.rule.ExprValueUtil
-//import org.apache.spark.rdd.RDD
-//
-//import scala.util.{Failure, Success}
-//
-//trait StreamingCacheDirectDataConnector extends DirectDataConnector {
-//
-//  val cacheDataConnector: CacheDataConnector
-//  @transient val streamingDataConnector: StreamingDataConnector
-//
-//  def available(): Boolean = {
-//    cacheDataConnector.available && streamingDataConnector.available
-//  }
-//
-//  def init(): Unit = {
-//    cacheDataConnector.init
-//
-//    val ds = streamingDataConnector.stream match {
-//      case Success(dstream) => dstream
-//      case Failure(ex) => throw ex
-//    }
-//
-//    ds.foreachRDD((rdd, time) => {
-//      val ms = time.milliseconds
-//
-//      val valueMapRdd = transform(rdd, ms)
-//
-//      // save data frame
-//      cacheDataConnector.saveData(valueMapRdd, ms)
-//    })
-//  }
-//
-//  protected def transform(rdd: RDD[(streamingDataConnector.K, streamingDataConnector.V)],
-//                          ms: Long
-//                         ): RDD[Map[String, Any]]
-//
-//}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/CacheDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/CacheDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/CacheDataConnector.scala
deleted file mode 100644
index 67dcc06..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/CacheDataConnector.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements.  See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership.  The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License.  You may obtain a copy of the License at
-//
-//  http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied.  See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//package org.apache.griffin.measure.data.connector.cache
-//
-//import org.apache.griffin.measure.data.connector.DataConnector
-//import org.apache.spark.rdd.RDD
-//
-//import scala.util.Try
-//
-//trait CacheDataConnector extends DataConnector with DataCacheable with DataUpdatable {
-//
-//  def saveData(rdd: RDD[Map[String, Any]], ms: Long): Unit
-//
-//  def readData(): Try[RDD[Map[String, Any]]]
-//
-//}
-//

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/DataCacheable.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/DataCacheable.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/DataCacheable.scala
deleted file mode 100644
index 79162be..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/DataCacheable.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements.  See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership.  The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License.  You may obtain a copy of the License at
-//
-//  http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied.  See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//package org.apache.griffin.measure.data.connector.cache
-//
-//import java.util.concurrent.atomic.AtomicLong
-//
-//import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache}
-//
-//trait DataCacheable {
-//
-//  protected val defCacheInfoPath = PathCounter.genPath
-//
-//  val cacheInfoPath: String
-//  val readyTimeInterval: Long
-//  val readyTimeDelay: Long
-//
-//  def selfCacheInfoPath = s"${TimeInfoCache.infoPath}/${cacheInfoPath}"
-//
-//  def selfCacheTime = TimeInfoCache.cacheTime(selfCacheInfoPath)
-//  def selfLastProcTime = TimeInfoCache.lastProcTime(selfCacheInfoPath)
-//  def selfReadyTime = TimeInfoCache.readyTime(selfCacheInfoPath)
-//  def selfCleanTime = TimeInfoCache.cleanTime(selfCacheInfoPath)
-//
-//  protected def submitCacheTime(ms: Long): Unit = {
-//    val map = Map[String, String]((selfCacheTime -> ms.toString))
-//    InfoCacheInstance.cacheInfo(map)
-//  }
-//
-//  protected def submitReadyTime(ms: Long): Unit = {
-//    val curReadyTime = ms - readyTimeDelay
-//    if (curReadyTime % readyTimeInterval == 0) {
-//      val map = Map[String, String]((selfReadyTime -> curReadyTime.toString))
-//      InfoCacheInstance.cacheInfo(map)
-//    }
-//  }
-//
-//  protected def submitLastProcTime(ms: Long): Unit = {
-//    val map = Map[String, String]((selfLastProcTime -> ms.toString))
-//    InfoCacheInstance.cacheInfo(map)
-//  }
-//
-//  protected def submitCleanTime(ms: Long): Unit = {
-//    val cleanTime = genCleanTime(ms)
-//    val map = Map[String, String]((selfCleanTime -> cleanTime.toString))
-//    InfoCacheInstance.cacheInfo(map)
-//  }
-//
-//  protected def genCleanTime(ms: Long): Long = ms
-//
-//  protected def readCleanTime(): Option[Long] = {
-//    val key = selfCleanTime
-//    val keys = key :: Nil
-//    InfoCacheInstance.readInfo(keys).get(key).flatMap { v =>
-//      try {
-//        Some(v.toLong)
-//      } catch {
-//        case _ => None
-//      }
-//    }
-//  }
-//
-//}
-//
-//object PathCounter {
-//  private val counter: AtomicLong = new AtomicLong(0L)
-//  def genPath(): String = s"path_${increment}"
-//  private def increment(): Long = {
-//    counter.incrementAndGet()
-//  }
-//}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/DataUpdatable.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/DataUpdatable.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/DataUpdatable.scala
deleted file mode 100644
index 61e8413..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/DataUpdatable.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements.  See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership.  The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License.  You may obtain a copy of the License at
-//
-//  http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied.  See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//package org.apache.griffin.measure.data.connector.cache
-//
-//import org.apache.spark.rdd.RDD
-//
-//trait DataUpdatable {
-//
-//  def cleanOldData(): Unit = {}
-//
-//  def updateOldData(t: Long, oldData: Iterable[Map[String, Any]]): Unit = {}
-//  def updateAllOldData(oldRdd: RDD[Map[String, Any]]): Unit = {}
-//
-//}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/HiveCacheDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/HiveCacheDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/HiveCacheDataConnector.scala
deleted file mode 100644
index 4c7b45b..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/HiveCacheDataConnector.scala
+++ /dev/null
@@ -1,351 +0,0 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements.  See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership.  The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License.  You may obtain a copy of the License at
-//
-//  http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied.  See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//package org.apache.griffin.measure.data.connector.cache
-//
-//import java.util.concurrent.TimeUnit
-//
-//import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache}
-//import org.apache.griffin.measure.config.params.user.DataCacheParam
-//import org.apache.griffin.measure.result.TimeStampInfo
-//import org.apache.griffin.measure.utils.{HdfsFileDumpUtil, HdfsUtil, JsonUtil, TimeUtil}
-//import org.apache.spark.rdd.RDD
-//import org.apache.spark.sql.SQLContext
-//import org.apache.spark.sql.hive.HiveContext
-//
-//import scala.util.{Success, Try}
-//
-//case class HiveCacheDataConnector(sqlContext: SQLContext, dataCacheParam: DataCacheParam
-//                                 ) extends CacheDataConnector {
-//
-//  if (!sqlContext.isInstanceOf[HiveContext]) {
-//    throw new Exception("hive context not prepared!")
-//  }
-//
-//  val config = dataCacheParam.config
-//  val InfoPath = "info.path"
-//  val cacheInfoPath: String = config.getOrElse(InfoPath, defCacheInfoPath).toString
-//
-//  val newCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.new")
-//  val oldCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.old")
-//
-//  val timeRangeParam: List[String] = if (dataCacheParam.timeRange != null) dataCacheParam.timeRange else Nil
-//  val deltaTimeRange: (Long, Long) = (timeRangeParam ::: List("0", "0")) match {
-//    case s :: e :: _ => {
-//      val ns = TimeUtil.milliseconds(s) match {
-//        case Some(n) if (n < 0) => n
-//        case _ => 0
-//      }
-//      val ne = TimeUtil.milliseconds(e) match {
-//        case Some(n) if (n < 0) => n
-//        case _ => 0
-//      }
-//      (ns, ne)
-//    }
-//    case _ => (0, 0)
-//  }
-//
-//  val Database = "database"
-//  val database: String = config.getOrElse(Database, "").toString
-//  val TableName = "table.name"
-//  val tableName: String = config.get(TableName) match {
-//    case Some(s: String) if (s.nonEmpty) => s
-//    case _ => throw new Exception("invalid table.name!")
-//  }
-//  val ParentPath = "parent.path"
-//  val parentPath: String = config.get(ParentPath) match {
-//    case Some(s: String) => s
-//    case _ => throw new Exception("invalid parent.path!")
-//  }
-//  val tablePath = HdfsUtil.getHdfsFilePath(parentPath, tableName)
-//
-//  val concreteTableName = if (dbPrefix) s"${database}.${tableName}" else tableName
-//
-//  val ReadyTimeInterval = "ready.time.interval"
-//  val ReadyTimeDelay = "ready.time.delay"
-//  val readyTimeInterval: Long = TimeUtil.milliseconds(config.getOrElse(ReadyTimeInterval, "1m").toString).getOrElse(60000L)
-//  val readyTimeDelay: Long = TimeUtil.milliseconds(config.getOrElse(ReadyTimeDelay, "1m").toString).getOrElse(60000L)
-//
-//  val TimeStampColumn: String = TimeStampInfo.key
-//  val PayloadColumn: String = "payload"
-//
-////  type Schema = (Long, String)
-//  val schema: List[(String, String)] = List(
-//    (TimeStampColumn, "bigint"),
-//    (PayloadColumn, "string")
-//  )
-//  val schemaName = schema.map(_._1)
-//
-////  type Partition = (Long, Long)
-//  val partition: List[(String, String, String)] = List(
-//    ("hr", "bigint", "hour"),
-//    ("min", "bigint", "min")
-//  )
-//  val partitionName = partition.map(_._1)
-//
-//  private val fieldSep = """|"""
-//  private val rowSep = """\n"""
-//  private val rowSepLiteral = "\n"
-//
-//  private def dbPrefix(): Boolean = {
-//    database.nonEmpty && !database.equals("default")
-//  }
-//
-//  private def tableExists(): Boolean = {
-//    Try {
-//      if (dbPrefix) {
-//        sqlContext.tables(database).filter(tableExistsSql).collect.size
-//      } else {
-//        sqlContext.tables().filter(tableExistsSql).collect.size
-//      }
-//    } match {
-//      case Success(s) => s > 0
-//      case _ => false
-//    }
-//  }
-//
-//  override def init(): Unit = {
-//    try {
-//      if (tableExists) {
-//        // drop exist table
-//        val dropSql = s"""DROP TABLE ${concreteTableName}"""
-//        sqlContext.sql(dropSql)
-//      }
-//
-//      val colsSql = schema.map { field =>
-//        s"`${field._1}` ${field._2}"
-//      }.mkString(", ")
-//      val partitionsSql = partition.map { partition =>
-//        s"`${partition._1}` ${partition._2}"
-//      }.mkString(", ")
-//      val sql = s"""CREATE EXTERNAL TABLE IF NOT EXISTS ${concreteTableName}
-//                    |(${colsSql}) PARTITIONED BY (${partitionsSql})
-//                    |ROW FORMAT DELIMITED
-//                    |FIELDS TERMINATED BY '${fieldSep}'
-//                    |LINES TERMINATED BY '${rowSep}'
-//                    |STORED AS TEXTFILE
-//                    |LOCATION '${tablePath}'""".stripMargin
-//      sqlContext.sql(sql)
-//    } catch {
-//      case e: Throwable => throw e
-//    }
-//  }
-//
-//  def available(): Boolean = {
-//    true
-//  }
-//
-//  private def encode(data: Map[String, Any], ms: Long): Option[List[Any]] = {
-//    try {
-//      Some(schema.map { field =>
-//        val (name, _) = field
-//        name match {
-//          case TimeStampColumn => ms
-//          case PayloadColumn => JsonUtil.toJson(data)
-//          case _ => null
-//        }
-//      })
-//    } catch {
-//      case _ => None
-//    }
-//  }
-//
-//  private def decode(data: List[Any], updateTimeStamp: Boolean): Option[Map[String, Any]] = {
-//    val dataMap = schemaName.zip(data).toMap
-//    dataMap.get(PayloadColumn) match {
-//      case Some(v: String) => {
-//        try {
-//          val map = JsonUtil.toAnyMap(v)
-//          val resMap = if (updateTimeStamp) {
-//            dataMap.get(TimeStampColumn) match {
-//              case Some(t) => map + (TimeStampColumn -> t)
-//              case _ => map
-//            }
-//          } else map
-//          Some(resMap)
-//        } catch {
-//          case _ => None
-//        }
-//      }
-//      case _ => None
-//    }
-//  }
-//
-//  def saveData(rdd: RDD[Map[String, Any]], ms: Long): Unit = {
-//    val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
-//    if (newCacheLocked) {
-//      try {
-//        val ptns = getPartition(ms)
-//        val ptnsPath = genPartitionHdfsPath(ptns)
-//        val dirPath = s"${tablePath}/${ptnsPath}"
-//        val fileName = s"${ms}"
-//        val filePath = HdfsUtil.getHdfsFilePath(dirPath, fileName)
-//
-//        // encode data
-//        val dataRdd: RDD[List[Any]] = rdd.flatMap(encode(_, ms))
-//
-//        // save data
-//        val recordRdd: RDD[String] = dataRdd.map { dt =>
-//          dt.map(_.toString).mkString(fieldSep)
-//        }
-//
-//        val dumped = if (!recordRdd.isEmpty) {
-//          HdfsFileDumpUtil.dump(filePath, recordRdd, rowSepLiteral)
-//        } else false
-//
-//        // add partition
-//        if (dumped) {
-//          val sql = addPartitionSql(concreteTableName, ptns)
-//          sqlContext.sql(sql)
-//        }
-//
-//        // submit ms
-//        submitCacheTime(ms)
-//        submitReadyTime(ms)
-//      } catch {
-//        case e: Throwable => error(s"save data error: ${e.getMessage}")
-//      } finally {
-//        newCacheLock.unlock()
-//      }
-//    }
-//  }
-//
-//  def readData(): Try[RDD[Map[String, Any]]] = Try {
-//    val timeRange = TimeInfoCache.getTimeRange
-//    submitLastProcTime(timeRange._2)
-//
-//    val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + deltaTimeRange._2)
-//    submitCleanTime(reviseTimeRange._1)
-//
-//    // read directly through partition info
-//    val partitionRange = getPartitionRange(reviseTimeRange._1, reviseTimeRange._2)
-//    val sql = selectSql(concreteTableName, partitionRange)
-//    val df = sqlContext.sql(sql)
-//
-//    // decode data
-//    df.flatMap { row =>
-//      val dt = schemaName.map { sn =>
-//        row.getAs[Any](sn)
-//      }
-//      decode(dt, true)
-//    }
-//  }
-//
-//  override def cleanOldData(): Unit = {
-//    val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
-//    if (oldCacheLocked) {
-//      try {
-//        val cleanTime = readCleanTime()
-//        cleanTime match {
-//          case Some(ct) => {
-//            // drop partition
-//            val bound = getPartition(ct)
-//            val sql = dropPartitionSql(concreteTableName, bound)
-//            sqlContext.sql(sql)
-//          }
-//          case _ => {
-//            // do nothing
-//          }
-//        }
-//      } catch {
-//        case e: Throwable => error(s"clean old data error: ${e.getMessage}")
-//      } finally {
-//        oldCacheLock.unlock()
-//      }
-//    }
-//  }
-//
-//  override def updateOldData(t: Long, oldData: Iterable[Map[String, Any]]): Unit = {
-//    // parallel process different time groups, lock is unnecessary
-//    val ptns = getPartition(t)
-//    val ptnsPath = genPartitionHdfsPath(ptns)
-//    val dirPath = s"${tablePath}/${ptnsPath}"
-//    val fileName = s"${t}"
-//    val filePath = HdfsUtil.getHdfsFilePath(dirPath, fileName)
-//
-//    try {
-//      // remove out time old data
-//      HdfsFileDumpUtil.remove(dirPath, fileName, true)
-//
-//      // save updated old data
-//      if (oldData.size > 0) {
-//        val recordDatas = oldData.flatMap { dt =>
-//          encode(dt, t)
-//        }
-//        val records: Iterable[String] = recordDatas.map { dt =>
-//          dt.map(_.toString).mkString(fieldSep)
-//        }
-//        val dumped = HdfsFileDumpUtil.dump(filePath, records, rowSepLiteral)
-//      }
-//    } catch {
-//      case e: Throwable => error(s"update old data error: ${e.getMessage}")
-//    }
-//  }
-//
-//  override protected def genCleanTime(ms: Long): Long = {
-//    val minPartition = partition.last
-//    val t1 = TimeUtil.timeToUnit(ms, minPartition._3)
-//    val t2 = TimeUtil.timeFromUnit(t1, minPartition._3)
-//    t2
-//  }
-//
-//  private def getPartition(ms: Long): List[(String, Any)] = {
-//    partition.map { p =>
-//      val (name, _, unit) = p
-//      val t = TimeUtil.timeToUnit(ms, unit)
-//      (name, t)
-//    }
-//  }
-//  private def getPartitionRange(ms1: Long, ms2: Long): List[(String, (Any, Any))] = {
-//    partition.map { p =>
-//      val (name, _, unit) = p
-//      val t1 = TimeUtil.timeToUnit(ms1, unit)
-//      val t2 = TimeUtil.timeToUnit(ms2, unit)
-//      (name, (t1, t2))
-//    }
-//  }
-//
-//  private def genPartitionHdfsPath(partition: List[(String, Any)]): String = {
-//    partition.map(prtn => s"${prtn._1}=${prtn._2}").mkString("/")
-//  }
-//  private def addPartitionSql(tbn: String, partition: List[(String, Any)]): String = {
-//    val partitionSql = partition.map(ptn => (s"`${ptn._1}` = ${ptn._2}")).mkString(", ")
-//    val sql = s"""ALTER TABLE ${tbn} ADD IF NOT EXISTS PARTITION (${partitionSql})"""
-//    sql
-//  }
-//  private def selectSql(tbn: String, partitionRange: List[(String, (Any, Any))]): String = {
-//    val clause = partitionRange.map { pr =>
-//      val (name, (r1, r2)) = pr
-//      s"""`${name}` BETWEEN '${r1}' and '${r2}'"""
-//    }.mkString(" AND ")
-//    val whereClause = if (clause.nonEmpty) s"WHERE ${clause}" else ""
-//    val sql = s"""SELECT * FROM ${tbn} ${whereClause}"""
-//    sql
-//  }
-//  private def dropPartitionSql(tbn: String, partition: List[(String, Any)]): String = {
-//    val partitionSql = partition.map(ptn => (s"PARTITION ( `${ptn._1}` < '${ptn._2}' ) ")).mkString(", ")
-//    val sql = s"""ALTER TABLE ${tbn} DROP ${partitionSql}"""
-//    println(sql)
-//    sql
-//  }
-//
-//  private def tableExistsSql(): String = {
-//    s"tableName LIKE '${tableName}'"
-//  }
-//
-//}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/TextCacheDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/TextCacheDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/TextCacheDataConnector.scala
deleted file mode 100644
index 0daf2d9..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/TextCacheDataConnector.scala
+++ /dev/null
@@ -1,311 +0,0 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements.  See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership.  The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License.  You may obtain a copy of the License at
-//
-//  http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied.  See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//package org.apache.griffin.measure.data.connector.cache
-//
-//import java.util.concurrent.TimeUnit
-//
-//import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache}
-//import org.apache.griffin.measure.config.params.user.DataCacheParam
-//import org.apache.griffin.measure.result.TimeStampInfo
-//import org.apache.griffin.measure.utils.{HdfsFileDumpUtil, HdfsUtil, JsonUtil, TimeUtil}
-//import org.apache.spark.rdd.RDD
-//import org.apache.spark.sql.SQLContext
-//
-//import scala.util.Try
-//
-//case class TextCacheDataConnector(sqlContext: SQLContext, dataCacheParam: DataCacheParam
-//                                 ) extends CacheDataConnector {
-//
-//  val config = dataCacheParam.config
-//  val InfoPath = "info.path"
-//  val cacheInfoPath: String = config.getOrElse(InfoPath, defCacheInfoPath).toString
-//
-//  val newCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.new")
-//  val oldCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.old")
-//
-//  val timeRangeParam: List[String] = if (dataCacheParam.timeRange != null) dataCacheParam.timeRange else Nil
-//  val deltaTimeRange: (Long, Long) = (timeRangeParam ::: List("0", "0")) match {
-//    case s :: e :: _ => {
-//      val ns = TimeUtil.milliseconds(s) match {
-//        case Some(n) if (n < 0) => n
-//        case _ => 0
-//      }
-//      val ne = TimeUtil.milliseconds(e) match {
-//        case Some(n) if (n < 0) => n
-//        case _ => 0
-//      }
-//      (ns, ne)
-//    }
-//    case _ => (0, 0)
-//  }
-//
-//  val FilePath = "file.path"
-//  val filePath: String = config.get(FilePath) match {
-//    case Some(s: String) => s
-//    case _ => throw new Exception("invalid file.path!")
-//  }
-//
-//  val ReadyTimeInterval = "ready.time.interval"
-//  val ReadyTimeDelay = "ready.time.delay"
-//  val readyTimeInterval: Long = TimeUtil.milliseconds(config.getOrElse(ReadyTimeInterval, "1m").toString).getOrElse(60000L)
-//  val readyTimeDelay: Long = TimeUtil.milliseconds(config.getOrElse(ReadyTimeDelay, "1m").toString).getOrElse(60000L)
-//
-////  val TimeStampColumn: String = TimeStampInfo.key
-////  val PayloadColumn: String = "payload"
-//
-//  // cache schema: Long, String
-////  val fields = List[StructField](
-////    StructField(TimeStampColumn, LongType),
-////    StructField(PayloadColumn, StringType)
-////  )
-////  val schema = StructType(fields)
-//
-//  //  case class CacheData(time: Long, payload: String) {
-//  //    def getTime(): Long = time
-//  //    def getPayload(): String = payload
-//  //  }
-//
-//  private val rowSepLiteral = "\n"
-//
-//  val partitionUnits: List[String] = List("hour", "min")
-//
-//  override def init(): Unit = {
-//    // do nothing
-//  }
-//
-//  def available(): Boolean = {
-//    true
-//  }
-//
-//  private def encode(data: Map[String, Any], ms: Long): Option[String] = {
-//    try {
-//      val map = data + (TimeStampInfo.key -> ms)
-//      Some(JsonUtil.toJson(map))
-//    } catch {
-//      case _: Throwable => None
-//    }
-//  }
-//
-//  private def decode(data: String): Option[Map[String, Any]] = {
-//    try {
-//      Some(JsonUtil.toAnyMap(data))
-//    } catch {
-//      case _: Throwable => None
-//    }
-//  }
-//
-//  def saveData(rdd: RDD[Map[String, Any]], ms: Long): Unit = {
-//    val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
-//    if (newCacheLocked) {
-//      try {
-//        val ptns = getPartition(ms)
-//        val ptnsPath = genPartitionHdfsPath(ptns)
-//        val dirPath = s"${filePath}/${ptnsPath}"
-//        val dataFileName = s"${ms}"
-//        val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName)
-//
-//        // encode data
-//        val dataRdd: RDD[String] = rdd.flatMap(encode(_, ms))
-//
-//        // save data
-//        val dumped = if (!dataRdd.isEmpty) {
-//          HdfsFileDumpUtil.dump(dataFilePath, dataRdd, rowSepLiteral)
-//        } else false
-//
-//        // submit ms
-//        submitCacheTime(ms)
-//        submitReadyTime(ms)
-//      } catch {
-//        case e: Throwable => error(s"save data error: ${e.getMessage}")
-//      } finally {
-//        newCacheLock.unlock()
-//      }
-//    }
-//  }
-//
-//  def readData(): Try[RDD[Map[String, Any]]] = Try {
-//    val timeRange = TimeInfoCache.getTimeRange
-//    submitLastProcTime(timeRange._2)
-//
-//    val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + deltaTimeRange._2)
-//    submitCleanTime(reviseTimeRange._1)
-//
-//    // read directly through partition info
-//    val partitionRanges = getPartitionRange(reviseTimeRange._1, reviseTimeRange._2)
-//    println(s"read time ranges: ${reviseTimeRange}")
-//    println(s"read partition ranges: ${partitionRanges}")
-//
-//    // list partition paths
-//    val partitionPaths = listPathsBetweenRanges(filePath :: Nil, partitionRanges)
-//
-//    if (partitionPaths.isEmpty) {
-//      sqlContext.sparkContext.emptyRDD[Map[String, Any]]
-//    } else {
-//      val filePaths = partitionPaths.mkString(",")
-//      val rdd = sqlContext.sparkContext.textFile(filePaths)
-//
-//      // decode data
-//      rdd.flatMap { row =>
-//        decode(row)
-//      }
-//    }
-//  }
-//
-//  override def cleanOldData(): Unit = {
-//    val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
-//    if (oldCacheLocked) {
-//      try {
-//        val cleanTime = readCleanTime()
-//        cleanTime match {
-//          case Some(ct) => {
-//            // drop partitions
-//            val bounds = getPartition(ct)
-//
-//            // list partition paths
-//            val earlierPaths = listPathsEarlierThanBounds(filePath :: Nil, bounds)
-//
-//            // delete out time data path
-//            earlierPaths.foreach { path =>
-//              println(s"delete hdfs path: ${path}")
-//              HdfsUtil.deleteHdfsPath(path)
-//            }
-//          }
-//          case _ => {
-//            // do nothing
-//          }
-//        }
-//      } catch {
-//        case e: Throwable => error(s"clean old data error: ${e.getMessage}")
-//      } finally {
-//        oldCacheLock.unlock()
-//      }
-//    }
-//  }
-//
-//  override def updateOldData(t: Long, oldData: Iterable[Map[String, Any]]): Unit = {
-//    // parallel process different time groups, lock is unnecessary
-//    val ptns = getPartition(t)
-//    val ptnsPath = genPartitionHdfsPath(ptns)
-//    val dirPath = s"${filePath}/${ptnsPath}"
-//    val dataFileName = s"${t}"
-//    val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName)
-//
-//    try {
-//      // remove out time old data
-//      HdfsFileDumpUtil.remove(dirPath, dataFileName, true)
-//
-//      // save updated old data
-//      if (oldData.size > 0) {
-//        val recordDatas = oldData.flatMap { dt =>
-//          encode(dt, t)
-//        }
-//        val dumped = HdfsFileDumpUtil.dump(dataFilePath, recordDatas, rowSepLiteral)
-//      }
-//    } catch {
-//      case e: Throwable => error(s"update old data error: ${e.getMessage}")
-//    }
-//  }
-//
-//  override protected def genCleanTime(ms: Long): Long = {
-//    val minPartitionUnit = partitionUnits.last
-//    val t1 = TimeUtil.timeToUnit(ms, minPartitionUnit)
-//    val t2 = TimeUtil.timeFromUnit(t1, minPartitionUnit)
-//    t2
-//  }
-//
-//  private def getPartition(ms: Long): List[Long] = {
-//    partitionUnits.map { unit =>
-//      TimeUtil.timeToUnit(ms, unit)
-//    }
-//  }
-//  private def getPartitionRange(ms1: Long, ms2: Long): List[(Long, Long)] = {
-//    partitionUnits.map { unit =>
-//      val t1 = TimeUtil.timeToUnit(ms1, unit)
-//      val t2 = TimeUtil.timeToUnit(ms2, unit)
-//      (t1, t2)
-//    }
-//  }
-//
-//  private def genPartitionHdfsPath(partition: List[Long]): String = {
-//    partition.map(prtn => s"${prtn}").mkString("/")
-//  }
-//
-//  private def str2Long(str: String): Option[Long] = {
-//    try {
-//      Some(str.toLong)
-//    } catch {
-//      case e: Throwable => None
-//    }
-//  }
-//
-//  // here the range means [min, max], but the best range should be (min, max]
-//  private def listPathsBetweenRanges(paths: List[String],
-//                                     partitionRanges: List[(Long, Long)]
-//                                    ): List[String] = {
-//    partitionRanges match {
-//      case Nil => paths
-//      case head :: tail => {
-//        val (lb, ub) = head
-//        val curPaths = paths.flatMap { path =>
-//          val names = HdfsUtil.listSubPaths(path, "dir").toList
-//          names.filter { name =>
-//            str2Long(name) match {
-//              case Some(t) => (t >= lb) && (t <= ub)
-//              case _ => false
-//            }
-//          }.map(HdfsUtil.getHdfsFilePath(path, _))
-//        }
-//        listPathsBetweenRanges(curPaths, tail)
-//      }
-//    }
-//  }
-//
-//  private def listPathsEarlierThanBounds(paths: List[String], bounds: List[Long]
-//                                        ): List[String] = {
-//    bounds match {
-//      case Nil => paths
-//      case head :: tail => {
-//        val earlierPaths = paths.flatMap { path =>
-//          val names = HdfsUtil.listSubPaths(path, "dir").toList
-//          names.filter { name =>
-//            str2Long(name) match {
-//              case Some(t) => (t < head)
-//              case _ => false
-//            }
-//          }.map(HdfsUtil.getHdfsFilePath(path, _))
-//        }
-//        val equalPaths = paths.flatMap { path =>
-//          val names = HdfsUtil.listSubPaths(path, "dir").toList
-//          names.filter { name =>
-//            str2Long(name) match {
-//              case Some(t) => (t == head)
-//              case _ => false
-//            }
-//          }.map(HdfsUtil.getHdfsFilePath(path, _))
-//        }
-//
-//        tail match {
-//          case Nil => earlierPaths
-//          case _ => earlierPaths ::: listPathsEarlierThanBounds(equalPaths, tail)
-//        }
-//      }
-//    }
-//  }
-//
-//}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/main/scala/org/apache/griffin/measure/persist/OldHttpPersist.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/OldHttpPersist.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/OldHttpPersist.scala
deleted file mode 100644
index 84316b3..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/persist/OldHttpPersist.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements.  See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership.  The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License.  You may obtain a copy of the License at
-//
-//  http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied.  See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//package org.apache.griffin.measure.persist
-//
-//import org.apache.griffin.measure.result._
-//import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil}
-//import org.apache.spark.rdd.RDD
-//
-//// persist result by old http way -- temporary way
-//case class OldHttpPersist(config: Map[String, Any], metricName: String, timeStamp: Long) extends Persist {
-//
-//  val Api = "api"
-//  val Method = "method"
-//
-//  val api = config.getOrElse(Api, "").toString
-//  val method = config.getOrElse(Method, "post").toString
-//
-//  def available(): Boolean = {
-//    api.nonEmpty
-//  }
-//
-//  def start(msg: String): Unit = {}
-//  def finish(): Unit = {}
-//
-//  def result(rt: Long, result: Result): Unit = {
-//    result match {
-//      case ar: AccuracyResult => {
-//        val matchPercentage: Double = if (ar.getTotal <= 0) 0 else (ar.getMatch * 1.0 / ar.getTotal) * 100
-//        val dataMap = Map[String, Any](("metricName" -> metricName), ("timestamp" -> timeStamp), ("value" -> matchPercentage), ("count" -> ar.getTotal))
-//        httpResult(dataMap)
-//      }
-//      case pr: ProfileResult => {
-//        val dataMap = Map[String, Any](("metricName" -> metricName), ("timestamp" -> timeStamp), ("value" -> pr.getMatch), ("count" -> pr.getTotal))
-//        httpResult(dataMap)
-//      }
-//      case _ => {
-//        info(s"result: ${result}")
-//      }
-//    }
-//  }
-//
-//  private def httpResult(dataMap: Map[String, Any]) = {
-//    try {
-//      val data = JsonUtil.toJson(dataMap)
-//      // post
-//      val params = Map[String, Object]()
-//      val header = Map[String, Object](("content-type" -> "application/json"))
-//
-//      def func(): Boolean = {
-//        HttpUtil.httpRequest(api, method, params, header, data)
-//      }
-//
-//      PersistThreadPool.addTask(func _, 10)
-//
-////      val status = HttpUtil.httpRequest(api, method, params, header, data)
-////      info(s"${method} to ${api} response status: ${status}")
-//    } catch {
-//      case e: Throwable => error(e.getMessage)
-//    }
-//
-//  }
-//
-//  def records(recs: RDD[String], tp: String): Unit = {}
-//  def records(recs: Iterable[String], tp: String): Unit = {}
-//
-////  def missRecords(records: RDD[String]): Unit = {}
-////  def matchRecords(records: RDD[String]): Unit = {}
-//
-//  def log(rt: Long, msg: String): Unit = {}
-//
-//}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/main/scala/org/apache/griffin/measure/process/Algo.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/Algo.scala b/measure/src/main/scala/org/apache/griffin/measure/process/Algo.scala
deleted file mode 100644
index 7f1b153..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/process/Algo.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements.  See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership.  The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License.  You may obtain a copy of the License at
-//
-//  http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied.  See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//package org.apache.griffin.measure.algo
-//
-//import org.apache.griffin.measure.config.params.env._
-//import org.apache.griffin.measure.config.params.user._
-//import org.apache.griffin.measure.log.Loggable
-//
-//import scala.util.Try
-//
-//trait Algo extends Loggable with Serializable {
-//
-//  val envParam: EnvParam
-//  val userParam: UserParam
-//
-//  def run(): Try[_]
-//
-//}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/test/resources/test-data.jsonFile
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/test-data.jsonFile b/measure/src/test/resources/test-data.jsonFile
deleted file mode 100644
index 73707f4..0000000
--- a/measure/src/test/resources/test-data.jsonFile
+++ /dev/null
@@ -1,3 +0,0 @@
-{ "name": "emily", "age": 5, "map": { "a": 1, "b": 2 }, "list": [ { "c": 1, "d": 2 }, { "c": 3, "d": 4 } ], "t": [1, 2, 3] }
-{ "name": "white", "age": 15, "map": { "a": 11, "b": 12 }, "list": [ { "c": 11, "d": 2 }, { "c": 23, "d": 4 } ], "t": [1, 2, 3] }
-{ "name": "west", "age": 25, "map": { "a": 21, "b": 22 }, "list": [ { "c": 11, "d": 2 }, { "c": 23, "d": 4 } ], "t": [1, 2, 3] }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/test/resources/test-data1.jsonFile
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/test-data1.jsonFile b/measure/src/test/resources/test-data1.jsonFile
deleted file mode 100644
index 1e1f28a..0000000
--- a/measure/src/test/resources/test-data1.jsonFile
+++ /dev/null
@@ -1,31 +0,0 @@
-[{
-	"Year": "2013",
-	"First Name": "DAVID",
-	"County": "KINGS",
-	"Sex": "M",
-	"Count": "272"
-}, {
-	"Year": "2013",
-	"First Name": "JAYDEN",
-	"County": "KINGS",
-	"Sex": "M",
-	"Count": "268"
-}, {
-	"Year": "2013",
-	"First Name": "JAYDEN",
-	"County": "QUEENS",
-	"Sex": "M",
-	"Count": "219"
-}, {
-	"Year": "2013",
-	"First Name": "MOSHE",
-	"County": "KINGS",
-	"Sex": "M",
-	"Count": "219"
-}, {
-	"Year": "2013",
-	"First Name": "ETHAN",
-	"County": "QUEENS",
-	"Sex": "M",
-	"Count": "216"
-}]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/test/scala/org/apache/griffin/measure/cache/InfoCacheInstanceTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/cache/InfoCacheInstanceTest.scala b/measure/src/test/scala/org/apache/griffin/measure/cache/InfoCacheInstanceTest.scala
deleted file mode 100644
index fc42d43..0000000
--- a/measure/src/test/scala/org/apache/griffin/measure/cache/InfoCacheInstanceTest.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements.  See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership.  The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License.  You may obtain a copy of the License at
-//
-//  http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied.  See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//package org.apache.griffin.measure.cache
-//
-//import java.util.Date
-//import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit}
-//
-//import org.apache.curator.framework.recipes.locks.InterProcessMutex
-//import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
-//import org.apache.curator.retry.ExponentialBackoffRetry
-//import org.apache.griffin.measure.cache.info.InfoCacheInstance
-//import org.apache.griffin.measure.config.params.env.InfoCacheParam
-//import org.junit.runner.RunWith
-//import org.scalatest.junit.JUnitRunner
-//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-//
-//import scala.util.{Failure, Try}
-//
-//@RunWith(classOf[JUnitRunner])
-//class InfoCacheInstanceTest extends FunSuite with Matchers with BeforeAndAfter {
-//
-//  val map = Map[String, Any](
-//    ("hosts" -> "localhost:2181"),
-//    ("namespace" -> "griffin/infocache"),
-//    ("lock.path" -> "lock"),
-//    ("mode" -> "persist"),
-//    ("init.clear" -> true),
-//    ("close.clear" -> false)
-//  )
-//  val name = "ttt"
-//
-//  val icp = InfoCacheParam("zk", map)
-//  val icps = icp :: Nil
-//
-//  before {
-//    InfoCacheInstance.initInstance(icps, name)
-//    InfoCacheInstance.init
-//  }
-//
-//  test ("others") {
-//    InfoCacheInstance.available should be (true)
-//
-//    val keys = List[String](
-//      "key1", "key2"
-//    )
-//    val info = Map[String, String](
-//      ("key1" -> "value1"),
-//      ("key2" -> "value2")
-//    )
-//
-//    InfoCacheInstance.cacheInfo(info) should be (true)
-//    InfoCacheInstance.readInfo(keys) should be (info)
-//    InfoCacheInstance.deleteInfo(keys)
-////    InfoCacheInstance.readInfo(keys) should be (Map[String, String]())
-//
-//  }
-//
-//  after {
-//    InfoCacheInstance.close()
-//  }
-//
-//}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/test/scala/org/apache/griffin/measure/cache/ZKCacheLockTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/cache/ZKCacheLockTest.scala b/measure/src/test/scala/org/apache/griffin/measure/cache/ZKCacheLockTest.scala
deleted file mode 100644
index 271529c..0000000
--- a/measure/src/test/scala/org/apache/griffin/measure/cache/ZKCacheLockTest.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements.  See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership.  The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License.  You may obtain a copy of the License at
-//
-//  http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied.  See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//package org.apache.griffin.measure.cache
-//
-//import java.util.Date
-//import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit}
-//
-//import org.apache.curator.framework.recipes.locks.InterProcessMutex
-//import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
-//import org.apache.curator.retry.ExponentialBackoffRetry
-//import org.apache.griffin.measure.cache.info.ZKInfoCache
-//import org.junit.runner.RunWith
-//import org.scalatest.junit.JUnitRunner
-//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-//
-//import scala.util.{Failure, Try}
-//
-//@RunWith(classOf[JUnitRunner])
-//class ZKCacheLockTest extends FunSuite with Matchers with BeforeAndAfter {
-//
-//  val map = Map[String, Any](
-//    ("hosts" -> "localhost:2181"),
-//    ("namespace" -> "griffin/infocache"),
-//    ("lock.path" -> "lock"),
-//    ("mode" -> "persist"),
-//    ("init.clear" -> true),
-//    ("close.clear" -> false)
-//  )
-//  val name = "ttt"
-//
-//  val ic = ZKInfoCache(map, name)
-//
-//  before {
-//    ic.init
-//  }
-//
-//  test ("lock") {
-//
-//    case class Proc(n: Int) extends Runnable {
-//      override def run(): Unit = {
-//        val cl = ic.genLock("proc")
-//        val b = cl.lock(2, TimeUnit.SECONDS)
-//        try {
-//          println(s"${n}: ${b}")
-//          if (b) Thread.sleep(3000)
-//        } finally {
-//          cl.unlock()
-//        }
-//      }
-//    }
-//
-//    val pool = Executors.newFixedThreadPool(5).asInstanceOf[ThreadPoolExecutor]
-//    val t = 0 until 10
-//    t.foreach(a => pool.submit(Proc(a)))
-//
-//    pool.shutdown()
-//    val t1 = new Date()
-//    println(s"${t1}: pool shut down")
-//    pool.awaitTermination(20, TimeUnit.SECONDS)
-//    val t2 = new Date()
-//    println(s"${t2}: pool shut down done [${t2.getTime - t1.getTime}]")
-//  }
-//
-//  after {
-//    ic.close()
-//  }
-//
-//}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/test/scala/org/apache/griffin/measure/cache/ZKInfoCacheTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/cache/ZKInfoCacheTest.scala b/measure/src/test/scala/org/apache/griffin/measure/cache/ZKInfoCacheTest.scala
deleted file mode 100644
index 086170a..0000000
--- a/measure/src/test/scala/org/apache/griffin/measure/cache/ZKInfoCacheTest.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements.  See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership.  The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License.  You may obtain a copy of the License at
-//
-//  http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied.  See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//package org.apache.griffin.measure.cache
-//
-//import java.util.Date
-//import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit}
-//
-//import org.apache.curator.framework.recipes.locks.InterProcessMutex
-//import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
-//import org.apache.curator.retry.ExponentialBackoffRetry
-//import org.apache.griffin.measure.cache.info.ZKInfoCache
-//import org.junit.runner.RunWith
-//import org.scalatest.junit.JUnitRunner
-//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-//
-//import scala.util.{Failure, Try}
-//
-//@RunWith(classOf[JUnitRunner])
-//class ZKInfoCacheTest extends FunSuite with Matchers with BeforeAndAfter {
-//
-//  val map = Map[String, Any](
-//    ("hosts" -> "localhost:2181"),
-//    ("namespace" -> "griffin/infocache"),
-//    ("lock.path" -> "lock"),
-//    ("mode" -> "persist"),
-//    ("init.clear" -> true),
-//    ("close.clear" -> false)
-//  )
-//  val name = "ttt"
-//
-//  test ("available") {
-//    val ic = ZKInfoCache(map, name)
-//    ic.init
-//
-//    ic.available should be (true)
-//
-//    ic.close
-//  }
-//
-//  test ("cacheInfo and readInfo") {
-//    val ic = ZKInfoCache(map, name)
-//    ic.init
-//
-//    val keys = List[String](
-//      "key1", "key2"
-//    )
-//    val info = Map[String, String](
-//      ("key1" -> "value1"),
-//      ("key2" -> "value2")
-//    )
-//
-//    ic.cacheInfo(info) should be (true)
-//    ic.readInfo(keys) should be (info)
-//    ic.deleteInfo(keys)
-//    ic.readInfo(keys) should be (Map[String, String]())
-//
-//    ic.close
-//  }
-//
-//  test ("genLock") {
-//    val ic = ZKInfoCache(map, name)
-//    ic.init
-//
-//    val lock1 = ic.genLock("ttt")
-//    val lock2 = ic.genLock("ttt")
-//    lock1.lock(5, TimeUnit.SECONDS)
-//    lock2.lock(5, TimeUnit.SECONDS)
-//    lock1.unlock
-//    lock2.unlock
-//
-//    ic.close
-//  }
-//
-//}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/test/scala/org/apache/griffin/measure/process/BatchProcessTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/process/BatchProcessTest.scala b/measure/src/test/scala/org/apache/griffin/measure/process/BatchProcessTest.scala
deleted file mode 100644
index 845a051..0000000
--- a/measure/src/test/scala/org/apache/griffin/measure/process/BatchProcessTest.scala
+++ /dev/null
@@ -1,146 +0,0 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements.  See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership.  The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License.  You may obtain a copy of the License at
-//
-//  http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied.  See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//package org.apache.griffin.measure.process
-//
-//import org.apache.griffin.measure.config.params.env._
-//import org.apache.griffin.measure.config.params.user._
-//import org.apache.griffin.measure.config.params._
-//import org.apache.griffin.measure.config.reader.ParamReaderFactory
-//import org.apache.griffin.measure.config.validator.AllParamValidator
-//import org.apache.griffin.measure.log.Loggable
-//import org.apache.griffin.measure.persist.PersistThreadPool
-//import org.junit.runner.RunWith
-//import org.scalatest.junit.JUnitRunner
-//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-//
-//import scala.util.{Failure, Success, Try}
-//
-//@RunWith(classOf[JUnitRunner])
-//class BatchProcessTest extends FunSuite with Matchers with BeforeAndAfter with Loggable {
-//
-//  val envFile = "src/test/resources/env-test.json"
-//  val confFile = "src/test/resources/config-test-profiling.json"
-////  val confFile = "src/test/resources/config-test-accuracy.json"
-//
-//  val envFsType = "local"
-//  val userFsType = "local"
-//
-//  val args = Array(envFile, confFile)
-//
-//  var allParam: AllParam = _
-//
-//  before {
-//    // read param files
-//    val envParam = readParamFile[EnvParam](envFile, envFsType) match {
-//      case Success(p) => p
-//      case Failure(ex) => {
-//        error(ex.getMessage)
-//        sys.exit(-2)
-//      }
-//    }
-//    val userParam = readParamFile[UserParam](confFile, userFsType) match {
-//      case Success(p) => p
-//      case Failure(ex) => {
-//        error(ex.getMessage)
-//        sys.exit(-2)
-//      }
-//    }
-//    allParam = AllParam(envParam, userParam)
-//
-//    // validate param files
-//    validateParams(allParam) match {
-//      case Failure(ex) => {
-//        error(ex.getMessage)
-//        sys.exit(-3)
-//      }
-//      case _ => {
-//        info("params validation pass")
-//      }
-//    }
-//  }
-//
-//  test ("batch process") {
-//    val procType = ProcessType(allParam.userParam.procType)
-//    val proc: DqProcess = procType match {
-//      case BatchProcessType => BatchDqProcess(allParam)
-//      case StreamingProcessType => StreamingDqProcess(allParam)
-//      case _ => {
-//        error(s"${procType} is unsupported process type!")
-//        sys.exit(-4)
-//      }
-//    }
-//
-//    // process init
-//    proc.init match {
-//      case Success(_) => {
-//        info("process init success")
-//      }
-//      case Failure(ex) => {
-//        error(s"process init error: ${ex.getMessage}")
-//        shutdown
-//        sys.exit(-5)
-//      }
-//    }
-//
-//    // process run
-//    proc.run match {
-//      case Success(_) => {
-//        info("process run success")
-//      }
-//      case Failure(ex) => {
-//        error(s"process run error: ${ex.getMessage}")
-//
-//        if (proc.retriable) {
-//          throw ex
-//        } else {
-//          shutdown
-//          sys.exit(-5)
-//        }
-//      }
-//    }
-//
-//    // process end
-//    proc.end match {
-//      case Success(_) => {
-//        info("process end success")
-//      }
-//      case Failure(ex) => {
-//        error(s"process end error: ${ex.getMessage}")
-//        shutdown
-//        sys.exit(-5)
-//      }
-//    }
-//
-//    shutdown
-//  }
-//
-//  private def readParamFile[T <: Param](file: String, fsType: String)(implicit m : Manifest[T]): Try[T] = {
-//    val paramReader = ParamReaderFactory.getParamReader(file, fsType)
-//    paramReader.readConfig[T]
-//  }
-//
-//  private def validateParams(allParam: AllParam): Try[Boolean] = {
-//    val allParamValidator = AllParamValidator()
-//    allParamValidator.validate(allParam)
-//  }
-//
-//  private def shutdown(): Unit = {
-//    PersistThreadPool.shutdown
-//  }
-//}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/test/scala/org/apache/griffin/measure/process/JsonParseTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/process/JsonParseTest.scala b/measure/src/test/scala/org/apache/griffin/measure/process/JsonParseTest.scala
deleted file mode 100644
index 1273bcf..0000000
--- a/measure/src/test/scala/org/apache/griffin/measure/process/JsonParseTest.scala
+++ /dev/null
@@ -1,531 +0,0 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements.  See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership.  The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License.  You may obtain a copy of the License at
-//
-//  http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied.  See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//package org.apache.griffin.measure.process
-//
-//import org.apache.griffin.measure.config.params._
-//import org.apache.griffin.measure.config.params.env._
-//import org.apache.griffin.measure.config.params.user._
-//import org.apache.griffin.measure.config.reader.ParamReaderFactory
-//import org.apache.griffin.measure.config.validator.AllParamValidator
-//import org.apache.griffin.measure.log.Loggable
-//import org.apache.griffin.measure.persist.PersistThreadPool
-//import org.apache.griffin.measure.process.engine.DataFrameOprs
-//import org.apache.griffin.measure.utils.{HdfsUtil, JsonUtil}
-//import org.apache.hadoop.hive.ql.exec.UDF
-//import org.apache.spark.{SparkConf, SparkContext}
-//import org.apache.spark.sql._
-//import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
-//import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
-//import org.apache.spark.sql.hive.HiveContext
-//import org.apache.spark.sql.types._
-//import org.junit.runner.RunWith
-//import org.scalatest.junit.JUnitRunner
-//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-//
-//import scala.collection.mutable.WrappedArray
-//import scala.util.{Failure, Success, Try}
-//
-//@RunWith(classOf[JUnitRunner])
-//class JsonParseTest extends FunSuite with Matchers with BeforeAndAfter with Loggable {
-//
-//  var sparkContext: SparkContext = _
-//  var sqlContext: SQLContext = _
-//
-//  before {
-//    val conf = new SparkConf().setAppName("test json").setMaster("local[*]")
-//    sparkContext = new SparkContext(conf)
-//    sparkContext.setLogLevel("WARN")
-////    sqlContext = new HiveContext(sparkContext)
-//    sqlContext = new SQLContext(sparkContext)
-//  }
-//
-//  test ("json test") {
-//    // 0. prepare data
-////    val dt =
-////      """
-////        |{"name": "s1", "age": 12, "items": [1, 2, 3],
-////        |"subs": [{"id": 1, "type": "seed"}, {"id": 2, "type": "frog"}],
-////        |"inner": {"a": 1, "b": 2}, "jstr": "{\"s1\": \"aaa\", \"s2\": 123}"
-////        |}""".stripMargin
-////    val rdd0 = sparkContext.parallelize(Seq(dt)).map(Row(_))
-//    val rdd0 = sparkContext.textFile("src/test/resources/input.msg").map(Row(_))
-//
-//    val vtp = StructField("value", StringType)
-//    val df0 = sqlContext.createDataFrame(rdd0, StructType(Array(vtp)))
-//    df0.registerTempTable("src")
-//
-////    val fromJson2Array = (s: String) => {
-////      JsonUtil.fromJson[Seq[String]](s)
-////    }
-////    sqlContext.udf.register("from_json_to_array", fromJson2Array)
-////
-////    val df2 = sqlContext.sql("SELECT explode(from_json_to_array(get_json_object(value, '$.seeds'))) as value FROM src")
-////    df2.printSchema
-////    df2.show(10)
-////    df2.registerTempTable("df2")
-//
-//
-//
-//    // 1. read from json string to extracted json row
-////    val readSql = "SELECT value FROM src"
-////    val df = sqlContext.sql(readSql)
-////    val df = sqlContext.table("src")
-////    val rdd = df.map { row =>
-////      row.getAs[String]("value")
-////    }
-////    val df1 = sqlContext.read.json(rdd)
-////    df1.printSchema
-////    df1.show(10)
-////    df1.registerTempTable("df1")
-//    val details = Map[String, Any](("df.name" -> "src"))
-//    val df1 = DataFrameOprs.fromJson(sqlContext, details)
-//    df1.registerTempTable("df1")
-//
-//    // 2. extract json array into lines
-////    val rdd2 = df1.flatMap { row =>
-////      row.getAs[WrappedArray[String]]("seeds")
-////    }
-////    val df2 = sqlContext.read.json(rdd2)
-//    val df2 = sqlContext.sql("select explode(seeds) as value from df1")
-////    val tdf = sqlContext.sql("select name, age, explode(items) as item from df1")
-////    tdf.registerTempTable("tdf")
-////    val df2 = sqlContext.sql("select struct(name, age, item) as ttt from tdf")
-//    df2.printSchema
-//    df2.show(10)
-//    df2.registerTempTable("df2")
-//    println(df2.count)
-//
-//    val sql1 = "SELECT value FROM df2"
-//    val df22 = sqlContext.sql(sql1)
-//    val rdd22 = df22.map { row =>
-//      row.getAs[String]("value")
-//    }
-//    import org.apache.spark.sql.functions._
-//    val df23 = sqlContext.read.json(rdd22)
-//    df23.registerTempTable("df23")
-////    df23.withColumn("par", monotonicallyIncreasingId)
-//
-//    val df24 = sqlContext.sql("SELECT url, cast(get_json_object(metadata, '$.tracker.crawlRequestCreateTS') as bigint) as ts FROM df23")
-//    df24.printSchema
-//    df24.show(10)
-//    df24.registerTempTable("df24")
-//    println(df24.count)
-//
-////    val df25 = sqlContext.sql("select ")
-//
-////
-////    // 3. extract json string into row
-//////    val df3 = sqlContext.sql("select cast(get_json_object(metadata, '$.tracker.crawlRequestCreateTS') as bigint), url from df2")
-////    val df3 = sqlContext.sql("select cast(get_json_object(get_json_object(value, '$.metadata'), '$.tracker.crawlRequestCreateTS') as bigint), get_json_object(value, '$.url') from df2")
-////    df3.printSchema()
-////    df3.show(10)
-////    println(df3.count)
-//
-//
-//
-////    val df5 = sqlContext.sql("select get_json_object(value, '$.subs') as subs from src")
-////    df5.printSchema()
-////    df5.show(10)
-////    df5.registerTempTable("df5")
-////    val rdd5 = df5.map { row =>
-////      row.getAs[String]("subs")
-////    }
-////    val df6 = sqlContext.read.json(rdd5)
-////    df6.printSchema
-////    df6.show(10)
-//
-//    // 2. extract json string to row
-////    val df2 = sqlContext.sql("select jstr from df1")
-////    val rdd2 = df2.map { row =>
-////      row.getAs[String]("jstr")
-////    }
-////    val df22 = sqlContext.read.json(rdd2)
-////    df22.printSchema
-////    df22.show(100)
-////    df22.registerTempTable("df2")
-////
-////    val df23 = sqlContext.sql("select json_tuple(jstr, 's1', 's2') from df1")
-////    df23.printSchema()
-////    df23.show(100)
-//
-//    // 3. extract json array into lines ??
-//
-//    // 3. flatmap from json row to json row
-////    val df3 = sqlContext.sql("select explode(subs) as sub, items from df1")
-////    df3.printSchema()
-////    df3.show(10)
-////    df3.registerTempTable("df3")
-////
-////    val df4 = sqlContext.sql("select explode(items) as item, sub from df3")
-////    df4.printSchema()
-////    df4.show(10)
-//
-////    sqlContext.udf.register("length", (s: WrappedArray[_]) => s.length)
-//    //
-//    //    val df2 = sqlContext.sql("SELECT inner from df1")
-//    //    df2.registerTempTable("df2")
-//    //    df2.printSchema
-//    //    df2.show(100)
-//
-////    def children(colname: String, df: DataFrame): Array[DataFrame] = {
-////      val parent = df.schema.fields.filter(_.name == colname).head
-////      println(parent)
-////      val fields: Array[StructField] = parent.dataType match {
-////        case x: StructType => x.fields
-////        case _ => Array.empty[StructField]
-////      }
-////      fields.map(x => col(s"$colname.${x.name}"))
-//////      fields.foreach(println)
-////    }
-//////
-////    children("inner", df2)
-////
-////    df2.select(children("bar", df): _*).printSchema
-//
-////    val df3 = sqlContext.sql("select inline(subs) from df1")
-////    df3.printSchema()
-////    df3.show(100)
-//
-////    val rdd2 = df2.flatMap { row =>
-////      row.getAs[GenericRowWithSchema]("inner") :: Nil
-////    }
-////
-////    rdd2.
-//
-////    val funcs = sqlContext.sql("show functions")
-////    funcs.printSchema()
-////    funcs.show(1000)
-////
-////    val desc = sqlContext.sql("describe function inline")
-////    desc.printSchema()
-////    desc.show(100)
-//
-//    //
-//
-//  }
-//
-//  test ("json test 2") {
-//    val rdd0 = sparkContext.textFile("src/test/resources/output.msg").map(Row(_))
-//
-//    val vtp = StructField("value", StringType)
-//    val df0 = sqlContext.createDataFrame(rdd0, StructType(Array(vtp)))
-//    df0.registerTempTable("tgt")
-//
-////    val fromJson2StringArray = (s: String) => {
-////      val seq = JsonUtil.fromJson[Seq[Any]](s)
-////      seq.map(i => JsonUtil.toJson(i))
-////    }
-////    sqlContext.udf.register("from_json_to_string_array", fromJson2StringArray)
-////
-////    val df2 = sqlContext.sql("SELECT from_json_to_string_array(get_json_object(value, '$.groups[0].attrsList')) as value FROM tgt")
-////    df2.printSchema()
-////    df2.show(10)
-////    df2.registerTempTable("df2")
-////
-////    val indexOfStringArray = (sa: String, )
-//
-//
-//    // 1. read from json string to extracted json row
-//    val readSql = "SELECT value FROM tgt"
-//    val df = sqlContext.sql(readSql)
-//    val rdd = df.map { row =>
-//      row.getAs[String]("value")
-//    }
-//    val df1 = sqlContext.read.json(rdd)
-//    df1.printSchema
-//    df1.show(10)
-//    df1.registerTempTable("df1")
-//
-//
-//    val df2 = sqlContext.sql("select groups[0].attrsList as attrs from df1")
-//    df2.printSchema
-//    df2.show(10)
-//    df2.registerTempTable("df2")
-//    println(df2.count)
-//
-//    val indexOf = (arr: Seq[String], v: String) => {
-//      arr.indexOf(v)
-//    }
-//    sqlContext.udf.register("index_of", indexOf)
-//
-//    val df3 = sqlContext.sql("select attrs.values[index_of(attrs.name, 'URL')][0] as url, cast(get_json_object(attrs.values[index_of(attrs.name, 'CRAWLMETADATA')][0], '$.tracker.crawlRequestCreateTS') as bigint) as ts from df2")
-//    df3.printSchema()
-//    df3.show(10)
-//    df3.registerTempTable("df3")
-//  }
-//
-//  test ("testing") {
-//    val dt =
-//      """
-//        |{"name": "age", "age": 12, "items": [1, 2, 3],
-//        |"subs": [{"id": 1, "type": "seed"}, {"id": 2, "type": "frog"}],
-//        |"inner": {"a": 1, "b": 2}, "jstr": "{\"s1\": \"aaa\", \"s2\": 123}", "b": true
-//        |}""".stripMargin
-//    val rdd = sparkContext.parallelize(Seq(dt)).map(Row(_))
-//    val vtp = StructField("value", StringType)
-//    val df = sqlContext.createDataFrame(rdd, StructType(Array(vtp)))
-//    df.registerTempTable("df")
-//
-//    val df1 = sqlContext.read.json(sqlContext.sql("select * from df").map(r => r.getAs[String]("value")))
-//    df1.printSchema()
-//    df1.show(10)
-//    df1.registerTempTable("df1")
-//
-//    val test = (s: String) => {
-//      s.toInt
-//    }
-//    sqlContext.udf.register("to_int", test)
-//
-//    val df2 = sqlContext.sql("select (b) as aa, inner.a from df1 where age = to_int(\"12\")")
-//    df2.printSchema()
-//    df2.show(10)
-//  }
-//
-//  test ("test input only sql") {
-//    val rdd0 = sparkContext.textFile("src/test/resources/input.msg").map(Row(_))
-//
-//    val vtp = StructField("value", StringType)
-//    val df0 = sqlContext.createDataFrame(rdd0, StructType(Array(vtp)))
-//    df0.registerTempTable("src")
-//    df0.show(10)
-//
-//    // 1. read from json string to extracted json row
-//    val df1 = sqlContext.sql("SELECT get_json_object(value, '$.seeds') as seeds FROM src")
-//    df1.printSchema
-//    df1.show(10)
-//    df1.registerTempTable("df1")
-//
-//    val json2StringArray: (String) => Seq[String] = (s: String) => {
-//      val seq = JsonUtil.fromJson[Seq[String]](s)
-////      seq.map(i => JsonUtil.toJson(i))
-//      seq
-//    }
-//    sqlContext.udf.register("json_to_string_array", json2StringArray)
-//
-//    val df2 = sqlContext.sql("SELECT explode(json_to_string_array(seeds)) as seed FROM df1")
-//    df2.printSchema
-//    df2.show(10)
-//    df2.registerTempTable("df2")
-//
-//
-//    val df3 = sqlContext.sql("SELECT get_json_object(seed, '$.url') as url, cast(get_json_object(get_json_object(seed, '$.metadata'), '$.tracker.crawlRequestCreateTS') as bigint) as ts FROM df2")
-//    df3.printSchema
-//    df3.show(10)
-//  }
-//
-//  test ("test output only sql") {
-//    val rdd0 = sparkContext.textFile("src/test/resources/output.msg").map(Row(_))
-//
-//    val vtp = StructField("value", StringType)
-//    val df0 = sqlContext.createDataFrame(rdd0, StructType(Array(vtp)))
-//    df0.registerTempTable("tgt")
-//    df0.printSchema()
-//    df0.show(10)
-//
-//    val json2StringArray: (String) => Seq[String] = (s: String) => {
-//      JsonUtil.fromJson[Seq[String]](s)
-//    }
-//    sqlContext.udf.register("json_to_string_array", json2StringArray)
-//
-//    val json2StringJsonArray: (String) => Seq[String] = (s: String) => {
-//      val seq = JsonUtil.fromJson[Seq[Any]](s)
-//      seq.map(i => JsonUtil.toJson(i))
-//    }
-//    sqlContext.udf.register("json_to_string_json_array", json2StringJsonArray)
-//
-//    val indexOf = (arr: Seq[String], v: String) => {
-//      arr.indexOf(v)
-//    }
-//    sqlContext.udf.register("index_of", indexOf)
-//
-//    val indexOfField = (arr: Seq[String], k: String, v: String) => {
-//      val seq = arr.flatMap { item =>
-//        JsonUtil.fromJson[Map[String, Any]](item).get(k)
-//      }
-//      seq.indexOf(v)
-//    }
-//    sqlContext.udf.register("index_of_field", indexOfField)
-//
-//    // 1. read from json string to extracted json row
-//    val df1 = sqlContext.sql("SELECT get_json_object(value, '$.groups[0].attrsList') as attrs FROM tgt")
-//    df1.printSchema
-//    df1.show(10)
-//    df1.registerTempTable("df1")
-//
-//    val df2 = sqlContext.sql("SELECT json_to_string_json_array(attrs) as attrs FROM df1")
-//    df2.printSchema()
-//    df2.show(10)
-//    df2.registerTempTable("df2")
-//
-//    val df3 = sqlContext.sql("SELECT attrs[index_of_field(attrs, 'name', 'URL')] as attr1, attrs[index_of_field(attrs, 'name', 'CRAWLMETADATA')] as attr2 FROM df2")
-//    df3.printSchema()
-//    df3.show(10)
-//    df3.registerTempTable("df3")
-//
-//    val df4 = sqlContext.sql("SELECT json_to_string_array(get_json_object(attr1, '$.values'))[0], cast(get_json_object(json_to_string_array(get_json_object(attr2, '$.values'))[0], '$.tracker.crawlRequestCreateTS') as bigint) FROM df3")
-//    df4.printSchema()
-//    df4.show(10)
-//  }
-//
-//  test ("test from json") {
-//    val fromJson2Map = (str: String) => {
-//      val a = JsonUtil.fromJson[Map[String, Any]](str)
-//      a.mapValues { v =>
-//        v match {
-//          case t: String => t
-//          case _ => JsonUtil.toJson(v)
-//        }
-//      }
-//    }
-//    sqlContext.udf.register("from_json_to_map", fromJson2Map)
-//
-//    val fromJson2Array = (str: String) => {
-//      val a = JsonUtil.fromJson[Seq[Any]](str)
-//      a.map { v =>
-//        v match {
-//          case t: String => t
-//          case _ => JsonUtil.toJson(v)
-//        }
-//      }
-//    }
-//    sqlContext.udf.register("from_json_to_array", fromJson2Array)
-//
-//    // ========================
-//
-//    val srdd = sparkContext.textFile("src/test/resources/input.msg").map(Row(_))
-//    val svtp = StructField("value", StringType)
-//    val sdf0 = sqlContext.createDataFrame(srdd, StructType(Array(svtp)))
-//    sdf0.registerTempTable("sdf0")
-//    sdf0.show(10)
-//
-//    // 1. read from json string to extracted json row
-//    val sdf1 = sqlContext.sql("SELECT explode(from_json_to_array(get_json_object(value, '$.seeds'))) as seed FROM sdf0")
-//    sdf1.printSchema
-//    sdf1.show(10)
-//    sdf1.registerTempTable("sdf1")
-//
-//    val sdf2 = sqlContext.sql("SELECT get_json_object(seed, '$.url') as url, cast(get_json_object(get_json_object(seed, '$.metadata'), '$.tracker.crawlRequestCreateTS') as bigint) as ts FROM sdf1")
-//    sdf2.printSchema
-//    sdf2.show(10)
-//
-//    // ---------------------------------------
-//
-//    val trdd = sparkContext.textFile("src/test/resources/output.msg").map(Row(_))
-//    val tvtp = StructField("value", StringType)
-//    val tdf0 = sqlContext.createDataFrame(trdd, StructType(Array(tvtp)))
-//    tdf0.registerTempTable("tdf0")
-//    tdf0.printSchema()
-//    tdf0.show(10)
-//
-////    val json2StringArray: (String) => Seq[String] = (s: String) => {
-////      JsonUtil.fromJson[Seq[String]](s)
-////    }
-////    sqlContext.udf.register("json_to_string_array", json2StringArray)
-////
-////    val json2StringJsonArray: (String) => Seq[String] = (s: String) => {
-////      val seq = JsonUtil.fromJson[Seq[Any]](s)
-////      seq.map(i => JsonUtil.toJson(i))
-////    }
-////    sqlContext.udf.register("json_to_string_json_array", json2StringJsonArray)
-////
-////    val indexOf = (arr: Seq[String], v: String) => {
-////      arr.indexOf(v)
-////    }
-////    sqlContext.udf.register("index_of", indexOf)
-////
-//    val indexOfField = (arr: Seq[String], k: String, v: String) => {
-//      val seq = arr.flatMap { item =>
-//        JsonUtil.fromJson[Map[String, Any]](item).get(k)
-//      }
-//      seq.indexOf(v)
-//    }
-//    sqlContext.udf.register("index_of_field", indexOfField)
-//
-//    // 1. read from json string to extracted json row
-////    val df1 = sqlContext.sql("SELECT get_json_object(value, '$.groups[0].attrsList') as attrs FROM tdf0")
-//    val tdf1 = sqlContext.sql("SELECT from_json_to_array(get_json_object(value, '$.groups[0].attrsList')) as attrs FROM tdf0")
-//    tdf1.printSchema
-//    tdf1.show(10)
-//    tdf1.registerTempTable("tdf1")
-//
-////    val tdf2 = sqlContext.sql("SELECT attrs[index_of_field(attrs, 'name', 'URL')] as attr1, attrs[index_of_field(attrs, 'name', 'CRAWLMETADATA')] as attr2 FROM tdf1")
-////    tdf2.printSchema()
-////    tdf2.show(10)
-////    tdf2.registerTempTable("tdf2")
-//
-//    val tdf3 = sqlContext.sql("SELECT from_json_to_array(get_json_object(attrs[index_of_field(attrs, 'name', 'URL')], '$.values'))[0] as url, cast(get_json_object(from_json_to_array(get_json_object(attrs[index_of_field(attrs, 'name', 'CRAWLMETADATA')], '$.values'))[0], '$.tracker.crawlRequestCreateTS') as bigint) as ts FROM tdf1")
-//    tdf3.printSchema()
-//    tdf3.show(10)
-//  }
-//
-//  test ("sql functions") {
-//    val functions = sqlContext.sql("show functions")
-//    functions.printSchema()
-//    functions.show(10)
-//
-//    val functionNames = functions.map(_.getString(0)).collect
-//    functionNames.foreach(println)
-//  }
-//
-//  test ("test text file read") {
-//    val partitionPaths = Seq[String](
-//      "hdfs://localhost/griffin/streaming/dump/source/418010/25080625/1504837518000",
-//      "hdfs://localhost/griffin/streaming/dump/target/418010/25080625/1504837518000")
-//    val df = sqlContext.read.json(partitionPaths: _*)
-//    df.printSchema()
-//    df.show(10)
-//  }
-//
-//  test ("list paths") {
-//    val filePath = "hdfs://localhost/griffin/streaming/dump/source"
-//    val partitionRanges = List[(Long, Long)]((0, 0), (-2, 0))
-//    val partitionPaths = listPathsBetweenRanges(filePath :: Nil, partitionRanges)
-//    println(partitionPaths)
-//  }
-//
-//  private def listPathsBetweenRanges(paths: List[String],
-//                                     partitionRanges: List[(Long, Long)]
-//                                    ): List[String] = {
-//    partitionRanges match {
-//      case Nil => paths
-//      case head :: tail => {
-//        val (lb, ub) = head
-//        val curPaths = paths.flatMap { path =>
-//          val names = HdfsUtil.listSubPathsByType(path, "dir").toList
-//          println(names)
-//          names.filter { name =>
-//            str2Long(name) match {
-//              case Some(t) => (t >= lb) && (t <= ub)
-//              case _ => false
-//            }
-//          }.map(HdfsUtil.getHdfsFilePath(path, _))
-//        }
-//        listPathsBetweenRanges(curPaths, tail)
-//      }
-//    }
-//  }
-//
-//  private def str2Long(str: String): Option[Long] = {
-//    try {
-//      Some(str.toLong)
-//    } catch {
-//      case e: Throwable => None
-//    }
-//  }
-//}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/test/scala/org/apache/griffin/measure/process/JsonToStructs.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/process/JsonToStructs.scala b/measure/src/test/scala/org/apache/griffin/measure/process/JsonToStructs.scala
deleted file mode 100644
index 394917c..0000000
--- a/measure/src/test/scala/org/apache/griffin/measure/process/JsonToStructs.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-package org.apache.griffin.measure.process
-
-import org.apache.griffin.measure.utils.JsonUtil
-import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
-import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData}
-import org.apache.spark.sql.execution.datasources.json.JSONOptions
-import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.UTF8String
-
-
-case class JsonToStructs(
-//                          schema: DataType,
-//                          options: Map[String, String],
-                          child: Expression)
-  extends UnaryExpression with CodegenFallback with ExpectsInputTypes {
-  override def nullable: Boolean = true
-
-//  def this(schema: DataType, options: Map[String, String], child: Expression) =
-//    this(schema, options, child, None)
-
-  // Used in `FunctionRegistry`
-//  def this(child: Expression, schema: Expression) =
-//  this(
-//    schema = JsonExprUtils.validateSchemaLiteral(schema),
-//    options = Map.empty[String, String],
-//    child = child,
-//    timeZoneId = None)
-//
-//  def this(child: Expression, schema: Expression, options: Expression) =
-//    this(
-//      schema = JsonExprUtils.validateSchemaLiteral(schema),
-//      options = JsonExprUtils.convertToMapData(options),
-//      child = child,
-//      timeZoneId = None)
-//
-//  override def checkInputDataTypes(): TypeCheckResult = schema match {
-//    case _: StructType | ArrayType(_: StructType, _) =>
-//      super.checkInputDataTypes()
-//    case _ => TypeCheckResult.TypeCheckFailure(
-//      s"Input schema ${schema.simpleString} must be a struct or an array of structs.")
-//  }
-
-  override def dataType: DataType = MapType(StringType, StringType)
-
-//  override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
-//    copy(timeZoneId = Option(timeZoneId))
-
-  override def nullSafeEval(json: Any): Any = {
-    if (json.toString.trim.isEmpty) return null
-
-    try {
-      JsonUtil.fromJson[Map[String, Any]](json.toString)
-    } catch {
-      case _: Throwable => null
-    }
-  }
-
-  override def inputTypes: Seq[DataType] = StringType :: Nil
-}
-//
-//object JsonExprUtils {
-//
-//  def validateSchemaLiteral(exp: Expression): StructType = exp match {
-//    case Literal(s, StringType) => CatalystSqlParser.parseTableSchema(s.toString)
-//    case e => throw new AnalysisException(s"Expected a string literal instead of $e")
-//  }
-//
-//  def convertToMapData(exp: Expression): Map[String, String] = exp match {
-//    case m: CreateMap
-//      if m.dataType.acceptsType(MapType(StringType, StringType, valueContainsNull = false)) =>
-//      val arrayMap = m.eval().asInstanceOf[ArrayBasedMapData]
-//      ArrayBasedMapData.toScalaMap(arrayMap).map { case (key, value) =>
-//        key.toString -> value.toString
-//      }
-//    case m: CreateMap =>
-//      throw new AnalysisException(
-//        s"A type of keys and values in map() must be string, but got ${m.dataType}")
-//    case _ =>
-//      throw new AnalysisException("Must use a map() function for options")
-//  }
-//}
\ No newline at end of file