You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2018/06/14 04:52:51 UTC

[1/3] incubator-griffin git commit: refactor context and data source

Repository: incubator-griffin
Updated Branches:
  refs/heads/master d63f6f4ae -> 6b389b316


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
new file mode 100644
index 0000000..f5439fc
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
@@ -0,0 +1,71 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource.connector.batch
+
+import org.apache.griffin.measure.configuration.params.DataConnectorParam
+import org.apache.griffin.measure.context.TimeRange
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.griffin.measure.utils.HdfsUtil
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.griffin.measure.utils.ParamUtil._
+
+/**
+  * batch data connector for avro file
+  */
+case class AvroBatchDataConnector(@transient sparkSession: SparkSession,
+                                  dcParam: DataConnectorParam,
+                                  timestampStorage: TimestampStorage
+                                 ) extends BatchDataConnector {
+
+  val config = dcParam.getConfig
+
+  val FilePath = "file.path"
+  val FileName = "file.name"
+
+  val filePath = config.getString(FilePath, "")
+  val fileName = config.getString(FileName, "")
+
+  val concreteFileFullPath = if (pathPrefix) s"${filePath}${fileName}" else fileName
+
+  private def pathPrefix(): Boolean = {
+    filePath.nonEmpty
+  }
+
+  private def fileExist(): Boolean = {
+    HdfsUtil.existPath(concreteFileFullPath)
+  }
+
+  def data(ms: Long): (Option[DataFrame], TimeRange) = {
+    val dfOpt = try {
+      val df = sparkSession.read.format("com.databricks.spark.avro").load(concreteFileFullPath)
+      val dfOpt = Some(df)
+      val preDfOpt = preProcess(dfOpt, ms)
+      preDfOpt
+    } catch {
+      case e: Throwable => {
+        error(s"load avro file ${concreteFileFullPath} fails")
+        None
+      }
+    }
+    val tmsts = readTmst(ms)
+    (dfOpt, TimeRange(ms, tmsts))
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/BatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/BatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/BatchDataConnector.scala
new file mode 100644
index 0000000..8246f3b
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/BatchDataConnector.scala
@@ -0,0 +1,27 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource.connector.batch
+
+import org.apache.griffin.measure.datasource.connector.DataConnector
+
+trait BatchDataConnector extends DataConnector {
+
+  def init(): Unit = {}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/HiveBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/HiveBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/HiveBatchDataConnector.scala
new file mode 100644
index 0000000..f6533b5
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/HiveBatchDataConnector.scala
@@ -0,0 +1,86 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource.connector.batch
+
+import org.apache.griffin.measure.configuration.params.DataConnectorParam
+import org.apache.griffin.measure.context.TimeRange
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.griffin.measure.utils.ParamUtil._
+
+/**
+  * batch data connector for hive table
+  */
+case class HiveBatchDataConnector(@transient sparkSession: SparkSession,
+                                  dcParam: DataConnectorParam,
+                                  timestampStorage: TimestampStorage
+                                 ) extends BatchDataConnector {
+
+  val config = dcParam.getConfig
+
+  val Database = "database"
+  val TableName = "table.name"
+  val Where = "where"
+
+  val database = config.getString(Database, "default")
+  val tableName = config.getString(TableName, "")
+  val whereString = config.getString(Where, "")
+
+  val concreteTableName = s"${database}.${tableName}"
+  val wheres = whereString.split(",").map(_.trim).filter(_.nonEmpty)
+
+  def data(ms: Long): (Option[DataFrame], TimeRange) = {
+    val dfOpt = try {
+      val dtSql = dataSql
+      info(dtSql)
+      val df = sparkSession.sql(dtSql)
+      val dfOpt = Some(df)
+      val preDfOpt = preProcess(dfOpt, ms)
+      preDfOpt
+    } catch {
+      case e: Throwable => {
+        error(s"load hive table ${concreteTableName} fails: ${e.getMessage}")
+        None
+      }
+    }
+    val tmsts = readTmst(ms)
+    (dfOpt, TimeRange(ms, tmsts))
+  }
+
+
+  private def tableExistsSql(): String = {
+//    s"SHOW TABLES LIKE '${concreteTableName}'"    // this is hive sql, but not work for spark sql
+    s"tableName LIKE '${tableName}'"
+  }
+
+  private def metaDataSql(): String = {
+    s"DESCRIBE ${concreteTableName}"
+  }
+
+  private def dataSql(): String = {
+    val tableClause = s"SELECT * FROM ${concreteTableName}"
+    if (wheres.length > 0) {
+      val clauses = wheres.map { w =>
+        s"${tableClause} WHERE ${w}"
+      }
+      clauses.mkString(" UNION ALL ")
+    } else tableClause
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
new file mode 100644
index 0000000..fd83c1e
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
@@ -0,0 +1,106 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource.connector.batch
+
+import org.apache.griffin.measure.configuration.params.DataConnectorParam
+import org.apache.griffin.measure.context.TimeRange
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.griffin.measure.utils.HdfsUtil
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.griffin.measure.utils.ParamUtil._
+
+/**
+  * batch data connector for directory with text format data in the nth depth sub-directories
+  */
+case class TextDirBatchDataConnector(@transient sparkSession: SparkSession,
+                                     dcParam: DataConnectorParam,
+                                     timestampStorage: TimestampStorage
+                                    ) extends BatchDataConnector {
+
+  val config = dcParam.getConfig
+
+  val DirPath = "dir.path"
+  val DataDirDepth = "data.dir.depth"
+  val SuccessFile = "success.file"
+  val DoneFile = "done.file"
+
+  val dirPath = config.getString(DirPath, "")
+  val dataDirDepth = config.getInt(DataDirDepth, 0)
+  val successFile = config.getString(SuccessFile, "_SUCCESS")
+  val doneFile = config.getString(DoneFile, "_DONE")
+
+  val ignoreFilePrefix = "_"
+
+  private def dirExist(): Boolean = {
+    HdfsUtil.existPath(dirPath)
+  }
+
+  def data(ms: Long): (Option[DataFrame], TimeRange) = {
+    val dfOpt = try {
+      val dataDirs = listSubDirs(dirPath :: Nil, dataDirDepth, readable)
+      // touch done file for read dirs
+      dataDirs.foreach(dir => touchDone(dir))
+
+      val validDataDirs = dataDirs.filter(dir => !emptyDir(dir))
+
+      if (validDataDirs.nonEmpty) {
+        val df = sparkSession.read.text(validDataDirs:  _*)
+        val dfOpt = Some(df)
+        val preDfOpt = preProcess(dfOpt, ms)
+        preDfOpt
+      } else {
+        None
+      }
+    } catch {
+      case e: Throwable => {
+        error(s"load text dir ${dirPath} fails: ${e.getMessage}")
+        None
+      }
+    }
+    val tmsts = readTmst(ms)
+    (dfOpt, TimeRange(ms, tmsts))
+  }
+
+  private def listSubDirs(paths: Seq[String], depth: Int, filteFunc: (String) => Boolean): Seq[String] = {
+    val subDirs = paths.flatMap { path => HdfsUtil.listSubPathsByType(path, "dir", true) }
+    if (depth <= 0) {
+      subDirs.filter(filteFunc)
+    } else {
+      listSubDirs(subDirs, depth - 1, filteFunc)
+    }
+  }
+
+  private def readable(dir: String): Boolean = isSuccess(dir) && !isDone(dir)
+  private def isDone(dir: String): Boolean = HdfsUtil.existFileInDir(dir, doneFile)
+  private def isSuccess(dir: String): Boolean = HdfsUtil.existFileInDir(dir, successFile)
+
+  private def touchDone(dir: String): Unit = HdfsUtil.createEmptyFile(HdfsUtil.getHdfsFilePath(dir, doneFile))
+
+  private def emptyDir(dir: String): Boolean = {
+    HdfsUtil.listSubPathsByType(dir, "file").filter(!_.startsWith(ignoreFilePrefix)).size == 0
+  }
+
+//  def metaData(): Try[Iterable[(String, String)]] = {
+//    Try {
+//      val st = sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath).schema
+//      st.fields.map(f => (f.name, f.dataType.typeName))
+//    }
+//  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala
new file mode 100644
index 0000000..1475898
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala
@@ -0,0 +1,85 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource.connector.streaming
+
+import kafka.serializer.Decoder
+import org.apache.spark.streaming.dstream.InputDStream
+
+import scala.util.{Failure, Success, Try}
+import org.apache.griffin.measure.utils.ParamUtil._
+
+/**
+  * streaming data connector for kafka
+  */
+trait KafkaStreamingDataConnector extends StreamingDataConnector {
+
+  type KD <: Decoder[K]
+  type VD <: Decoder[V]
+  type OUT = (K, V)
+
+  val config = dcParam.getConfig
+
+  val KafkaConfig = "kafka.config"
+  val Topics = "topics"
+
+  val kafkaConfig = config.getAnyRef(KafkaConfig, Map[String, String]())
+  val topics = config.getString(Topics, "")
+
+  def init(): Unit = {
+    // register fan in
+    streamingCacheClientOpt.foreach(_.registerFanIn)
+
+    val ds = stream match {
+      case Success(dstream) => dstream
+      case Failure(ex) => throw ex
+    }
+    ds.foreachRDD((rdd, time) => {
+      val ms = time.milliseconds
+      val saveDfOpt = try {
+        // coalesce partition number
+        val prlCount = rdd.sparkContext.defaultParallelism
+        val ptnCount = rdd.getNumPartitions
+        val repartitionedRdd = if (prlCount < ptnCount) {
+          rdd.coalesce(prlCount)
+        } else rdd
+
+        val dfOpt = transform(repartitionedRdd)
+
+        // pre-process
+        preProcess(dfOpt, ms)
+      } catch {
+        case e: Throwable => {
+          error(s"streaming data connector error: ${e.getMessage}")
+          None
+        }
+      }
+
+      // save data frame
+      streamingCacheClientOpt.foreach(_.saveData(saveDfOpt, ms))
+    })
+  }
+
+  def stream(): Try[InputDStream[OUT]] = Try {
+    val topicSet = topics.split(",").toSet
+    createDStream(topicSet)
+  }
+
+  protected def createDStream(topicSet: Set[String]): InputDStream[OUT]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala
new file mode 100644
index 0000000..c24aadc
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala
@@ -0,0 +1,71 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource.connector.streaming
+
+import kafka.serializer.StringDecoder
+import org.apache.griffin.measure.configuration.params.DataConnectorParam
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.{Row, _}
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream.InputDStream
+import org.apache.spark.streaming.kafka.KafkaUtils
+
+/**
+  * streaming data connector for kafka with string format key and value
+  */
+case class KafkaStreamingStringDataConnector(@transient sparkSession: SparkSession,
+                                             @transient ssc: StreamingContext,
+                                             dcParam: DataConnectorParam,
+                                             timestampStorage: TimestampStorage,
+                                             streamingCacheClientOpt: Option[StreamingCacheClient]
+                                            ) extends KafkaStreamingDataConnector {
+
+  type K = String
+  type KD = StringDecoder
+  type V = String
+  type VD = StringDecoder
+
+  val valueColName = "value"
+  val schema = StructType(Array(
+    StructField(valueColName, StringType)
+  ))
+
+  def createDStream(topicSet: Set[String]): InputDStream[OUT] = {
+    KafkaUtils.createDirectStream[K, V, KD, VD](ssc, kafkaConfig, topicSet)
+  }
+
+  def transform(rdd: RDD[OUT]): Option[DataFrame] = {
+    if (rdd.isEmpty) None else {
+      try {
+        val rowRdd = rdd.map(d => Row(d._2))
+        val df = sparkSession.createDataFrame(rowRdd, schema)
+        Some(df)
+      } catch {
+        case e: Throwable => {
+          error(s"streaming data transform fails")
+          None
+        }
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnector.scala
new file mode 100644
index 0000000..5c2170c
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnector.scala
@@ -0,0 +1,46 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource.connector.streaming
+
+import org.apache.griffin.measure.context.TimeRange
+import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
+import org.apache.griffin.measure.datasource.connector.DataConnector
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql._
+import org.apache.spark.streaming.dstream.InputDStream
+
+import scala.util.Try
+
+trait StreamingDataConnector extends DataConnector {
+
+  type K
+  type V
+  type OUT
+
+  protected def stream(): Try[InputDStream[OUT]]
+
+  // transform rdd to dataframe
+  def transform(rdd: RDD[OUT]): Option[DataFrame]
+
+  // streaming data connector cannot directly read data frame
+  def data(ms: Long): (Option[DataFrame], TimeRange) = (None, TimeRange.emptyTimeRange)
+
+  val streamingCacheClientOpt: Option[StreamingCacheClient]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
index 06892a3..1174dbb 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
@@ -23,7 +23,7 @@ import java.util.Date
 import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.configuration.params._
 import org.apache.griffin.measure.context._
-import org.apache.griffin.measure.context.datasource.DataSourceFactory
+import org.apache.griffin.measure.datasource.DataSourceFactory
 import org.apache.griffin.measure.job.builder.DQJobBuilder
 import org.apache.griffin.measure.launch.DQApp
 import org.apache.griffin.measure.step.builder.udf.GriffinUDFAgent

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
index f990c8e..73fc7a0 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
@@ -25,8 +25,8 @@ import org.apache.griffin.measure.Loggable
 import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.configuration.params._
 import org.apache.griffin.measure.context._
-import org.apache.griffin.measure.context.datasource.DataSourceFactory
-import org.apache.griffin.measure.context.streaming.info.{InfoCacheInstance, TimeInfoCache}
+import org.apache.griffin.measure.datasource.DataSourceFactory
+import org.apache.griffin.measure.context.streaming.offset.OffsetCacheClient
 import org.apache.griffin.measure.context.streaming.metric.CacheResults
 import org.apache.griffin.measure.job.builder.DQJobBuilder
 import org.apache.griffin.measure.launch.DQApp
@@ -68,8 +68,8 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
     clearCpDir
 
     // init info cache instance
-    InfoCacheInstance.initInstance(envParam.infoCacheParams, metricName)
-    InfoCacheInstance.init
+    OffsetCacheClient.initClient(envParam.offsetCacheParams, metricName)
+    OffsetCacheClient.init
 
     // register udf
     GriffinUDFAgent.register(sqlContext)
@@ -163,7 +163,7 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
                                    evaluateRuleParam: EvaluateRuleParam
                                   ) extends Runnable with Loggable {
 
-    val lock = InfoCacheInstance.genLock("process")
+    val lock = OffsetCacheClient.genLock("process")
     val appPersist = globalContext.getPersist()
 
     def run(): Unit = {
@@ -174,7 +174,7 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
       if (locked) {
         try {
 
-          TimeInfoCache.startTimeInfoCache
+          OffsetCacheClient.startOffsetCache
 
           val startTime = new Date().getTime
           appPersist.log(startTime, s"starting process ...")
@@ -196,7 +196,7 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
           val endTime = new Date().getTime
           appPersist.log(endTime, s"process using time: ${endTime - startTime} ms")
 
-          TimeInfoCache.endTimeInfoCache
+          OffsetCacheClient.endOffsetCache
 
           // clean old data
           cleanData(dqContext)
@@ -225,7 +225,7 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
 
         context.clean()
 
-        val cleanTime = TimeInfoCache.getCleanTime
+        val cleanTime = OffsetCacheClient.getCleanTime
         CacheResults.refresh(cleanTime)
       } catch {
         case e: Throwable => error(s"clean data error: ${e.getMessage}")


[3/3] incubator-griffin git commit: refactor context and data source

Posted by gu...@apache.org.
refactor context and data source

Author: Lionel Liu <bh...@163.com>
Author: dodobel <12...@qq.com>

Closes #300 from bhlx3lyx7/spark2.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/6b389b31
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/6b389b31
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/6b389b31

Branch: refs/heads/master
Commit: 6b389b316061e55c19ccb171615fd7e3689f59ce
Parents: d63f6f4
Author: Lionel Liu <bh...@163.com>
Authored: Thu Jun 14 12:52:44 2018 +0800
Committer: Lionel Liu <bh...@163.com>
Committed: Thu Jun 14 12:52:44 2018 +0800

----------------------------------------------------------------------
 .../configuration/params/EnvConfig.scala        |  18 +-
 .../griffin/measure/context/DQContext.scala     |   2 +-
 .../measure/context/datasource/DataSource.scala |  97 -----
 .../context/datasource/DataSourceFactory.scala  |  67 ----
 .../datasource/cache/StreamingCacheClient.scala | 366 -------------------
 .../cache/StreamingCacheClientFactory.scala     |  68 ----
 .../cache/StreamingCacheJsonClient.scala        |  40 --
 .../cache/StreamingCacheOrcClient.scala         |  40 --
 .../cache/StreamingCacheParquetClient.scala     |  42 ---
 .../context/datasource/cache/WithFanIn.scala    |  69 ----
 .../datasource/connector/DataConnector.scala    | 112 ------
 .../connector/DataConnectorFactory.scala        | 125 -------
 .../batch/AvroBatchDataConnector.scala          |  71 ----
 .../connector/batch/BatchDataConnector.scala    |  27 --
 .../batch/HiveBatchDataConnector.scala          |  86 -----
 .../batch/TextDirBatchDataConnector.scala       | 106 ------
 .../streaming/KafkaStreamingDataConnector.scala |  85 -----
 .../KafkaStreamingStringDataConnector.scala     |  71 ----
 .../streaming/StreamingDataConnector.scala      |  46 ---
 .../datasource/info/DataSourceCacheable.scala   |  88 -----
 .../context/datasource/info/TmstCache.scala     |  47 ---
 .../context/streaming/info/InfoCache.scala      |  39 --
 .../streaming/info/InfoCacheFactory.scala       |  41 ---
 .../streaming/info/InfoCacheInstance.scala      |  53 ---
 .../context/streaming/info/TimeInfoCache.scala  | 127 -------
 .../context/streaming/info/ZKInfoCache.scala    | 217 -----------
 .../context/streaming/lock/CacheLockInZK.scala  |  53 +++
 .../context/streaming/lock/CacheLockSeq.scala   |  33 ++
 .../context/streaming/lock/MultiCacheLock.scala |  39 --
 .../context/streaming/lock/ZKCacheLock.scala    |  53 ---
 .../context/streaming/offset/OffsetCache.scala  |  39 ++
 .../streaming/offset/OffsetCacheClient.scala    |  54 +++
 .../streaming/offset/OffsetCacheFactory.scala   |  39 ++
 .../streaming/offset/OffsetCacheInZK.scala      | 214 +++++++++++
 .../context/streaming/offset/OffsetOps.scala    | 125 +++++++
 .../griffin/measure/datasource/DataSource.scala |  96 +++++
 .../measure/datasource/DataSourceFactory.scala  |  66 ++++
 .../measure/datasource/TimestampStorage.scala   |  47 +++
 .../datasource/cache/StreamingCacheClient.scala | 366 +++++++++++++++++++
 .../cache/StreamingCacheClientFactory.scala     |  68 ++++
 .../cache/StreamingCacheJsonClient.scala        |  40 ++
 .../cache/StreamingCacheOrcClient.scala         |  40 ++
 .../cache/StreamingCacheParquetClient.scala     |  42 +++
 .../cache/StreamingOffsetCacheable.scala        |  88 +++++
 .../measure/datasource/cache/WithFanIn.scala    |  69 ++++
 .../datasource/connector/DataConnector.scala    | 112 ++++++
 .../connector/DataConnectorFactory.scala        | 125 +++++++
 .../batch/AvroBatchDataConnector.scala          |  71 ++++
 .../connector/batch/BatchDataConnector.scala    |  27 ++
 .../batch/HiveBatchDataConnector.scala          |  86 +++++
 .../batch/TextDirBatchDataConnector.scala       | 106 ++++++
 .../streaming/KafkaStreamingDataConnector.scala |  85 +++++
 .../KafkaStreamingStringDataConnector.scala     |  71 ++++
 .../streaming/StreamingDataConnector.scala      |  46 +++
 .../measure/launch/batch/BatchDQApp.scala       |   2 +-
 .../launch/streaming/StreamingDQApp.scala       |  16 +-
 56 files changed, 2227 insertions(+), 2241 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvConfig.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvConfig.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvConfig.scala
index bc6d50f..fc4b984 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvConfig.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvConfig.scala
@@ -27,22 +27,22 @@ import org.apache.griffin.measure.utils.TimeUtil
   * environment param
   * @param sparkParam       config of spark environment (must)
   * @param persistParams    config of persist ways (optional)
-  * @param infoCacheParams  config of information cache ways (required in streaming mode)
+  * @param offsetCacheParams  config of information cache ways (required in streaming mode)
   */
 @JsonInclude(Include.NON_NULL)
 case class EnvConfig(@JsonProperty("spark") sparkParam: SparkParam,
                      @JsonProperty("persist") persistParams: List[PersistParam],
-                     @JsonProperty("info.cache") infoCacheParams: List[InfoCacheParam]
+                     @JsonProperty("info.cache") offsetCacheParams: List[OffsetCacheParam]
                    ) extends Param {
   def getSparkParam: SparkParam = sparkParam
   def getPersistParams: Seq[PersistParam] = if (persistParams != null) persistParams else Nil
-  def getInfoCacheParams: Seq[InfoCacheParam] = if (infoCacheParams != null) infoCacheParams else Nil
+  def getOffsetCacheParams: Seq[OffsetCacheParam] = if (offsetCacheParams != null) offsetCacheParams else Nil
 
   def validate(): Unit = {
     assert((sparkParam != null), "spark param should not be null")
     sparkParam.validate
     getPersistParams.foreach(_.validate)
-    getInfoCacheParams.foreach(_.validate)
+    getOffsetCacheParams.foreach(_.validate)
   }
 }
 
@@ -95,14 +95,14 @@ case class PersistParam( @JsonProperty("type") persistType: String,
 }
 
 /**
-  * info cache param
-  * @param cacheType    information cache type, e.g.: zookeeper (must)
+  * offset cache param
+  * @param cacheType    offset cache type, e.g.: zookeeper (must)
   * @param config       config of cache way
   */
 @JsonInclude(Include.NON_NULL)
-case class InfoCacheParam( @JsonProperty("type") cacheType: String,
-                           @JsonProperty("config") config: Map[String, Any]
-                         ) extends Param {
+case class OffsetCacheParam(@JsonProperty("type") cacheType: String,
+                            @JsonProperty("config") config: Map[String, Any]
+                          ) extends Param {
   def getType: String = cacheType
   def getConfig: Map[String, Any] = if (config != null) config else Map[String, Any]()
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
index 1d7dc62..926bde1 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
@@ -20,7 +20,7 @@ package org.apache.griffin.measure.context
 
 import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.configuration.params._
-import org.apache.griffin.measure.context.datasource._
+import org.apache.griffin.measure.datasource._
 import org.apache.griffin.measure.context.writer._
 import org.apache.spark.sql.{Encoders, SQLContext, SparkSession}
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSource.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSource.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSource.scala
deleted file mode 100644
index 0ca61aa..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSource.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource
-
-import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.configuration.params.DataSourceParam
-import org.apache.griffin.measure.context.datasource.cache.StreamingCacheClient
-import org.apache.griffin.measure.context.{ContextId, DQContext, TimeRange}
-import org.apache.griffin.measure.context.datasource.connector.DataConnector
-import org.apache.griffin.measure.context.datasource.info.TmstCache
-import org.apache.griffin.measure.utils.DataFrameUtil._
-import org.apache.spark.sql._
-
-/**
-  * data source
-  * @param name     name of data source
-  * @param dsParam  param of this data source
-  * @param dataConnectors       list of data connectors
-  * @param streamingCacheClientOpt   streaming data cache client option
-  */
-case class DataSource(name: String,
-                      dsParam: DataSourceParam,
-                      dataConnectors: Seq[DataConnector],
-                      streamingCacheClientOpt: Option[StreamingCacheClient]
-                     ) extends Loggable with Serializable {
-
-  def init(): Unit = {
-    dataConnectors.foreach(_.init)
-  }
-
-  def loadData(context: DQContext): TimeRange = {
-    info(s"load data [${name}]")
-    val timestamp = context.contextId.timestamp
-    val (dfOpt, timeRange) = data(timestamp)
-    dfOpt match {
-      case Some(df) => {
-        context.runTimeTableRegister.registerTable(name, df)
-      }
-      case None => {
-        warn(s"load data source [${name}] fails")
-      }
-    }
-    timeRange
-  }
-
-  private def data(timestamp: Long): (Option[DataFrame], TimeRange) = {
-    val batches = dataConnectors.flatMap { dc =>
-      val (dfOpt, timeRange) = dc.data(timestamp)
-      dfOpt match {
-        case Some(df) => Some((dfOpt, timeRange))
-        case _ => None
-      }
-    }
-    val caches = streamingCacheClientOpt match {
-      case Some(dsc) => dsc.readData() :: Nil
-      case _ => Nil
-    }
-    val pairs = batches ++ caches
-
-    if (pairs.size > 0) {
-      pairs.reduce { (a, b) =>
-        (unionDfOpts(a._1, b._1), a._2.merge(b._2))
-      }
-    } else {
-      (None, TimeRange.emptyTimeRange)
-    }
-  }
-
-  def updateData(df: DataFrame): Unit = {
-    streamingCacheClientOpt.foreach(_.updateData(Some(df)))
-  }
-
-  def cleanOldData(): Unit = {
-    streamingCacheClientOpt.foreach(_.cleanOutTimeData)
-  }
-
-  def processFinish(): Unit = {
-    streamingCacheClientOpt.foreach(_.processFinish)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala
deleted file mode 100644
index edd88b6..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource
-
-import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.configuration.params.DataSourceParam
-import org.apache.griffin.measure.context.datasource.cache.StreamingCacheClientFactory
-import org.apache.griffin.measure.context.datasource.connector.{DataConnector, DataConnectorFactory}
-import org.apache.griffin.measure.context.datasource.info.TmstCache
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.streaming.StreamingContext
-
-import scala.util.Success
-
-object DataSourceFactory extends Loggable {
-
-  def getDataSources(sparkSession: SparkSession,
-                     ssc: StreamingContext,
-                     dataSources: Seq[DataSourceParam]
-                    ): Seq[DataSource] = {
-    dataSources.zipWithIndex.flatMap { pair =>
-      val (param, index) = pair
-      getDataSource(sparkSession, ssc, param, index)
-    }
-  }
-
-  private def getDataSource(sparkSession: SparkSession,
-                            ssc: StreamingContext,
-                            dataSourceParam: DataSourceParam,
-                            index: Int
-                           ): Option[DataSource] = {
-    val name = dataSourceParam.getName
-    val connectorParams = dataSourceParam.getConnectors
-    val tmstCache = TmstCache()
-
-    // for streaming data cache
-    val streamingCacheClientOpt = StreamingCacheClientFactory.getClientOpt(
-      sparkSession.sqlContext, dataSourceParam.getCacheOpt, name, index, tmstCache)
-
-    val dataConnectors: Seq[DataConnector] = connectorParams.flatMap { connectorParam =>
-      DataConnectorFactory.getDataConnector(sparkSession, ssc, connectorParam,
-        tmstCache, streamingCacheClientOpt) match {
-          case Success(connector) => Some(connector)
-          case _ => None
-        }
-    }
-
-    Some(DataSource(name, dataSourceParam, dataConnectors, streamingCacheClientOpt))
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClient.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClient.scala
deleted file mode 100644
index 1c3ca60..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClient.scala
+++ /dev/null
@@ -1,366 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.cache
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.context.TimeRange
-import org.apache.griffin.measure.context.datasource.info.{DataSourceCacheable, TmstCache}
-import org.apache.griffin.measure.context.streaming.info.{InfoCacheInstance, TimeInfoCache}
-import org.apache.griffin.measure.step.builder.ConstantColumns
-import org.apache.griffin.measure.utils.DataFrameUtil._
-import org.apache.griffin.measure.utils.ParamUtil._
-import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil}
-import org.apache.spark.sql._
-
-import scala.util.Random
-
-/**
-  * data source cache in streaming mode
-  * save data frame into hdfs in dump phase
-  * read data frame from hdfs in calculate phase
-  * with update and clean actions for the cache data
-  */
-trait StreamingCacheClient extends DataSourceCacheable with WithFanIn[Long] with Loggable with Serializable {
-
-  val sqlContext: SQLContext
-  val param: Map[String, Any]
-  val dsName: String
-  val index: Int
-
-  val tmstCache: TmstCache
-  protected def fromUntilRangeTmsts(from: Long, until: Long) = tmstCache.fromUntil(from, until)
-  protected def clearTmst(t: Long) = tmstCache.remove(t)
-  protected def clearTmstsUntil(until: Long) = {
-    val outDateTmsts = tmstCache.until(until)
-    tmstCache.remove(outDateTmsts)
-  }
-  protected def afterTilRangeTmsts(after: Long, til: Long) = fromUntilRangeTmsts(after + 1, til + 1)
-  protected def clearTmstsTil(til: Long) = clearTmstsUntil(til + 1)
-
-  val _FilePath = "file.path"
-  val _InfoPath = "info.path"
-  val _ReadyTimeInterval = "ready.time.interval"
-  val _ReadyTimeDelay = "ready.time.delay"
-  val _TimeRange = "time.range"
-
-  val rdmStr = Random.alphanumeric.take(10).mkString
-  val defFilePath = s"hdfs:///griffin/cache/${dsName}_${rdmStr}"
-  val defInfoPath = s"${index}"
-
-  val filePath: String = param.getString(_FilePath, defFilePath)
-  val cacheInfoPath: String = param.getString(_InfoPath, defInfoPath)
-  val readyTimeInterval: Long = TimeUtil.milliseconds(param.getString(_ReadyTimeInterval, "1m")).getOrElse(60000L)
-  val readyTimeDelay: Long = TimeUtil.milliseconds(param.getString(_ReadyTimeDelay, "1m")).getOrElse(60000L)
-  val deltaTimeRange: (Long, Long) = {
-    def negative(n: Long): Long = if (n <= 0) n else 0
-    param.get(_TimeRange) match {
-      case Some(seq: Seq[String]) => {
-        val nseq = seq.flatMap(TimeUtil.milliseconds(_))
-        val ns = negative(nseq.headOption.getOrElse(0))
-        val ne = negative(nseq.tail.headOption.getOrElse(0))
-        (ns, ne)
-      }
-      case _ => (0, 0)
-    }
-  }
-
-  val _ReadOnly = "read.only"
-  val readOnly = param.getBoolean(_ReadOnly, false)
-
-  val _Updatable = "updatable"
-  val updatable = param.getBoolean(_Updatable, false)
-
-  val newCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.new")
-  val oldCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.old")
-
-  val newFilePath = s"${filePath}/new"
-  val oldFilePath = s"${filePath}/old"
-
-  val defOldCacheIndex = 0L
-
-  protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit
-  protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame
-
-  /**
-    * save data frame in dump phase
-    * @param dfOpt    data frame to be saved
-    * @param ms       timestamp of this data frame
-    */
-  def saveData(dfOpt: Option[DataFrame], ms: Long): Unit = {
-    if (!readOnly) {
-      dfOpt match {
-        case Some(df) => {
-          // cache df
-          df.cache
-
-          // cache df
-          val cnt = df.count
-          info(s"save ${dsName} data count: ${cnt}")
-
-          if (cnt > 0) {
-            // lock makes it safer when writing new cache data
-            val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
-            if (newCacheLocked) {
-              try {
-                val dfw = df.write.mode(SaveMode.Append).partitionBy(ConstantColumns.tmst)
-                writeDataFrame(dfw, newFilePath)
-              } catch {
-                case e: Throwable => error(s"save data error: ${e.getMessage}")
-              } finally {
-                newCacheLock.unlock()
-              }
-            }
-          }
-
-          // uncache df
-          df.unpersist
-        }
-        case _ => {
-          info(s"no data frame to save")
-        }
-      }
-
-      // submit cache time and ready time
-      if (fanIncrement(ms)) {
-        info(s"save data [${ms}] finish")
-        submitCacheTime(ms)
-        submitReadyTime(ms)
-      }
-
-    }
-  }
-
-  /**
-    * read data frame in calculation phase
-    * @return   data frame to calculate, with the time range of data
-    */
-  def readData(): (Option[DataFrame], TimeRange) = {
-    // time range: (a, b]
-    val timeRange = TimeInfoCache.getTimeRange
-    val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + deltaTimeRange._2)
-
-    // read partition info
-    val filterStr = if (reviseTimeRange._1 == reviseTimeRange._2) {
-      info(s"read time range: [${reviseTimeRange._1}]")
-      s"`${ConstantColumns.tmst}` = ${reviseTimeRange._1}"
-    } else {
-      info(s"read time range: (${reviseTimeRange._1}, ${reviseTimeRange._2}]")
-      s"`${ConstantColumns.tmst}` > ${reviseTimeRange._1} AND `${ConstantColumns.tmst}` <= ${reviseTimeRange._2}"
-    }
-
-    // new cache data
-    val newDfOpt = try {
-      val dfr = sqlContext.read
-      Some(readDataFrame(dfr, newFilePath).filter(filterStr))
-    } catch {
-      case e: Throwable => {
-        warn(s"read data source cache warn: ${e.getMessage}")
-        None
-      }
-    }
-
-    // old cache data
-    val oldCacheIndexOpt = if (updatable) readOldCacheIndex else None
-    val oldDfOpt = oldCacheIndexOpt.flatMap { idx =>
-      val oldDfPath = s"${oldFilePath}/${idx}"
-      try {
-        val dfr = sqlContext.read
-        Some(readDataFrame(dfr, oldDfPath).filter(filterStr))
-      } catch {
-        case e: Throwable => {
-          warn(s"read old data source cache warn: ${e.getMessage}")
-          None
-        }
-      }
-    }
-
-    // whole cache data
-    val cacheDfOpt = unionDfOpts(newDfOpt, oldDfOpt)
-
-    // from until tmst range
-    val (from, until) = (reviseTimeRange._1, reviseTimeRange._2)
-    val tmstSet = afterTilRangeTmsts(from, until)
-
-    val retTimeRange = TimeRange(reviseTimeRange, tmstSet)
-    (cacheDfOpt, retTimeRange)
-  }
-
-  private def cleanOutTimePartitions(path: String, outTime: Long, partitionOpt: Option[String],
-                                     func: (Long, Long) => Boolean
-                                    ): Unit = {
-    val earlierOrEqPaths = listPartitionsByFunc(path: String, outTime, partitionOpt, func)
-    // delete out time data path
-    earlierOrEqPaths.foreach { path =>
-      info(s"delete hdfs path: ${path}")
-      HdfsUtil.deleteHdfsPath(path)
-    }
-  }
-  private def listPartitionsByFunc(path: String, bound: Long, partitionOpt: Option[String],
-                                        func: (Long, Long) => Boolean
-                                       ): Iterable[String] = {
-    val names = HdfsUtil.listSubPathsByType(path, "dir")
-    val regex = partitionOpt match {
-      case Some(partition) => s"^${partition}=(\\d+)$$".r
-      case _ => "^(\\d+)$".r
-    }
-    names.filter { name =>
-      name match {
-        case regex(value) => {
-          str2Long(value) match {
-            case Some(t) => func(t, bound)
-            case _ => false
-          }
-        }
-        case _ => false
-      }
-    }.map(name => s"${path}/${name}")
-  }
-  private def str2Long(str: String): Option[Long] = {
-    try {
-      Some(str.toLong)
-    } catch {
-      case e: Throwable => None
-    }
-  }
-
-  /**
-    * clean out-time cached data on hdfs
-    */
-  def cleanOutTimeData(): Unit = {
-    // clean tmst
-    val cleanTime = readCleanTime
-    cleanTime.foreach(clearTmstsTil(_))
-
-    if (!readOnly) {
-      // new cache data
-      val newCacheCleanTime = if (updatable) readLastProcTime else readCleanTime
-      newCacheCleanTime match {
-        case Some(nct) => {
-          // clean calculated new cache data
-          val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
-          if (newCacheLocked) {
-            try {
-              cleanOutTimePartitions(newFilePath, nct, Some(ConstantColumns.tmst),
-                (a: Long, b: Long) => (a <= b))
-            } catch {
-              case e: Throwable => error(s"clean new cache data error: ${e.getMessage}")
-            } finally {
-              newCacheLock.unlock()
-            }
-          }
-        }
-        case _ => {
-          // do nothing
-        }
-      }
-
-      // old cache data
-      val oldCacheCleanTime = if (updatable) readCleanTime else None
-      oldCacheCleanTime match {
-        case Some(oct) => {
-          val oldCacheIndexOpt = readOldCacheIndex
-          oldCacheIndexOpt.foreach { idx =>
-            val oldDfPath = s"${oldFilePath}/${idx}"
-            val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
-            if (oldCacheLocked) {
-              try {
-                // clean calculated old cache data
-                cleanOutTimePartitions(oldFilePath, idx, None, (a: Long, b: Long) => (a < b))
-                // clean out time old cache data not calculated
-//                cleanOutTimePartitions(oldDfPath, oct, Some(InternalColumns.tmst))
-              } catch {
-                case e: Throwable => error(s"clean old cache data error: ${e.getMessage}")
-              } finally {
-                oldCacheLock.unlock()
-              }
-            }
-          }
-        }
-        case _ => {
-          // do nothing
-        }
-      }
-    }
-  }
-
-  /**
-    * update old cached data by new data frame
-    * @param dfOpt    data frame to update old cached data
-    */
-  def updateData(dfOpt: Option[DataFrame]): Unit = {
-    if (!readOnly && updatable) {
-      dfOpt match {
-        case Some(df) => {
-          // old cache lock
-          val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
-          if (oldCacheLocked) {
-            try {
-              val oldCacheIndexOpt = readOldCacheIndex
-              val nextOldCacheIndex = oldCacheIndexOpt.getOrElse(defOldCacheIndex) + 1
-
-              val oldDfPath = s"${oldFilePath}/${nextOldCacheIndex}"
-              val cleanTime = getNextCleanTime
-              val filterStr = s"`${ConstantColumns.tmst}` > ${cleanTime}"
-              val updateDf = df.filter(filterStr)
-
-              val prlCount = sqlContext.sparkContext.defaultParallelism
-              // repartition
-              val repartitionedDf = updateDf.repartition(prlCount)
-              val dfw = repartitionedDf.write.mode(SaveMode.Overwrite)
-              writeDataFrame(dfw, oldDfPath)
-
-              submitOldCacheIndex(nextOldCacheIndex)
-            } catch {
-              case e: Throwable => error(s"update data error: ${e.getMessage}")
-            } finally {
-              oldCacheLock.unlock()
-            }
-          }
-        }
-        case _ => {
-          info(s"no data frame to update")
-        }
-      }
-    }
-  }
-
-  /**
-    * each time calculation phase finishes,
-    * data source cache needs to submit some cache information
-    */
-  def processFinish(): Unit = {
-    // next last proc time
-    val timeRange = TimeInfoCache.getTimeRange
-    submitLastProcTime(timeRange._2)
-
-    // next clean time
-    val nextCleanTime = timeRange._2 + deltaTimeRange._1
-    submitCleanTime(nextCleanTime)
-  }
-
-  // read next clean time
-  private def getNextCleanTime(): Long = {
-    val timeRange = TimeInfoCache.getTimeRange
-    val nextCleanTime = timeRange._2 + deltaTimeRange._1
-    nextCleanTime
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClientFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClientFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClientFactory.scala
deleted file mode 100644
index fd9d231..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClientFactory.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.cache
-
-import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.context.datasource.info.TmstCache
-import org.apache.griffin.measure.utils.ParamUtil._
-import org.apache.spark.sql.SQLContext
-
-object StreamingCacheClientFactory extends Loggable {
-
-  private object DataSourceCacheType {
-    val ParquetRegex = "^(?i)parq(uet)?$".r
-    val JsonRegex = "^(?i)json$".r
-    val OrcRegex = "^(?i)orc$".r
-  }
-  import DataSourceCacheType._
-
-  val _type = "type"
-
-  /**
-    * create streaming cache client
-    * @param sqlContext   sqlContext in spark environment
-    * @param cacheOpt     data source cache config option
-    * @param name         data source name
-    * @param index        data source index
-    * @param tmstCache    the same tmstCache instance inside a data source
-    * @return             streaming cache client option
-    */
-  def getClientOpt(sqlContext: SQLContext, cacheOpt: Option[Map[String, Any]],
-                   name: String, index: Int, tmstCache: TmstCache
-                  ): Option[StreamingCacheClient] = {
-    cacheOpt.flatMap { param =>
-      try {
-        val tp = param.getString(_type, "")
-        val dsCache = tp match {
-          case ParquetRegex() => StreamingCacheParquetClient(sqlContext, param, name, index, tmstCache)
-          case JsonRegex() => StreamingCacheJsonClient(sqlContext, param, name, index, tmstCache)
-          case OrcRegex() => StreamingCacheOrcClient(sqlContext, param, name, index, tmstCache)
-          case _ => StreamingCacheParquetClient(sqlContext, param, name, index, tmstCache)
-        }
-        Some(dsCache)
-      } catch {
-        case e: Throwable => {
-          error(s"generate data source cache fails")
-          None
-        }
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheJsonClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheJsonClient.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheJsonClient.scala
deleted file mode 100644
index 494db3e..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheJsonClient.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.cache
-
-import org.apache.griffin.measure.context.datasource.info.TmstCache
-import org.apache.spark.sql._
-
-/**
-  * data source cache in json format
-  */
-case class StreamingCacheJsonClient(sqlContext: SQLContext, param: Map[String, Any],
-                                    dsName: String, index: Int, tmstCache: TmstCache
-                              ) extends StreamingCacheClient {
-
-  protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = {
-    info(s"write path: ${path}")
-    dfw.json(path)
-  }
-
-  protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = {
-    dfr.json(path)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheOrcClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheOrcClient.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheOrcClient.scala
deleted file mode 100644
index 6e0f142..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheOrcClient.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.cache
-
-import org.apache.griffin.measure.context.datasource.info.TmstCache
-import org.apache.spark.sql._
-
-/**
-  * data source cache in orc format
-  */
-case class StreamingCacheOrcClient(sqlContext: SQLContext, param: Map[String, Any],
-                                   dsName: String, index: Int, tmstCache: TmstCache
-                             ) extends StreamingCacheClient {
-
-  protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = {
-    info(s"write path: ${path}")
-    dfw.orc(path)
-  }
-
-  protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = {
-    dfr.orc(path)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheParquetClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheParquetClient.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheParquetClient.scala
deleted file mode 100644
index d99bc58..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheParquetClient.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.cache
-
-import org.apache.griffin.measure.context.datasource.info.TmstCache
-import org.apache.spark.sql._
-
-/**
-  * data source cache in parquet format
-  */
-case class StreamingCacheParquetClient(sqlContext: SQLContext, param: Map[String, Any],
-                                       dsName: String, index: Int, tmstCache: TmstCache
-                                 ) extends StreamingCacheClient {
-
-  sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
-
-  protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = {
-    info(s"write path: ${path}")
-    dfw.parquet(path)
-  }
-
-  protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = {
-    dfr.parquet(path)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/WithFanIn.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/WithFanIn.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/WithFanIn.scala
deleted file mode 100644
index ebd2e55..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/WithFanIn.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.cache
-
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.collection.concurrent.{TrieMap, Map => ConcMap}
-
-/**
-  * fan in trait, for multiple input and one output
-  * to support multiple parallel data connectors in one data source
-  */
-trait WithFanIn[T] {
-
-  // total input number
-  val totalNum: AtomicInteger = new AtomicInteger(0)
-  // concurrent map of fan in count for each key
-  val fanInCountMap: ConcMap[T, Int] = TrieMap[T, Int]()
-
-  def registerFanIn(): Int = {
-    totalNum.incrementAndGet()
-  }
-
-  /**
-    * increment for a key, to test if all parallel inputs finished
-    * @param key
-    * @return
-    */
-  def fanIncrement(key: T): Boolean = {
-    fanInc(key)
-    fanInCountMap.get(key) match {
-      case Some(n) if (n >= totalNum.get) => {
-        fanInCountMap.remove(key)
-        true
-      }
-      case _ => false
-    }
-  }
-
-  private def fanInc(key: T): Unit = {
-    fanInCountMap.get(key) match {
-      case Some(n) => {
-        val suc = fanInCountMap.replace(key, n, n + 1)
-        if (!suc) fanInc(key)
-      }
-      case _ => {
-        val oldOpt = fanInCountMap.putIfAbsent(key, 1)
-        if (oldOpt.nonEmpty) fanInc(key)
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnector.scala
deleted file mode 100644
index a4c1995..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnector.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.connector
-
-import java.util.concurrent.atomic.AtomicLong
-
-import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.configuration.enums.{BatchProcessType, DslType, SparkSqlType}
-import org.apache.griffin.measure.configuration.params.DataConnectorParam
-import org.apache.griffin.measure.context.datasource.info.TmstCache
-import org.apache.griffin.measure.context.{ContextId, DQContext, TimeRange}
-import org.apache.griffin.measure.job.builder.DQJobBuilder
-import org.apache.griffin.measure.step.builder.ConstantColumns
-import org.apache.griffin.measure.step.builder.preproc.PreProcRuleParamGenerator
-import org.apache.spark.sql.{DataFrame, SparkSession}
-import org.apache.spark.sql.functions._
-
-trait DataConnector extends Loggable with Serializable {
-
-  @transient val sparkSession: SparkSession
-
-  val dcParam: DataConnectorParam
-
-  val id: String = DataConnectorIdGenerator.genId
-  protected def thisName(suffix: String): String = s"this_${suffix}"
-
-  val tmstCache: TmstCache
-  protected def saveTmst(t: Long) = tmstCache.insert(t)
-  protected def readTmst(t: Long) = tmstCache.fromUntil(t, t + 1)
-
-  def init(): Unit
-
-  // get data frame in batch mode
-  def data(ms: Long): (Option[DataFrame], TimeRange)
-
-  private def createContext(t: Long): DQContext = {
-    DQContext(ContextId(t, id), id, Nil, Nil, BatchProcessType)(sparkSession)
-  }
-
-  def preProcess(dfOpt: Option[DataFrame], ms: Long): Option[DataFrame] = {
-    // new context
-    val context = createContext(ms)
-
-    val timestamp = context.contextId.timestamp
-    val suffix = context.contextId.id
-    val thisTable = thisName(suffix)
-
-    try {
-      saveTmst(timestamp)    // save timestamp
-
-      dfOpt.flatMap { df =>
-        val preProcRules = PreProcRuleParamGenerator.getNewPreProcRules(dcParam.getPreProcRules, suffix)
-
-        // init data
-        context.compileTableRegister.registerTable(thisTable)
-        context.runTimeTableRegister.registerTable(thisTable, df)
-
-        // build job
-        val preprocJob = DQJobBuilder.buildDQJob(context, preProcRules)
-
-        // job execute
-        preprocJob.execute(context)
-
-        // out data
-        val outDf = context.sparkSession.table(s"`${thisTable}`")
-
-        // add tmst column
-        val withTmstDf = outDf.withColumn(ConstantColumns.tmst, lit(timestamp))
-
-        // clean context
-        context.clean()
-
-        Some(withTmstDf)
-      }
-
-    } catch {
-      case e: Throwable => {
-        error(s"pre-process of data connector [${id}] error: ${e.getMessage}")
-        None
-      }
-    }
-  }
-}
-
-object DataConnectorIdGenerator {
-  private val counter: AtomicLong = new AtomicLong(0L)
-  private val head: String = "dc"
-
-  def genId: String = {
-    s"${head}${increment}"
-  }
-
-  private def increment: Long = {
-    counter.incrementAndGet()
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala
deleted file mode 100644
index 4538fbb..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.connector
-
-import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.configuration.params.DataConnectorParam
-import org.apache.griffin.measure.context.datasource.cache.StreamingCacheClient
-import org.apache.griffin.measure.context.datasource.connector.batch._
-import org.apache.griffin.measure.context.datasource.connector.streaming._
-import org.apache.griffin.measure.context.datasource.info.TmstCache
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.streaming.StreamingContext
-
-import scala.reflect.ClassTag
-import scala.util.Try
-
-object DataConnectorFactory extends Loggable {
-
-  val HiveRegex = """^(?i)hive$""".r
-  val AvroRegex = """^(?i)avro$""".r
-  val TextDirRegex = """^(?i)text-dir$""".r
-
-  val KafkaRegex = """^(?i)kafka$""".r
-
-  /**
-    * create data connector
-    * @param sparkSession     spark env
-    * @param ssc              spark streaming env
-    * @param dcParam          data connector param
-    * @param tmstCache        same tmst cache in one data source
-    * @param streamingCacheClientOpt   for streaming cache
-    * @return   data connector
-    */
-  def getDataConnector(sparkSession: SparkSession,
-                       ssc: StreamingContext,
-                       dcParam: DataConnectorParam,
-                       tmstCache: TmstCache,
-                       streamingCacheClientOpt: Option[StreamingCacheClient]
-                      ): Try[DataConnector] = {
-    val conType = dcParam.getType
-    val version = dcParam.getVersion
-    Try {
-      conType match {
-        case HiveRegex() => HiveBatchDataConnector(sparkSession, dcParam, tmstCache)
-        case AvroRegex() => AvroBatchDataConnector(sparkSession, dcParam, tmstCache)
-        case TextDirRegex() => TextDirBatchDataConnector(sparkSession, dcParam, tmstCache)
-        case KafkaRegex() => {
-          getStreamingDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
-        }
-        case _ => throw new Exception("connector creation error!")
-      }
-    }
-  }
-
-  private def getStreamingDataConnector(sparkSession: SparkSession,
-                                        ssc: StreamingContext,
-                                        dcParam: DataConnectorParam,
-                                        tmstCache: TmstCache,
-                                        streamingCacheClientOpt: Option[StreamingCacheClient]
-                                       ): StreamingDataConnector = {
-    if (ssc == null) throw new Exception("streaming context is null!")
-    val conType = dcParam.getType
-    val version = dcParam.getVersion
-    conType match {
-      case KafkaRegex() => getKafkaDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
-      case _ => throw new Exception("streaming connector creation error!")
-    }
-  }
-
-  private def getKafkaDataConnector(sparkSession: SparkSession,
-                                    ssc: StreamingContext,
-                                    dcParam: DataConnectorParam,
-                                    tmstCache: TmstCache,
-                                    streamingCacheClientOpt: Option[StreamingCacheClient]
-                                   ): KafkaStreamingDataConnector  = {
-    val KeyType = "key.type"
-    val ValueType = "value.type"
-    val config = dcParam.config
-    val keyType = config.getOrElse(KeyType, "java.lang.String").toString
-    val valueType = config.getOrElse(ValueType, "java.lang.String").toString
-    (getClassTag(keyType), getClassTag(valueType)) match {
-      case (ClassTag(k: Class[String]), ClassTag(v: Class[String])) => {
-        KafkaStreamingStringDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
-      }
-      case _ => {
-        throw new Exception("not supported type kafka data connector")
-      }
-    }
-  }
-
-  private def getClassTag(tp: String): ClassTag[_] = {
-    try {
-      val clazz = Class.forName(tp)
-      ClassTag(clazz)
-    } catch {
-      case e: Throwable => throw e
-    }
-  }
-
-//  def filterDataConnectors[T <: DataConnector : ClassTag](connectors: Seq[DataConnector]): Seq[T] = {
-//    connectors.flatMap { dc =>
-//      dc match {
-//        case mdc: T => Some(mdc)
-//        case _ => None
-//      }
-//    }
-//  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/AvroBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/AvroBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/AvroBatchDataConnector.scala
deleted file mode 100644
index a906246..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/AvroBatchDataConnector.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.connector.batch
-
-import org.apache.griffin.measure.configuration.params.DataConnectorParam
-import org.apache.griffin.measure.context.TimeRange
-import org.apache.griffin.measure.context.datasource.info.TmstCache
-import org.apache.griffin.measure.utils.HdfsUtil
-import org.apache.spark.sql.{DataFrame, SparkSession}
-import org.apache.griffin.measure.utils.ParamUtil._
-
-/**
-  * batch data connector for avro file
-  */
-case class AvroBatchDataConnector(@transient sparkSession: SparkSession,
-                                  dcParam: DataConnectorParam,
-                                  tmstCache: TmstCache
-                                 ) extends BatchDataConnector {
-
-  val config = dcParam.getConfig
-
-  val FilePath = "file.path"
-  val FileName = "file.name"
-
-  val filePath = config.getString(FilePath, "")
-  val fileName = config.getString(FileName, "")
-
-  val concreteFileFullPath = if (pathPrefix) s"${filePath}${fileName}" else fileName
-
-  private def pathPrefix(): Boolean = {
-    filePath.nonEmpty
-  }
-
-  private def fileExist(): Boolean = {
-    HdfsUtil.existPath(concreteFileFullPath)
-  }
-
-  def data(ms: Long): (Option[DataFrame], TimeRange) = {
-    val dfOpt = try {
-      val df = sparkSession.read.format("com.databricks.spark.avro").load(concreteFileFullPath)
-      val dfOpt = Some(df)
-      val preDfOpt = preProcess(dfOpt, ms)
-      preDfOpt
-    } catch {
-      case e: Throwable => {
-        error(s"load avro file ${concreteFileFullPath} fails")
-        None
-      }
-    }
-    val tmsts = readTmst(ms)
-    (dfOpt, TimeRange(ms, tmsts))
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/BatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/BatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/BatchDataConnector.scala
deleted file mode 100644
index 8f32687..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/BatchDataConnector.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.connector.batch
-
-import org.apache.griffin.measure.context.datasource.connector.DataConnector
-
-trait BatchDataConnector extends DataConnector {
-
-  def init(): Unit = {}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/HiveBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/HiveBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/HiveBatchDataConnector.scala
deleted file mode 100644
index 331f469..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/HiveBatchDataConnector.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.connector.batch
-
-import org.apache.griffin.measure.configuration.params.DataConnectorParam
-import org.apache.griffin.measure.context.TimeRange
-import org.apache.griffin.measure.context.datasource.info.TmstCache
-import org.apache.spark.sql.{DataFrame, SparkSession}
-import org.apache.griffin.measure.utils.ParamUtil._
-
-/**
-  * batch data connector for hive table
-  */
-case class HiveBatchDataConnector(@transient sparkSession: SparkSession,
-                                  dcParam: DataConnectorParam,
-                                  tmstCache: TmstCache
-                                 ) extends BatchDataConnector {
-
-  val config = dcParam.getConfig
-
-  val Database = "database"
-  val TableName = "table.name"
-  val Where = "where"
-
-  val database = config.getString(Database, "default")
-  val tableName = config.getString(TableName, "")
-  val whereString = config.getString(Where, "")
-
-  val concreteTableName = s"${database}.${tableName}"
-  val wheres = whereString.split(",").map(_.trim).filter(_.nonEmpty)
-
-  def data(ms: Long): (Option[DataFrame], TimeRange) = {
-    val dfOpt = try {
-      val dtSql = dataSql
-      info(dtSql)
-      val df = sparkSession.sql(dtSql)
-      val dfOpt = Some(df)
-      val preDfOpt = preProcess(dfOpt, ms)
-      preDfOpt
-    } catch {
-      case e: Throwable => {
-        error(s"load hive table ${concreteTableName} fails: ${e.getMessage}")
-        None
-      }
-    }
-    val tmsts = readTmst(ms)
-    (dfOpt, TimeRange(ms, tmsts))
-  }
-
-
-  private def tableExistsSql(): String = {
-//    s"SHOW TABLES LIKE '${concreteTableName}'"    // this is hive sql, but not work for spark sql
-    s"tableName LIKE '${tableName}'"
-  }
-
-  private def metaDataSql(): String = {
-    s"DESCRIBE ${concreteTableName}"
-  }
-
-  private def dataSql(): String = {
-    val tableClause = s"SELECT * FROM ${concreteTableName}"
-    if (wheres.length > 0) {
-      val clauses = wheres.map { w =>
-        s"${tableClause} WHERE ${w}"
-      }
-      clauses.mkString(" UNION ALL ")
-    } else tableClause
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/TextDirBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/TextDirBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/TextDirBatchDataConnector.scala
deleted file mode 100644
index bc76f9d..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/TextDirBatchDataConnector.scala
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.connector.batch
-
-import org.apache.griffin.measure.configuration.params.DataConnectorParam
-import org.apache.griffin.measure.context.TimeRange
-import org.apache.griffin.measure.context.datasource.info.TmstCache
-import org.apache.griffin.measure.utils.HdfsUtil
-import org.apache.spark.sql.{DataFrame, SparkSession}
-import org.apache.griffin.measure.utils.ParamUtil._
-
-/**
-  * batch data connector for directory with text format data in the nth depth sub-directories
-  */
-case class TextDirBatchDataConnector(@transient sparkSession: SparkSession,
-                                     dcParam: DataConnectorParam,
-                                     tmstCache: TmstCache
-                                    ) extends BatchDataConnector {
-
-  val config = dcParam.getConfig
-
-  val DirPath = "dir.path"
-  val DataDirDepth = "data.dir.depth"
-  val SuccessFile = "success.file"
-  val DoneFile = "done.file"
-
-  val dirPath = config.getString(DirPath, "")
-  val dataDirDepth = config.getInt(DataDirDepth, 0)
-  val successFile = config.getString(SuccessFile, "_SUCCESS")
-  val doneFile = config.getString(DoneFile, "_DONE")
-
-  val ignoreFilePrefix = "_"
-
-  private def dirExist(): Boolean = {
-    HdfsUtil.existPath(dirPath)
-  }
-
-  def data(ms: Long): (Option[DataFrame], TimeRange) = {
-    val dfOpt = try {
-      val dataDirs = listSubDirs(dirPath :: Nil, dataDirDepth, readable)
-      // touch done file for read dirs
-      dataDirs.foreach(dir => touchDone(dir))
-
-      val validDataDirs = dataDirs.filter(dir => !emptyDir(dir))
-
-      if (validDataDirs.nonEmpty) {
-        val df = sparkSession.read.text(validDataDirs:  _*)
-        val dfOpt = Some(df)
-        val preDfOpt = preProcess(dfOpt, ms)
-        preDfOpt
-      } else {
-        None
-      }
-    } catch {
-      case e: Throwable => {
-        error(s"load text dir ${dirPath} fails: ${e.getMessage}")
-        None
-      }
-    }
-    val tmsts = readTmst(ms)
-    (dfOpt, TimeRange(ms, tmsts))
-  }
-
-  private def listSubDirs(paths: Seq[String], depth: Int, filteFunc: (String) => Boolean): Seq[String] = {
-    val subDirs = paths.flatMap { path => HdfsUtil.listSubPathsByType(path, "dir", true) }
-    if (depth <= 0) {
-      subDirs.filter(filteFunc)
-    } else {
-      listSubDirs(subDirs, depth - 1, filteFunc)
-    }
-  }
-
-  private def readable(dir: String): Boolean = isSuccess(dir) && !isDone(dir)
-  private def isDone(dir: String): Boolean = HdfsUtil.existFileInDir(dir, doneFile)
-  private def isSuccess(dir: String): Boolean = HdfsUtil.existFileInDir(dir, successFile)
-
-  private def touchDone(dir: String): Unit = HdfsUtil.createEmptyFile(HdfsUtil.getHdfsFilePath(dir, doneFile))
-
-  private def emptyDir(dir: String): Boolean = {
-    HdfsUtil.listSubPathsByType(dir, "file").filter(!_.startsWith(ignoreFilePrefix)).size == 0
-  }
-
-//  def metaData(): Try[Iterable[(String, String)]] = {
-//    Try {
-//      val st = sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath).schema
-//      st.fields.map(f => (f.name, f.dataType.typeName))
-//    }
-//  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala
deleted file mode 100644
index 0f30b7f..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.connector.streaming
-
-import kafka.serializer.Decoder
-import org.apache.spark.streaming.dstream.InputDStream
-
-import scala.util.{Failure, Success, Try}
-import org.apache.griffin.measure.utils.ParamUtil._
-
-/**
-  * streaming data connector for kafka
-  */
-trait KafkaStreamingDataConnector extends StreamingDataConnector {
-
-  type KD <: Decoder[K]
-  type VD <: Decoder[V]
-  type OUT = (K, V)
-
-  val config = dcParam.getConfig
-
-  val KafkaConfig = "kafka.config"
-  val Topics = "topics"
-
-  val kafkaConfig = config.getAnyRef(KafkaConfig, Map[String, String]())
-  val topics = config.getString(Topics, "")
-
-  def init(): Unit = {
-    // register fan in
-    streamingCacheClientOpt.foreach(_.registerFanIn)
-
-    val ds = stream match {
-      case Success(dstream) => dstream
-      case Failure(ex) => throw ex
-    }
-    ds.foreachRDD((rdd, time) => {
-      val ms = time.milliseconds
-      val saveDfOpt = try {
-        // coalesce partition number
-        val prlCount = rdd.sparkContext.defaultParallelism
-        val ptnCount = rdd.getNumPartitions
-        val repartitionedRdd = if (prlCount < ptnCount) {
-          rdd.coalesce(prlCount)
-        } else rdd
-
-        val dfOpt = transform(repartitionedRdd)
-
-        // pre-process
-        preProcess(dfOpt, ms)
-      } catch {
-        case e: Throwable => {
-          error(s"streaming data connector error: ${e.getMessage}")
-          None
-        }
-      }
-
-      // save data frame
-      streamingCacheClientOpt.foreach(_.saveData(saveDfOpt, ms))
-    })
-  }
-
-  def stream(): Try[InputDStream[OUT]] = Try {
-    val topicSet = topics.split(",").toSet
-    createDStream(topicSet)
-  }
-
-  protected def createDStream(topicSet: Set[String]): InputDStream[OUT]
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala
deleted file mode 100644
index 3083ca6..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.connector.streaming
-
-import kafka.serializer.StringDecoder
-import org.apache.griffin.measure.configuration.params.DataConnectorParam
-import org.apache.griffin.measure.context.datasource.cache.StreamingCacheClient
-import org.apache.griffin.measure.context.datasource.info.TmstCache
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
-import org.apache.spark.sql.{Row, _}
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.dstream.InputDStream
-import org.apache.spark.streaming.kafka.KafkaUtils
-
-/**
-  * streaming data connector for kafka with string format key and value
-  */
-case class KafkaStreamingStringDataConnector(@transient sparkSession: SparkSession,
-                                             @transient ssc: StreamingContext,
-                                             dcParam: DataConnectorParam,
-                                             tmstCache: TmstCache,
-                                             streamingCacheClientOpt: Option[StreamingCacheClient]
-                                            ) extends KafkaStreamingDataConnector {
-
-  type K = String
-  type KD = StringDecoder
-  type V = String
-  type VD = StringDecoder
-
-  val valueColName = "value"
-  val schema = StructType(Array(
-    StructField(valueColName, StringType)
-  ))
-
-  def createDStream(topicSet: Set[String]): InputDStream[OUT] = {
-    KafkaUtils.createDirectStream[K, V, KD, VD](ssc, kafkaConfig, topicSet)
-  }
-
-  def transform(rdd: RDD[OUT]): Option[DataFrame] = {
-    if (rdd.isEmpty) None else {
-      try {
-        val rowRdd = rdd.map(d => Row(d._2))
-        val df = sparkSession.createDataFrame(rowRdd, schema)
-        Some(df)
-      } catch {
-        case e: Throwable => {
-          error(s"streaming data transform fails")
-          None
-        }
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/StreamingDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/StreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/StreamingDataConnector.scala
deleted file mode 100644
index 737bc21..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/StreamingDataConnector.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.connector.streaming
-
-import org.apache.griffin.measure.context.TimeRange
-import org.apache.griffin.measure.context.datasource.cache.StreamingCacheClient
-import org.apache.griffin.measure.context.datasource.connector.DataConnector
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql._
-import org.apache.spark.streaming.dstream.InputDStream
-
-import scala.util.Try
-
-trait StreamingDataConnector extends DataConnector {
-
-  type K
-  type V
-  type OUT
-
-  protected def stream(): Try[InputDStream[OUT]]
-
-  // transform rdd to dataframe
-  def transform(rdd: RDD[OUT]): Option[DataFrame]
-
-  // streaming data connector cannot directly read data frame
-  def data(ms: Long): (Option[DataFrame], TimeRange) = (None, TimeRange.emptyTimeRange)
-
-  val streamingCacheClientOpt: Option[StreamingCacheClient]
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/datasource/info/DataSourceCacheable.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/info/DataSourceCacheable.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/info/DataSourceCacheable.scala
deleted file mode 100644
index f721a1e..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/info/DataSourceCacheable.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.info
-
-import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.context.streaming.info.{InfoCacheInstance, TimeInfoCache}
-
-/**
-  * timestamp info of data source cache
-  */
-trait DataSourceCacheable extends Loggable with Serializable {
-
-  val cacheInfoPath: String
-  val readyTimeInterval: Long
-  val readyTimeDelay: Long
-
-  def selfCacheInfoPath = s"${TimeInfoCache.infoPath}/${cacheInfoPath}"
-
-  def selfCacheTime = TimeInfoCache.cacheTime(selfCacheInfoPath)
-  def selfLastProcTime = TimeInfoCache.lastProcTime(selfCacheInfoPath)
-  def selfReadyTime = TimeInfoCache.readyTime(selfCacheInfoPath)
-  def selfCleanTime = TimeInfoCache.cleanTime(selfCacheInfoPath)
-  def selfOldCacheIndex = TimeInfoCache.oldCacheIndex(selfCacheInfoPath)
-
-  protected def submitCacheTime(ms: Long): Unit = {
-    val map = Map[String, String]((selfCacheTime -> ms.toString))
-    InfoCacheInstance.cacheInfo(map)
-  }
-
-  protected def submitReadyTime(ms: Long): Unit = {
-    val curReadyTime = ms - readyTimeDelay
-    if (curReadyTime % readyTimeInterval == 0) {
-      val map = Map[String, String]((selfReadyTime -> curReadyTime.toString))
-      InfoCacheInstance.cacheInfo(map)
-    }
-  }
-
-  protected def submitLastProcTime(ms: Long): Unit = {
-    val map = Map[String, String]((selfLastProcTime -> ms.toString))
-    InfoCacheInstance.cacheInfo(map)
-  }
-
-  protected def readLastProcTime(): Option[Long] = readSelfInfo(selfLastProcTime)
-
-  protected def submitCleanTime(ms: Long): Unit = {
-    val cleanTime = genCleanTime(ms)
-    val map = Map[String, String]((selfCleanTime -> cleanTime.toString))
-    InfoCacheInstance.cacheInfo(map)
-  }
-
-  protected def genCleanTime(ms: Long): Long = ms
-
-  protected def readCleanTime(): Option[Long] = readSelfInfo(selfCleanTime)
-
-  protected def submitOldCacheIndex(index: Long): Unit = {
-    val map = Map[String, String]((selfOldCacheIndex -> index.toString))
-    InfoCacheInstance.cacheInfo(map)
-  }
-
-  def readOldCacheIndex(): Option[Long] = readSelfInfo(selfOldCacheIndex)
-
-  private def readSelfInfo(key: String): Option[Long] = {
-    InfoCacheInstance.readInfo(key :: Nil).get(key).flatMap { v =>
-      try {
-        Some(v.toLong)
-      } catch {
-        case _ => None
-      }
-    }
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/datasource/info/TmstCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/info/TmstCache.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/info/TmstCache.scala
deleted file mode 100644
index 4b1c410..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/info/TmstCache.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.info
-
-import org.apache.griffin.measure.Loggable
-
-import scala.collection.mutable.{SortedSet => MutableSortedSet}
-
-/**
-  * tmst cache, CRUD of timestamps
-  */
-case class TmstCache() extends Loggable {
-
-  private val tmstGroup: MutableSortedSet[Long] = MutableSortedSet.empty[Long]
-
-  //-- insert tmst into tmst group --
-  def insert(tmst: Long) = tmstGroup += tmst
-  def insert(tmsts: Iterable[Long]) = tmstGroup ++= tmsts
-
-  //-- remove tmst from tmst group --
-  def remove(tmst: Long) = tmstGroup -= tmst
-  def remove(tmsts: Iterable[Long]) = tmstGroup --= tmsts
-
-  //-- get subset of tmst group --
-  def fromUntil(from: Long, until: Long) = tmstGroup.range(from, until).toSet
-  def afterTil(after: Long, til: Long) = tmstGroup.range(after + 1, til + 1).toSet
-  def until(until: Long) = tmstGroup.until(until).toSet
-  def from(from: Long) = tmstGroup.from(from).toSet
-  def all = tmstGroup.toSet
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCache.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCache.scala
deleted file mode 100644
index e1d498b..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCache.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.streaming.info
-
-import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.context.streaming.lock.CacheLock
-
-trait InfoCache extends Loggable with Serializable {
-
-  def init(): Unit
-  def available(): Boolean
-  def close(): Unit
-
-  def cacheInfo(info: Map[String, String]): Boolean
-  def readInfo(keys: Iterable[String]): Map[String, String]
-  def deleteInfo(keys: Iterable[String]): Unit
-  def clearInfo(): Unit
-
-  def listKeys(path: String): List[String]
-
-  def genLock(s: String): CacheLock
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheFactory.scala
deleted file mode 100644
index 28ade3b..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheFactory.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.streaming.info
-
-import org.apache.griffin.measure.configuration.params.InfoCacheParam
-
-import scala.util.{Success, Try}
-
-case class InfoCacheFactory(infoCacheParams: Iterable[InfoCacheParam], metricName: String) extends Serializable {
-
-  val ZK_REGEX = """^(?i)zk|zookeeper$""".r
-
-  def getInfoCache(infoCacheParam: InfoCacheParam): Option[InfoCache] = {
-    val config = infoCacheParam.getConfig
-    val infoCacheTry = infoCacheParam.getType match {
-      case ZK_REGEX() => Try(ZKInfoCache(config, metricName))
-      case _ => throw new Exception("not supported info cache type")
-    }
-    infoCacheTry match {
-      case Success(infoCache) => Some(infoCache)
-      case _ => None
-    }
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheInstance.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheInstance.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheInstance.scala
deleted file mode 100644
index 198f157..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheInstance.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.streaming.info
-
-import org.apache.griffin.measure.configuration.params.InfoCacheParam
-import org.apache.griffin.measure.context.streaming.lock.{CacheLock, MultiCacheLock}
-
-object InfoCacheInstance extends InfoCache {
-  var infoCaches: List[InfoCache] = Nil
-
-  def initInstance(infoCacheParams: Iterable[InfoCacheParam], metricName: String) = {
-    val fac = InfoCacheFactory(infoCacheParams, metricName)
-    infoCaches = infoCacheParams.flatMap(param => fac.getInfoCache(param)).toList
-  }
-
-  def init(): Unit = infoCaches.foreach(_.init)
-  def available(): Boolean = infoCaches.foldLeft(false)(_ || _.available)
-  def close(): Unit = infoCaches.foreach(_.close)
-
-  def cacheInfo(info: Map[String, String]): Boolean = {
-    infoCaches.foldLeft(false) { (res, infoCache) => res || infoCache.cacheInfo(info) }
-  }
-  def readInfo(keys: Iterable[String]): Map[String, String] = {
-    val maps = infoCaches.map(_.readInfo(keys)).reverse
-    maps.fold(Map[String, String]())(_ ++ _)
-  }
-  def deleteInfo(keys: Iterable[String]): Unit = infoCaches.foreach(_.deleteInfo(keys))
-  def clearInfo(): Unit = infoCaches.foreach(_.clearInfo)
-
-  def listKeys(path: String): List[String] = {
-    infoCaches.foldLeft(Nil: List[String]) { (res, infoCache) =>
-      if (res.size > 0) res else infoCache.listKeys(path)
-    }
-  }
-
-  def genLock(s: String): CacheLock = MultiCacheLock(infoCaches.map(_.genLock(s)))
-}


[2/3] incubator-griffin git commit: refactor context and data source

Posted by gu...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/TimeInfoCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/TimeInfoCache.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/TimeInfoCache.scala
deleted file mode 100644
index b1e6764..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/TimeInfoCache.scala
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.streaming.info
-
-import org.apache.griffin.measure.Loggable
-
-object TimeInfoCache extends Loggable with Serializable {
-
-  private val CacheTime = "cache.time"
-  private val LastProcTime = "last.proc.time"
-  private val ReadyTime = "ready.time"
-  private val CleanTime = "clean.time"
-  private val OldCacheIndex = "old.cache.index"
-
-  def cacheTime(path: String): String = s"${path}/${CacheTime}"
-  def lastProcTime(path: String): String = s"${path}/${LastProcTime}"
-  def readyTime(path: String): String = s"${path}/${ReadyTime}"
-  def cleanTime(path: String): String = s"${path}/${CleanTime}"
-  def oldCacheIndex(path: String): String = s"${path}/${OldCacheIndex}"
-
-  val infoPath = "info"
-
-  val finalCacheInfoPath = "info.final"
-  val finalReadyTime = s"${finalCacheInfoPath}/${ReadyTime}"
-  val finalLastProcTime = s"${finalCacheInfoPath}/${LastProcTime}"
-  val finalCleanTime = s"${finalCacheInfoPath}/${CleanTime}"
-
-  def startTimeInfoCache(): Unit = {
-    genFinalReadyTime
-  }
-
-  def getTimeRange(): (Long, Long) = {
-    readTimeRange
-  }
-
-  def getCleanTime(): Long = {
-    readCleanTime
-  }
-
-  def endTimeInfoCache: Unit = {
-    genFinalLastProcTime
-    genFinalCleanTime
-  }
-
-  private def genFinalReadyTime(): Unit = {
-    val subPath = InfoCacheInstance.listKeys(infoPath)
-    val keys = subPath.map { p => s"${infoPath}/${p}/${ReadyTime}" }
-    val result = InfoCacheInstance.readInfo(keys)
-    val times = keys.flatMap { k =>
-      getLongOpt(result, k)
-    }
-    if (times.nonEmpty) {
-      val time = times.min
-      val map = Map[String, String]((finalReadyTime -> time.toString))
-      InfoCacheInstance.cacheInfo(map)
-    }
-  }
-
-  private def genFinalLastProcTime(): Unit = {
-    val subPath = InfoCacheInstance.listKeys(infoPath)
-    val keys = subPath.map { p => s"${infoPath}/${p}/${LastProcTime}" }
-    val result = InfoCacheInstance.readInfo(keys)
-    val times = keys.flatMap { k =>
-      getLongOpt(result, k)
-    }
-    if (times.nonEmpty) {
-      val time = times.min
-      val map = Map[String, String]((finalLastProcTime -> time.toString))
-      InfoCacheInstance.cacheInfo(map)
-    }
-  }
-
-  private def genFinalCleanTime(): Unit = {
-    val subPath = InfoCacheInstance.listKeys(infoPath)
-    val keys = subPath.map { p => s"${infoPath}/${p}/${CleanTime}" }
-    val result = InfoCacheInstance.readInfo(keys)
-    val times = keys.flatMap { k =>
-      getLongOpt(result, k)
-    }
-    if (times.nonEmpty) {
-      val time = times.min
-      val map = Map[String, String]((finalCleanTime -> time.toString))
-      InfoCacheInstance.cacheInfo(map)
-    }
-  }
-
-  private def readTimeRange(): (Long, Long) = {
-    val map = InfoCacheInstance.readInfo(List(finalLastProcTime, finalReadyTime))
-    val lastProcTime = getLong(map, finalLastProcTime)
-    val curReadyTime = getLong(map, finalReadyTime)
-    (lastProcTime, curReadyTime)
-  }
-
-  private def readCleanTime(): Long = {
-    val map = InfoCacheInstance.readInfo(List(finalCleanTime))
-    val cleanTime = getLong(map, finalCleanTime)
-    cleanTime
-  }
-
-  private def getLongOpt(map: Map[String, String], key: String): Option[Long] = {
-    try {
-      map.get(key).map(_.toLong)
-    } catch {
-      case e: Throwable => None
-    }
-  }
-  private def getLong(map: Map[String, String], key: String) = {
-    getLongOpt(map, key).getOrElse(-1L)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/ZKInfoCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/ZKInfoCache.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/ZKInfoCache.scala
deleted file mode 100644
index 42166ae..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/ZKInfoCache.scala
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.streaming.info
-
-import org.apache.curator.framework.imps.CuratorFrameworkState
-import org.apache.curator.framework.recipes.locks.InterProcessMutex
-import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
-import org.apache.curator.retry.ExponentialBackoffRetry
-import org.apache.curator.utils.ZKPaths
-import org.apache.griffin.measure.context.streaming.lock.ZKCacheLock
-import org.apache.zookeeper.CreateMode
-
-import scala.collection.JavaConverters._
-
-/**
-  * leverage zookeeper for info cache
-  * @param config
-  * @param metricName
-  */
-case class ZKInfoCache(config: Map[String, Any], metricName: String) extends InfoCache {
-
-  val Hosts = "hosts"
-  val Namespace = "namespace"
-  val Mode = "mode"
-  val InitClear = "init.clear"
-  val CloseClear = "close.clear"
-  val LockPath = "lock.path"
-
-  val PersistRegex = """^(?i)persist$""".r
-  val EphemeralRegex = """^(?i)ephemeral$""".r
-
-  final val separator = ZKPaths.PATH_SEPARATOR
-
-  val hosts = config.getOrElse(Hosts, "").toString
-  val namespace = config.getOrElse(Namespace, "").toString
-  val mode: CreateMode = config.get(Mode) match {
-    case Some(s: String) => s match {
-      case PersistRegex() => CreateMode.PERSISTENT
-      case EphemeralRegex() => CreateMode.EPHEMERAL
-      case _ => CreateMode.PERSISTENT
-    }
-    case _ => CreateMode.PERSISTENT
-  }
-  val initClear = config.get(InitClear) match {
-    case Some(b: Boolean) => b
-    case _ => true
-  }
-  val closeClear = config.get(CloseClear) match {
-    case Some(b: Boolean) => b
-    case _ => false
-  }
-  val lockPath = config.getOrElse(LockPath, "lock").toString
-
-  private val cacheNamespace: String = if (namespace.isEmpty) metricName else namespace + separator + metricName
-  private val builder = CuratorFrameworkFactory.builder()
-    .connectString(hosts)
-    .retryPolicy(new ExponentialBackoffRetry(1000, 3))
-    .namespace(cacheNamespace)
-  private val client: CuratorFramework = builder.build
-
-  def init(): Unit = {
-    client.start()
-    info("start zk info cache")
-    client.usingNamespace(cacheNamespace)
-    info(s"init with namespace: ${cacheNamespace}")
-    deleteInfo(lockPath :: Nil)
-    if (initClear) {
-      clearInfo
-    }
-  }
-
-  def available(): Boolean = {
-    client.getState match {
-      case CuratorFrameworkState.STARTED => true
-      case _ => false
-    }
-  }
-
-  def close(): Unit = {
-    if (closeClear) {
-      clearInfo
-    }
-    info("close zk info cache")
-    client.close()
-  }
-
-  def cacheInfo(info: Map[String, String]): Boolean = {
-    info.foldLeft(true) { (rs, pair) =>
-      val (k, v) = pair
-      createOrUpdate(path(k), v) && rs
-    }
-  }
-
-  def readInfo(keys: Iterable[String]): Map[String, String] = {
-    keys.flatMap { key =>
-      read(path(key)) match {
-        case Some(v) => Some((key, v))
-        case _ => None
-      }
-    }.toMap
-  }
-
-  def deleteInfo(keys: Iterable[String]): Unit = {
-    keys.foreach { key => delete(path(key)) }
-  }
-
-  def clearInfo(): Unit = {
-//    delete("/")
-    deleteInfo(TimeInfoCache.finalCacheInfoPath :: Nil)
-    deleteInfo(TimeInfoCache.infoPath :: Nil)
-    info("clear info")
-  }
-
-  def listKeys(p: String): List[String] = {
-    children(path(p))
-  }
-
-  def genLock(s: String): ZKCacheLock = {
-    val lpt = if (s.isEmpty) path(lockPath) else path(lockPath) + separator + s
-    ZKCacheLock(new InterProcessMutex(client, lpt))
-  }
-
-  private def path(k: String): String = {
-    if (k.startsWith(separator)) k else separator + k
-  }
-
-  private def children(path: String): List[String] = {
-    try {
-      client.getChildren().forPath(path).asScala.toList
-    } catch {
-      case e: Throwable => {
-        warn(s"list ${path} warn: ${e.getMessage}")
-        Nil
-      }
-    }
-  }
-
-  private def createOrUpdate(path: String, content: String): Boolean = {
-    if (checkExists(path)) {
-      update(path, content)
-    } else {
-      create(path, content)
-    }
-  }
-
-  private def create(path: String, content: String): Boolean = {
-    try {
-      client.create().creatingParentsIfNeeded().withMode(mode)
-        .forPath(path, content.getBytes("utf-8"))
-      true
-    } catch {
-      case e: Throwable => {
-        error(s"create ( ${path} -> ${content} ) error: ${e.getMessage}")
-        false
-      }
-    }
-  }
-
-  private def update(path: String, content: String): Boolean = {
-    try {
-      client.setData().forPath(path, content.getBytes("utf-8"))
-      true
-    } catch {
-      case e: Throwable => {
-        error(s"update ( ${path} -> ${content} ) error: ${e.getMessage}")
-        false
-      }
-    }
-  }
-
-  private def read(path: String): Option[String] = {
-    try {
-      Some(new String(client.getData().forPath(path), "utf-8"))
-    } catch {
-      case e: Throwable => {
-        warn(s"read ${path} warn: ${e.getMessage}")
-        None
-      }
-    }
-  }
-
-  private def delete(path: String): Unit = {
-    try {
-      client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path)
-    } catch {
-      case e: Throwable => error(s"delete ${path} error: ${e.getMessage}")
-    }
-  }
-
-  private def checkExists(path: String): Boolean = {
-    try {
-      client.checkExists().forPath(path) != null
-    } catch {
-      case e: Throwable => {
-        warn(s"check exists ${path} warn: ${e.getMessage}")
-        false
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLockInZK.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLockInZK.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLockInZK.scala
new file mode 100644
index 0000000..5ae4c75
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLockInZK.scala
@@ -0,0 +1,53 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.streaming.lock
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.curator.framework.recipes.locks.InterProcessMutex
+
+case class CacheLockInZK(@transient mutex: InterProcessMutex) extends CacheLock {
+
+  def lock(outtime: Long, unit: TimeUnit): Boolean = {
+    try {
+      if (outtime >= 0) {
+        mutex.acquire(outtime, unit)
+      } else {
+        mutex.acquire(-1, null)
+      }
+    } catch {
+      case e: Throwable => {
+        error(s"lock error: ${e.getMessage}")
+        false
+      }
+    }
+
+  }
+
+  def unlock(): Unit = {
+    try {
+      if (mutex.isAcquiredInThisProcess) mutex.release
+    } catch {
+      case e: Throwable => {
+        error(s"unlock error: ${e.getMessage}")
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLockSeq.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLockSeq.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLockSeq.scala
new file mode 100644
index 0000000..6f033bd
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLockSeq.scala
@@ -0,0 +1,33 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.streaming.lock
+
+import java.util.concurrent.TimeUnit
+
+case class CacheLockSeq(cacheLocks: Seq[CacheLock]) extends CacheLock {
+
+  def lock(outtime: Long, unit: TimeUnit): Boolean = {
+    cacheLocks.headOption.map(_.lock(outtime, unit)).getOrElse(true)
+  }
+
+  def unlock(): Unit = {
+    cacheLocks.headOption.foreach(_.unlock)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/MultiCacheLock.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/MultiCacheLock.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/MultiCacheLock.scala
deleted file mode 100644
index 6eea0b6..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/MultiCacheLock.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.streaming.lock
-
-import java.util.concurrent.TimeUnit
-
-case class MultiCacheLock(cacheLocks: List[CacheLock]) extends CacheLock {
-
-  def lock(outtime: Long, unit: TimeUnit): Boolean = {
-    cacheLocks.headOption match {
-      case Some(cl) => cl.lock(outtime, unit)
-      case None => true
-    }
-  }
-
-  def unlock(): Unit = {
-    cacheLocks.headOption match {
-      case Some(cl) => cl.unlock
-      case None => {}
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/ZKCacheLock.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/ZKCacheLock.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/ZKCacheLock.scala
deleted file mode 100644
index 2c9b717..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/ZKCacheLock.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.streaming.lock
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.curator.framework.recipes.locks.InterProcessMutex
-
-case class ZKCacheLock(@transient mutex: InterProcessMutex) extends CacheLock {
-
-  def lock(outtime: Long, unit: TimeUnit): Boolean = {
-    try {
-      if (outtime >= 0) {
-        mutex.acquire(outtime, unit)
-      } else {
-        mutex.acquire(-1, null)
-      }
-    } catch {
-      case e: Throwable => {
-        error(s"lock error: ${e.getMessage}")
-        false
-      }
-    }
-
-  }
-
-  def unlock(): Unit = {
-    try {
-      if (mutex.isAcquiredInThisProcess) mutex.release
-    } catch {
-      case e: Throwable => {
-        error(s"unlock error: ${e.getMessage}")
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCache.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCache.scala
new file mode 100644
index 0000000..fc78eda
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCache.scala
@@ -0,0 +1,39 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.streaming.offset
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.context.streaming.lock.CacheLock
+
+trait OffsetCache extends Loggable with Serializable {
+
+  def init(): Unit
+  def available(): Boolean
+  def close(): Unit
+
+  def cache(kvs: Map[String, String]): Unit
+  def read(keys: Iterable[String]): Map[String, String]
+  def delete(keys: Iterable[String]): Unit
+  def clear(): Unit
+
+  def listKeys(path: String): List[String]
+
+  def genLock(s: String): CacheLock
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheClient.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheClient.scala
new file mode 100644
index 0000000..416ff31
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheClient.scala
@@ -0,0 +1,54 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.streaming.offset
+
+import org.apache.griffin.measure.configuration.params.OffsetCacheParam
+import org.apache.griffin.measure.context.streaming.lock.{CacheLock, CacheLockSeq}
+
+object OffsetCacheClient extends OffsetCache with OffsetOps {
+  var offsetCaches: Seq[OffsetCache] = Nil
+
+  def initClient(offsetCacheParams: Iterable[OffsetCacheParam], metricName: String) = {
+    val fac = OffsetCacheFactory(offsetCacheParams, metricName)
+    offsetCaches = offsetCacheParams.flatMap(param => fac.getOffsetCache(param)).toList
+  }
+
+  def init(): Unit = offsetCaches.foreach(_.init)
+  def available(): Boolean = offsetCaches.foldLeft(false)(_ || _.available)
+  def close(): Unit = offsetCaches.foreach(_.close)
+
+  def cache(kvs: Map[String, String]): Unit = {
+    offsetCaches.foreach(_.cache(kvs))
+  }
+  def read(keys: Iterable[String]): Map[String, String] = {
+    val maps = offsetCaches.map(_.read(keys)).reverse
+    maps.fold(Map[String, String]())(_ ++ _)
+  }
+  def delete(keys: Iterable[String]): Unit = offsetCaches.foreach(_.delete(keys))
+  def clear(): Unit = offsetCaches.foreach(_.clear)
+
+  def listKeys(path: String): List[String] = {
+    offsetCaches.foldLeft(Nil: List[String]) { (res, offsetCache) =>
+      if (res.size > 0) res else offsetCache.listKeys(path)
+    }
+  }
+
+  def genLock(s: String): CacheLock = CacheLockSeq(offsetCaches.map(_.genLock(s)))
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheFactory.scala
new file mode 100644
index 0000000..07bca27
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheFactory.scala
@@ -0,0 +1,39 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.streaming.offset
+
+import org.apache.griffin.measure.configuration.params.OffsetCacheParam
+
+import scala.util.{Success, Try}
+
+case class OffsetCacheFactory(offsetCacheParams: Iterable[OffsetCacheParam], metricName: String
+                             ) extends Serializable {
+
+  val ZK_REGEX = """^(?i)zk|zookeeper$""".r
+
+  def getOffsetCache(offsetCacheParam: OffsetCacheParam): Option[OffsetCache] = {
+    val config = offsetCacheParam.getConfig
+    val offsetCacheTry = offsetCacheParam.getType match {
+      case ZK_REGEX() => Try(OffsetCacheInZK(config, metricName))
+      case _ => throw new Exception("not supported info cache type")
+    }
+    offsetCacheTry.toOption
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheInZK.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheInZK.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheInZK.scala
new file mode 100644
index 0000000..9e020c2
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheInZK.scala
@@ -0,0 +1,214 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.streaming.offset
+
+import org.apache.curator.framework.imps.CuratorFrameworkState
+import org.apache.curator.framework.recipes.locks.InterProcessMutex
+import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
+import org.apache.curator.retry.ExponentialBackoffRetry
+import org.apache.curator.utils.ZKPaths
+import org.apache.griffin.measure.context.streaming.lock.CacheLockInZK
+import org.apache.zookeeper.CreateMode
+
+import scala.collection.JavaConverters._
+
+/**
+  * leverage zookeeper for info cache
+  * @param config
+  * @param metricName
+  */
+case class OffsetCacheInZK(config: Map[String, Any], metricName: String) extends OffsetCache with OffsetOps {
+
+  val Hosts = "hosts"
+  val Namespace = "namespace"
+  val Mode = "mode"
+  val InitClear = "init.clear"
+  val CloseClear = "close.clear"
+  val LockPath = "lock.path"
+
+  val PersistRegex = """^(?i)persist$""".r
+  val EphemeralRegex = """^(?i)ephemeral$""".r
+
+  final val separator = ZKPaths.PATH_SEPARATOR
+
+  val hosts = config.getOrElse(Hosts, "").toString
+  val namespace = config.getOrElse(Namespace, "").toString
+  val mode: CreateMode = config.get(Mode) match {
+    case Some(s: String) => s match {
+      case PersistRegex() => CreateMode.PERSISTENT
+      case EphemeralRegex() => CreateMode.EPHEMERAL
+      case _ => CreateMode.PERSISTENT
+    }
+    case _ => CreateMode.PERSISTENT
+  }
+  val initClear = config.get(InitClear) match {
+    case Some(b: Boolean) => b
+    case _ => true
+  }
+  val closeClear = config.get(CloseClear) match {
+    case Some(b: Boolean) => b
+    case _ => false
+  }
+  val lockPath = config.getOrElse(LockPath, "lock").toString
+
+  private val cacheNamespace: String = if (namespace.isEmpty) metricName else namespace + separator + metricName
+  private val builder = CuratorFrameworkFactory.builder()
+    .connectString(hosts)
+    .retryPolicy(new ExponentialBackoffRetry(1000, 3))
+    .namespace(cacheNamespace)
+  private val client: CuratorFramework = builder.build
+
+  def init(): Unit = {
+    client.start()
+    info("start zk info cache")
+    client.usingNamespace(cacheNamespace)
+    info(s"init with namespace: ${cacheNamespace}")
+    delete(lockPath :: Nil)
+    if (initClear) {
+      clear
+    }
+  }
+
+  def available(): Boolean = {
+    client.getState match {
+      case CuratorFrameworkState.STARTED => true
+      case _ => false
+    }
+  }
+
+  def close(): Unit = {
+    if (closeClear) {
+      clear
+    }
+    info("close zk info cache")
+    client.close()
+  }
+
+  def cache(kvs: Map[String, String]): Unit = {
+    kvs.foreach(kv => createOrUpdate(path(kv._1), kv._2))
+  }
+
+  def read(keys: Iterable[String]): Map[String, String] = {
+    keys.flatMap { key =>
+      read(path(key)) match {
+        case Some(v) => Some((key, v))
+        case _ => None
+      }
+    }.toMap
+  }
+
+  def delete(keys: Iterable[String]): Unit = {
+    keys.foreach { key => delete(path(key)) }
+  }
+
+  def clear(): Unit = {
+//    delete("/")
+    delete(finalCacheInfoPath :: Nil)
+    delete(infoPath :: Nil)
+    info("clear info")
+  }
+
+  def listKeys(p: String): List[String] = {
+    children(path(p))
+  }
+
+  def genLock(s: String): CacheLockInZK = {
+    val lpt = if (s.isEmpty) path(lockPath) else path(lockPath) + separator + s
+    CacheLockInZK(new InterProcessMutex(client, lpt))
+  }
+
+  private def path(k: String): String = {
+    if (k.startsWith(separator)) k else separator + k
+  }
+
+  private def children(path: String): List[String] = {
+    try {
+      client.getChildren().forPath(path).asScala.toList
+    } catch {
+      case e: Throwable => {
+        warn(s"list ${path} warn: ${e.getMessage}")
+        Nil
+      }
+    }
+  }
+
+  private def createOrUpdate(path: String, content: String): Boolean = {
+    if (checkExists(path)) {
+      update(path, content)
+    } else {
+      create(path, content)
+    }
+  }
+
+  private def create(path: String, content: String): Boolean = {
+    try {
+      client.create().creatingParentsIfNeeded().withMode(mode)
+        .forPath(path, content.getBytes("utf-8"))
+      true
+    } catch {
+      case e: Throwable => {
+        error(s"create ( ${path} -> ${content} ) error: ${e.getMessage}")
+        false
+      }
+    }
+  }
+
+  private def update(path: String, content: String): Boolean = {
+    try {
+      client.setData().forPath(path, content.getBytes("utf-8"))
+      true
+    } catch {
+      case e: Throwable => {
+        error(s"update ( ${path} -> ${content} ) error: ${e.getMessage}")
+        false
+      }
+    }
+  }
+
+  private def read(path: String): Option[String] = {
+    try {
+      Some(new String(client.getData().forPath(path), "utf-8"))
+    } catch {
+      case e: Throwable => {
+        warn(s"read ${path} warn: ${e.getMessage}")
+        None
+      }
+    }
+  }
+
+  private def delete(path: String): Unit = {
+    try {
+      client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path)
+    } catch {
+      case e: Throwable => error(s"delete ${path} error: ${e.getMessage}")
+    }
+  }
+
+  private def checkExists(path: String): Boolean = {
+    try {
+      client.checkExists().forPath(path) != null
+    } catch {
+      case e: Throwable => {
+        warn(s"check exists ${path} warn: ${e.getMessage}")
+        false
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetOps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetOps.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetOps.scala
new file mode 100644
index 0000000..c8763ec
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetOps.scala
@@ -0,0 +1,125 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.streaming.offset
+
+trait OffsetOps extends Serializable { this: OffsetCache =>
+
+  val CacheTime = "cache.time"
+  val LastProcTime = "last.proc.time"
+  val ReadyTime = "ready.time"
+  val CleanTime = "clean.time"
+  val OldCacheIndex = "old.cache.index"
+
+  def cacheTime(path: String): String = s"${path}/${CacheTime}"
+  def lastProcTime(path: String): String = s"${path}/${LastProcTime}"
+  def readyTime(path: String): String = s"${path}/${ReadyTime}"
+  def cleanTime(path: String): String = s"${path}/${CleanTime}"
+  def oldCacheIndex(path: String): String = s"${path}/${OldCacheIndex}"
+
+  val infoPath = "info"
+
+  val finalCacheInfoPath = "info.final"
+  val finalReadyTime = s"${finalCacheInfoPath}/${ReadyTime}"
+  val finalLastProcTime = s"${finalCacheInfoPath}/${LastProcTime}"
+  val finalCleanTime = s"${finalCacheInfoPath}/${CleanTime}"
+
+  def startOffsetCache(): Unit = {
+    genFinalReadyTime
+  }
+
+  def getTimeRange(): (Long, Long) = {
+    readTimeRange
+  }
+
+  def getCleanTime(): Long = {
+    readCleanTime
+  }
+
+  def endOffsetCache: Unit = {
+    genFinalLastProcTime
+    genFinalCleanTime
+  }
+
+  private def genFinalReadyTime(): Unit = {
+    val subPath = listKeys(infoPath)
+    val keys = subPath.map { p => s"${infoPath}/${p}/${ReadyTime}" }
+    val result = read(keys)
+    val times = keys.flatMap { k =>
+      getLongOpt(result, k)
+    }
+    if (times.nonEmpty) {
+      val time = times.min
+      val map = Map[String, String]((finalReadyTime -> time.toString))
+      cache(map)
+    }
+  }
+
+  private def genFinalLastProcTime(): Unit = {
+    val subPath = listKeys(infoPath)
+    val keys = subPath.map { p => s"${infoPath}/${p}/${LastProcTime}" }
+    val result = read(keys)
+    val times = keys.flatMap { k =>
+      getLongOpt(result, k)
+    }
+    if (times.nonEmpty) {
+      val time = times.min
+      val map = Map[String, String]((finalLastProcTime -> time.toString))
+      cache(map)
+    }
+  }
+
+  private def genFinalCleanTime(): Unit = {
+    val subPath = listKeys(infoPath)
+    val keys = subPath.map { p => s"${infoPath}/${p}/${CleanTime}" }
+    val result = read(keys)
+    val times = keys.flatMap { k =>
+      getLongOpt(result, k)
+    }
+    if (times.nonEmpty) {
+      val time = times.min
+      val map = Map[String, String]((finalCleanTime -> time.toString))
+      cache(map)
+    }
+  }
+
+  private def readTimeRange(): (Long, Long) = {
+    val map = read(List(finalLastProcTime, finalReadyTime))
+    val lastProcTime = getLong(map, finalLastProcTime)
+    val curReadyTime = getLong(map, finalReadyTime)
+    (lastProcTime, curReadyTime)
+  }
+
+  private def readCleanTime(): Long = {
+    val map = read(List(finalCleanTime))
+    val cleanTime = getLong(map, finalCleanTime)
+    cleanTime
+  }
+
+  private def getLongOpt(map: Map[String, String], key: String): Option[Long] = {
+    try {
+      map.get(key).map(_.toLong)
+    } catch {
+      case e: Throwable => None
+    }
+  }
+  private def getLong(map: Map[String, String], key: String) = {
+    getLongOpt(map, key).getOrElse(-1L)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
new file mode 100644
index 0000000..8b068e3
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
@@ -0,0 +1,96 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.configuration.params.DataSourceParam
+import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
+import org.apache.griffin.measure.context.{DQContext, TimeRange}
+import org.apache.griffin.measure.datasource.connector.DataConnector
+import org.apache.griffin.measure.utils.DataFrameUtil._
+import org.apache.spark.sql._
+
+/**
+  * data source
+  * @param name     name of data source
+  * @param dsParam  param of this data source
+  * @param dataConnectors       list of data connectors
+  * @param streamingCacheClientOpt   streaming data cache client option
+  */
+case class DataSource(name: String,
+                      dsParam: DataSourceParam,
+                      dataConnectors: Seq[DataConnector],
+                      streamingCacheClientOpt: Option[StreamingCacheClient]
+                     ) extends Loggable with Serializable {
+
+  def init(): Unit = {
+    dataConnectors.foreach(_.init)
+  }
+
+  def loadData(context: DQContext): TimeRange = {
+    info(s"load data [${name}]")
+    val timestamp = context.contextId.timestamp
+    val (dfOpt, timeRange) = data(timestamp)
+    dfOpt match {
+      case Some(df) => {
+        context.runTimeTableRegister.registerTable(name, df)
+      }
+      case None => {
+        warn(s"load data source [${name}] fails")
+      }
+    }
+    timeRange
+  }
+
+  private def data(timestamp: Long): (Option[DataFrame], TimeRange) = {
+    val batches = dataConnectors.flatMap { dc =>
+      val (dfOpt, timeRange) = dc.data(timestamp)
+      dfOpt match {
+        case Some(df) => Some((dfOpt, timeRange))
+        case _ => None
+      }
+    }
+    val caches = streamingCacheClientOpt match {
+      case Some(dsc) => dsc.readData() :: Nil
+      case _ => Nil
+    }
+    val pairs = batches ++ caches
+
+    if (pairs.size > 0) {
+      pairs.reduce { (a, b) =>
+        (unionDfOpts(a._1, b._1), a._2.merge(b._2))
+      }
+    } else {
+      (None, TimeRange.emptyTimeRange)
+    }
+  }
+
+  def updateData(df: DataFrame): Unit = {
+    streamingCacheClientOpt.foreach(_.updateData(Some(df)))
+  }
+
+  def cleanOldData(): Unit = {
+    streamingCacheClientOpt.foreach(_.cleanOutTimeData)
+  }
+
+  def processFinish(): Unit = {
+    streamingCacheClientOpt.foreach(_.processFinish)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
new file mode 100644
index 0000000..888d92a
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
@@ -0,0 +1,66 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.configuration.params.DataSourceParam
+import org.apache.griffin.measure.datasource.cache.StreamingCacheClientFactory
+import org.apache.griffin.measure.datasource.connector.{DataConnector, DataConnectorFactory}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.streaming.StreamingContext
+
+import scala.util.Success
+
+object DataSourceFactory extends Loggable {
+
+  def getDataSources(sparkSession: SparkSession,
+                     ssc: StreamingContext,
+                     dataSources: Seq[DataSourceParam]
+                    ): Seq[DataSource] = {
+    dataSources.zipWithIndex.flatMap { pair =>
+      val (param, index) = pair
+      getDataSource(sparkSession, ssc, param, index)
+    }
+  }
+
+  private def getDataSource(sparkSession: SparkSession,
+                            ssc: StreamingContext,
+                            dataSourceParam: DataSourceParam,
+                            index: Int
+                           ): Option[DataSource] = {
+    val name = dataSourceParam.getName
+    val connectorParams = dataSourceParam.getConnectors
+    val timestampStorage = TimestampStorage()
+
+    // for streaming data cache
+    val streamingCacheClientOpt = StreamingCacheClientFactory.getClientOpt(
+      sparkSession.sqlContext, dataSourceParam.getCacheOpt, name, index, timestampStorage)
+
+    val dataConnectors: Seq[DataConnector] = connectorParams.flatMap { connectorParam =>
+      DataConnectorFactory.getDataConnector(sparkSession, ssc, connectorParam,
+        timestampStorage, streamingCacheClientOpt) match {
+          case Success(connector) => Some(connector)
+          case _ => None
+        }
+    }
+
+    Some(DataSource(name, dataSourceParam, dataConnectors, streamingCacheClientOpt))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/datasource/TimestampStorage.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/TimestampStorage.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/TimestampStorage.scala
new file mode 100644
index 0000000..a305563
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/TimestampStorage.scala
@@ -0,0 +1,47 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource
+
+import org.apache.griffin.measure.Loggable
+
+import scala.collection.mutable.{SortedSet => MutableSortedSet}
+
+/**
+  * tmst cache, CRUD of timestamps
+  */
+case class TimestampStorage() extends Loggable {
+
+  private val tmstGroup: MutableSortedSet[Long] = MutableSortedSet.empty[Long]
+
+  //-- insert tmst into tmst group --
+  def insert(tmst: Long) = tmstGroup += tmst
+  def insert(tmsts: Iterable[Long]) = tmstGroup ++= tmsts
+
+  //-- remove tmst from tmst group --
+  def remove(tmst: Long) = tmstGroup -= tmst
+  def remove(tmsts: Iterable[Long]) = tmstGroup --= tmsts
+
+  //-- get subset of tmst group --
+  def fromUntil(from: Long, until: Long) = tmstGroup.range(from, until).toSet
+  def afterTil(after: Long, til: Long) = tmstGroup.range(after + 1, til + 1).toSet
+  def until(until: Long) = tmstGroup.until(until).toSet
+  def from(from: Long) = tmstGroup.from(from).toSet
+  def all = tmstGroup.toSet
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
new file mode 100644
index 0000000..10c443e
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
@@ -0,0 +1,366 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource.cache
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.context.TimeRange
+import org.apache.griffin.measure.context.streaming.offset.OffsetCacheClient
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.griffin.measure.step.builder.ConstantColumns
+import org.apache.griffin.measure.utils.DataFrameUtil._
+import org.apache.griffin.measure.utils.ParamUtil._
+import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil}
+import org.apache.spark.sql._
+
+import scala.util.Random
+
+/**
+  * data source cache in streaming mode
+  * save data frame into hdfs in dump phase
+  * read data frame from hdfs in calculate phase
+  * with update and clean actions for the cache data
+  */
+trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long] with Loggable with Serializable {
+
+  val sqlContext: SQLContext
+  val param: Map[String, Any]
+  val dsName: String
+  val index: Int
+
+  val timestampStorage: TimestampStorage
+  protected def fromUntilRangeTmsts(from: Long, until: Long) = timestampStorage.fromUntil(from, until)
+  protected def clearTmst(t: Long) = timestampStorage.remove(t)
+  protected def clearTmstsUntil(until: Long) = {
+    val outDateTmsts = timestampStorage.until(until)
+    timestampStorage.remove(outDateTmsts)
+  }
+  protected def afterTilRangeTmsts(after: Long, til: Long) = fromUntilRangeTmsts(after + 1, til + 1)
+  protected def clearTmstsTil(til: Long) = clearTmstsUntil(til + 1)
+
+  val _FilePath = "file.path"
+  val _InfoPath = "info.path"
+  val _ReadyTimeInterval = "ready.time.interval"
+  val _ReadyTimeDelay = "ready.time.delay"
+  val _TimeRange = "time.range"
+
+  val rdmStr = Random.alphanumeric.take(10).mkString
+  val defFilePath = s"hdfs:///griffin/cache/${dsName}_${rdmStr}"
+  val defInfoPath = s"${index}"
+
+  val filePath: String = param.getString(_FilePath, defFilePath)
+  val cacheInfoPath: String = param.getString(_InfoPath, defInfoPath)
+  val readyTimeInterval: Long = TimeUtil.milliseconds(param.getString(_ReadyTimeInterval, "1m")).getOrElse(60000L)
+  val readyTimeDelay: Long = TimeUtil.milliseconds(param.getString(_ReadyTimeDelay, "1m")).getOrElse(60000L)
+  val deltaTimeRange: (Long, Long) = {
+    def negative(n: Long): Long = if (n <= 0) n else 0
+    param.get(_TimeRange) match {
+      case Some(seq: Seq[String]) => {
+        val nseq = seq.flatMap(TimeUtil.milliseconds(_))
+        val ns = negative(nseq.headOption.getOrElse(0))
+        val ne = negative(nseq.tail.headOption.getOrElse(0))
+        (ns, ne)
+      }
+      case _ => (0, 0)
+    }
+  }
+
+  val _ReadOnly = "read.only"
+  val readOnly = param.getBoolean(_ReadOnly, false)
+
+  val _Updatable = "updatable"
+  val updatable = param.getBoolean(_Updatable, false)
+
+  val newCacheLock = OffsetCacheClient.genLock(s"${cacheInfoPath}.new")
+  val oldCacheLock = OffsetCacheClient.genLock(s"${cacheInfoPath}.old")
+
+  val newFilePath = s"${filePath}/new"
+  val oldFilePath = s"${filePath}/old"
+
+  val defOldCacheIndex = 0L
+
+  protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit
+  protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame
+
+  /**
+    * save data frame in dump phase
+    * @param dfOpt    data frame to be saved
+    * @param ms       timestamp of this data frame
+    */
+  def saveData(dfOpt: Option[DataFrame], ms: Long): Unit = {
+    if (!readOnly) {
+      dfOpt match {
+        case Some(df) => {
+          // cache df
+          df.cache
+
+          // cache df
+          val cnt = df.count
+          info(s"save ${dsName} data count: ${cnt}")
+
+          if (cnt > 0) {
+            // lock makes it safer when writing new cache data
+            val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
+            if (newCacheLocked) {
+              try {
+                val dfw = df.write.mode(SaveMode.Append).partitionBy(ConstantColumns.tmst)
+                writeDataFrame(dfw, newFilePath)
+              } catch {
+                case e: Throwable => error(s"save data error: ${e.getMessage}")
+              } finally {
+                newCacheLock.unlock()
+              }
+            }
+          }
+
+          // uncache df
+          df.unpersist
+        }
+        case _ => {
+          info(s"no data frame to save")
+        }
+      }
+
+      // submit cache time and ready time
+      if (fanIncrement(ms)) {
+        info(s"save data [${ms}] finish")
+        submitCacheTime(ms)
+        submitReadyTime(ms)
+      }
+
+    }
+  }
+
+  /**
+    * read data frame in calculation phase
+    * @return   data frame to calculate, with the time range of data
+    */
+  def readData(): (Option[DataFrame], TimeRange) = {
+    // time range: (a, b]
+    val timeRange = OffsetCacheClient.getTimeRange
+    val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + deltaTimeRange._2)
+
+    // read partition info
+    val filterStr = if (reviseTimeRange._1 == reviseTimeRange._2) {
+      info(s"read time range: [${reviseTimeRange._1}]")
+      s"`${ConstantColumns.tmst}` = ${reviseTimeRange._1}"
+    } else {
+      info(s"read time range: (${reviseTimeRange._1}, ${reviseTimeRange._2}]")
+      s"`${ConstantColumns.tmst}` > ${reviseTimeRange._1} AND `${ConstantColumns.tmst}` <= ${reviseTimeRange._2}"
+    }
+
+    // new cache data
+    val newDfOpt = try {
+      val dfr = sqlContext.read
+      Some(readDataFrame(dfr, newFilePath).filter(filterStr))
+    } catch {
+      case e: Throwable => {
+        warn(s"read data source cache warn: ${e.getMessage}")
+        None
+      }
+    }
+
+    // old cache data
+    val oldCacheIndexOpt = if (updatable) readOldCacheIndex else None
+    val oldDfOpt = oldCacheIndexOpt.flatMap { idx =>
+      val oldDfPath = s"${oldFilePath}/${idx}"
+      try {
+        val dfr = sqlContext.read
+        Some(readDataFrame(dfr, oldDfPath).filter(filterStr))
+      } catch {
+        case e: Throwable => {
+          warn(s"read old data source cache warn: ${e.getMessage}")
+          None
+        }
+      }
+    }
+
+    // whole cache data
+    val cacheDfOpt = unionDfOpts(newDfOpt, oldDfOpt)
+
+    // from until tmst range
+    val (from, until) = (reviseTimeRange._1, reviseTimeRange._2)
+    val tmstSet = afterTilRangeTmsts(from, until)
+
+    val retTimeRange = TimeRange(reviseTimeRange, tmstSet)
+    (cacheDfOpt, retTimeRange)
+  }
+
+  private def cleanOutTimePartitions(path: String, outTime: Long, partitionOpt: Option[String],
+                                     func: (Long, Long) => Boolean
+                                    ): Unit = {
+    val earlierOrEqPaths = listPartitionsByFunc(path: String, outTime, partitionOpt, func)
+    // delete out time data path
+    earlierOrEqPaths.foreach { path =>
+      info(s"delete hdfs path: ${path}")
+      HdfsUtil.deleteHdfsPath(path)
+    }
+  }
+  private def listPartitionsByFunc(path: String, bound: Long, partitionOpt: Option[String],
+                                        func: (Long, Long) => Boolean
+                                       ): Iterable[String] = {
+    val names = HdfsUtil.listSubPathsByType(path, "dir")
+    val regex = partitionOpt match {
+      case Some(partition) => s"^${partition}=(\\d+)$$".r
+      case _ => "^(\\d+)$".r
+    }
+    names.filter { name =>
+      name match {
+        case regex(value) => {
+          str2Long(value) match {
+            case Some(t) => func(t, bound)
+            case _ => false
+          }
+        }
+        case _ => false
+      }
+    }.map(name => s"${path}/${name}")
+  }
+  private def str2Long(str: String): Option[Long] = {
+    try {
+      Some(str.toLong)
+    } catch {
+      case e: Throwable => None
+    }
+  }
+
+  /**
+    * clean out-time cached data on hdfs
+    */
+  def cleanOutTimeData(): Unit = {
+    // clean tmst
+    val cleanTime = readCleanTime
+    cleanTime.foreach(clearTmstsTil(_))
+
+    if (!readOnly) {
+      // new cache data
+      val newCacheCleanTime = if (updatable) readLastProcTime else readCleanTime
+      newCacheCleanTime match {
+        case Some(nct) => {
+          // clean calculated new cache data
+          val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
+          if (newCacheLocked) {
+            try {
+              cleanOutTimePartitions(newFilePath, nct, Some(ConstantColumns.tmst),
+                (a: Long, b: Long) => (a <= b))
+            } catch {
+              case e: Throwable => error(s"clean new cache data error: ${e.getMessage}")
+            } finally {
+              newCacheLock.unlock()
+            }
+          }
+        }
+        case _ => {
+          // do nothing
+        }
+      }
+
+      // old cache data
+      val oldCacheCleanTime = if (updatable) readCleanTime else None
+      oldCacheCleanTime match {
+        case Some(oct) => {
+          val oldCacheIndexOpt = readOldCacheIndex
+          oldCacheIndexOpt.foreach { idx =>
+            val oldDfPath = s"${oldFilePath}/${idx}"
+            val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
+            if (oldCacheLocked) {
+              try {
+                // clean calculated old cache data
+                cleanOutTimePartitions(oldFilePath, idx, None, (a: Long, b: Long) => (a < b))
+                // clean out time old cache data not calculated
+//                cleanOutTimePartitions(oldDfPath, oct, Some(InternalColumns.tmst))
+              } catch {
+                case e: Throwable => error(s"clean old cache data error: ${e.getMessage}")
+              } finally {
+                oldCacheLock.unlock()
+              }
+            }
+          }
+        }
+        case _ => {
+          // do nothing
+        }
+      }
+    }
+  }
+
+  /**
+    * update old cached data by new data frame
+    * @param dfOpt    data frame to update old cached data
+    */
+  def updateData(dfOpt: Option[DataFrame]): Unit = {
+    if (!readOnly && updatable) {
+      dfOpt match {
+        case Some(df) => {
+          // old cache lock
+          val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
+          if (oldCacheLocked) {
+            try {
+              val oldCacheIndexOpt = readOldCacheIndex
+              val nextOldCacheIndex = oldCacheIndexOpt.getOrElse(defOldCacheIndex) + 1
+
+              val oldDfPath = s"${oldFilePath}/${nextOldCacheIndex}"
+              val cleanTime = getNextCleanTime
+              val filterStr = s"`${ConstantColumns.tmst}` > ${cleanTime}"
+              val updateDf = df.filter(filterStr)
+
+              val prlCount = sqlContext.sparkContext.defaultParallelism
+              // repartition
+              val repartitionedDf = updateDf.repartition(prlCount)
+              val dfw = repartitionedDf.write.mode(SaveMode.Overwrite)
+              writeDataFrame(dfw, oldDfPath)
+
+              submitOldCacheIndex(nextOldCacheIndex)
+            } catch {
+              case e: Throwable => error(s"update data error: ${e.getMessage}")
+            } finally {
+              oldCacheLock.unlock()
+            }
+          }
+        }
+        case _ => {
+          info(s"no data frame to update")
+        }
+      }
+    }
+  }
+
+  /**
+    * each time calculation phase finishes,
+    * data source cache needs to submit some cache information
+    */
+  def processFinish(): Unit = {
+    // next last proc time
+    val timeRange = OffsetCacheClient.getTimeRange
+    submitLastProcTime(timeRange._2)
+
+    // next clean time
+    val nextCleanTime = timeRange._2 + deltaTimeRange._1
+    submitCleanTime(nextCleanTime)
+  }
+
+  // read next clean time
+  private def getNextCleanTime(): Long = {
+    val timeRange = OffsetCacheClient.getTimeRange
+    val nextCleanTime = timeRange._2 + deltaTimeRange._1
+    nextCleanTime
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala
new file mode 100644
index 0000000..d2e7771
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala
@@ -0,0 +1,68 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource.cache
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.griffin.measure.utils.ParamUtil._
+import org.apache.spark.sql.SQLContext
+
+object StreamingCacheClientFactory extends Loggable {
+
+  private object DataSourceCacheType {
+    val ParquetRegex = "^(?i)parq(uet)?$".r
+    val JsonRegex = "^(?i)json$".r
+    val OrcRegex = "^(?i)orc$".r
+  }
+  import DataSourceCacheType._
+
+  val _type = "type"
+
+  /**
+    * create streaming cache client
+    * @param sqlContext   sqlContext in spark environment
+    * @param cacheOpt     data source cache config option
+    * @param name         data source name
+    * @param index        data source index
+    * @param tmstCache    the same tmstCache instance inside a data source
+    * @return             streaming cache client option
+    */
+  def getClientOpt(sqlContext: SQLContext, cacheOpt: Option[Map[String, Any]],
+                   name: String, index: Int, tmstCache: TimestampStorage
+                  ): Option[StreamingCacheClient] = {
+    cacheOpt.flatMap { param =>
+      try {
+        val tp = param.getString(_type, "")
+        val dsCache = tp match {
+          case ParquetRegex() => StreamingCacheParquetClient(sqlContext, param, name, index, tmstCache)
+          case JsonRegex() => StreamingCacheJsonClient(sqlContext, param, name, index, tmstCache)
+          case OrcRegex() => StreamingCacheOrcClient(sqlContext, param, name, index, tmstCache)
+          case _ => StreamingCacheParquetClient(sqlContext, param, name, index, tmstCache)
+        }
+        Some(dsCache)
+      } catch {
+        case e: Throwable => {
+          error(s"generate data source cache fails")
+          None
+        }
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala
new file mode 100644
index 0000000..a12ef87
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala
@@ -0,0 +1,40 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource.cache
+
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.spark.sql._
+
+/**
+  * data source cache in json format
+  */
+case class StreamingCacheJsonClient(sqlContext: SQLContext, param: Map[String, Any],
+                                    dsName: String, index: Int, timestampStorage: TimestampStorage
+                              ) extends StreamingCacheClient {
+
+  protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = {
+    info(s"write path: ${path}")
+    dfw.json(path)
+  }
+
+  protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = {
+    dfr.json(path)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala
new file mode 100644
index 0000000..63e7104
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala
@@ -0,0 +1,40 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource.cache
+
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.spark.sql._
+
+/**
+  * data source cache in orc format
+  */
+case class StreamingCacheOrcClient(sqlContext: SQLContext, param: Map[String, Any],
+                                   dsName: String, index: Int, timestampStorage: TimestampStorage
+                             ) extends StreamingCacheClient {
+
+  protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = {
+    info(s"write path: ${path}")
+    dfw.orc(path)
+  }
+
+  protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = {
+    dfr.orc(path)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala
new file mode 100644
index 0000000..c275227
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala
@@ -0,0 +1,42 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource.cache
+
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.spark.sql._
+
+/**
+  * data source cache in parquet format
+  */
+case class StreamingCacheParquetClient(sqlContext: SQLContext, param: Map[String, Any],
+                                       dsName: String, index: Int, timestampStorage: TimestampStorage
+                                 ) extends StreamingCacheClient {
+
+  sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
+
+  protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = {
+    info(s"write path: ${path}")
+    dfw.parquet(path)
+  }
+
+  protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = {
+    dfr.parquet(path)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala
new file mode 100644
index 0000000..3195d24
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala
@@ -0,0 +1,88 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource.cache
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.context.streaming.offset.OffsetCacheClient
+
+/**
+  * timestamp offset of streaming data source cache
+  */
+trait StreamingOffsetCacheable extends Loggable with Serializable {
+
+  val cacheInfoPath: String
+  val readyTimeInterval: Long
+  val readyTimeDelay: Long
+
+  def selfCacheInfoPath = s"${OffsetCacheClient.infoPath}/${cacheInfoPath}"
+
+  def selfCacheTime = OffsetCacheClient.cacheTime(selfCacheInfoPath)
+  def selfLastProcTime = OffsetCacheClient.lastProcTime(selfCacheInfoPath)
+  def selfReadyTime = OffsetCacheClient.readyTime(selfCacheInfoPath)
+  def selfCleanTime = OffsetCacheClient.cleanTime(selfCacheInfoPath)
+  def selfOldCacheIndex = OffsetCacheClient.oldCacheIndex(selfCacheInfoPath)
+
+  protected def submitCacheTime(ms: Long): Unit = {
+    val map = Map[String, String]((selfCacheTime -> ms.toString))
+    OffsetCacheClient.cache(map)
+  }
+
+  protected def submitReadyTime(ms: Long): Unit = {
+    val curReadyTime = ms - readyTimeDelay
+    if (curReadyTime % readyTimeInterval == 0) {
+      val map = Map[String, String]((selfReadyTime -> curReadyTime.toString))
+      OffsetCacheClient.cache(map)
+    }
+  }
+
+  protected def submitLastProcTime(ms: Long): Unit = {
+    val map = Map[String, String]((selfLastProcTime -> ms.toString))
+    OffsetCacheClient.cache(map)
+  }
+
+  protected def readLastProcTime(): Option[Long] = readSelfInfo(selfLastProcTime)
+
+  protected def submitCleanTime(ms: Long): Unit = {
+    val cleanTime = genCleanTime(ms)
+    val map = Map[String, String]((selfCleanTime -> cleanTime.toString))
+    OffsetCacheClient.cache(map)
+  }
+
+  protected def genCleanTime(ms: Long): Long = ms
+
+  protected def readCleanTime(): Option[Long] = readSelfInfo(selfCleanTime)
+
+  protected def submitOldCacheIndex(index: Long): Unit = {
+    val map = Map[String, String]((selfOldCacheIndex -> index.toString))
+    OffsetCacheClient.cache(map)
+  }
+
+  def readOldCacheIndex(): Option[Long] = readSelfInfo(selfOldCacheIndex)
+
+  private def readSelfInfo(key: String): Option[Long] = {
+    OffsetCacheClient.read(key :: Nil).get(key).flatMap { v =>
+      try {
+        Some(v.toLong)
+      } catch {
+        case _ => None
+      }
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/WithFanIn.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/WithFanIn.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/WithFanIn.scala
new file mode 100644
index 0000000..413b5a2
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/WithFanIn.scala
@@ -0,0 +1,69 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource.cache
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.concurrent.{TrieMap, Map => ConcMap}
+
+/**
+  * fan in trait, for multiple input and one output
+  * to support multiple parallel data connectors in one data source
+  */
+trait WithFanIn[T] {
+
+  // total input number
+  val totalNum: AtomicInteger = new AtomicInteger(0)
+  // concurrent map of fan in count for each key
+  val fanInCountMap: ConcMap[T, Int] = TrieMap[T, Int]()
+
+  def registerFanIn(): Int = {
+    totalNum.incrementAndGet()
+  }
+
+  /**
+    * increment for a key, to test if all parallel inputs finished
+    * @param key
+    * @return
+    */
+  def fanIncrement(key: T): Boolean = {
+    fanInc(key)
+    fanInCountMap.get(key) match {
+      case Some(n) if (n >= totalNum.get) => {
+        fanInCountMap.remove(key)
+        true
+      }
+      case _ => false
+    }
+  }
+
+  private def fanInc(key: T): Unit = {
+    fanInCountMap.get(key) match {
+      case Some(n) => {
+        val suc = fanInCountMap.replace(key, n, n + 1)
+        if (!suc) fanInc(key)
+      }
+      case _ => {
+        val oldOpt = fanInCountMap.putIfAbsent(key, 1)
+        if (oldOpt.nonEmpty) fanInc(key)
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
new file mode 100644
index 0000000..1abbd30
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
@@ -0,0 +1,112 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource.connector
+
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.configuration.enums.{BatchProcessType, DslType, SparkSqlType}
+import org.apache.griffin.measure.configuration.params.DataConnectorParam
+import org.apache.griffin.measure.context.{ContextId, DQContext, TimeRange}
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.griffin.measure.job.builder.DQJobBuilder
+import org.apache.griffin.measure.step.builder.ConstantColumns
+import org.apache.griffin.measure.step.builder.preproc.PreProcRuleParamGenerator
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.functions._
+
+trait DataConnector extends Loggable with Serializable {
+
+  @transient val sparkSession: SparkSession
+
+  val dcParam: DataConnectorParam
+
+  val id: String = DataConnectorIdGenerator.genId
+  protected def thisName(suffix: String): String = s"this_${suffix}"
+
+  val timestampStorage: TimestampStorage
+  protected def saveTmst(t: Long) = timestampStorage.insert(t)
+  protected def readTmst(t: Long) = timestampStorage.fromUntil(t, t + 1)
+
+  def init(): Unit
+
+  // get data frame in batch mode
+  def data(ms: Long): (Option[DataFrame], TimeRange)
+
+  private def createContext(t: Long): DQContext = {
+    DQContext(ContextId(t, id), id, Nil, Nil, BatchProcessType)(sparkSession)
+  }
+
+  def preProcess(dfOpt: Option[DataFrame], ms: Long): Option[DataFrame] = {
+    // new context
+    val context = createContext(ms)
+
+    val timestamp = context.contextId.timestamp
+    val suffix = context.contextId.id
+    val thisTable = thisName(suffix)
+
+    try {
+      saveTmst(timestamp)    // save timestamp
+
+      dfOpt.flatMap { df =>
+        val preProcRules = PreProcRuleParamGenerator.getNewPreProcRules(dcParam.getPreProcRules, suffix)
+
+        // init data
+        context.compileTableRegister.registerTable(thisTable)
+        context.runTimeTableRegister.registerTable(thisTable, df)
+
+        // build job
+        val preprocJob = DQJobBuilder.buildDQJob(context, preProcRules)
+
+        // job execute
+        preprocJob.execute(context)
+
+        // out data
+        val outDf = context.sparkSession.table(s"`${thisTable}`")
+
+        // add tmst column
+        val withTmstDf = outDf.withColumn(ConstantColumns.tmst, lit(timestamp))
+
+        // clean context
+        context.clean()
+
+        Some(withTmstDf)
+      }
+
+    } catch {
+      case e: Throwable => {
+        error(s"pre-process of data connector [${id}] error: ${e.getMessage}")
+        None
+      }
+    }
+  }
+}
+
+object DataConnectorIdGenerator {
+  private val counter: AtomicLong = new AtomicLong(0L)
+  private val head: String = "dc"
+
+  def genId: String = {
+    s"${head}${increment}"
+  }
+
+  private def increment: Long = {
+    counter.incrementAndGet()
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
new file mode 100644
index 0000000..4edd4d2
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
@@ -0,0 +1,125 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource.connector
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.configuration.params.DataConnectorParam
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
+import org.apache.griffin.measure.datasource.connector.batch._
+import org.apache.griffin.measure.datasource.connector.streaming._
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.streaming.StreamingContext
+
+import scala.reflect.ClassTag
+import scala.util.Try
+
+object DataConnectorFactory extends Loggable {
+
+  val HiveRegex = """^(?i)hive$""".r
+  val AvroRegex = """^(?i)avro$""".r
+  val TextDirRegex = """^(?i)text-dir$""".r
+
+  val KafkaRegex = """^(?i)kafka$""".r
+
+  /**
+    * create data connector
+    * @param sparkSession     spark env
+    * @param ssc              spark streaming env
+    * @param dcParam          data connector param
+    * @param tmstCache        same tmst cache in one data source
+    * @param streamingCacheClientOpt   for streaming cache
+    * @return   data connector
+    */
+  def getDataConnector(sparkSession: SparkSession,
+                       ssc: StreamingContext,
+                       dcParam: DataConnectorParam,
+                       tmstCache: TimestampStorage,
+                       streamingCacheClientOpt: Option[StreamingCacheClient]
+                      ): Try[DataConnector] = {
+    val conType = dcParam.getType
+    val version = dcParam.getVersion
+    Try {
+      conType match {
+        case HiveRegex() => HiveBatchDataConnector(sparkSession, dcParam, tmstCache)
+        case AvroRegex() => AvroBatchDataConnector(sparkSession, dcParam, tmstCache)
+        case TextDirRegex() => TextDirBatchDataConnector(sparkSession, dcParam, tmstCache)
+        case KafkaRegex() => {
+          getStreamingDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
+        }
+        case _ => throw new Exception("connector creation error!")
+      }
+    }
+  }
+
+  private def getStreamingDataConnector(sparkSession: SparkSession,
+                                        ssc: StreamingContext,
+                                        dcParam: DataConnectorParam,
+                                        tmstCache: TimestampStorage,
+                                        streamingCacheClientOpt: Option[StreamingCacheClient]
+                                       ): StreamingDataConnector = {
+    if (ssc == null) throw new Exception("streaming context is null!")
+    val conType = dcParam.getType
+    val version = dcParam.getVersion
+    conType match {
+      case KafkaRegex() => getKafkaDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
+      case _ => throw new Exception("streaming connector creation error!")
+    }
+  }
+
+  private def getKafkaDataConnector(sparkSession: SparkSession,
+                                    ssc: StreamingContext,
+                                    dcParam: DataConnectorParam,
+                                    tmstCache: TimestampStorage,
+                                    streamingCacheClientOpt: Option[StreamingCacheClient]
+                                   ): KafkaStreamingDataConnector  = {
+    val KeyType = "key.type"
+    val ValueType = "value.type"
+    val config = dcParam.config
+    val keyType = config.getOrElse(KeyType, "java.lang.String").toString
+    val valueType = config.getOrElse(ValueType, "java.lang.String").toString
+    (getClassTag(keyType), getClassTag(valueType)) match {
+      case (ClassTag(k: Class[String]), ClassTag(v: Class[String])) => {
+        KafkaStreamingStringDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
+      }
+      case _ => {
+        throw new Exception("not supported type kafka data connector")
+      }
+    }
+  }
+
+  private def getClassTag(tp: String): ClassTag[_] = {
+    try {
+      val clazz = Class.forName(tp)
+      ClassTag(clazz)
+    } catch {
+      case e: Throwable => throw e
+    }
+  }
+
+//  def filterDataConnectors[T <: DataConnector : ClassTag](connectors: Seq[DataConnector]): Seq[T] = {
+//    connectors.flatMap { dc =>
+//      dc match {
+//        case mdc: T => Some(mdc)
+//        case _ => None
+//      }
+//    }
+//  }
+
+}