You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2017/10/30 08:08:21 UTC
[2/2] incubator-griffin git commit: remove some unused files
remove some unused files
Author: Lionel Liu <bh...@163.com>
Closes #152 from bhlx3lyx7/master.
Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/1f984da1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/1f984da1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/1f984da1
Branch: refs/heads/master
Commit: 1f984da1aea86e8be507db37f426b5e28d0d81e8
Parents: 0dd1d35
Author: Lionel Liu <bh...@163.com>
Authored: Mon Oct 30 16:08:14 2017 +0800
Committer: Lionel Liu <bh...@163.com>
Committed: Mon Oct 30 16:08:14 2017 +0800
----------------------------------------------------------------------
.../batch/KafkaCacheDirectDataConnector.scala | 125 -----
.../StreamingCacheDirectDataConnector.scala | 60 ---
.../connector/cache/CacheDataConnector.scala | 33 --
.../data/connector/cache/DataCacheable.scala | 86 ---
.../data/connector/cache/DataUpdatable.scala | 30 --
.../cache/HiveCacheDataConnector.scala | 351 ------------
.../cache/TextCacheDataConnector.scala | 311 -----------
.../measure/persist/OldHttpPersist.scala | 87 ---
.../apache/griffin/measure/process/Algo.scala | 34 --
measure/src/test/resources/test-data.jsonFile | 3 -
measure/src/test/resources/test-data1.jsonFile | 31 --
.../measure/cache/InfoCacheInstanceTest.scala | 78 ---
.../griffin/measure/cache/ZKCacheLockTest.scala | 84 ---
.../griffin/measure/cache/ZKInfoCacheTest.scala | 90 ----
.../measure/process/BatchProcessTest.scala | 146 -----
.../griffin/measure/process/JsonParseTest.scala | 531 -------------------
.../griffin/measure/process/JsonToStructs.scala | 85 ---
.../measure/process/StreamingProcessTest.scala | 147 -----
.../apache/griffin/measure/sql/SqlTest.scala | 125 -----
.../griffin/measure/utils/JsonUtilTest.scala | 60 ---
20 files changed, 2497 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/KafkaCacheDirectDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/KafkaCacheDirectDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/KafkaCacheDirectDataConnector.scala
deleted file mode 100644
index 70ddcde..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/KafkaCacheDirectDataConnector.scala
+++ /dev/null
@@ -1,125 +0,0 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements. See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership. The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied. See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//package org.apache.griffin.measure.data.connector.direct
-//
-//import org.apache.griffin.measure.config.params.user.DataConnectorParam
-//import org.apache.griffin.measure.data.connector.DataConnectorFactory
-//import org.apache.griffin.measure.data.connector.cache.CacheDataConnector
-//import org.apache.griffin.measure.data.connector.streaming.StreamingDataConnector
-//import org.apache.griffin.measure.result._
-//import org.apache.griffin.measure.rule._
-//import org.apache.spark.rdd.RDD
-//import org.apache.spark.sql.SQLContext
-//import org.apache.spark.streaming.StreamingContext
-//
-//import scala.util.{Failure, Success, Try}
-//
-//case class KafkaCacheDirectDataConnector(@transient streamingDataConnectorTry: Try[StreamingDataConnector],
-// cacheDataConnectorTry: Try[CacheDataConnector],
-// dataConnectorParam: DataConnectorParam,
-// ruleExprs: RuleExprs,
-// constFinalExprValueMap: Map[String, Any]
-// ) extends StreamingCacheDirectDataConnector {
-//
-// val cacheDataConnector: CacheDataConnector = cacheDataConnectorTry match {
-// case Success(cntr) => cntr
-// case Failure(ex) => throw ex
-// }
-// @transient val streamingDataConnector: StreamingDataConnector = streamingDataConnectorTry match {
-// case Success(cntr) => cntr
-// case Failure(ex) => throw ex
-// }
-//
-// protected def transform(rdd: RDD[(streamingDataConnector.K, streamingDataConnector.V)],
-// ms: Long
-// ): RDD[Map[String, Any]] = {
-// val dataInfoMap = DataInfo.cacheInfoList.map(_.defWrap).toMap + TimeStampInfo.wrap(ms)
-//
-// rdd.flatMap { kv =>
-// val msg = kv._2
-//
-// val cacheExprValueMaps = ExprValueUtil.genExprValueMaps(Some(msg), ruleExprs.cacheExprs, constFinalExprValueMap)
-// val finalExprValueMaps = ExprValueUtil.updateExprValueMaps(ruleExprs.finalCacheExprs, cacheExprValueMaps)
-//
-// finalExprValueMaps.map { vm =>
-// vm ++ dataInfoMap
-// }
-// }
-// }
-//
-// def metaData(): Try[Iterable[(String, String)]] = Try {
-// Map.empty[String, String]
-// }
-//
-// def data(): Try[RDD[(Product, (Map[String, Any], Map[String, Any]))]] = Try {
-// cacheDataConnector.readData match {
-// case Success(rdd) => {
-// rdd.flatMap { row =>
-// val finalExprValueMap = ruleExprs.finalCacheExprs.flatMap { expr =>
-// row.get(expr._id).flatMap { d =>
-// Some((expr._id, d))
-// }
-// }.toMap
-//
-// val dataInfoMap: Map[String, Any] = DataInfo.cacheInfoList.map { info =>
-// row.get(info.key) match {
-// case Some(d) => (info.key -> d)
-// case _ => info.defWrap
-// }
-// }.toMap
-//
-// val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { expr =>
-// expr.calculate(finalExprValueMap) match {
-// case Some(v) => Some(v.asInstanceOf[AnyRef])
-// case _ => None
-// }
-// }
-// val key = toTuple(groupbyData)
-//
-// Some((key, (finalExprValueMap, dataInfoMap)))
-// }
-// }
-// case Failure(ex) => throw ex
-// }
-// }
-//
-// override def cleanOldData(): Unit = {
-// cacheDataConnector.cleanOldData
-// }
-//
-// override def updateOldData(t: Long, oldData: Iterable[Map[String, Any]]): Unit = {
-// if (dataConnectorParam.getMatchOnce) {
-// cacheDataConnector.updateOldData(t, oldData)
-// }
-// }
-//
-// override def updateAllOldData(oldRdd: RDD[Map[String, Any]]): Unit = {
-// if (dataConnectorParam.getMatchOnce) {
-// cacheDataConnector.updateAllOldData(oldRdd)
-// }
-// }
-//
-// private def toTuple[A <: AnyRef](as: Seq[A]): Product = {
-// if (as.size > 0) {
-// val tupleClass = Class.forName("scala.Tuple" + as.size)
-// tupleClass.getConstructors.apply(0).newInstance(as: _*).asInstanceOf[Product]
-// } else None
-// }
-//
-//}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/StreamingCacheDirectDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/StreamingCacheDirectDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/StreamingCacheDirectDataConnector.scala
deleted file mode 100644
index dddf430..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/StreamingCacheDirectDataConnector.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements. See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership. The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied. See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//package org.apache.griffin.measure.data.connector.direct
-//
-//import org.apache.griffin.measure.data.connector.cache.CacheDataConnector
-//import org.apache.griffin.measure.data.connector.streaming.StreamingDataConnector
-//import org.apache.griffin.measure.result.{DataInfo, TimeStampInfo}
-//import org.apache.griffin.measure.rule.ExprValueUtil
-//import org.apache.spark.rdd.RDD
-//
-//import scala.util.{Failure, Success}
-//
-//trait StreamingCacheDirectDataConnector extends DirectDataConnector {
-//
-// val cacheDataConnector: CacheDataConnector
-// @transient val streamingDataConnector: StreamingDataConnector
-//
-// def available(): Boolean = {
-// cacheDataConnector.available && streamingDataConnector.available
-// }
-//
-// def init(): Unit = {
-// cacheDataConnector.init
-//
-// val ds = streamingDataConnector.stream match {
-// case Success(dstream) => dstream
-// case Failure(ex) => throw ex
-// }
-//
-// ds.foreachRDD((rdd, time) => {
-// val ms = time.milliseconds
-//
-// val valueMapRdd = transform(rdd, ms)
-//
-// // save data frame
-// cacheDataConnector.saveData(valueMapRdd, ms)
-// })
-// }
-//
-// protected def transform(rdd: RDD[(streamingDataConnector.K, streamingDataConnector.V)],
-// ms: Long
-// ): RDD[Map[String, Any]]
-//
-//}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/CacheDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/CacheDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/CacheDataConnector.scala
deleted file mode 100644
index 67dcc06..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/CacheDataConnector.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements. See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership. The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied. See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//package org.apache.griffin.measure.data.connector.cache
-//
-//import org.apache.griffin.measure.data.connector.DataConnector
-//import org.apache.spark.rdd.RDD
-//
-//import scala.util.Try
-//
-//trait CacheDataConnector extends DataConnector with DataCacheable with DataUpdatable {
-//
-// def saveData(rdd: RDD[Map[String, Any]], ms: Long): Unit
-//
-// def readData(): Try[RDD[Map[String, Any]]]
-//
-//}
-//
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/DataCacheable.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/DataCacheable.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/DataCacheable.scala
deleted file mode 100644
index 79162be..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/DataCacheable.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements. See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership. The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied. See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//package org.apache.griffin.measure.data.connector.cache
-//
-//import java.util.concurrent.atomic.AtomicLong
-//
-//import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache}
-//
-//trait DataCacheable {
-//
-// protected val defCacheInfoPath = PathCounter.genPath
-//
-// val cacheInfoPath: String
-// val readyTimeInterval: Long
-// val readyTimeDelay: Long
-//
-// def selfCacheInfoPath = s"${TimeInfoCache.infoPath}/${cacheInfoPath}"
-//
-// def selfCacheTime = TimeInfoCache.cacheTime(selfCacheInfoPath)
-// def selfLastProcTime = TimeInfoCache.lastProcTime(selfCacheInfoPath)
-// def selfReadyTime = TimeInfoCache.readyTime(selfCacheInfoPath)
-// def selfCleanTime = TimeInfoCache.cleanTime(selfCacheInfoPath)
-//
-// protected def submitCacheTime(ms: Long): Unit = {
-// val map = Map[String, String]((selfCacheTime -> ms.toString))
-// InfoCacheInstance.cacheInfo(map)
-// }
-//
-// protected def submitReadyTime(ms: Long): Unit = {
-// val curReadyTime = ms - readyTimeDelay
-// if (curReadyTime % readyTimeInterval == 0) {
-// val map = Map[String, String]((selfReadyTime -> curReadyTime.toString))
-// InfoCacheInstance.cacheInfo(map)
-// }
-// }
-//
-// protected def submitLastProcTime(ms: Long): Unit = {
-// val map = Map[String, String]((selfLastProcTime -> ms.toString))
-// InfoCacheInstance.cacheInfo(map)
-// }
-//
-// protected def submitCleanTime(ms: Long): Unit = {
-// val cleanTime = genCleanTime(ms)
-// val map = Map[String, String]((selfCleanTime -> cleanTime.toString))
-// InfoCacheInstance.cacheInfo(map)
-// }
-//
-// protected def genCleanTime(ms: Long): Long = ms
-//
-// protected def readCleanTime(): Option[Long] = {
-// val key = selfCleanTime
-// val keys = key :: Nil
-// InfoCacheInstance.readInfo(keys).get(key).flatMap { v =>
-// try {
-// Some(v.toLong)
-// } catch {
-// case _ => None
-// }
-// }
-// }
-//
-//}
-//
-//object PathCounter {
-// private val counter: AtomicLong = new AtomicLong(0L)
-// def genPath(): String = s"path_${increment}"
-// private def increment(): Long = {
-// counter.incrementAndGet()
-// }
-//}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/DataUpdatable.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/DataUpdatable.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/DataUpdatable.scala
deleted file mode 100644
index 61e8413..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/DataUpdatable.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements. See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership. The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied. See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//package org.apache.griffin.measure.data.connector.cache
-//
-//import org.apache.spark.rdd.RDD
-//
-//trait DataUpdatable {
-//
-// def cleanOldData(): Unit = {}
-//
-// def updateOldData(t: Long, oldData: Iterable[Map[String, Any]]): Unit = {}
-// def updateAllOldData(oldRdd: RDD[Map[String, Any]]): Unit = {}
-//
-//}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/HiveCacheDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/HiveCacheDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/HiveCacheDataConnector.scala
deleted file mode 100644
index 4c7b45b..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/HiveCacheDataConnector.scala
+++ /dev/null
@@ -1,351 +0,0 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements. See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership. The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied. See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//package org.apache.griffin.measure.data.connector.cache
-//
-//import java.util.concurrent.TimeUnit
-//
-//import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache}
-//import org.apache.griffin.measure.config.params.user.DataCacheParam
-//import org.apache.griffin.measure.result.TimeStampInfo
-//import org.apache.griffin.measure.utils.{HdfsFileDumpUtil, HdfsUtil, JsonUtil, TimeUtil}
-//import org.apache.spark.rdd.RDD
-//import org.apache.spark.sql.SQLContext
-//import org.apache.spark.sql.hive.HiveContext
-//
-//import scala.util.{Success, Try}
-//
-//case class HiveCacheDataConnector(sqlContext: SQLContext, dataCacheParam: DataCacheParam
-// ) extends CacheDataConnector {
-//
-// if (!sqlContext.isInstanceOf[HiveContext]) {
-// throw new Exception("hive context not prepared!")
-// }
-//
-// val config = dataCacheParam.config
-// val InfoPath = "info.path"
-// val cacheInfoPath: String = config.getOrElse(InfoPath, defCacheInfoPath).toString
-//
-// val newCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.new")
-// val oldCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.old")
-//
-// val timeRangeParam: List[String] = if (dataCacheParam.timeRange != null) dataCacheParam.timeRange else Nil
-// val deltaTimeRange: (Long, Long) = (timeRangeParam ::: List("0", "0")) match {
-// case s :: e :: _ => {
-// val ns = TimeUtil.milliseconds(s) match {
-// case Some(n) if (n < 0) => n
-// case _ => 0
-// }
-// val ne = TimeUtil.milliseconds(e) match {
-// case Some(n) if (n < 0) => n
-// case _ => 0
-// }
-// (ns, ne)
-// }
-// case _ => (0, 0)
-// }
-//
-// val Database = "database"
-// val database: String = config.getOrElse(Database, "").toString
-// val TableName = "table.name"
-// val tableName: String = config.get(TableName) match {
-// case Some(s: String) if (s.nonEmpty) => s
-// case _ => throw new Exception("invalid table.name!")
-// }
-// val ParentPath = "parent.path"
-// val parentPath: String = config.get(ParentPath) match {
-// case Some(s: String) => s
-// case _ => throw new Exception("invalid parent.path!")
-// }
-// val tablePath = HdfsUtil.getHdfsFilePath(parentPath, tableName)
-//
-// val concreteTableName = if (dbPrefix) s"${database}.${tableName}" else tableName
-//
-// val ReadyTimeInterval = "ready.time.interval"
-// val ReadyTimeDelay = "ready.time.delay"
-// val readyTimeInterval: Long = TimeUtil.milliseconds(config.getOrElse(ReadyTimeInterval, "1m").toString).getOrElse(60000L)
-// val readyTimeDelay: Long = TimeUtil.milliseconds(config.getOrElse(ReadyTimeDelay, "1m").toString).getOrElse(60000L)
-//
-// val TimeStampColumn: String = TimeStampInfo.key
-// val PayloadColumn: String = "payload"
-//
-//// type Schema = (Long, String)
-// val schema: List[(String, String)] = List(
-// (TimeStampColumn, "bigint"),
-// (PayloadColumn, "string")
-// )
-// val schemaName = schema.map(_._1)
-//
-//// type Partition = (Long, Long)
-// val partition: List[(String, String, String)] = List(
-// ("hr", "bigint", "hour"),
-// ("min", "bigint", "min")
-// )
-// val partitionName = partition.map(_._1)
-//
-// private val fieldSep = """|"""
-// private val rowSep = """\n"""
-// private val rowSepLiteral = "\n"
-//
-// private def dbPrefix(): Boolean = {
-// database.nonEmpty && !database.equals("default")
-// }
-//
-// private def tableExists(): Boolean = {
-// Try {
-// if (dbPrefix) {
-// sqlContext.tables(database).filter(tableExistsSql).collect.size
-// } else {
-// sqlContext.tables().filter(tableExistsSql).collect.size
-// }
-// } match {
-// case Success(s) => s > 0
-// case _ => false
-// }
-// }
-//
-// override def init(): Unit = {
-// try {
-// if (tableExists) {
-// // drop exist table
-// val dropSql = s"""DROP TABLE ${concreteTableName}"""
-// sqlContext.sql(dropSql)
-// }
-//
-// val colsSql = schema.map { field =>
-// s"`${field._1}` ${field._2}"
-// }.mkString(", ")
-// val partitionsSql = partition.map { partition =>
-// s"`${partition._1}` ${partition._2}"
-// }.mkString(", ")
-// val sql = s"""CREATE EXTERNAL TABLE IF NOT EXISTS ${concreteTableName}
-// |(${colsSql}) PARTITIONED BY (${partitionsSql})
-// |ROW FORMAT DELIMITED
-// |FIELDS TERMINATED BY '${fieldSep}'
-// |LINES TERMINATED BY '${rowSep}'
-// |STORED AS TEXTFILE
-// |LOCATION '${tablePath}'""".stripMargin
-// sqlContext.sql(sql)
-// } catch {
-// case e: Throwable => throw e
-// }
-// }
-//
-// def available(): Boolean = {
-// true
-// }
-//
-// private def encode(data: Map[String, Any], ms: Long): Option[List[Any]] = {
-// try {
-// Some(schema.map { field =>
-// val (name, _) = field
-// name match {
-// case TimeStampColumn => ms
-// case PayloadColumn => JsonUtil.toJson(data)
-// case _ => null
-// }
-// })
-// } catch {
-// case _ => None
-// }
-// }
-//
-// private def decode(data: List[Any], updateTimeStamp: Boolean): Option[Map[String, Any]] = {
-// val dataMap = schemaName.zip(data).toMap
-// dataMap.get(PayloadColumn) match {
-// case Some(v: String) => {
-// try {
-// val map = JsonUtil.toAnyMap(v)
-// val resMap = if (updateTimeStamp) {
-// dataMap.get(TimeStampColumn) match {
-// case Some(t) => map + (TimeStampColumn -> t)
-// case _ => map
-// }
-// } else map
-// Some(resMap)
-// } catch {
-// case _ => None
-// }
-// }
-// case _ => None
-// }
-// }
-//
-// def saveData(rdd: RDD[Map[String, Any]], ms: Long): Unit = {
-// val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
-// if (newCacheLocked) {
-// try {
-// val ptns = getPartition(ms)
-// val ptnsPath = genPartitionHdfsPath(ptns)
-// val dirPath = s"${tablePath}/${ptnsPath}"
-// val fileName = s"${ms}"
-// val filePath = HdfsUtil.getHdfsFilePath(dirPath, fileName)
-//
-// // encode data
-// val dataRdd: RDD[List[Any]] = rdd.flatMap(encode(_, ms))
-//
-// // save data
-// val recordRdd: RDD[String] = dataRdd.map { dt =>
-// dt.map(_.toString).mkString(fieldSep)
-// }
-//
-// val dumped = if (!recordRdd.isEmpty) {
-// HdfsFileDumpUtil.dump(filePath, recordRdd, rowSepLiteral)
-// } else false
-//
-// // add partition
-// if (dumped) {
-// val sql = addPartitionSql(concreteTableName, ptns)
-// sqlContext.sql(sql)
-// }
-//
-// // submit ms
-// submitCacheTime(ms)
-// submitReadyTime(ms)
-// } catch {
-// case e: Throwable => error(s"save data error: ${e.getMessage}")
-// } finally {
-// newCacheLock.unlock()
-// }
-// }
-// }
-//
-// def readData(): Try[RDD[Map[String, Any]]] = Try {
-// val timeRange = TimeInfoCache.getTimeRange
-// submitLastProcTime(timeRange._2)
-//
-// val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + deltaTimeRange._2)
-// submitCleanTime(reviseTimeRange._1)
-//
-// // read directly through partition info
-// val partitionRange = getPartitionRange(reviseTimeRange._1, reviseTimeRange._2)
-// val sql = selectSql(concreteTableName, partitionRange)
-// val df = sqlContext.sql(sql)
-//
-// // decode data
-// df.flatMap { row =>
-// val dt = schemaName.map { sn =>
-// row.getAs[Any](sn)
-// }
-// decode(dt, true)
-// }
-// }
-//
-// override def cleanOldData(): Unit = {
-// val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
-// if (oldCacheLocked) {
-// try {
-// val cleanTime = readCleanTime()
-// cleanTime match {
-// case Some(ct) => {
-// // drop partition
-// val bound = getPartition(ct)
-// val sql = dropPartitionSql(concreteTableName, bound)
-// sqlContext.sql(sql)
-// }
-// case _ => {
-// // do nothing
-// }
-// }
-// } catch {
-// case e: Throwable => error(s"clean old data error: ${e.getMessage}")
-// } finally {
-// oldCacheLock.unlock()
-// }
-// }
-// }
-//
-// override def updateOldData(t: Long, oldData: Iterable[Map[String, Any]]): Unit = {
-// // parallel process different time groups, lock is unnecessary
-// val ptns = getPartition(t)
-// val ptnsPath = genPartitionHdfsPath(ptns)
-// val dirPath = s"${tablePath}/${ptnsPath}"
-// val fileName = s"${t}"
-// val filePath = HdfsUtil.getHdfsFilePath(dirPath, fileName)
-//
-// try {
-// // remove out time old data
-// HdfsFileDumpUtil.remove(dirPath, fileName, true)
-//
-// // save updated old data
-// if (oldData.size > 0) {
-// val recordDatas = oldData.flatMap { dt =>
-// encode(dt, t)
-// }
-// val records: Iterable[String] = recordDatas.map { dt =>
-// dt.map(_.toString).mkString(fieldSep)
-// }
-// val dumped = HdfsFileDumpUtil.dump(filePath, records, rowSepLiteral)
-// }
-// } catch {
-// case e: Throwable => error(s"update old data error: ${e.getMessage}")
-// }
-// }
-//
-// override protected def genCleanTime(ms: Long): Long = {
-// val minPartition = partition.last
-// val t1 = TimeUtil.timeToUnit(ms, minPartition._3)
-// val t2 = TimeUtil.timeFromUnit(t1, minPartition._3)
-// t2
-// }
-//
-// private def getPartition(ms: Long): List[(String, Any)] = {
-// partition.map { p =>
-// val (name, _, unit) = p
-// val t = TimeUtil.timeToUnit(ms, unit)
-// (name, t)
-// }
-// }
-// private def getPartitionRange(ms1: Long, ms2: Long): List[(String, (Any, Any))] = {
-// partition.map { p =>
-// val (name, _, unit) = p
-// val t1 = TimeUtil.timeToUnit(ms1, unit)
-// val t2 = TimeUtil.timeToUnit(ms2, unit)
-// (name, (t1, t2))
-// }
-// }
-//
-// private def genPartitionHdfsPath(partition: List[(String, Any)]): String = {
-// partition.map(prtn => s"${prtn._1}=${prtn._2}").mkString("/")
-// }
-// private def addPartitionSql(tbn: String, partition: List[(String, Any)]): String = {
-// val partitionSql = partition.map(ptn => (s"`${ptn._1}` = ${ptn._2}")).mkString(", ")
-// val sql = s"""ALTER TABLE ${tbn} ADD IF NOT EXISTS PARTITION (${partitionSql})"""
-// sql
-// }
-// private def selectSql(tbn: String, partitionRange: List[(String, (Any, Any))]): String = {
-// val clause = partitionRange.map { pr =>
-// val (name, (r1, r2)) = pr
-// s"""`${name}` BETWEEN '${r1}' and '${r2}'"""
-// }.mkString(" AND ")
-// val whereClause = if (clause.nonEmpty) s"WHERE ${clause}" else ""
-// val sql = s"""SELECT * FROM ${tbn} ${whereClause}"""
-// sql
-// }
-// private def dropPartitionSql(tbn: String, partition: List[(String, Any)]): String = {
-// val partitionSql = partition.map(ptn => (s"PARTITION ( `${ptn._1}` < '${ptn._2}' ) ")).mkString(", ")
-// val sql = s"""ALTER TABLE ${tbn} DROP ${partitionSql}"""
-// println(sql)
-// sql
-// }
-//
-// private def tableExistsSql(): String = {
-// s"tableName LIKE '${tableName}'"
-// }
-//
-//}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/TextCacheDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/TextCacheDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/TextCacheDataConnector.scala
deleted file mode 100644
index 0daf2d9..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/TextCacheDataConnector.scala
+++ /dev/null
@@ -1,311 +0,0 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements. See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership. The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied. See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//package org.apache.griffin.measure.data.connector.cache
-//
-//import java.util.concurrent.TimeUnit
-//
-//import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache}
-//import org.apache.griffin.measure.config.params.user.DataCacheParam
-//import org.apache.griffin.measure.result.TimeStampInfo
-//import org.apache.griffin.measure.utils.{HdfsFileDumpUtil, HdfsUtil, JsonUtil, TimeUtil}
-//import org.apache.spark.rdd.RDD
-//import org.apache.spark.sql.SQLContext
-//
-//import scala.util.Try
-//
-//case class TextCacheDataConnector(sqlContext: SQLContext, dataCacheParam: DataCacheParam
-// ) extends CacheDataConnector {
-//
-// val config = dataCacheParam.config
-// val InfoPath = "info.path"
-// val cacheInfoPath: String = config.getOrElse(InfoPath, defCacheInfoPath).toString
-//
-// val newCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.new")
-// val oldCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.old")
-//
-// val timeRangeParam: List[String] = if (dataCacheParam.timeRange != null) dataCacheParam.timeRange else Nil
-// val deltaTimeRange: (Long, Long) = (timeRangeParam ::: List("0", "0")) match {
-// case s :: e :: _ => {
-// val ns = TimeUtil.milliseconds(s) match {
-// case Some(n) if (n < 0) => n
-// case _ => 0
-// }
-// val ne = TimeUtil.milliseconds(e) match {
-// case Some(n) if (n < 0) => n
-// case _ => 0
-// }
-// (ns, ne)
-// }
-// case _ => (0, 0)
-// }
-//
-// val FilePath = "file.path"
-// val filePath: String = config.get(FilePath) match {
-// case Some(s: String) => s
-// case _ => throw new Exception("invalid file.path!")
-// }
-//
-// val ReadyTimeInterval = "ready.time.interval"
-// val ReadyTimeDelay = "ready.time.delay"
-// val readyTimeInterval: Long = TimeUtil.milliseconds(config.getOrElse(ReadyTimeInterval, "1m").toString).getOrElse(60000L)
-// val readyTimeDelay: Long = TimeUtil.milliseconds(config.getOrElse(ReadyTimeDelay, "1m").toString).getOrElse(60000L)
-//
-//// val TimeStampColumn: String = TimeStampInfo.key
-//// val PayloadColumn: String = "payload"
-//
-// // cache schema: Long, String
-//// val fields = List[StructField](
-//// StructField(TimeStampColumn, LongType),
-//// StructField(PayloadColumn, StringType)
-//// )
-//// val schema = StructType(fields)
-//
-// // case class CacheData(time: Long, payload: String) {
-// // def getTime(): Long = time
-// // def getPayload(): String = payload
-// // }
-//
-// private val rowSepLiteral = "\n"
-//
-// val partitionUnits: List[String] = List("hour", "min")
-//
-// override def init(): Unit = {
-// // do nothing
-// }
-//
-// def available(): Boolean = {
-// true
-// }
-//
-// private def encode(data: Map[String, Any], ms: Long): Option[String] = {
-// try {
-// val map = data + (TimeStampInfo.key -> ms)
-// Some(JsonUtil.toJson(map))
-// } catch {
-// case _: Throwable => None
-// }
-// }
-//
-// private def decode(data: String): Option[Map[String, Any]] = {
-// try {
-// Some(JsonUtil.toAnyMap(data))
-// } catch {
-// case _: Throwable => None
-// }
-// }
-//
-// def saveData(rdd: RDD[Map[String, Any]], ms: Long): Unit = {
-// val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
-// if (newCacheLocked) {
-// try {
-// val ptns = getPartition(ms)
-// val ptnsPath = genPartitionHdfsPath(ptns)
-// val dirPath = s"${filePath}/${ptnsPath}"
-// val dataFileName = s"${ms}"
-// val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName)
-//
-// // encode data
-// val dataRdd: RDD[String] = rdd.flatMap(encode(_, ms))
-//
-// // save data
-// val dumped = if (!dataRdd.isEmpty) {
-// HdfsFileDumpUtil.dump(dataFilePath, dataRdd, rowSepLiteral)
-// } else false
-//
-// // submit ms
-// submitCacheTime(ms)
-// submitReadyTime(ms)
-// } catch {
-// case e: Throwable => error(s"save data error: ${e.getMessage}")
-// } finally {
-// newCacheLock.unlock()
-// }
-// }
-// }
-//
-// def readData(): Try[RDD[Map[String, Any]]] = Try {
-// val timeRange = TimeInfoCache.getTimeRange
-// submitLastProcTime(timeRange._2)
-//
-// val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + deltaTimeRange._2)
-// submitCleanTime(reviseTimeRange._1)
-//
-// // read directly through partition info
-// val partitionRanges = getPartitionRange(reviseTimeRange._1, reviseTimeRange._2)
-// println(s"read time ranges: ${reviseTimeRange}")
-// println(s"read partition ranges: ${partitionRanges}")
-//
-// // list partition paths
-// val partitionPaths = listPathsBetweenRanges(filePath :: Nil, partitionRanges)
-//
-// if (partitionPaths.isEmpty) {
-// sqlContext.sparkContext.emptyRDD[Map[String, Any]]
-// } else {
-// val filePaths = partitionPaths.mkString(",")
-// val rdd = sqlContext.sparkContext.textFile(filePaths)
-//
-// // decode data
-// rdd.flatMap { row =>
-// decode(row)
-// }
-// }
-// }
-//
-// override def cleanOldData(): Unit = {
-// val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
-// if (oldCacheLocked) {
-// try {
-// val cleanTime = readCleanTime()
-// cleanTime match {
-// case Some(ct) => {
-// // drop partitions
-// val bounds = getPartition(ct)
-//
-// // list partition paths
-// val earlierPaths = listPathsEarlierThanBounds(filePath :: Nil, bounds)
-//
-// // delete out time data path
-// earlierPaths.foreach { path =>
-// println(s"delete hdfs path: ${path}")
-// HdfsUtil.deleteHdfsPath(path)
-// }
-// }
-// case _ => {
-// // do nothing
-// }
-// }
-// } catch {
-// case e: Throwable => error(s"clean old data error: ${e.getMessage}")
-// } finally {
-// oldCacheLock.unlock()
-// }
-// }
-// }
-//
-// override def updateOldData(t: Long, oldData: Iterable[Map[String, Any]]): Unit = {
-// // parallel process different time groups, lock is unnecessary
-// val ptns = getPartition(t)
-// val ptnsPath = genPartitionHdfsPath(ptns)
-// val dirPath = s"${filePath}/${ptnsPath}"
-// val dataFileName = s"${t}"
-// val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName)
-//
-// try {
-// // remove out time old data
-// HdfsFileDumpUtil.remove(dirPath, dataFileName, true)
-//
-// // save updated old data
-// if (oldData.size > 0) {
-// val recordDatas = oldData.flatMap { dt =>
-// encode(dt, t)
-// }
-// val dumped = HdfsFileDumpUtil.dump(dataFilePath, recordDatas, rowSepLiteral)
-// }
-// } catch {
-// case e: Throwable => error(s"update old data error: ${e.getMessage}")
-// }
-// }
-//
-// override protected def genCleanTime(ms: Long): Long = {
-// val minPartitionUnit = partitionUnits.last
-// val t1 = TimeUtil.timeToUnit(ms, minPartitionUnit)
-// val t2 = TimeUtil.timeFromUnit(t1, minPartitionUnit)
-// t2
-// }
-//
-// private def getPartition(ms: Long): List[Long] = {
-// partitionUnits.map { unit =>
-// TimeUtil.timeToUnit(ms, unit)
-// }
-// }
-// private def getPartitionRange(ms1: Long, ms2: Long): List[(Long, Long)] = {
-// partitionUnits.map { unit =>
-// val t1 = TimeUtil.timeToUnit(ms1, unit)
-// val t2 = TimeUtil.timeToUnit(ms2, unit)
-// (t1, t2)
-// }
-// }
-//
-// private def genPartitionHdfsPath(partition: List[Long]): String = {
-// partition.map(prtn => s"${prtn}").mkString("/")
-// }
-//
-// private def str2Long(str: String): Option[Long] = {
-// try {
-// Some(str.toLong)
-// } catch {
-// case e: Throwable => None
-// }
-// }
-//
-// // here the range means [min, max], but the best range should be (min, max]
-// private def listPathsBetweenRanges(paths: List[String],
-// partitionRanges: List[(Long, Long)]
-// ): List[String] = {
-// partitionRanges match {
-// case Nil => paths
-// case head :: tail => {
-// val (lb, ub) = head
-// val curPaths = paths.flatMap { path =>
-// val names = HdfsUtil.listSubPaths(path, "dir").toList
-// names.filter { name =>
-// str2Long(name) match {
-// case Some(t) => (t >= lb) && (t <= ub)
-// case _ => false
-// }
-// }.map(HdfsUtil.getHdfsFilePath(path, _))
-// }
-// listPathsBetweenRanges(curPaths, tail)
-// }
-// }
-// }
-//
-// private def listPathsEarlierThanBounds(paths: List[String], bounds: List[Long]
-// ): List[String] = {
-// bounds match {
-// case Nil => paths
-// case head :: tail => {
-// val earlierPaths = paths.flatMap { path =>
-// val names = HdfsUtil.listSubPaths(path, "dir").toList
-// names.filter { name =>
-// str2Long(name) match {
-// case Some(t) => (t < head)
-// case _ => false
-// }
-// }.map(HdfsUtil.getHdfsFilePath(path, _))
-// }
-// val equalPaths = paths.flatMap { path =>
-// val names = HdfsUtil.listSubPaths(path, "dir").toList
-// names.filter { name =>
-// str2Long(name) match {
-// case Some(t) => (t == head)
-// case _ => false
-// }
-// }.map(HdfsUtil.getHdfsFilePath(path, _))
-// }
-//
-// tail match {
-// case Nil => earlierPaths
-// case _ => earlierPaths ::: listPathsEarlierThanBounds(equalPaths, tail)
-// }
-// }
-// }
-// }
-//
-//}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/main/scala/org/apache/griffin/measure/persist/OldHttpPersist.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/OldHttpPersist.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/OldHttpPersist.scala
deleted file mode 100644
index 84316b3..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/persist/OldHttpPersist.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements. See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership. The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied. See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//package org.apache.griffin.measure.persist
-//
-//import org.apache.griffin.measure.result._
-//import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil}
-//import org.apache.spark.rdd.RDD
-//
-//// persist result by old http way -- temporary way
-//case class OldHttpPersist(config: Map[String, Any], metricName: String, timeStamp: Long) extends Persist {
-//
-// val Api = "api"
-// val Method = "method"
-//
-// val api = config.getOrElse(Api, "").toString
-// val method = config.getOrElse(Method, "post").toString
-//
-// def available(): Boolean = {
-// api.nonEmpty
-// }
-//
-// def start(msg: String): Unit = {}
-// def finish(): Unit = {}
-//
-// def result(rt: Long, result: Result): Unit = {
-// result match {
-// case ar: AccuracyResult => {
-// val matchPercentage: Double = if (ar.getTotal <= 0) 0 else (ar.getMatch * 1.0 / ar.getTotal) * 100
-// val dataMap = Map[String, Any](("metricName" -> metricName), ("timestamp" -> timeStamp), ("value" -> matchPercentage), ("count" -> ar.getTotal))
-// httpResult(dataMap)
-// }
-// case pr: ProfileResult => {
-// val dataMap = Map[String, Any](("metricName" -> metricName), ("timestamp" -> timeStamp), ("value" -> pr.getMatch), ("count" -> pr.getTotal))
-// httpResult(dataMap)
-// }
-// case _ => {
-// info(s"result: ${result}")
-// }
-// }
-// }
-//
-// private def httpResult(dataMap: Map[String, Any]) = {
-// try {
-// val data = JsonUtil.toJson(dataMap)
-// // post
-// val params = Map[String, Object]()
-// val header = Map[String, Object](("content-type" -> "application/json"))
-//
-// def func(): Boolean = {
-// HttpUtil.httpRequest(api, method, params, header, data)
-// }
-//
-// PersistThreadPool.addTask(func _, 10)
-//
-//// val status = HttpUtil.httpRequest(api, method, params, header, data)
-//// info(s"${method} to ${api} response status: ${status}")
-// } catch {
-// case e: Throwable => error(e.getMessage)
-// }
-//
-// }
-//
-// def records(recs: RDD[String], tp: String): Unit = {}
-// def records(recs: Iterable[String], tp: String): Unit = {}
-//
-//// def missRecords(records: RDD[String]): Unit = {}
-//// def matchRecords(records: RDD[String]): Unit = {}
-//
-// def log(rt: Long, msg: String): Unit = {}
-//
-//}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/main/scala/org/apache/griffin/measure/process/Algo.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/Algo.scala b/measure/src/main/scala/org/apache/griffin/measure/process/Algo.scala
deleted file mode 100644
index 7f1b153..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/process/Algo.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements. See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership. The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied. See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//package org.apache.griffin.measure.algo
-//
-//import org.apache.griffin.measure.config.params.env._
-//import org.apache.griffin.measure.config.params.user._
-//import org.apache.griffin.measure.log.Loggable
-//
-//import scala.util.Try
-//
-//trait Algo extends Loggable with Serializable {
-//
-// val envParam: EnvParam
-// val userParam: UserParam
-//
-// def run(): Try[_]
-//
-//}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/test/resources/test-data.jsonFile
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/test-data.jsonFile b/measure/src/test/resources/test-data.jsonFile
deleted file mode 100644
index 73707f4..0000000
--- a/measure/src/test/resources/test-data.jsonFile
+++ /dev/null
@@ -1,3 +0,0 @@
-{ "name": "emily", "age": 5, "map": { "a": 1, "b": 2 }, "list": [ { "c": 1, "d": 2 }, { "c": 3, "d": 4 } ], "t": [1, 2, 3] }
-{ "name": "white", "age": 15, "map": { "a": 11, "b": 12 }, "list": [ { "c": 11, "d": 2 }, { "c": 23, "d": 4 } ], "t": [1, 2, 3] }
-{ "name": "west", "age": 25, "map": { "a": 21, "b": 22 }, "list": [ { "c": 11, "d": 2 }, { "c": 23, "d": 4 } ], "t": [1, 2, 3] }
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/test/resources/test-data1.jsonFile
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/test-data1.jsonFile b/measure/src/test/resources/test-data1.jsonFile
deleted file mode 100644
index 1e1f28a..0000000
--- a/measure/src/test/resources/test-data1.jsonFile
+++ /dev/null
@@ -1,31 +0,0 @@
-[{
- "Year": "2013",
- "First Name": "DAVID",
- "County": "KINGS",
- "Sex": "M",
- "Count": "272"
-}, {
- "Year": "2013",
- "First Name": "JAYDEN",
- "County": "KINGS",
- "Sex": "M",
- "Count": "268"
-}, {
- "Year": "2013",
- "First Name": "JAYDEN",
- "County": "QUEENS",
- "Sex": "M",
- "Count": "219"
-}, {
- "Year": "2013",
- "First Name": "MOSHE",
- "County": "KINGS",
- "Sex": "M",
- "Count": "219"
-}, {
- "Year": "2013",
- "First Name": "ETHAN",
- "County": "QUEENS",
- "Sex": "M",
- "Count": "216"
-}]
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/test/scala/org/apache/griffin/measure/cache/InfoCacheInstanceTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/cache/InfoCacheInstanceTest.scala b/measure/src/test/scala/org/apache/griffin/measure/cache/InfoCacheInstanceTest.scala
deleted file mode 100644
index fc42d43..0000000
--- a/measure/src/test/scala/org/apache/griffin/measure/cache/InfoCacheInstanceTest.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements. See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership. The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied. See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//package org.apache.griffin.measure.cache
-//
-//import java.util.Date
-//import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit}
-//
-//import org.apache.curator.framework.recipes.locks.InterProcessMutex
-//import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
-//import org.apache.curator.retry.ExponentialBackoffRetry
-//import org.apache.griffin.measure.cache.info.InfoCacheInstance
-//import org.apache.griffin.measure.config.params.env.InfoCacheParam
-//import org.junit.runner.RunWith
-//import org.scalatest.junit.JUnitRunner
-//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-//
-//import scala.util.{Failure, Try}
-//
-//@RunWith(classOf[JUnitRunner])
-//class InfoCacheInstanceTest extends FunSuite with Matchers with BeforeAndAfter {
-//
-// val map = Map[String, Any](
-// ("hosts" -> "localhost:2181"),
-// ("namespace" -> "griffin/infocache"),
-// ("lock.path" -> "lock"),
-// ("mode" -> "persist"),
-// ("init.clear" -> true),
-// ("close.clear" -> false)
-// )
-// val name = "ttt"
-//
-// val icp = InfoCacheParam("zk", map)
-// val icps = icp :: Nil
-//
-// before {
-// InfoCacheInstance.initInstance(icps, name)
-// InfoCacheInstance.init
-// }
-//
-// test ("others") {
-// InfoCacheInstance.available should be (true)
-//
-// val keys = List[String](
-// "key1", "key2"
-// )
-// val info = Map[String, String](
-// ("key1" -> "value1"),
-// ("key2" -> "value2")
-// )
-//
-// InfoCacheInstance.cacheInfo(info) should be (true)
-// InfoCacheInstance.readInfo(keys) should be (info)
-// InfoCacheInstance.deleteInfo(keys)
-//// InfoCacheInstance.readInfo(keys) should be (Map[String, String]())
-//
-// }
-//
-// after {
-// InfoCacheInstance.close()
-// }
-//
-//}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/test/scala/org/apache/griffin/measure/cache/ZKCacheLockTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/cache/ZKCacheLockTest.scala b/measure/src/test/scala/org/apache/griffin/measure/cache/ZKCacheLockTest.scala
deleted file mode 100644
index 271529c..0000000
--- a/measure/src/test/scala/org/apache/griffin/measure/cache/ZKCacheLockTest.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements. See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership. The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied. See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//package org.apache.griffin.measure.cache
-//
-//import java.util.Date
-//import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit}
-//
-//import org.apache.curator.framework.recipes.locks.InterProcessMutex
-//import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
-//import org.apache.curator.retry.ExponentialBackoffRetry
-//import org.apache.griffin.measure.cache.info.ZKInfoCache
-//import org.junit.runner.RunWith
-//import org.scalatest.junit.JUnitRunner
-//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-//
-//import scala.util.{Failure, Try}
-//
-//@RunWith(classOf[JUnitRunner])
-//class ZKCacheLockTest extends FunSuite with Matchers with BeforeAndAfter {
-//
-// val map = Map[String, Any](
-// ("hosts" -> "localhost:2181"),
-// ("namespace" -> "griffin/infocache"),
-// ("lock.path" -> "lock"),
-// ("mode" -> "persist"),
-// ("init.clear" -> true),
-// ("close.clear" -> false)
-// )
-// val name = "ttt"
-//
-// val ic = ZKInfoCache(map, name)
-//
-// before {
-// ic.init
-// }
-//
-// test ("lock") {
-//
-// case class Proc(n: Int) extends Runnable {
-// override def run(): Unit = {
-// val cl = ic.genLock("proc")
-// val b = cl.lock(2, TimeUnit.SECONDS)
-// try {
-// println(s"${n}: ${b}")
-// if (b) Thread.sleep(3000)
-// } finally {
-// cl.unlock()
-// }
-// }
-// }
-//
-// val pool = Executors.newFixedThreadPool(5).asInstanceOf[ThreadPoolExecutor]
-// val t = 0 until 10
-// t.foreach(a => pool.submit(Proc(a)))
-//
-// pool.shutdown()
-// val t1 = new Date()
-// println(s"${t1}: pool shut down")
-// pool.awaitTermination(20, TimeUnit.SECONDS)
-// val t2 = new Date()
-// println(s"${t2}: pool shut down done [${t2.getTime - t1.getTime}]")
-// }
-//
-// after {
-// ic.close()
-// }
-//
-//}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/test/scala/org/apache/griffin/measure/cache/ZKInfoCacheTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/cache/ZKInfoCacheTest.scala b/measure/src/test/scala/org/apache/griffin/measure/cache/ZKInfoCacheTest.scala
deleted file mode 100644
index 086170a..0000000
--- a/measure/src/test/scala/org/apache/griffin/measure/cache/ZKInfoCacheTest.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements. See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership. The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied. See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//package org.apache.griffin.measure.cache
-//
-//import java.util.Date
-//import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit}
-//
-//import org.apache.curator.framework.recipes.locks.InterProcessMutex
-//import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
-//import org.apache.curator.retry.ExponentialBackoffRetry
-//import org.apache.griffin.measure.cache.info.ZKInfoCache
-//import org.junit.runner.RunWith
-//import org.scalatest.junit.JUnitRunner
-//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-//
-//import scala.util.{Failure, Try}
-//
-//@RunWith(classOf[JUnitRunner])
-//class ZKInfoCacheTest extends FunSuite with Matchers with BeforeAndAfter {
-//
-// val map = Map[String, Any](
-// ("hosts" -> "localhost:2181"),
-// ("namespace" -> "griffin/infocache"),
-// ("lock.path" -> "lock"),
-// ("mode" -> "persist"),
-// ("init.clear" -> true),
-// ("close.clear" -> false)
-// )
-// val name = "ttt"
-//
-// test ("available") {
-// val ic = ZKInfoCache(map, name)
-// ic.init
-//
-// ic.available should be (true)
-//
-// ic.close
-// }
-//
-// test ("cacheInfo and readInfo") {
-// val ic = ZKInfoCache(map, name)
-// ic.init
-//
-// val keys = List[String](
-// "key1", "key2"
-// )
-// val info = Map[String, String](
-// ("key1" -> "value1"),
-// ("key2" -> "value2")
-// )
-//
-// ic.cacheInfo(info) should be (true)
-// ic.readInfo(keys) should be (info)
-// ic.deleteInfo(keys)
-// ic.readInfo(keys) should be (Map[String, String]())
-//
-// ic.close
-// }
-//
-// test ("genLock") {
-// val ic = ZKInfoCache(map, name)
-// ic.init
-//
-// val lock1 = ic.genLock("ttt")
-// val lock2 = ic.genLock("ttt")
-// lock1.lock(5, TimeUnit.SECONDS)
-// lock2.lock(5, TimeUnit.SECONDS)
-// lock1.unlock
-// lock2.unlock
-//
-// ic.close
-// }
-//
-//}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/test/scala/org/apache/griffin/measure/process/BatchProcessTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/process/BatchProcessTest.scala b/measure/src/test/scala/org/apache/griffin/measure/process/BatchProcessTest.scala
deleted file mode 100644
index 845a051..0000000
--- a/measure/src/test/scala/org/apache/griffin/measure/process/BatchProcessTest.scala
+++ /dev/null
@@ -1,146 +0,0 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements. See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership. The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied. See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//package org.apache.griffin.measure.process
-//
-//import org.apache.griffin.measure.config.params.env._
-//import org.apache.griffin.measure.config.params.user._
-//import org.apache.griffin.measure.config.params._
-//import org.apache.griffin.measure.config.reader.ParamReaderFactory
-//import org.apache.griffin.measure.config.validator.AllParamValidator
-//import org.apache.griffin.measure.log.Loggable
-//import org.apache.griffin.measure.persist.PersistThreadPool
-//import org.junit.runner.RunWith
-//import org.scalatest.junit.JUnitRunner
-//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-//
-//import scala.util.{Failure, Success, Try}
-//
-//@RunWith(classOf[JUnitRunner])
-//class BatchProcessTest extends FunSuite with Matchers with BeforeAndAfter with Loggable {
-//
-// val envFile = "src/test/resources/env-test.json"
-// val confFile = "src/test/resources/config-test-profiling.json"
-//// val confFile = "src/test/resources/config-test-accuracy.json"
-//
-// val envFsType = "local"
-// val userFsType = "local"
-//
-// val args = Array(envFile, confFile)
-//
-// var allParam: AllParam = _
-//
-// before {
-// // read param files
-// val envParam = readParamFile[EnvParam](envFile, envFsType) match {
-// case Success(p) => p
-// case Failure(ex) => {
-// error(ex.getMessage)
-// sys.exit(-2)
-// }
-// }
-// val userParam = readParamFile[UserParam](confFile, userFsType) match {
-// case Success(p) => p
-// case Failure(ex) => {
-// error(ex.getMessage)
-// sys.exit(-2)
-// }
-// }
-// allParam = AllParam(envParam, userParam)
-//
-// // validate param files
-// validateParams(allParam) match {
-// case Failure(ex) => {
-// error(ex.getMessage)
-// sys.exit(-3)
-// }
-// case _ => {
-// info("params validation pass")
-// }
-// }
-// }
-//
-// test ("batch process") {
-// val procType = ProcessType(allParam.userParam.procType)
-// val proc: DqProcess = procType match {
-// case BatchProcessType => BatchDqProcess(allParam)
-// case StreamingProcessType => StreamingDqProcess(allParam)
-// case _ => {
-// error(s"${procType} is unsupported process type!")
-// sys.exit(-4)
-// }
-// }
-//
-// // process init
-// proc.init match {
-// case Success(_) => {
-// info("process init success")
-// }
-// case Failure(ex) => {
-// error(s"process init error: ${ex.getMessage}")
-// shutdown
-// sys.exit(-5)
-// }
-// }
-//
-// // process run
-// proc.run match {
-// case Success(_) => {
-// info("process run success")
-// }
-// case Failure(ex) => {
-// error(s"process run error: ${ex.getMessage}")
-//
-// if (proc.retriable) {
-// throw ex
-// } else {
-// shutdown
-// sys.exit(-5)
-// }
-// }
-// }
-//
-// // process end
-// proc.end match {
-// case Success(_) => {
-// info("process end success")
-// }
-// case Failure(ex) => {
-// error(s"process end error: ${ex.getMessage}")
-// shutdown
-// sys.exit(-5)
-// }
-// }
-//
-// shutdown
-// }
-//
-// private def readParamFile[T <: Param](file: String, fsType: String)(implicit m : Manifest[T]): Try[T] = {
-// val paramReader = ParamReaderFactory.getParamReader(file, fsType)
-// paramReader.readConfig[T]
-// }
-//
-// private def validateParams(allParam: AllParam): Try[Boolean] = {
-// val allParamValidator = AllParamValidator()
-// allParamValidator.validate(allParam)
-// }
-//
-// private def shutdown(): Unit = {
-// PersistThreadPool.shutdown
-// }
-//}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/test/scala/org/apache/griffin/measure/process/JsonParseTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/process/JsonParseTest.scala b/measure/src/test/scala/org/apache/griffin/measure/process/JsonParseTest.scala
deleted file mode 100644
index 1273bcf..0000000
--- a/measure/src/test/scala/org/apache/griffin/measure/process/JsonParseTest.scala
+++ /dev/null
@@ -1,531 +0,0 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements. See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership. The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied. See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//package org.apache.griffin.measure.process
-//
-//import org.apache.griffin.measure.config.params._
-//import org.apache.griffin.measure.config.params.env._
-//import org.apache.griffin.measure.config.params.user._
-//import org.apache.griffin.measure.config.reader.ParamReaderFactory
-//import org.apache.griffin.measure.config.validator.AllParamValidator
-//import org.apache.griffin.measure.log.Loggable
-//import org.apache.griffin.measure.persist.PersistThreadPool
-//import org.apache.griffin.measure.process.engine.DataFrameOprs
-//import org.apache.griffin.measure.utils.{HdfsUtil, JsonUtil}
-//import org.apache.hadoop.hive.ql.exec.UDF
-//import org.apache.spark.{SparkConf, SparkContext}
-//import org.apache.spark.sql._
-//import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
-//import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
-//import org.apache.spark.sql.hive.HiveContext
-//import org.apache.spark.sql.types._
-//import org.junit.runner.RunWith
-//import org.scalatest.junit.JUnitRunner
-//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-//
-//import scala.collection.mutable.WrappedArray
-//import scala.util.{Failure, Success, Try}
-//
-//@RunWith(classOf[JUnitRunner])
-//class JsonParseTest extends FunSuite with Matchers with BeforeAndAfter with Loggable {
-//
-// var sparkContext: SparkContext = _
-// var sqlContext: SQLContext = _
-//
-// before {
-// val conf = new SparkConf().setAppName("test json").setMaster("local[*]")
-// sparkContext = new SparkContext(conf)
-// sparkContext.setLogLevel("WARN")
-//// sqlContext = new HiveContext(sparkContext)
-// sqlContext = new SQLContext(sparkContext)
-// }
-//
-// test ("json test") {
-// // 0. prepare data
-//// val dt =
-//// """
-//// |{"name": "s1", "age": 12, "items": [1, 2, 3],
-//// |"subs": [{"id": 1, "type": "seed"}, {"id": 2, "type": "frog"}],
-//// |"inner": {"a": 1, "b": 2}, "jstr": "{\"s1\": \"aaa\", \"s2\": 123}"
-//// |}""".stripMargin
-//// val rdd0 = sparkContext.parallelize(Seq(dt)).map(Row(_))
-// val rdd0 = sparkContext.textFile("src/test/resources/input.msg").map(Row(_))
-//
-// val vtp = StructField("value", StringType)
-// val df0 = sqlContext.createDataFrame(rdd0, StructType(Array(vtp)))
-// df0.registerTempTable("src")
-//
-//// val fromJson2Array = (s: String) => {
-//// JsonUtil.fromJson[Seq[String]](s)
-//// }
-//// sqlContext.udf.register("from_json_to_array", fromJson2Array)
-////
-//// val df2 = sqlContext.sql("SELECT explode(from_json_to_array(get_json_object(value, '$.seeds'))) as value FROM src")
-//// df2.printSchema
-//// df2.show(10)
-//// df2.registerTempTable("df2")
-//
-//
-//
-// // 1. read from json string to extracted json row
-//// val readSql = "SELECT value FROM src"
-//// val df = sqlContext.sql(readSql)
-//// val df = sqlContext.table("src")
-//// val rdd = df.map { row =>
-//// row.getAs[String]("value")
-//// }
-//// val df1 = sqlContext.read.json(rdd)
-//// df1.printSchema
-//// df1.show(10)
-//// df1.registerTempTable("df1")
-// val details = Map[String, Any](("df.name" -> "src"))
-// val df1 = DataFrameOprs.fromJson(sqlContext, details)
-// df1.registerTempTable("df1")
-//
-// // 2. extract json array into lines
-//// val rdd2 = df1.flatMap { row =>
-//// row.getAs[WrappedArray[String]]("seeds")
-//// }
-//// val df2 = sqlContext.read.json(rdd2)
-// val df2 = sqlContext.sql("select explode(seeds) as value from df1")
-//// val tdf = sqlContext.sql("select name, age, explode(items) as item from df1")
-//// tdf.registerTempTable("tdf")
-//// val df2 = sqlContext.sql("select struct(name, age, item) as ttt from tdf")
-// df2.printSchema
-// df2.show(10)
-// df2.registerTempTable("df2")
-// println(df2.count)
-//
-// val sql1 = "SELECT value FROM df2"
-// val df22 = sqlContext.sql(sql1)
-// val rdd22 = df22.map { row =>
-// row.getAs[String]("value")
-// }
-// import org.apache.spark.sql.functions._
-// val df23 = sqlContext.read.json(rdd22)
-// df23.registerTempTable("df23")
-//// df23.withColumn("par", monotonicallyIncreasingId)
-//
-// val df24 = sqlContext.sql("SELECT url, cast(get_json_object(metadata, '$.tracker.crawlRequestCreateTS') as bigint) as ts FROM df23")
-// df24.printSchema
-// df24.show(10)
-// df24.registerTempTable("df24")
-// println(df24.count)
-//
-//// val df25 = sqlContext.sql("select ")
-//
-////
-//// // 3. extract json string into row
-////// val df3 = sqlContext.sql("select cast(get_json_object(metadata, '$.tracker.crawlRequestCreateTS') as bigint), url from df2")
-//// val df3 = sqlContext.sql("select cast(get_json_object(get_json_object(value, '$.metadata'), '$.tracker.crawlRequestCreateTS') as bigint), get_json_object(value, '$.url') from df2")
-//// df3.printSchema()
-//// df3.show(10)
-//// println(df3.count)
-//
-//
-//
-//// val df5 = sqlContext.sql("select get_json_object(value, '$.subs') as subs from src")
-//// df5.printSchema()
-//// df5.show(10)
-//// df5.registerTempTable("df5")
-//// val rdd5 = df5.map { row =>
-//// row.getAs[String]("subs")
-//// }
-//// val df6 = sqlContext.read.json(rdd5)
-//// df6.printSchema
-//// df6.show(10)
-//
-// // 2. extract json string to row
-//// val df2 = sqlContext.sql("select jstr from df1")
-//// val rdd2 = df2.map { row =>
-//// row.getAs[String]("jstr")
-//// }
-//// val df22 = sqlContext.read.json(rdd2)
-//// df22.printSchema
-//// df22.show(100)
-//// df22.registerTempTable("df2")
-////
-//// val df23 = sqlContext.sql("select json_tuple(jstr, 's1', 's2') from df1")
-//// df23.printSchema()
-//// df23.show(100)
-//
-// // 3. extract json array into lines ??
-//
-// // 3. flatmap from json row to json row
-//// val df3 = sqlContext.sql("select explode(subs) as sub, items from df1")
-//// df3.printSchema()
-//// df3.show(10)
-//// df3.registerTempTable("df3")
-////
-//// val df4 = sqlContext.sql("select explode(items) as item, sub from df3")
-//// df4.printSchema()
-//// df4.show(10)
-//
-//// sqlContext.udf.register("length", (s: WrappedArray[_]) => s.length)
-// //
-// // val df2 = sqlContext.sql("SELECT inner from df1")
-// // df2.registerTempTable("df2")
-// // df2.printSchema
-// // df2.show(100)
-//
-//// def children(colname: String, df: DataFrame): Array[DataFrame] = {
-//// val parent = df.schema.fields.filter(_.name == colname).head
-//// println(parent)
-//// val fields: Array[StructField] = parent.dataType match {
-//// case x: StructType => x.fields
-//// case _ => Array.empty[StructField]
-//// }
-//// fields.map(x => col(s"$colname.${x.name}"))
-////// fields.foreach(println)
-//// }
-//////
-//// children("inner", df2)
-////
-//// df2.select(children("bar", df): _*).printSchema
-//
-//// val df3 = sqlContext.sql("select inline(subs) from df1")
-//// df3.printSchema()
-//// df3.show(100)
-//
-//// val rdd2 = df2.flatMap { row =>
-//// row.getAs[GenericRowWithSchema]("inner") :: Nil
-//// }
-////
-//// rdd2.
-//
-//// val funcs = sqlContext.sql("show functions")
-//// funcs.printSchema()
-//// funcs.show(1000)
-////
-//// val desc = sqlContext.sql("describe function inline")
-//// desc.printSchema()
-//// desc.show(100)
-//
-// //
-//
-// }
-//
-// test ("json test 2") {
-// val rdd0 = sparkContext.textFile("src/test/resources/output.msg").map(Row(_))
-//
-// val vtp = StructField("value", StringType)
-// val df0 = sqlContext.createDataFrame(rdd0, StructType(Array(vtp)))
-// df0.registerTempTable("tgt")
-//
-//// val fromJson2StringArray = (s: String) => {
-//// val seq = JsonUtil.fromJson[Seq[Any]](s)
-//// seq.map(i => JsonUtil.toJson(i))
-//// }
-//// sqlContext.udf.register("from_json_to_string_array", fromJson2StringArray)
-////
-//// val df2 = sqlContext.sql("SELECT from_json_to_string_array(get_json_object(value, '$.groups[0].attrsList')) as value FROM tgt")
-//// df2.printSchema()
-//// df2.show(10)
-//// df2.registerTempTable("df2")
-////
-//// val indexOfStringArray = (sa: String, )
-//
-//
-// // 1. read from json string to extracted json row
-// val readSql = "SELECT value FROM tgt"
-// val df = sqlContext.sql(readSql)
-// val rdd = df.map { row =>
-// row.getAs[String]("value")
-// }
-// val df1 = sqlContext.read.json(rdd)
-// df1.printSchema
-// df1.show(10)
-// df1.registerTempTable("df1")
-//
-//
-// val df2 = sqlContext.sql("select groups[0].attrsList as attrs from df1")
-// df2.printSchema
-// df2.show(10)
-// df2.registerTempTable("df2")
-// println(df2.count)
-//
-// val indexOf = (arr: Seq[String], v: String) => {
-// arr.indexOf(v)
-// }
-// sqlContext.udf.register("index_of", indexOf)
-//
-// val df3 = sqlContext.sql("select attrs.values[index_of(attrs.name, 'URL')][0] as url, cast(get_json_object(attrs.values[index_of(attrs.name, 'CRAWLMETADATA')][0], '$.tracker.crawlRequestCreateTS') as bigint) as ts from df2")
-// df3.printSchema()
-// df3.show(10)
-// df3.registerTempTable("df3")
-// }
-//
-// test ("testing") {
-// val dt =
-// """
-// |{"name": "age", "age": 12, "items": [1, 2, 3],
-// |"subs": [{"id": 1, "type": "seed"}, {"id": 2, "type": "frog"}],
-// |"inner": {"a": 1, "b": 2}, "jstr": "{\"s1\": \"aaa\", \"s2\": 123}", "b": true
-// |}""".stripMargin
-// val rdd = sparkContext.parallelize(Seq(dt)).map(Row(_))
-// val vtp = StructField("value", StringType)
-// val df = sqlContext.createDataFrame(rdd, StructType(Array(vtp)))
-// df.registerTempTable("df")
-//
-// val df1 = sqlContext.read.json(sqlContext.sql("select * from df").map(r => r.getAs[String]("value")))
-// df1.printSchema()
-// df1.show(10)
-// df1.registerTempTable("df1")
-//
-// val test = (s: String) => {
-// s.toInt
-// }
-// sqlContext.udf.register("to_int", test)
-//
-// val df2 = sqlContext.sql("select (b) as aa, inner.a from df1 where age = to_int(\"12\")")
-// df2.printSchema()
-// df2.show(10)
-// }
-//
-// test ("test input only sql") {
-// val rdd0 = sparkContext.textFile("src/test/resources/input.msg").map(Row(_))
-//
-// val vtp = StructField("value", StringType)
-// val df0 = sqlContext.createDataFrame(rdd0, StructType(Array(vtp)))
-// df0.registerTempTable("src")
-// df0.show(10)
-//
-// // 1. read from json string to extracted json row
-// val df1 = sqlContext.sql("SELECT get_json_object(value, '$.seeds') as seeds FROM src")
-// df1.printSchema
-// df1.show(10)
-// df1.registerTempTable("df1")
-//
-// val json2StringArray: (String) => Seq[String] = (s: String) => {
-// val seq = JsonUtil.fromJson[Seq[String]](s)
-//// seq.map(i => JsonUtil.toJson(i))
-// seq
-// }
-// sqlContext.udf.register("json_to_string_array", json2StringArray)
-//
-// val df2 = sqlContext.sql("SELECT explode(json_to_string_array(seeds)) as seed FROM df1")
-// df2.printSchema
-// df2.show(10)
-// df2.registerTempTable("df2")
-//
-//
-// val df3 = sqlContext.sql("SELECT get_json_object(seed, '$.url') as url, cast(get_json_object(get_json_object(seed, '$.metadata'), '$.tracker.crawlRequestCreateTS') as bigint) as ts FROM df2")
-// df3.printSchema
-// df3.show(10)
-// }
-//
-// test ("test output only sql") {
-// val rdd0 = sparkContext.textFile("src/test/resources/output.msg").map(Row(_))
-//
-// val vtp = StructField("value", StringType)
-// val df0 = sqlContext.createDataFrame(rdd0, StructType(Array(vtp)))
-// df0.registerTempTable("tgt")
-// df0.printSchema()
-// df0.show(10)
-//
-// val json2StringArray: (String) => Seq[String] = (s: String) => {
-// JsonUtil.fromJson[Seq[String]](s)
-// }
-// sqlContext.udf.register("json_to_string_array", json2StringArray)
-//
-// val json2StringJsonArray: (String) => Seq[String] = (s: String) => {
-// val seq = JsonUtil.fromJson[Seq[Any]](s)
-// seq.map(i => JsonUtil.toJson(i))
-// }
-// sqlContext.udf.register("json_to_string_json_array", json2StringJsonArray)
-//
-// val indexOf = (arr: Seq[String], v: String) => {
-// arr.indexOf(v)
-// }
-// sqlContext.udf.register("index_of", indexOf)
-//
-// val indexOfField = (arr: Seq[String], k: String, v: String) => {
-// val seq = arr.flatMap { item =>
-// JsonUtil.fromJson[Map[String, Any]](item).get(k)
-// }
-// seq.indexOf(v)
-// }
-// sqlContext.udf.register("index_of_field", indexOfField)
-//
-// // 1. read from json string to extracted json row
-// val df1 = sqlContext.sql("SELECT get_json_object(value, '$.groups[0].attrsList') as attrs FROM tgt")
-// df1.printSchema
-// df1.show(10)
-// df1.registerTempTable("df1")
-//
-// val df2 = sqlContext.sql("SELECT json_to_string_json_array(attrs) as attrs FROM df1")
-// df2.printSchema()
-// df2.show(10)
-// df2.registerTempTable("df2")
-//
-// val df3 = sqlContext.sql("SELECT attrs[index_of_field(attrs, 'name', 'URL')] as attr1, attrs[index_of_field(attrs, 'name', 'CRAWLMETADATA')] as attr2 FROM df2")
-// df3.printSchema()
-// df3.show(10)
-// df3.registerTempTable("df3")
-//
-// val df4 = sqlContext.sql("SELECT json_to_string_array(get_json_object(attr1, '$.values'))[0], cast(get_json_object(json_to_string_array(get_json_object(attr2, '$.values'))[0], '$.tracker.crawlRequestCreateTS') as bigint) FROM df3")
-// df4.printSchema()
-// df4.show(10)
-// }
-//
-// test ("test from json") {
-// val fromJson2Map = (str: String) => {
-// val a = JsonUtil.fromJson[Map[String, Any]](str)
-// a.mapValues { v =>
-// v match {
-// case t: String => t
-// case _ => JsonUtil.toJson(v)
-// }
-// }
-// }
-// sqlContext.udf.register("from_json_to_map", fromJson2Map)
-//
-// val fromJson2Array = (str: String) => {
-// val a = JsonUtil.fromJson[Seq[Any]](str)
-// a.map { v =>
-// v match {
-// case t: String => t
-// case _ => JsonUtil.toJson(v)
-// }
-// }
-// }
-// sqlContext.udf.register("from_json_to_array", fromJson2Array)
-//
-// // ========================
-//
-// val srdd = sparkContext.textFile("src/test/resources/input.msg").map(Row(_))
-// val svtp = StructField("value", StringType)
-// val sdf0 = sqlContext.createDataFrame(srdd, StructType(Array(svtp)))
-// sdf0.registerTempTable("sdf0")
-// sdf0.show(10)
-//
-// // 1. read from json string to extracted json row
-// val sdf1 = sqlContext.sql("SELECT explode(from_json_to_array(get_json_object(value, '$.seeds'))) as seed FROM sdf0")
-// sdf1.printSchema
-// sdf1.show(10)
-// sdf1.registerTempTable("sdf1")
-//
-// val sdf2 = sqlContext.sql("SELECT get_json_object(seed, '$.url') as url, cast(get_json_object(get_json_object(seed, '$.metadata'), '$.tracker.crawlRequestCreateTS') as bigint) as ts FROM sdf1")
-// sdf2.printSchema
-// sdf2.show(10)
-//
-// // ---------------------------------------
-//
-// val trdd = sparkContext.textFile("src/test/resources/output.msg").map(Row(_))
-// val tvtp = StructField("value", StringType)
-// val tdf0 = sqlContext.createDataFrame(trdd, StructType(Array(tvtp)))
-// tdf0.registerTempTable("tdf0")
-// tdf0.printSchema()
-// tdf0.show(10)
-//
-//// val json2StringArray: (String) => Seq[String] = (s: String) => {
-//// JsonUtil.fromJson[Seq[String]](s)
-//// }
-//// sqlContext.udf.register("json_to_string_array", json2StringArray)
-////
-//// val json2StringJsonArray: (String) => Seq[String] = (s: String) => {
-//// val seq = JsonUtil.fromJson[Seq[Any]](s)
-//// seq.map(i => JsonUtil.toJson(i))
-//// }
-//// sqlContext.udf.register("json_to_string_json_array", json2StringJsonArray)
-////
-//// val indexOf = (arr: Seq[String], v: String) => {
-//// arr.indexOf(v)
-//// }
-//// sqlContext.udf.register("index_of", indexOf)
-////
-// val indexOfField = (arr: Seq[String], k: String, v: String) => {
-// val seq = arr.flatMap { item =>
-// JsonUtil.fromJson[Map[String, Any]](item).get(k)
-// }
-// seq.indexOf(v)
-// }
-// sqlContext.udf.register("index_of_field", indexOfField)
-//
-// // 1. read from json string to extracted json row
-//// val df1 = sqlContext.sql("SELECT get_json_object(value, '$.groups[0].attrsList') as attrs FROM tdf0")
-// val tdf1 = sqlContext.sql("SELECT from_json_to_array(get_json_object(value, '$.groups[0].attrsList')) as attrs FROM tdf0")
-// tdf1.printSchema
-// tdf1.show(10)
-// tdf1.registerTempTable("tdf1")
-//
-//// val tdf2 = sqlContext.sql("SELECT attrs[index_of_field(attrs, 'name', 'URL')] as attr1, attrs[index_of_field(attrs, 'name', 'CRAWLMETADATA')] as attr2 FROM tdf1")
-//// tdf2.printSchema()
-//// tdf2.show(10)
-//// tdf2.registerTempTable("tdf2")
-//
-// val tdf3 = sqlContext.sql("SELECT from_json_to_array(get_json_object(attrs[index_of_field(attrs, 'name', 'URL')], '$.values'))[0] as url, cast(get_json_object(from_json_to_array(get_json_object(attrs[index_of_field(attrs, 'name', 'CRAWLMETADATA')], '$.values'))[0], '$.tracker.crawlRequestCreateTS') as bigint) as ts FROM tdf1")
-// tdf3.printSchema()
-// tdf3.show(10)
-// }
-//
-// test ("sql functions") {
-// val functions = sqlContext.sql("show functions")
-// functions.printSchema()
-// functions.show(10)
-//
-// val functionNames = functions.map(_.getString(0)).collect
-// functionNames.foreach(println)
-// }
-//
-// test ("test text file read") {
-// val partitionPaths = Seq[String](
-// "hdfs://localhost/griffin/streaming/dump/source/418010/25080625/1504837518000",
-// "hdfs://localhost/griffin/streaming/dump/target/418010/25080625/1504837518000")
-// val df = sqlContext.read.json(partitionPaths: _*)
-// df.printSchema()
-// df.show(10)
-// }
-//
-// test ("list paths") {
-// val filePath = "hdfs://localhost/griffin/streaming/dump/source"
-// val partitionRanges = List[(Long, Long)]((0, 0), (-2, 0))
-// val partitionPaths = listPathsBetweenRanges(filePath :: Nil, partitionRanges)
-// println(partitionPaths)
-// }
-//
-// private def listPathsBetweenRanges(paths: List[String],
-// partitionRanges: List[(Long, Long)]
-// ): List[String] = {
-// partitionRanges match {
-// case Nil => paths
-// case head :: tail => {
-// val (lb, ub) = head
-// val curPaths = paths.flatMap { path =>
-// val names = HdfsUtil.listSubPathsByType(path, "dir").toList
-// println(names)
-// names.filter { name =>
-// str2Long(name) match {
-// case Some(t) => (t >= lb) && (t <= ub)
-// case _ => false
-// }
-// }.map(HdfsUtil.getHdfsFilePath(path, _))
-// }
-// listPathsBetweenRanges(curPaths, tail)
-// }
-// }
-// }
-//
-// private def str2Long(str: String): Option[Long] = {
-// try {
-// Some(str.toLong)
-// } catch {
-// case e: Throwable => None
-// }
-// }
-//}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1f984da1/measure/src/test/scala/org/apache/griffin/measure/process/JsonToStructs.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/process/JsonToStructs.scala b/measure/src/test/scala/org/apache/griffin/measure/process/JsonToStructs.scala
deleted file mode 100644
index 394917c..0000000
--- a/measure/src/test/scala/org/apache/griffin/measure/process/JsonToStructs.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-package org.apache.griffin.measure.process
-
-import org.apache.griffin.measure.utils.JsonUtil
-import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
-import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData}
-import org.apache.spark.sql.execution.datasources.json.JSONOptions
-import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.UTF8String
-
-
-case class JsonToStructs(
-// schema: DataType,
-// options: Map[String, String],
- child: Expression)
- extends UnaryExpression with CodegenFallback with ExpectsInputTypes {
- override def nullable: Boolean = true
-
-// def this(schema: DataType, options: Map[String, String], child: Expression) =
-// this(schema, options, child, None)
-
- // Used in `FunctionRegistry`
-// def this(child: Expression, schema: Expression) =
-// this(
-// schema = JsonExprUtils.validateSchemaLiteral(schema),
-// options = Map.empty[String, String],
-// child = child,
-// timeZoneId = None)
-//
-// def this(child: Expression, schema: Expression, options: Expression) =
-// this(
-// schema = JsonExprUtils.validateSchemaLiteral(schema),
-// options = JsonExprUtils.convertToMapData(options),
-// child = child,
-// timeZoneId = None)
-//
-// override def checkInputDataTypes(): TypeCheckResult = schema match {
-// case _: StructType | ArrayType(_: StructType, _) =>
-// super.checkInputDataTypes()
-// case _ => TypeCheckResult.TypeCheckFailure(
-// s"Input schema ${schema.simpleString} must be a struct or an array of structs.")
-// }
-
- override def dataType: DataType = MapType(StringType, StringType)
-
-// override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
-// copy(timeZoneId = Option(timeZoneId))
-
- override def nullSafeEval(json: Any): Any = {
- if (json.toString.trim.isEmpty) return null
-
- try {
- JsonUtil.fromJson[Map[String, Any]](json.toString)
- } catch {
- case _: Throwable => null
- }
- }
-
- override def inputTypes: Seq[DataType] = StringType :: Nil
-}
-//
-//object JsonExprUtils {
-//
-// def validateSchemaLiteral(exp: Expression): StructType = exp match {
-// case Literal(s, StringType) => CatalystSqlParser.parseTableSchema(s.toString)
-// case e => throw new AnalysisException(s"Expected a string literal instead of $e")
-// }
-//
-// def convertToMapData(exp: Expression): Map[String, String] = exp match {
-// case m: CreateMap
-// if m.dataType.acceptsType(MapType(StringType, StringType, valueContainsNull = false)) =>
-// val arrayMap = m.eval().asInstanceOf[ArrayBasedMapData]
-// ArrayBasedMapData.toScalaMap(arrayMap).map { case (key, value) =>
-// key.toString -> value.toString
-// }
-// case m: CreateMap =>
-// throw new AnalysisException(
-// s"A type of keys and values in map() must be string, but got ${m.dataType}")
-// case _ =>
-// throw new AnalysisException("Must use a map() function for options")
-// }
-//}
\ No newline at end of file