You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2017/09/30 08:35:25 UTC

[10/11] incubator-griffin git commit: Dsl modify

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/config/params/user/EvaluateRuleParam.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/EvaluateRuleParam.scala b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/EvaluateRuleParam.scala
index 6ee9783..2abf3e5 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/EvaluateRuleParam.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/EvaluateRuleParam.scala
@@ -23,8 +23,8 @@ import com.fasterxml.jackson.annotation.JsonInclude.Include
 import org.apache.griffin.measure.config.params.Param
 
 @JsonInclude(Include.NON_NULL)
-case class EvaluateRuleParam( @JsonProperty("sampleRatio") sampleRatio: Double,
-                              @JsonProperty("rules") rules: String
+case class EvaluateRuleParam( @JsonProperty("dsl.type") dslType: String,
+                              @JsonProperty("rules") rules: List[Map[String, Any]]
                             ) extends Param {
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/config/params/user/UserParam.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/UserParam.scala b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/UserParam.scala
index df0647c..e55d2b4 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/UserParam.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/UserParam.scala
@@ -23,12 +23,10 @@ import com.fasterxml.jackson.annotation.JsonInclude.Include
 import org.apache.griffin.measure.config.params.Param
 
 @JsonInclude(Include.NON_NULL)
-case class UserParam(@JsonProperty("name") name: String,
-                     @JsonProperty("type") dqType: String,
-                     @JsonProperty("process.type") procType: String,
-                     @JsonProperty("source") sourceParam: DataConnectorParam,
-                     @JsonProperty("target") targetParam: DataConnectorParam,
-                     @JsonProperty("evaluateRule") evaluateRuleParam: EvaluateRuleParam
+case class UserParam( @JsonProperty("name") name: String,
+                      @JsonProperty("process.type") procType: String,
+                      @JsonProperty("data.sources") dataSources: List[DataSourceParam],
+                      @JsonProperty("evaluateRule") evaluateRuleParam: EvaluateRuleParam
                     ) extends Param {
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/connector/DataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/DataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/DataConnector.scala
deleted file mode 100644
index 1fb1868..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/connector/DataConnector.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.connector
-
-import org.apache.griffin.measure.log.Loggable
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.DataFrame
-
-
-trait DataConnector extends Loggable with Serializable {
-
-  def available(): Boolean
-
-  def init(): Unit
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/connector/DataConnectorFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/DataConnectorFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/DataConnectorFactory.scala
deleted file mode 100644
index 670175d..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/connector/DataConnectorFactory.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.connector
-
-import kafka.serializer.StringDecoder
-import org.apache.griffin.measure.config.params.user._
-import org.apache.griffin.measure.connector.cache._
-import org.apache.griffin.measure.connector.direct._
-import org.apache.griffin.measure.connector.streaming._
-import org.apache.griffin.measure.rule.RuleExprs
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.dstream.InputDStream
-import org.apache.spark.streaming.kafka.KafkaUtils
-
-import scala.reflect.ClassTag
-import scala.util.Try
-
-object DataConnectorFactory {
-
-  val HiveRegex = """^(?i)hive$""".r
-  val AvroRegex = """^(?i)avro$""".r
-
-  val KafkaRegex = """^(?i)kafka$""".r
-
-  val TextRegex = """^(?i)text$""".r
-
-  def getDirectDataConnector(sqlContext: SQLContext,
-                             ssc: StreamingContext,
-                             dataConnectorParam: DataConnectorParam,
-                             ruleExprs: RuleExprs,
-                             globalFinalCacheMap: Map[String, Any]
-                            ): Try[DirectDataConnector] = {
-    val conType = dataConnectorParam.conType
-    val version = dataConnectorParam.version
-    val config = dataConnectorParam.config
-    Try {
-      conType match {
-        case HiveRegex() => HiveDirectDataConnector(sqlContext, config, ruleExprs, globalFinalCacheMap)
-        case AvroRegex() => AvroDirectDataConnector(sqlContext, config, ruleExprs, globalFinalCacheMap)
-        case KafkaRegex() => {
-          val ksdcTry = getStreamingDataConnector(ssc, dataConnectorParam)
-          val cdcTry = getCacheDataConnector(sqlContext, dataConnectorParam.cache)
-          KafkaCacheDirectDataConnector(ksdcTry, cdcTry, dataConnectorParam, ruleExprs, globalFinalCacheMap)
-        }
-        case _ => throw new Exception("connector creation error!")
-      }
-    }
-  }
-
-  private def getStreamingDataConnector(ssc: StreamingContext,
-                                        dataConnectorParam: DataConnectorParam
-                                       ): Try[StreamingDataConnector] = {
-    val conType = dataConnectorParam.conType
-    val version = dataConnectorParam.version
-    val config = dataConnectorParam.config
-    Try {
-      conType match {
-        case KafkaRegex() => {
-          genKafkaDataConnector(ssc, config)
-        }
-        case _ => throw new Exception("streaming connector creation error!")
-      }
-    }
-  }
-
-  private def getCacheDataConnector(sqlContext: SQLContext,
-                                    dataCacheParam: DataCacheParam
-                                   ): Try[CacheDataConnector] = {
-    if (dataCacheParam == null) {
-      throw new Exception("invalid data cache param!")
-    }
-    val cacheType = dataCacheParam.cacheType
-    Try {
-      cacheType match {
-        case HiveRegex() => HiveCacheDataConnector(sqlContext, dataCacheParam)
-        case TextRegex() => TextCacheDataConnector(sqlContext, dataCacheParam)
-        case _ => throw new Exception("cache connector creation error!")
-      }
-    }
-  }
-
-  private def genKafkaDataConnector(ssc: StreamingContext, config: Map[String, Any]) = {
-    val KeyType = "key.type"
-    val ValueType = "value.type"
-    val keyType = config.getOrElse(KeyType, "java.lang.String").toString
-    val valueType = config.getOrElse(ValueType, "java.lang.String").toString
-//    val KafkaConfig = "kafka.config"
-//    val Topics = "topics"
-//    val kafkaConfig = config.get(KafkaConfig) match {
-//      case Some(map: Map[String, Any]) => map.mapValues(_.toString).map(identity)
-//      case _ => Map[String, String]()
-//    }
-//    val topics = config.getOrElse(Topics, "").toString
-    (getClassTag(keyType), getClassTag(valueType)) match {
-      case (ClassTag(k: Class[String]), ClassTag(v: Class[String])) => {
-        if (ssc == null) throw new Exception("streaming context is null!  ")
-        new KafkaStreamingDataConnector(ssc, config) {
-          type K = String
-          type KD = StringDecoder
-          type V = String
-          type VD = StringDecoder
-          def createDStream(topicSet: Set[String]): InputDStream[(K, V)] = {
-            KafkaUtils.createDirectStream[K, V, KD, VD](ssc, kafkaConfig, topicSet)
-          }
-        }
-      }
-      case _ => {
-        throw new Exception("not supported type kafka data connector")
-      }
-    }
-  }
-
-  private def getClassTag(tp: String): ClassTag[_] = {
-    try {
-      val clazz = Class.forName(tp)
-      ClassTag(clazz)
-    } catch {
-      case e: Throwable => throw e
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/connector/cache/CacheDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/cache/CacheDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/cache/CacheDataConnector.scala
deleted file mode 100644
index 1dfe8e2..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/connector/cache/CacheDataConnector.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.connector.cache
-
-import org.apache.griffin.measure.connector.DataConnector
-import org.apache.spark.rdd.RDD
-
-import scala.util.Try
-
-trait CacheDataConnector extends DataConnector with DataCacheable with DataUpdatable {
-
-  def saveData(rdd: RDD[Map[String, Any]], ms: Long): Unit
-
-  def readData(): Try[RDD[Map[String, Any]]]
-
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/connector/cache/DataCacheable.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/cache/DataCacheable.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/cache/DataCacheable.scala
deleted file mode 100644
index 2be87a6..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/connector/cache/DataCacheable.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.connector.cache
-
-import java.util.concurrent.atomic.AtomicLong
-
-import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache}
-
-trait DataCacheable {
-
-  protected val defCacheInfoPath = PathCounter.genPath
-
-  val cacheInfoPath: String
-  val readyTimeInterval: Long
-  val readyTimeDelay: Long
-
-  def selfCacheInfoPath = s"${TimeInfoCache.infoPath}/${cacheInfoPath}"
-
-  def selfCacheTime = TimeInfoCache.cacheTime(selfCacheInfoPath)
-  def selfLastProcTime = TimeInfoCache.lastProcTime(selfCacheInfoPath)
-  def selfReadyTime = TimeInfoCache.readyTime(selfCacheInfoPath)
-  def selfCleanTime = TimeInfoCache.cleanTime(selfCacheInfoPath)
-
-  protected def submitCacheTime(ms: Long): Unit = {
-    val map = Map[String, String]((selfCacheTime -> ms.toString))
-    InfoCacheInstance.cacheInfo(map)
-  }
-
-  protected def submitReadyTime(ms: Long): Unit = {
-    val curReadyTime = ms - readyTimeDelay
-    if (curReadyTime % readyTimeInterval == 0) {
-      val map = Map[String, String]((selfReadyTime -> curReadyTime.toString))
-      InfoCacheInstance.cacheInfo(map)
-    }
-  }
-
-  protected def submitLastProcTime(ms: Long): Unit = {
-    val map = Map[String, String]((selfLastProcTime -> ms.toString))
-    InfoCacheInstance.cacheInfo(map)
-  }
-
-  protected def submitCleanTime(ms: Long): Unit = {
-    val cleanTime = genCleanTime(ms)
-    val map = Map[String, String]((selfCleanTime -> cleanTime.toString))
-    InfoCacheInstance.cacheInfo(map)
-  }
-
-  protected def genCleanTime(ms: Long): Long = ms
-
-  protected def readCleanTime(): Option[Long] = {
-    val key = selfCleanTime
-    val keys = key :: Nil
-    InfoCacheInstance.readInfo(keys).get(key).flatMap { v =>
-      try {
-        Some(v.toLong)
-      } catch {
-        case _ => None
-      }
-    }
-  }
-
-}
-
-object PathCounter {
-  private val counter: AtomicLong = new AtomicLong(0L)
-  def genPath(): String = s"path_${increment}"
-  private def increment(): Long = {
-    counter.incrementAndGet()
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/connector/cache/DataUpdatable.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/cache/DataUpdatable.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/cache/DataUpdatable.scala
deleted file mode 100644
index 07c8187..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/connector/cache/DataUpdatable.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.connector.cache
-
-import org.apache.spark.rdd.RDD
-
-trait DataUpdatable {
-
-  def cleanOldData(): Unit = {}
-
-  def updateOldData(t: Long, oldData: Iterable[Map[String, Any]]): Unit = {}
-  def updateAllOldData(oldRdd: RDD[Map[String, Any]]): Unit = {}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/connector/cache/HiveCacheDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/cache/HiveCacheDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/cache/HiveCacheDataConnector.scala
deleted file mode 100644
index e241188..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/connector/cache/HiveCacheDataConnector.scala
+++ /dev/null
@@ -1,351 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.connector.cache
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache}
-import org.apache.griffin.measure.config.params.user.DataCacheParam
-import org.apache.griffin.measure.result.TimeStampInfo
-import org.apache.griffin.measure.utils.{HdfsFileDumpUtil, HdfsUtil, JsonUtil, TimeUtil}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.hive.HiveContext
-
-import scala.util.{Success, Try}
-
-case class HiveCacheDataConnector(sqlContext: SQLContext, dataCacheParam: DataCacheParam
-                                 ) extends CacheDataConnector {
-
-  if (!sqlContext.isInstanceOf[HiveContext]) {
-    throw new Exception("hive context not prepared!")
-  }
-
-  val config = dataCacheParam.config
-  val InfoPath = "info.path"
-  val cacheInfoPath: String = config.getOrElse(InfoPath, defCacheInfoPath).toString
-
-  val newCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.new")
-  val oldCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.old")
-
-  val timeRangeParam: List[String] = if (dataCacheParam.timeRange != null) dataCacheParam.timeRange else Nil
-  val deltaTimeRange: (Long, Long) = (timeRangeParam ::: List("0", "0")) match {
-    case s :: e :: _ => {
-      val ns = TimeUtil.milliseconds(s) match {
-        case Some(n) if (n < 0) => n
-        case _ => 0
-      }
-      val ne = TimeUtil.milliseconds(e) match {
-        case Some(n) if (n < 0) => n
-        case _ => 0
-      }
-      (ns, ne)
-    }
-    case _ => (0, 0)
-  }
-
-  val Database = "database"
-  val database: String = config.getOrElse(Database, "").toString
-  val TableName = "table.name"
-  val tableName: String = config.get(TableName) match {
-    case Some(s: String) if (s.nonEmpty) => s
-    case _ => throw new Exception("invalid table.name!")
-  }
-  val ParentPath = "parent.path"
-  val parentPath: String = config.get(ParentPath) match {
-    case Some(s: String) => s
-    case _ => throw new Exception("invalid parent.path!")
-  }
-  val tablePath = HdfsUtil.getHdfsFilePath(parentPath, tableName)
-
-  val concreteTableName = if (dbPrefix) s"${database}.${tableName}" else tableName
-
-  val ReadyTimeInterval = "ready.time.interval"
-  val ReadyTimeDelay = "ready.time.delay"
-  val readyTimeInterval: Long = TimeUtil.milliseconds(config.getOrElse(ReadyTimeInterval, "1m").toString).getOrElse(60000L)
-  val readyTimeDelay: Long = TimeUtil.milliseconds(config.getOrElse(ReadyTimeDelay, "1m").toString).getOrElse(60000L)
-
-  val TimeStampColumn: String = TimeStampInfo.key
-  val PayloadColumn: String = "payload"
-
-//  type Schema = (Long, String)
-  val schema: List[(String, String)] = List(
-    (TimeStampColumn, "bigint"),
-    (PayloadColumn, "string")
-  )
-  val schemaName = schema.map(_._1)
-
-//  type Partition = (Long, Long)
-  val partition: List[(String, String, String)] = List(
-    ("hr", "bigint", "hour"),
-    ("min", "bigint", "min")
-  )
-  val partitionName = partition.map(_._1)
-
-  private val fieldSep = """|"""
-  private val rowSep = """\n"""
-  private val rowSepLiteral = "\n"
-
-  private def dbPrefix(): Boolean = {
-    database.nonEmpty && !database.equals("default")
-  }
-
-  private def tableExists(): Boolean = {
-    Try {
-      if (dbPrefix) {
-        sqlContext.tables(database).filter(tableExistsSql).collect.size
-      } else {
-        sqlContext.tables().filter(tableExistsSql).collect.size
-      }
-    } match {
-      case Success(s) => s > 0
-      case _ => false
-    }
-  }
-
-  override def init(): Unit = {
-    try {
-      if (tableExists) {
-        // drop exist table
-        val dropSql = s"""DROP TABLE ${concreteTableName}"""
-        sqlContext.sql(dropSql)
-      }
-
-      val colsSql = schema.map { field =>
-        s"`${field._1}` ${field._2}"
-      }.mkString(", ")
-      val partitionsSql = partition.map { partition =>
-        s"`${partition._1}` ${partition._2}"
-      }.mkString(", ")
-      val sql = s"""CREATE EXTERNAL TABLE IF NOT EXISTS ${concreteTableName}
-                    |(${colsSql}) PARTITIONED BY (${partitionsSql})
-                    |ROW FORMAT DELIMITED
-                    |FIELDS TERMINATED BY '${fieldSep}'
-                    |LINES TERMINATED BY '${rowSep}'
-                    |STORED AS TEXTFILE
-                    |LOCATION '${tablePath}'""".stripMargin
-      sqlContext.sql(sql)
-    } catch {
-      case e: Throwable => throw e
-    }
-  }
-
-  def available(): Boolean = {
-    true
-  }
-
-  private def encode(data: Map[String, Any], ms: Long): Option[List[Any]] = {
-    try {
-      Some(schema.map { field =>
-        val (name, _) = field
-        name match {
-          case TimeStampColumn => ms
-          case PayloadColumn => JsonUtil.toJson(data)
-          case _ => null
-        }
-      })
-    } catch {
-      case _ => None
-    }
-  }
-
-  private def decode(data: List[Any], updateTimeStamp: Boolean): Option[Map[String, Any]] = {
-    val dataMap = schemaName.zip(data).toMap
-    dataMap.get(PayloadColumn) match {
-      case Some(v: String) => {
-        try {
-          val map = JsonUtil.toAnyMap(v)
-          val resMap = if (updateTimeStamp) {
-            dataMap.get(TimeStampColumn) match {
-              case Some(t) => map + (TimeStampColumn -> t)
-              case _ => map
-            }
-          } else map
-          Some(resMap)
-        } catch {
-          case _ => None
-        }
-      }
-      case _ => None
-    }
-  }
-
-  def saveData(rdd: RDD[Map[String, Any]], ms: Long): Unit = {
-    val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
-    if (newCacheLocked) {
-      try {
-        val ptns = getPartition(ms)
-        val ptnsPath = genPartitionHdfsPath(ptns)
-        val dirPath = s"${tablePath}/${ptnsPath}"
-        val fileName = s"${ms}"
-        val filePath = HdfsUtil.getHdfsFilePath(dirPath, fileName)
-
-        // encode data
-        val dataRdd: RDD[List[Any]] = rdd.flatMap(encode(_, ms))
-
-        // save data
-        val recordRdd: RDD[String] = dataRdd.map { dt =>
-          dt.map(_.toString).mkString(fieldSep)
-        }
-
-        val dumped = if (!recordRdd.isEmpty) {
-          HdfsFileDumpUtil.dump(filePath, recordRdd, rowSepLiteral)
-        } else false
-
-        // add partition
-        if (dumped) {
-          val sql = addPartitionSql(concreteTableName, ptns)
-          sqlContext.sql(sql)
-        }
-
-        // submit ms
-        submitCacheTime(ms)
-        submitReadyTime(ms)
-      } catch {
-        case e: Throwable => error(s"save data error: ${e.getMessage}")
-      } finally {
-        newCacheLock.unlock()
-      }
-    }
-  }
-
-  def readData(): Try[RDD[Map[String, Any]]] = Try {
-    val timeRange = TimeInfoCache.getTimeRange
-    submitLastProcTime(timeRange._2)
-
-    val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + deltaTimeRange._2)
-    submitCleanTime(reviseTimeRange._1)
-
-    // read directly through partition info
-    val partitionRange = getPartitionRange(reviseTimeRange._1, reviseTimeRange._2)
-    val sql = selectSql(concreteTableName, partitionRange)
-    val df = sqlContext.sql(sql)
-
-    // decode data
-    df.flatMap { row =>
-      val dt = schemaName.map { sn =>
-        row.getAs[Any](sn)
-      }
-      decode(dt, true)
-    }
-  }
-
-  override def cleanOldData(): Unit = {
-    val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
-    if (oldCacheLocked) {
-      try {
-        val cleanTime = readCleanTime()
-        cleanTime match {
-          case Some(ct) => {
-            // drop partition
-            val bound = getPartition(ct)
-            val sql = dropPartitionSql(concreteTableName, bound)
-            sqlContext.sql(sql)
-          }
-          case _ => {
-            // do nothing
-          }
-        }
-      } catch {
-        case e: Throwable => error(s"clean old data error: ${e.getMessage}")
-      } finally {
-        oldCacheLock.unlock()
-      }
-    }
-  }
-
-  override def updateOldData(t: Long, oldData: Iterable[Map[String, Any]]): Unit = {
-    // parallel process different time groups, lock is unnecessary
-    val ptns = getPartition(t)
-    val ptnsPath = genPartitionHdfsPath(ptns)
-    val dirPath = s"${tablePath}/${ptnsPath}"
-    val fileName = s"${t}"
-    val filePath = HdfsUtil.getHdfsFilePath(dirPath, fileName)
-
-    try {
-      // remove out time old data
-      HdfsFileDumpUtil.remove(dirPath, fileName, true)
-
-      // save updated old data
-      if (oldData.size > 0) {
-        val recordDatas = oldData.flatMap { dt =>
-          encode(dt, t)
-        }
-        val records: Iterable[String] = recordDatas.map { dt =>
-          dt.map(_.toString).mkString(fieldSep)
-        }
-        val dumped = HdfsFileDumpUtil.dump(filePath, records, rowSepLiteral)
-      }
-    } catch {
-      case e: Throwable => error(s"update old data error: ${e.getMessage}")
-    }
-  }
-
-  override protected def genCleanTime(ms: Long): Long = {
-    val minPartition = partition.last
-    val t1 = TimeUtil.timeToUnit(ms, minPartition._3)
-    val t2 = TimeUtil.timeFromUnit(t1, minPartition._3)
-    t2
-  }
-
-  private def getPartition(ms: Long): List[(String, Any)] = {
-    partition.map { p =>
-      val (name, _, unit) = p
-      val t = TimeUtil.timeToUnit(ms, unit)
-      (name, t)
-    }
-  }
-  private def getPartitionRange(ms1: Long, ms2: Long): List[(String, (Any, Any))] = {
-    partition.map { p =>
-      val (name, _, unit) = p
-      val t1 = TimeUtil.timeToUnit(ms1, unit)
-      val t2 = TimeUtil.timeToUnit(ms2, unit)
-      (name, (t1, t2))
-    }
-  }
-
-  private def genPartitionHdfsPath(partition: List[(String, Any)]): String = {
-    partition.map(prtn => s"${prtn._1}=${prtn._2}").mkString("/")
-  }
-  private def addPartitionSql(tbn: String, partition: List[(String, Any)]): String = {
-    val partitionSql = partition.map(ptn => (s"`${ptn._1}` = ${ptn._2}")).mkString(", ")
-    val sql = s"""ALTER TABLE ${tbn} ADD IF NOT EXISTS PARTITION (${partitionSql})"""
-    sql
-  }
-  private def selectSql(tbn: String, partitionRange: List[(String, (Any, Any))]): String = {
-    val clause = partitionRange.map { pr =>
-      val (name, (r1, r2)) = pr
-      s"""`${name}` BETWEEN '${r1}' and '${r2}'"""
-    }.mkString(" AND ")
-    val whereClause = if (clause.nonEmpty) s"WHERE ${clause}" else ""
-    val sql = s"""SELECT * FROM ${tbn} ${whereClause}"""
-    sql
-  }
-  private def dropPartitionSql(tbn: String, partition: List[(String, Any)]): String = {
-    val partitionSql = partition.map(ptn => (s"PARTITION ( `${ptn._1}` < '${ptn._2}' ) ")).mkString(", ")
-    val sql = s"""ALTER TABLE ${tbn} DROP ${partitionSql}"""
-    println(sql)
-    sql
-  }
-
-  private def tableExistsSql(): String = {
-    s"tableName LIKE '${tableName}'"
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/connector/cache/TextCacheDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/cache/TextCacheDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/cache/TextCacheDataConnector.scala
deleted file mode 100644
index 62b6086..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/connector/cache/TextCacheDataConnector.scala
+++ /dev/null
@@ -1,311 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.connector.cache
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache}
-import org.apache.griffin.measure.config.params.user.DataCacheParam
-import org.apache.griffin.measure.result.TimeStampInfo
-import org.apache.griffin.measure.utils.{HdfsFileDumpUtil, HdfsUtil, JsonUtil, TimeUtil}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SQLContext
-
-import scala.util.Try
-
-case class TextCacheDataConnector(sqlContext: SQLContext, dataCacheParam: DataCacheParam
-                                 ) extends CacheDataConnector {
-
-  val config = dataCacheParam.config
-  val InfoPath = "info.path"
-  val cacheInfoPath: String = config.getOrElse(InfoPath, defCacheInfoPath).toString
-
-  val newCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.new")
-  val oldCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.old")
-
-  val timeRangeParam: List[String] = if (dataCacheParam.timeRange != null) dataCacheParam.timeRange else Nil
-  val deltaTimeRange: (Long, Long) = (timeRangeParam ::: List("0", "0")) match {
-    case s :: e :: _ => {
-      val ns = TimeUtil.milliseconds(s) match {
-        case Some(n) if (n < 0) => n
-        case _ => 0
-      }
-      val ne = TimeUtil.milliseconds(e) match {
-        case Some(n) if (n < 0) => n
-        case _ => 0
-      }
-      (ns, ne)
-    }
-    case _ => (0, 0)
-  }
-
-  val FilePath = "file.path"
-  val filePath: String = config.get(FilePath) match {
-    case Some(s: String) => s
-    case _ => throw new Exception("invalid file.path!")
-  }
-
-  val ReadyTimeInterval = "ready.time.interval"
-  val ReadyTimeDelay = "ready.time.delay"
-  val readyTimeInterval: Long = TimeUtil.milliseconds(config.getOrElse(ReadyTimeInterval, "1m").toString).getOrElse(60000L)
-  val readyTimeDelay: Long = TimeUtil.milliseconds(config.getOrElse(ReadyTimeDelay, "1m").toString).getOrElse(60000L)
-
-//  val TimeStampColumn: String = TimeStampInfo.key
-//  val PayloadColumn: String = "payload"
-
-  // cache schema: Long, String
-//  val fields = List[StructField](
-//    StructField(TimeStampColumn, LongType),
-//    StructField(PayloadColumn, StringType)
-//  )
-//  val schema = StructType(fields)
-
-  //  case class CacheData(time: Long, payload: String) {
-  //    def getTime(): Long = time
-  //    def getPayload(): String = payload
-  //  }
-
-  private val rowSepLiteral = "\n"
-
-  val partitionUnits: List[String] = List("hour", "min")
-
-  override def init(): Unit = {
-    // do nothing
-  }
-
-  def available(): Boolean = {
-    true
-  }
-
-  private def encode(data: Map[String, Any], ms: Long): Option[String] = {
-    try {
-      val map = data + (TimeStampInfo.key -> ms)
-      Some(JsonUtil.toJson(map))
-    } catch {
-      case _: Throwable => None
-    }
-  }
-
-  private def decode(data: String): Option[Map[String, Any]] = {
-    try {
-      Some(JsonUtil.toAnyMap(data))
-    } catch {
-      case _: Throwable => None
-    }
-  }
-
-  def saveData(rdd: RDD[Map[String, Any]], ms: Long): Unit = {
-    val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
-    if (newCacheLocked) {
-      try {
-        val ptns = getPartition(ms)
-        val ptnsPath = genPartitionHdfsPath(ptns)
-        val dirPath = s"${filePath}/${ptnsPath}"
-        val dataFileName = s"${ms}"
-        val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName)
-
-        // encode data
-        val dataRdd: RDD[String] = rdd.flatMap(encode(_, ms))
-
-        // save data
-        val dumped = if (!dataRdd.isEmpty) {
-          HdfsFileDumpUtil.dump(dataFilePath, dataRdd, rowSepLiteral)
-        } else false
-
-        // submit ms
-        submitCacheTime(ms)
-        submitReadyTime(ms)
-      } catch {
-        case e: Throwable => error(s"save data error: ${e.getMessage}")
-      } finally {
-        newCacheLock.unlock()
-      }
-    }
-  }
-
-  def readData(): Try[RDD[Map[String, Any]]] = Try {
-    val timeRange = TimeInfoCache.getTimeRange
-    submitLastProcTime(timeRange._2)
-
-    val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + deltaTimeRange._2)
-    submitCleanTime(reviseTimeRange._1)
-
-    // read directly through partition info
-    val partitionRanges = getPartitionRange(reviseTimeRange._1, reviseTimeRange._2)
-    println(s"read time ranges: ${reviseTimeRange}")
-    println(s"read partition ranges: ${partitionRanges}")
-
-    // list partition paths
-    val partitionPaths = listPathsBetweenRanges(filePath :: Nil, partitionRanges)
-
-    if (partitionPaths.isEmpty) {
-      sqlContext.sparkContext.emptyRDD[Map[String, Any]]
-    } else {
-      val filePaths = partitionPaths.mkString(",")
-      val rdd = sqlContext.sparkContext.textFile(filePaths)
-
-      // decode data
-      rdd.flatMap { row =>
-        decode(row)
-      }
-    }
-  }
-
-  override def cleanOldData(): Unit = {
-    val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
-    if (oldCacheLocked) {
-      try {
-        val cleanTime = readCleanTime()
-        cleanTime match {
-          case Some(ct) => {
-            // drop partitions
-            val bounds = getPartition(ct)
-
-            // list partition paths
-            val earlierPaths = listPathsEarlierThanBounds(filePath :: Nil, bounds)
-
-            // delete out time data path
-            earlierPaths.foreach { path =>
-              println(s"delete hdfs path: ${path}")
-              HdfsUtil.deleteHdfsPath(path)
-            }
-          }
-          case _ => {
-            // do nothing
-          }
-        }
-      } catch {
-        case e: Throwable => error(s"clean old data error: ${e.getMessage}")
-      } finally {
-        oldCacheLock.unlock()
-      }
-    }
-  }
-
-  override def updateOldData(t: Long, oldData: Iterable[Map[String, Any]]): Unit = {
-    // parallel process different time groups, lock is unnecessary
-    val ptns = getPartition(t)
-    val ptnsPath = genPartitionHdfsPath(ptns)
-    val dirPath = s"${filePath}/${ptnsPath}"
-    val dataFileName = s"${t}"
-    val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName)
-
-    try {
-      // remove out time old data
-      HdfsFileDumpUtil.remove(dirPath, dataFileName, true)
-
-      // save updated old data
-      if (oldData.size > 0) {
-        val recordDatas = oldData.flatMap { dt =>
-          encode(dt, t)
-        }
-        val dumped = HdfsFileDumpUtil.dump(dataFilePath, recordDatas, rowSepLiteral)
-      }
-    } catch {
-      case e: Throwable => error(s"update old data error: ${e.getMessage}")
-    }
-  }
-
-  override protected def genCleanTime(ms: Long): Long = {
-    val minPartitionUnit = partitionUnits.last
-    val t1 = TimeUtil.timeToUnit(ms, minPartitionUnit)
-    val t2 = TimeUtil.timeFromUnit(t1, minPartitionUnit)
-    t2
-  }
-
-  private def getPartition(ms: Long): List[Long] = {
-    partitionUnits.map { unit =>
-      TimeUtil.timeToUnit(ms, unit)
-    }
-  }
-  private def getPartitionRange(ms1: Long, ms2: Long): List[(Long, Long)] = {
-    partitionUnits.map { unit =>
-      val t1 = TimeUtil.timeToUnit(ms1, unit)
-      val t2 = TimeUtil.timeToUnit(ms2, unit)
-      (t1, t2)
-    }
-  }
-
-  private def genPartitionHdfsPath(partition: List[Long]): String = {
-    partition.map(prtn => s"${prtn}").mkString("/")
-  }
-
-  private def str2Long(str: String): Option[Long] = {
-    try {
-      Some(str.toLong)
-    } catch {
-      case e: Throwable => None
-    }
-  }
-
-  // here the range means [min, max], but the best range should be (min, max]
-  private def listPathsBetweenRanges(paths: List[String],
-                                     partitionRanges: List[(Long, Long)]
-                                    ): List[String] = {
-    partitionRanges match {
-      case Nil => paths
-      case head :: tail => {
-        val (lb, ub) = head
-        val curPaths = paths.flatMap { path =>
-          val names = HdfsUtil.listSubPaths(path, "dir").toList
-          names.filter { name =>
-            str2Long(name) match {
-              case Some(t) => (t >= lb) && (t <= ub)
-              case _ => false
-            }
-          }.map(HdfsUtil.getHdfsFilePath(path, _))
-        }
-        listPathsBetweenRanges(curPaths, tail)
-      }
-    }
-  }
-
-  private def listPathsEarlierThanBounds(paths: List[String], bounds: List[Long]
-                                        ): List[String] = {
-    bounds match {
-      case Nil => paths
-      case head :: tail => {
-        val earlierPaths = paths.flatMap { path =>
-          val names = HdfsUtil.listSubPaths(path, "dir").toList
-          names.filter { name =>
-            str2Long(name) match {
-              case Some(t) => (t < head)
-              case _ => false
-            }
-          }.map(HdfsUtil.getHdfsFilePath(path, _))
-        }
-        val equalPaths = paths.flatMap { path =>
-          val names = HdfsUtil.listSubPaths(path, "dir").toList
-          names.filter { name =>
-            str2Long(name) match {
-              case Some(t) => (t == head)
-              case _ => false
-            }
-          }.map(HdfsUtil.getHdfsFilePath(path, _))
-        }
-
-        tail match {
-          case Nil => earlierPaths
-          case _ => earlierPaths ::: listPathsEarlierThanBounds(equalPaths, tail)
-        }
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/connector/direct/AvroDirectDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/direct/AvroDirectDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/direct/AvroDirectDataConnector.scala
deleted file mode 100644
index b45e5a9..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/connector/direct/AvroDirectDataConnector.scala
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.connector.direct
-
-import org.apache.griffin.measure.result._
-import org.apache.griffin.measure.rule.{ExprValueUtil, RuleExprs}
-import org.apache.griffin.measure.utils.HdfsUtil
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SQLContext
-
-import scala.util.Try
-
-// data connector for avro file
-case class AvroDirectDataConnector(sqlContext: SQLContext, config: Map[String, Any],
-                                   ruleExprs: RuleExprs, constFinalExprValueMap: Map[String, Any]
-                                 ) extends DirectDataConnector {
-
-  val FilePath = "file.path"
-  val FileName = "file.name"
-
-  val filePath = config.getOrElse(FilePath, "").toString
-  val fileName = config.getOrElse(FileName, "").toString
-
-  val concreteFileFullPath = if (pathPrefix) s"${filePath}${fileName}" else fileName
-
-  private def pathPrefix(): Boolean = {
-    filePath.nonEmpty
-  }
-
-  private def fileExist(): Boolean = {
-    HdfsUtil.existPath(concreteFileFullPath)
-  }
-
-  def available(): Boolean = {
-    (!concreteFileFullPath.isEmpty) && fileExist
-  }
-
-  def init(): Unit = {}
-
-  def metaData(): Try[Iterable[(String, String)]] = {
-    Try {
-      val st = sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath).schema
-      st.fields.map(f => (f.name, f.dataType.typeName))
-    }
-  }
-
-  def data(): Try[RDD[(Product, (Map[String, Any], Map[String, Any]))]] = {
-    Try {
-      loadDataFile.flatMap { row =>
-        // generate cache data
-        val cacheExprValueMaps = ExprValueUtil.genExprValueMaps(Some(row), ruleExprs.cacheExprs, constFinalExprValueMap)
-        val finalExprValueMaps = ExprValueUtil.updateExprValueMaps(ruleExprs.finalCacheExprs, cacheExprValueMaps)
-
-        // data info
-        val dataInfoMap: Map[String, Any] = DataInfo.cacheInfoList.map { info =>
-          try {
-            (info.key -> row.getAs[info.T](info.key))
-          } catch {
-            case e: Throwable => info.defWrap
-          }
-        }.toMap
-
-        finalExprValueMaps.flatMap { finalExprValueMap =>
-          val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { expr =>
-            expr.calculate(finalExprValueMap) match {
-              case Some(v) => Some(v.asInstanceOf[AnyRef])
-              case _ => None
-            }
-          }
-          val key = toTuple(groupbyData)
-
-          Some((key, (finalExprValueMap, dataInfoMap)))
-        }
-
-//        val cacheExprValueMap: Map[String, Any] = ruleExprs.cacheExprs.foldLeft(constFinalExprValueMap) { (cachedMap, expr) =>
-//          ExprValueUtil.genExprValueMaps(Some(row), expr, cachedMap)
-//        }
-//        val finalExprValueMap = ExprValueUtil.updateExprValueMaps(ruleExprs.finalCacheExprs, cacheExprValueMap)
-
-        // when clause filter data source
-//        val whenResult = ruleExprs.whenClauseExprOpt match {
-//          case Some(whenClause) => whenClause.calculate(finalExprValueMap)
-//          case _ => None
-//        }
-//
-//        // get groupby data
-//        whenResult match {
-//          case Some(false) => None
-//          case _ => {
-//            val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { expr =>
-//              expr.calculate(finalExprValueMap) match {
-//                case Some(v) => Some(v.asInstanceOf[AnyRef])
-//                case _ => None
-//              }
-//            }
-//            val key = toTuple(groupbyData)
-//
-//            Some((key, finalExprValueMap))
-//          }
-//        }
-      }
-    }
-  }
-
-  private def loadDataFile() = {
-    sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath)
-  }
-
-  private def toTuple[A <: AnyRef](as: Seq[A]): Product = {
-    if (as.size > 0) {
-      val tupleClass = Class.forName("scala.Tuple" + as.size)
-      tupleClass.getConstructors.apply(0).newInstance(as: _*).asInstanceOf[Product]
-    } else None
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/connector/direct/DirectDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/direct/DirectDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/direct/DirectDataConnector.scala
deleted file mode 100644
index ac1a792..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/connector/direct/DirectDataConnector.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.connector.direct
-
-import org.apache.griffin.measure.connector.DataConnector
-import org.apache.griffin.measure.connector.cache.DataUpdatable
-import org.apache.spark.rdd.RDD
-
-import scala.util.Try
-
-
-trait DirectDataConnector extends DataConnector with DataUpdatable {
-
-  def metaData(): Try[Iterable[(String, String)]]
-
-  def data(): Try[RDD[(Product, (Map[String, Any], Map[String, Any]))]]
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/connector/direct/HiveDirectDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/direct/HiveDirectDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/direct/HiveDirectDataConnector.scala
deleted file mode 100644
index 7de2b02..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/connector/direct/HiveDirectDataConnector.scala
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.connector.direct
-
-import org.apache.griffin.measure.result._
-import org.apache.griffin.measure.rule.{ExprValueUtil, RuleExprs}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SQLContext
-
-import scala.util.{Success, Try}
-
-// data connector for hive
-case class HiveDirectDataConnector(sqlContext: SQLContext, config: Map[String, Any],
-                                   ruleExprs: RuleExprs, constFinalExprValueMap: Map[String, Any]
-                                 ) extends DirectDataConnector {
-
-  val Database = "database"
-  val TableName = "table.name"
-  val Partitions = "partitions"
-
-  val database = config.getOrElse(Database, "").toString
-  val tableName = config.getOrElse(TableName, "").toString
-  val partitionsString = config.getOrElse(Partitions, "").toString
-
-  val concreteTableName = if (dbPrefix) s"${database}.${tableName}" else tableName
-  val partitions = partitionsString.split(";").map(s => s.split(",").map(_.trim))
-
-  private def dbPrefix(): Boolean = {
-    database.nonEmpty && !database.equals("default")
-  }
-
-  def available(): Boolean = {
-    (!tableName.isEmpty) && {
-      Try {
-        if (dbPrefix) {
-          sqlContext.tables(database).filter(tableExistsSql).collect.size
-        } else {
-          sqlContext.tables().filter(tableExistsSql).collect.size
-        }
-      } match {
-        case Success(s) => s > 0
-        case _ => false
-      }
-    }
-  }
-
-  def init(): Unit = {}
-
-  def metaData(): Try[Iterable[(String, String)]] = {
-    Try {
-      val originRows = sqlContext.sql(metaDataSql).map(r => (r.getString(0), r.getString(1))).collect
-      val partitionPos: Int = originRows.indexWhere(pair => pair._1.startsWith("# "))
-      if (partitionPos < 0) originRows
-      else originRows.take(partitionPos)
-    }
-  }
-
-  def data(): Try[RDD[(Product, (Map[String, Any], Map[String, Any]))]] = {
-    Try {
-      sqlContext.sql(dataSql).flatMap { row =>
-        // generate cache data
-        val cacheExprValueMaps = ExprValueUtil.genExprValueMaps(Some(row), ruleExprs.cacheExprs, constFinalExprValueMap)
-        val finalExprValueMaps = ExprValueUtil.updateExprValueMaps(ruleExprs.finalCacheExprs, cacheExprValueMaps)
-
-        // data info
-        val dataInfoMap: Map[String, Any] = DataInfo.cacheInfoList.map { info =>
-          try {
-            (info.key -> row.getAs[info.T](info.key))
-          } catch {
-            case e: Throwable => info.defWrap
-          }
-        }.toMap
-
-        finalExprValueMaps.flatMap { finalExprValueMap =>
-          val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { expr =>
-            expr.calculate(finalExprValueMap) match {
-              case Some(v) => Some(v.asInstanceOf[AnyRef])
-              case _ => None
-            }
-          }
-          val key = toTuple(groupbyData)
-
-          Some((key, (finalExprValueMap, dataInfoMap)))
-        }
-
-        // generate cache data
-//        val cacheExprValueMap: Map[String, Any] = ruleExprs.cacheExprs.foldLeft(constFinalExprValueMap) { (cachedMap, expr) =>
-//          ExprValueUtil.genExprValueMap(Some(row), expr, cachedMap)
-//        }
-//        val finalExprValueMap = ExprValueUtil.updateExprValueMap(ruleExprs.finalCacheExprs, cacheExprValueMap)
-//
-//        // when clause filter data source
-//        val whenResult = ruleExprs.whenClauseExprOpt match {
-//          case Some(whenClause) => whenClause.calculate(finalExprValueMap)
-//          case _ => None
-//        }
-//
-//        // get groupby data
-//        whenResult match {
-//          case Some(false) => None
-//          case _ => {
-//            val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { expr =>
-//              expr.calculate(finalExprValueMap) match {
-//                case Some(v) => Some(v.asInstanceOf[AnyRef])
-//                case _ => None
-//              }
-//            }
-//            val key = toTuple(groupbyData)
-//
-//            Some((key, finalExprValueMap))
-//          }
-//        }
-      }
-    }
-  }
-
-  private def tableExistsSql(): String = {
-//    s"SHOW TABLES LIKE '${concreteTableName}'"    // this is hive sql, but not work for spark sql
-    s"tableName LIKE '${tableName}'"
-  }
-
-  private def metaDataSql(): String = {
-    s"DESCRIBE ${concreteTableName}"
-  }
-
-  private def dataSql(): String = {
-    val clauses = partitions.map { prtn =>
-      val cls = prtn.mkString(" AND ")
-      if (cls.isEmpty) s"SELECT * FROM ${concreteTableName}"
-      else s"SELECT * FROM ${concreteTableName} WHERE ${cls}"
-    }
-    clauses.mkString(" UNION ALL ")
-  }
-
-  private def toTuple[A <: AnyRef](as: Seq[A]): Product = {
-    if (as.size > 0) {
-      val tupleClass = Class.forName("scala.Tuple" + as.size)
-      tupleClass.getConstructors.apply(0).newInstance(as: _*).asInstanceOf[Product]
-    } else None
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/connector/direct/KafkaCacheDirectDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/direct/KafkaCacheDirectDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/direct/KafkaCacheDirectDataConnector.scala
deleted file mode 100644
index d2534cc..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/connector/direct/KafkaCacheDirectDataConnector.scala
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.connector.direct
-
-import org.apache.griffin.measure.config.params.user.DataConnectorParam
-import org.apache.griffin.measure.connector.DataConnectorFactory
-import org.apache.griffin.measure.connector.cache.CacheDataConnector
-import org.apache.griffin.measure.connector.streaming.StreamingDataConnector
-import org.apache.griffin.measure.result._
-import org.apache.griffin.measure.rule._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.streaming.StreamingContext
-
-import scala.util.{Failure, Success, Try}
-
-case class KafkaCacheDirectDataConnector(@transient streamingDataConnectorTry: Try[StreamingDataConnector],
-                                         cacheDataConnectorTry: Try[CacheDataConnector],
-                                         dataConnectorParam: DataConnectorParam,
-                                         ruleExprs: RuleExprs,
-                                         constFinalExprValueMap: Map[String, Any]
-                                        ) extends StreamingCacheDirectDataConnector {
-
-  val cacheDataConnector: CacheDataConnector = cacheDataConnectorTry match {
-    case Success(cntr) => cntr
-    case Failure(ex) => throw ex
-  }
-  @transient val streamingDataConnector: StreamingDataConnector = streamingDataConnectorTry match {
-    case Success(cntr) => cntr
-    case Failure(ex) => throw ex
-  }
-
-  protected def transform(rdd: RDD[(streamingDataConnector.K, streamingDataConnector.V)],
-                          ms: Long
-                         ): RDD[Map[String, Any]] = {
-    val dataInfoMap = DataInfo.cacheInfoList.map(_.defWrap).toMap + TimeStampInfo.wrap(ms)
-
-    rdd.flatMap { kv =>
-      val msg = kv._2
-
-      val cacheExprValueMaps = ExprValueUtil.genExprValueMaps(Some(msg), ruleExprs.cacheExprs, constFinalExprValueMap)
-      val finalExprValueMaps = ExprValueUtil.updateExprValueMaps(ruleExprs.finalCacheExprs, cacheExprValueMaps)
-
-      finalExprValueMaps.map { vm =>
-        vm ++ dataInfoMap
-      }
-    }
-  }
-
-  def metaData(): Try[Iterable[(String, String)]] = Try {
-    Map.empty[String, String]
-  }
-
-  def data(): Try[RDD[(Product, (Map[String, Any], Map[String, Any]))]] = Try {
-    cacheDataConnector.readData match {
-      case Success(rdd) => {
-        rdd.flatMap { row =>
-          val finalExprValueMap = ruleExprs.finalCacheExprs.flatMap { expr =>
-            row.get(expr._id).flatMap { d =>
-              Some((expr._id, d))
-            }
-          }.toMap
-
-          val dataInfoMap: Map[String, Any] = DataInfo.cacheInfoList.map { info =>
-            row.get(info.key) match {
-              case Some(d) => (info.key -> d)
-              case _ => info.defWrap
-            }
-          }.toMap
-
-          val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { expr =>
-            expr.calculate(finalExprValueMap) match {
-              case Some(v) => Some(v.asInstanceOf[AnyRef])
-              case _ => None
-            }
-          }
-          val key = toTuple(groupbyData)
-
-          Some((key, (finalExprValueMap, dataInfoMap)))
-        }
-      }
-      case Failure(ex) => throw ex
-    }
-  }
-
-  override def cleanOldData(): Unit = {
-    cacheDataConnector.cleanOldData
-  }
-
-  override def updateOldData(t: Long, oldData: Iterable[Map[String, Any]]): Unit = {
-    if (dataConnectorParam.getMatchOnce) {
-      cacheDataConnector.updateOldData(t, oldData)
-    }
-  }
-
-  override def updateAllOldData(oldRdd: RDD[Map[String, Any]]): Unit = {
-    if (dataConnectorParam.getMatchOnce) {
-      cacheDataConnector.updateAllOldData(oldRdd)
-    }
-  }
-
-  private def toTuple[A <: AnyRef](as: Seq[A]): Product = {
-    if (as.size > 0) {
-      val tupleClass = Class.forName("scala.Tuple" + as.size)
-      tupleClass.getConstructors.apply(0).newInstance(as: _*).asInstanceOf[Product]
-    } else None
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/connector/direct/StreamingCacheDirectDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/direct/StreamingCacheDirectDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/direct/StreamingCacheDirectDataConnector.scala
deleted file mode 100644
index 87139d6..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/connector/direct/StreamingCacheDirectDataConnector.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.connector.direct
-
-import org.apache.griffin.measure.connector.cache.CacheDataConnector
-import org.apache.griffin.measure.connector.streaming.StreamingDataConnector
-import org.apache.griffin.measure.result.{DataInfo, TimeStampInfo}
-import org.apache.griffin.measure.rule.ExprValueUtil
-import org.apache.spark.rdd.RDD
-
-import scala.util.{Failure, Success}
-
-trait StreamingCacheDirectDataConnector extends DirectDataConnector {
-
-  val cacheDataConnector: CacheDataConnector
-  @transient val streamingDataConnector: StreamingDataConnector
-
-  def available(): Boolean = {
-    cacheDataConnector.available && streamingDataConnector.available
-  }
-
-  def init(): Unit = {
-    cacheDataConnector.init
-
-    val ds = streamingDataConnector.stream match {
-      case Success(dstream) => dstream
-      case Failure(ex) => throw ex
-    }
-
-    ds.foreachRDD((rdd, time) => {
-      val ms = time.milliseconds
-
-      val valueMapRdd = transform(rdd, ms)
-
-      // save data frame
-      cacheDataConnector.saveData(valueMapRdd, ms)
-    })
-  }
-
-  protected def transform(rdd: RDD[(streamingDataConnector.K, streamingDataConnector.V)],
-                          ms: Long
-                         ): RDD[Map[String, Any]]
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/connector/streaming/KafkaStreamingDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/streaming/KafkaStreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/streaming/KafkaStreamingDataConnector.scala
deleted file mode 100644
index fdd511d..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/connector/streaming/KafkaStreamingDataConnector.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.connector.streaming
-
-import kafka.serializer.Decoder
-import org.apache.griffin.measure.connector.cache.{CacheDataConnector, DataCacheable}
-import org.apache.griffin.measure.result.{DataInfo, TimeStampInfo}
-import org.apache.griffin.measure.rule.{ExprValueUtil, RuleExprs}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.dstream.InputDStream
-
-import scala.util.{Failure, Success, Try}
-
-abstract class KafkaStreamingDataConnector(@transient ssc: StreamingContext,
-                                           config: Map[String, Any]
-                                          ) extends StreamingDataConnector {
-  type KD <: Decoder[K]
-  type VD <: Decoder[V]
-
-  val KafkaConfig = "kafka.config"
-  val Topics = "topics"
-
-  val kafkaConfig = config.get(KafkaConfig) match {
-    case Some(map: Map[String, Any]) => map.mapValues(_.toString).map(identity)
-    case _ => Map[String, String]()
-  }
-  val topics = config.getOrElse(Topics, "").toString
-
-  def available(): Boolean = {
-    true
-  }
-
-  def init(): Unit = {}
-
-  def stream(): Try[InputDStream[(K, V)]] = Try {
-    val topicSet = topics.split(",").toSet
-    createDStream(topicSet)
-  }
-
-  protected def createDStream(topicSet: Set[String]): InputDStream[(K, V)]
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/connector/streaming/StreamingDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/streaming/StreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/streaming/StreamingDataConnector.scala
deleted file mode 100644
index c37caac..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/connector/streaming/StreamingDataConnector.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.connector.streaming
-
-import org.apache.griffin.measure.connector.DataConnector
-import org.apache.spark.streaming.dstream.InputDStream
-
-import scala.util.Try
-
-
-trait StreamingDataConnector extends DataConnector {
-
-  type K
-  type V
-
-  def stream(): Try[InputDStream[(K, V)]]
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnector.scala
new file mode 100644
index 0000000..534fb1b
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnector.scala
@@ -0,0 +1,114 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.data.connector
+
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.griffin.measure.config.params.user.DataConnectorParam
+import org.apache.griffin.measure.log.Loggable
+import org.apache.griffin.measure.process.engine._
+import org.apache.griffin.measure.rule.adaptor.{PreProcPhase, RuleAdaptorGroup, RunPhase}
+import org.apache.griffin.measure.rule.dsl._
+import org.apache.griffin.measure.rule.preproc.PreProcRuleGenerator
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+
+trait DataConnector extends Loggable with Serializable {
+
+//  def available(): Boolean
+
+  def init(): Unit
+
+  def data(ms: Long): Option[DataFrame]
+
+  val dqEngines: DqEngines
+
+  val dcParam: DataConnectorParam
+
+  val sqlContext: SQLContext
+
+  val id: String = DataConnectorIdGenerator.genId
+
+  protected def suffix(ms: Long): String = s"${id}_${ms}"
+  protected def thisName(ms: Long): String = s"this_${suffix(ms)}"
+
+  final val tmstColName = GroupByColumn.tmst
+
+  def preProcess(dfOpt: Option[DataFrame], ms: Long): Option[DataFrame] = {
+    val thisTable = thisName(ms)
+    val preProcRules = PreProcRuleGenerator.genPreProcRules(dcParam.preProc, suffix(ms))
+    val names = PreProcRuleGenerator.getRuleNames(preProcRules).toSet + thisTable
+
+    try {
+      dfOpt.flatMap { df =>
+        // in data
+        df.registerTempTable(thisTable)
+
+        // generate rule steps
+        val ruleSteps = RuleAdaptorGroup.genConcreteRuleSteps(preProcRules, DslType("spark-sql"), PreProcPhase)
+
+        // run rules
+        dqEngines.runRuleSteps(ruleSteps)
+
+        // out data
+        val outDf = sqlContext.table(thisTable)
+
+        // drop temp table
+        names.foreach { name =>
+          try {
+            sqlContext.dropTempTable(name)
+          } catch {
+            case e: Throwable => warn(s"drop temp table ${name} fails")
+          }
+        }
+
+        // add tmst
+        val withTmstDf = outDf.withColumn(tmstColName, lit(ms))
+
+        Some(withTmstDf)
+      }
+    } catch {
+      case e: Throwable => {
+        error(s"preporcess of data connector [${id}] error: ${e.getMessage}")
+        None
+      }
+    }
+
+  }
+
+}
+
+object DataConnectorIdGenerator {
+  private val counter: AtomicLong = new AtomicLong(0L)
+  private val head: String = "dc"
+
+  def genId: String = {
+    s"${head}${increment}"
+  }
+
+  private def increment: Long = {
+    counter.incrementAndGet()
+  }
+}
+
+object GroupByColumn {
+  val tmst = "__tmst"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnectorFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnectorFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnectorFactory.scala
new file mode 100644
index 0000000..9c3383f
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnectorFactory.scala
@@ -0,0 +1,150 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.data.connector
+
+import kafka.serializer.StringDecoder
+import org.apache.griffin.measure.config.params.user._
+import org.apache.griffin.measure.data.connector.streaming.{KafkaStreamingDataConnector, KafkaStreamingStringDataConnector, StreamingDataConnector}
+import org.apache.griffin.measure.process.engine.{DqEngine, DqEngines}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.DataFrame
+
+import scala.util.Success
+//import org.apache.griffin.measure.data.connector.cache._
+import org.apache.griffin.measure.data.connector.batch._
+//import org.apache.griffin.measure.data.connector.streaming._
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream.InputDStream
+import org.apache.spark.streaming.kafka.KafkaUtils
+
+import scala.reflect.ClassTag
+import scala.util.Try
+
+object DataConnectorFactory {
+
+  val HiveRegex = """^(?i)hive$""".r
+  val AvroRegex = """^(?i)avro$""".r
+  val TextDirRegex = """^(?i)text-dir$""".r
+
+  val KafkaRegex = """^(?i)kafka$""".r
+
+  val TextRegex = """^(?i)text$""".r
+
+  def getDataConnector(sqlContext: SQLContext,
+                       @transient ssc: StreamingContext,
+                       dqEngines: DqEngines,
+                       dataConnectorParam: DataConnectorParam
+                      ): Try[DataConnector] = {
+    val conType = dataConnectorParam.conType
+    val version = dataConnectorParam.version
+    val config = dataConnectorParam.config
+    Try {
+      conType match {
+        case HiveRegex() => HiveBatchDataConnector(sqlContext, dqEngines, dataConnectorParam)
+        case AvroRegex() => AvroBatchDataConnector(sqlContext, dqEngines, dataConnectorParam)
+        case TextDirRegex() => TextDirBatchDataConnector(sqlContext, dqEngines, dataConnectorParam)
+        case KafkaRegex() => {
+//          val ksdcTry = getStreamingDataConnector(ssc, dataConnectorParam)
+//          val cdcTry = getCacheDataConnector(sqlContext, dataConnectorParam.cache)
+//          KafkaCacheDirectDataConnector(ksdcTry, cdcTry, dataConnectorParam)
+          getStreamingDataConnector(sqlContext, ssc, dqEngines, dataConnectorParam)
+        }
+        case _ => throw new Exception("connector creation error!")
+      }
+    }
+  }
+
+  private def getStreamingDataConnector(sqlContext: SQLContext,
+                                        @transient ssc: StreamingContext,
+                                        dqEngines: DqEngines,
+                                        dataConnectorParam: DataConnectorParam
+                                       ): StreamingDataConnector = {
+    if (ssc == null) throw new Exception("streaming context is null!")
+    val conType = dataConnectorParam.conType
+    val version = dataConnectorParam.version
+    conType match {
+      case KafkaRegex() => genKafkaDataConnector(sqlContext, ssc, dqEngines, dataConnectorParam)
+      case _ => throw new Exception("streaming connector creation error!")
+    }
+  }
+//
+//  private def getCacheDataConnector(sqlContext: SQLContext,
+//                                    dataCacheParam: DataCacheParam
+//                                   ): Try[CacheDataConnector] = {
+//    if (dataCacheParam == null) {
+//      throw new Exception("invalid data cache param!")
+//    }
+//    val cacheType = dataCacheParam.cacheType
+//    Try {
+//      cacheType match {
+//        case HiveRegex() => HiveCacheDataConnector(sqlContext, dataCacheParam)
+//        case TextRegex() => TextCacheDataConnector(sqlContext, dataCacheParam)
+//        case _ => throw new Exception("cache connector creation error!")
+//      }
+//    }
+//  }
+//
+  private def genKafkaDataConnector(sqlContext: SQLContext,
+                                    @transient ssc: StreamingContext,
+                                    dqEngines: DqEngines,
+                                    dataConnectorParam: DataConnectorParam
+                                   ) = {
+    val config = dataConnectorParam.config
+    val KeyType = "key.type"
+    val ValueType = "value.type"
+    val keyType = config.getOrElse(KeyType, "java.lang.String").toString
+    val valueType = config.getOrElse(ValueType, "java.lang.String").toString
+    (getClassTag(keyType), getClassTag(valueType)) match {
+      case (ClassTag(k: Class[String]), ClassTag(v: Class[String])) => {
+        KafkaStreamingStringDataConnector(sqlContext, ssc, dqEngines, dataConnectorParam)
+      }
+      case _ => {
+        throw new Exception("not supported type kafka data connector")
+      }
+    }
+  }
+
+  private def getClassTag(tp: String): ClassTag[_] = {
+    try {
+      val clazz = Class.forName(tp)
+      ClassTag(clazz)
+    } catch {
+      case e: Throwable => throw e
+    }
+  }
+
+  def filterBatchDataConnectors(connectors: Seq[DataConnector]): Seq[BatchDataConnector] = {
+    connectors.flatMap { dc =>
+      dc match {
+        case mdc: BatchDataConnector => Some(mdc)
+        case _ => None
+      }
+    }
+  }
+  def filterStreamingDataConnectors(connectors: Seq[DataConnector]): Seq[StreamingDataConnector] = {
+    connectors.flatMap { dc =>
+      dc match {
+        case mdc: StreamingDataConnector => Some(mdc)
+        case _ => None
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/AvroBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/AvroBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/AvroBatchDataConnector.scala
new file mode 100644
index 0000000..ccd6441
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/AvroBatchDataConnector.scala
@@ -0,0 +1,112 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.data.connector.batch
+
+import org.apache.griffin.measure.config.params.user.DataConnectorParam
+import org.apache.griffin.measure.data.connector._
+import org.apache.griffin.measure.process.engine.DqEngines
+import org.apache.griffin.measure.result._
+import org.apache.griffin.measure.utils.HdfsUtil
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.griffin.measure.utils.ParamUtil._
+
+import scala.util.Try
+
+// data connector for avro file
+case class AvroBatchDataConnector(sqlContext: SQLContext, dqEngines: DqEngines, dcParam: DataConnectorParam
+                                 ) extends BatchDataConnector {
+
+  val config = dcParam.config
+
+  val FilePath = "file.path"
+  val FileName = "file.name"
+
+  val filePath = config.getString(FilePath, "")
+  val fileName = config.getString(FileName, "")
+
+  val concreteFileFullPath = if (pathPrefix) s"${filePath}${fileName}" else fileName
+
+  private def pathPrefix(): Boolean = {
+    filePath.nonEmpty
+  }
+
+  private def fileExist(): Boolean = {
+    HdfsUtil.existPath(concreteFileFullPath)
+  }
+
+  def data(ms: Long): Option[DataFrame] = {
+    try {
+      val df = sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath)
+      val dfOpt = Some(df)
+      val preDfOpt = preProcess(dfOpt, ms)
+      preDfOpt
+    } catch {
+      case e: Throwable => {
+        error(s"load avro file ${concreteFileFullPath} fails")
+        None
+      }
+    }
+  }
+
+//  def available(): Boolean = {
+//    (!concreteFileFullPath.isEmpty) && fileExist
+//  }
+
+//  def init(): Unit = {}
+
+//  def metaData(): Try[Iterable[(String, String)]] = {
+//    Try {
+//      val st = sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath).schema
+//      st.fields.map(f => (f.name, f.dataType.typeName))
+//    }
+//  }
+
+//  def data(): Try[RDD[(Product, (Map[String, Any], Map[String, Any]))]] = {
+//    Try {
+//      loadDataFile.flatMap { row =>
+//        // generate cache data
+//        val cacheExprValueMaps = ExprValueUtil.genExprValueMaps(Some(row), ruleExprs.cacheExprs, constFinalExprValueMap)
+//        val finalExprValueMaps = ExprValueUtil.updateExprValueMaps(ruleExprs.finalCacheExprs, cacheExprValueMaps)
+//
+//        // data info
+//        val dataInfoMap: Map[String, Any] = DataInfo.cacheInfoList.map { info =>
+//          try {
+//            (info.key -> row.getAs[info.T](info.key))
+//          } catch {
+//            case e: Throwable => info.defWrap
+//          }
+//        }.toMap
+//
+//        finalExprValueMaps.flatMap { finalExprValueMap =>
+//          val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { expr =>
+//            expr.calculate(finalExprValueMap) match {
+//              case Some(v) => Some(v.asInstanceOf[AnyRef])
+//              case _ => None
+//            }
+//          }
+//          val key = toTuple(groupbyData)
+//
+//          Some((key, (finalExprValueMap, dataInfoMap)))
+//        }
+//      }
+//    }
+//  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/BatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/BatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/BatchDataConnector.scala
new file mode 100644
index 0000000..4d138ab
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/BatchDataConnector.scala
@@ -0,0 +1,35 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.data.connector.batch
+
+import org.apache.griffin.measure.data.connector._
+//import org.apache.griffin.measure.data.connector.cache.DataUpdatable
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.types.StructType
+
+import scala.util.{Failure, Success, Try}
+
+
+trait BatchDataConnector extends DataConnector {
+
+//  def metaData(): Option[StructType]
+
+  def init(): Unit = {}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/HiveBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/HiveBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/HiveBatchDataConnector.scala
new file mode 100644
index 0000000..5d80d0e
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/HiveBatchDataConnector.scala
@@ -0,0 +1,149 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.data.connector.batch
+
+import org.apache.griffin.measure.config.params.user.DataConnectorParam
+import org.apache.griffin.measure.data.connector._
+import org.apache.griffin.measure.process.engine.DqEngines
+import org.apache.griffin.measure.result._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+import scala.util.{Success, Try}
+import org.apache.griffin.measure.utils.ParamUtil._
+
+// data connector for hive
+case class HiveBatchDataConnector(sqlContext: SQLContext, dqEngines: DqEngines, dcParam: DataConnectorParam
+                                  ) extends BatchDataConnector {
+
+  val config = dcParam.config
+
+  if (!sqlContext.isInstanceOf[HiveContext]) {
+    throw new Exception("hive context not prepared!")
+  }
+
+  val Database = "database"
+  val TableName = "table.name"
+  val Partitions = "partitions"
+
+  val database = config.getString(Database, "default")
+  val tableName = config.getString(TableName, "")
+  val partitionsString = config.getString(Partitions, "")
+
+  val concreteTableName = s"${database}.${tableName}"
+  val partitions = partitionsString.split(";").map(s => s.split(",").map(_.trim))
+
+  def data(ms: Long): Option[DataFrame] = {
+    try {
+      val df = sqlContext.sql(dataSql)
+      val dfOpt = Some(df)
+      val preDfOpt = preProcess(dfOpt, ms)
+      preDfOpt
+    } catch {
+      case e: Throwable => {
+        error(s"load hive table ${concreteTableName} fails")
+        None
+      }
+    }
+  }
+
+//  def available(): Boolean = {
+//    (!tableName.isEmpty) && {
+//      Try {
+//        if (dbPrefix) {
+//          sqlContext.tables(database).filter(tableExistsSql).collect.size
+//        } else {
+//          sqlContext.tables().filter(tableExistsSql).collect.size
+//        }
+//      } match {
+//        case Success(s) => s > 0
+//        case _ => false
+//      }
+//    }
+//  }
+
+//  def init(): Unit = {}
+
+//  def metaData(): Try[Iterable[(String, String)]] = {
+//    Try {
+//      val originRows = sqlContext.sql(metaDataSql).map(r => (r.getString(0), r.getString(1))).collect
+//      val partitionPos: Int = originRows.indexWhere(pair => pair._1.startsWith("# "))
+//      if (partitionPos < 0) originRows
+//      else originRows.take(partitionPos)
+//    }
+//  }
+
+//  def data(): Try[RDD[(Product, (Map[String, Any], Map[String, Any]))]] = {
+//    Try {
+//      sqlContext.sql(dataSql).flatMap { row =>
+//        // generate cache data
+//        val cacheExprValueMaps = ExprValueUtil.genExprValueMaps(Some(row), ruleExprs.cacheExprs, constFinalExprValueMap)
+//        val finalExprValueMaps = ExprValueUtil.updateExprValueMaps(ruleExprs.finalCacheExprs, cacheExprValueMaps)
+//
+//        // data info
+//        val dataInfoMap: Map[String, Any] = DataInfo.cacheInfoList.map { info =>
+//          try {
+//            (info.key -> row.getAs[info.T](info.key))
+//          } catch {
+//            case e: Throwable => info.defWrap
+//          }
+//        }.toMap
+//
+//        finalExprValueMaps.flatMap { finalExprValueMap =>
+//          val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { expr =>
+//            expr.calculate(finalExprValueMap) match {
+//              case Some(v) => Some(v.asInstanceOf[AnyRef])
+//              case _ => None
+//            }
+//          }
+//          val key = toTuple(groupbyData)
+//
+//          Some((key, (finalExprValueMap, dataInfoMap)))
+//        }
+//      }
+//    }
+//  }
+
+  private def tableExistsSql(): String = {
+//    s"SHOW TABLES LIKE '${concreteTableName}'"    // this is hive sql, but not work for spark sql
+    s"tableName LIKE '${tableName}'"
+  }
+
+  private def metaDataSql(): String = {
+    s"DESCRIBE ${concreteTableName}"
+  }
+
+  private def dataSql(): String = {
+    val clauses = partitions.map { prtn =>
+      val cls = prtn.mkString(" AND ")
+      if (cls.isEmpty) s"SELECT * FROM ${concreteTableName}"
+      else s"SELECT * FROM ${concreteTableName} WHERE ${cls}"
+    }
+    clauses.mkString(" UNION ALL ")
+  }
+
+//  private def toTuple[A <: AnyRef](as: Seq[A]): Product = {
+//    if (as.size > 0) {
+//      val tupleClass = Class.forName("scala.Tuple" + as.size)
+//      tupleClass.getConstructors.apply(0).newInstance(as: _*).asInstanceOf[Product]
+//    } else None
+//  }
+
+}