You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2018/06/11 07:58:45 UTC

incubator-griffin git commit: refactor main process of batch and streaming measure

Repository: incubator-griffin
Updated Branches:
  refs/heads/master ff3098ffd -> ed5ac8730


refactor main process of batch and streaming measure

Author: Lionel Liu <bh...@163.com>
Author: dodobel <12...@qq.com>

Closes #294 from bhlx3lyx7/spark2.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/ed5ac873
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/ed5ac873
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/ed5ac873

Branch: refs/heads/master
Commit: ed5ac8730e685b54c38494c26bf07c0603e8fa67
Parents: ff3098f
Author: Lionel Liu <bh...@163.com>
Authored: Mon Jun 11 15:58:39 2018 +0800
Committer: Lionel Liu <bh...@163.com>
Committed: Mon Jun 11 15:58:39 2018 +0800

----------------------------------------------------------------------
 .../apache/griffin/measure/Application.scala    |  16 +-
 .../griffin/measure/context/DQContext.scala     |   2 +
 .../measure/context/datasource/DataSource.scala |  14 +-
 .../context/datasource/DataSourceFactory.scala  |   9 +-
 .../datasource/cache/DataSourceCache.scala      | 366 -------------------
 .../cache/DataSourceCacheFactory.scala          |  68 ----
 .../datasource/cache/JsonDataSourceCache.scala  |  40 --
 .../datasource/cache/OrcDataSourceCache.scala   |  40 --
 .../cache/ParquetDataSourceCache.scala          |  42 ---
 .../datasource/cache/StreamingCacheClient.scala | 366 +++++++++++++++++++
 .../cache/StreamingCacheClientFactory.scala     |  68 ++++
 .../cache/StreamingCacheJsonClient.scala        |  40 ++
 .../cache/StreamingCacheOrcClient.scala         |  40 ++
 .../cache/StreamingCacheParquetClient.scala     |  42 +++
 .../connector/DataConnectorFactory.scala        |  16 +-
 .../streaming/KafkaStreamingDataConnector.scala |   4 +-
 .../KafkaStreamingStringDataConnector.scala     |   4 +-
 .../streaming/StreamingDataConnector.scala      |   4 +-
 .../apache/griffin/measure/launch/DQApp.scala   |   5 +-
 .../measure/launch/batch/BatchDQApp.scala       |   6 +-
 .../launch/streaming/StreamingDQApp.scala       | 104 +++++-
 .../launch/streaming/StreamingDQApp2.scala      | 104 ------
 .../step/builder/RuleParamStepBuilder.scala     |   4 +-
 .../dsl/transform/AccuracyExpr2DQSteps.scala    |   4 +-
 .../transform/DistinctnessExpr2DQSteps.scala    |   4 +-
 .../step/write/DataSourceUpdateWriteStep.scala  |  61 ++++
 .../step/write/DsCacheUpdateWriteStep.scala     |  61 ----
 measure/src/test/resources/env-batch.json       |   9 -
 28 files changed, 763 insertions(+), 780 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/Application.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/Application.scala b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
index 893ba2c..6580ffc 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/Application.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
@@ -77,7 +77,7 @@ object Application extends Loggable {
 
     // choose process
     val procType = ProcessType(allParam.dqParam.procType)
-    val proc: DQApp = procType match {
+    val dqApp: DQApp = procType match {
       case BatchProcessType => BatchDQApp(allParam)
       case StreamingProcessType => StreamingDQApp(allParam)
       case _ => {
@@ -88,8 +88,8 @@ object Application extends Loggable {
 
     startup
 
-    // process init
-    proc.init match {
+    // dq app init
+    dqApp.init match {
       case Success(_) => {
         info("process init success")
       }
@@ -100,15 +100,15 @@ object Application extends Loggable {
       }
     }
 
-    // process run
-    proc.run match {
+    // dq app run
+    dqApp.run match {
       case Success(_) => {
         info("process run success")
       }
       case Failure(ex) => {
         error(s"process run error: ${ex.getMessage}")
 
-        if (proc.retryable) {
+        if (dqApp.retryable) {
           throw ex
         } else {
           shutdown
@@ -117,8 +117,8 @@ object Application extends Loggable {
       }
     }
 
-    // process end
-    proc.close match {
+    // dq app end
+    dqApp.close match {
       case Success(_) => {
         info("process end success")
       }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
index 43b61aa..1d7dc62 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
@@ -26,6 +26,8 @@ import org.apache.spark.sql.{Encoders, SQLContext, SparkSession}
 
 /**
   * dq context: the context of each calculation
+  * unique context id in each calculation
+  * access the same spark session this app created
   */
 case class DQContext(contextId: ContextId,
                      name: String,

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSource.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSource.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSource.scala
index 09ab9ea..0ca61aa 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSource.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSource.scala
@@ -20,7 +20,7 @@ package org.apache.griffin.measure.context.datasource
 
 import org.apache.griffin.measure.Loggable
 import org.apache.griffin.measure.configuration.params.DataSourceParam
-import org.apache.griffin.measure.context.datasource.cache.DataSourceCache
+import org.apache.griffin.measure.context.datasource.cache.StreamingCacheClient
 import org.apache.griffin.measure.context.{ContextId, DQContext, TimeRange}
 import org.apache.griffin.measure.context.datasource.connector.DataConnector
 import org.apache.griffin.measure.context.datasource.info.TmstCache
@@ -32,12 +32,12 @@ import org.apache.spark.sql._
   * @param name     name of data source
   * @param dsParam  param of this data source
   * @param dataConnectors       list of data connectors
-  * @param dataSourceCacheOpt   data source cache option in streaming mode
+  * @param streamingCacheClientOpt   streaming data cache client option
   */
 case class DataSource(name: String,
                       dsParam: DataSourceParam,
                       dataConnectors: Seq[DataConnector],
-                      dataSourceCacheOpt: Option[DataSourceCache]
+                      streamingCacheClientOpt: Option[StreamingCacheClient]
                      ) extends Loggable with Serializable {
 
   def init(): Unit = {
@@ -67,7 +67,7 @@ case class DataSource(name: String,
         case _ => None
       }
     }
-    val caches = dataSourceCacheOpt match {
+    val caches = streamingCacheClientOpt match {
       case Some(dsc) => dsc.readData() :: Nil
       case _ => Nil
     }
@@ -83,15 +83,15 @@ case class DataSource(name: String,
   }
 
   def updateData(df: DataFrame): Unit = {
-    dataSourceCacheOpt.foreach(_.updateData(Some(df)))
+    streamingCacheClientOpt.foreach(_.updateData(Some(df)))
   }
 
   def cleanOldData(): Unit = {
-    dataSourceCacheOpt.foreach(_.cleanOutTimeData)
+    streamingCacheClientOpt.foreach(_.cleanOutTimeData)
   }
 
   def processFinish(): Unit = {
-    dataSourceCacheOpt.foreach(_.processFinish)
+    streamingCacheClientOpt.foreach(_.processFinish)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala
index a22b856..95c8de7 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala
@@ -20,7 +20,7 @@ package org.apache.griffin.measure.context.datasource
 
 import org.apache.griffin.measure.Loggable
 import org.apache.griffin.measure.configuration.params.DataSourceParam
-import org.apache.griffin.measure.context.datasource.cache.DataSourceCacheFactory
+import org.apache.griffin.measure.context.datasource.cache.StreamingCacheClientFactory
 import org.apache.griffin.measure.context.datasource.connector.{DataConnector, DataConnectorFactory}
 import org.apache.griffin.measure.context.datasource.info.TmstCache
 import org.apache.spark.sql.SparkSession
@@ -49,18 +49,19 @@ object DataSourceFactory extends Loggable {
     val connectorParams = dataSourceParam.getConnectors
     val tmstCache = TmstCache()
 
-    val dataSourceCacheOpt = DataSourceCacheFactory.getDataSourceCacheOpt(
+    // for streaming data cache
+    val streamingCacheClientOpt = StreamingCacheClientFactory.getClientOpt(
       sparkSession.sqlContext, dataSourceParam.cache, name, index, tmstCache)
 
     val dataConnectors: Seq[DataConnector] = connectorParams.flatMap { connectorParam =>
       DataConnectorFactory.getDataConnector(sparkSession, ssc, connectorParam,
-        tmstCache, dataSourceCacheOpt) match {
+        tmstCache, streamingCacheClientOpt) match {
           case Success(connector) => Some(connector)
           case _ => None
         }
     }
 
-    Some(DataSource(name, dataSourceParam, dataConnectors, dataSourceCacheOpt))
+    Some(DataSource(name, dataSourceParam, dataConnectors, streamingCacheClientOpt))
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/DataSourceCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/DataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/DataSourceCache.scala
deleted file mode 100644
index c70fd20..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/DataSourceCache.scala
+++ /dev/null
@@ -1,366 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.cache
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.context.TimeRange
-import org.apache.griffin.measure.context.datasource.info.{DataSourceCacheable, TmstCache}
-import org.apache.griffin.measure.context.streaming.info.{InfoCacheInstance, TimeInfoCache}
-import org.apache.griffin.measure.step.builder.ConstantColumns
-import org.apache.griffin.measure.utils.DataFrameUtil._
-import org.apache.griffin.measure.utils.ParamUtil._
-import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil}
-import org.apache.spark.sql._
-
-import scala.util.Random
-
-/**
-  * data source cache in streaming mode
-  * save data frame into hdfs in dump phase
-  * read data frame from hdfs in calculate phase
-  * with update and clean actions for the cache data
-  */
-trait DataSourceCache extends DataSourceCacheable with WithFanIn[Long] with Loggable with Serializable {
-
-  val sqlContext: SQLContext
-  val param: Map[String, Any]
-  val dsName: String
-  val index: Int
-
-  val tmstCache: TmstCache
-  protected def fromUntilRangeTmsts(from: Long, until: Long) = tmstCache.fromUntil(from, until)
-  protected def clearTmst(t: Long) = tmstCache.remove(t)
-  protected def clearTmstsUntil(until: Long) = {
-    val outDateTmsts = tmstCache.until(until)
-    tmstCache.remove(outDateTmsts)
-  }
-  protected def afterTilRangeTmsts(after: Long, til: Long) = fromUntilRangeTmsts(after + 1, til + 1)
-  protected def clearTmstsTil(til: Long) = clearTmstsUntil(til + 1)
-
-  val _FilePath = "file.path"
-  val _InfoPath = "info.path"
-  val _ReadyTimeInterval = "ready.time.interval"
-  val _ReadyTimeDelay = "ready.time.delay"
-  val _TimeRange = "time.range"
-
-  val rdmStr = Random.alphanumeric.take(10).mkString
-  val defFilePath = s"hdfs:///griffin/cache/${dsName}_${rdmStr}"
-  val defInfoPath = s"${index}"
-
-  val filePath: String = param.getString(_FilePath, defFilePath)
-  val cacheInfoPath: String = param.getString(_InfoPath, defInfoPath)
-  val readyTimeInterval: Long = TimeUtil.milliseconds(param.getString(_ReadyTimeInterval, "1m")).getOrElse(60000L)
-  val readyTimeDelay: Long = TimeUtil.milliseconds(param.getString(_ReadyTimeDelay, "1m")).getOrElse(60000L)
-  val deltaTimeRange: (Long, Long) = {
-    def negative(n: Long): Long = if (n <= 0) n else 0
-    param.get(_TimeRange) match {
-      case Some(seq: Seq[String]) => {
-        val nseq = seq.flatMap(TimeUtil.milliseconds(_))
-        val ns = negative(nseq.headOption.getOrElse(0))
-        val ne = negative(nseq.tail.headOption.getOrElse(0))
-        (ns, ne)
-      }
-      case _ => (0, 0)
-    }
-  }
-
-  val _ReadOnly = "read.only"
-  val readOnly = param.getBoolean(_ReadOnly, false)
-
-  val _Updatable = "updatable"
-  val updatable = param.getBoolean(_Updatable, false)
-
-  val newCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.new")
-  val oldCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.old")
-
-  val newFilePath = s"${filePath}/new"
-  val oldFilePath = s"${filePath}/old"
-
-  val defOldCacheIndex = 0L
-
-  protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit
-  protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame
-
-  /**
-    * save data frame in dump phase
-    * @param dfOpt    data frame to be saved
-    * @param ms       timestamp of this data frame
-    */
-  def saveData(dfOpt: Option[DataFrame], ms: Long): Unit = {
-    if (!readOnly) {
-      dfOpt match {
-        case Some(df) => {
-          // cache df
-          df.cache
-
-          // cache df
-          val cnt = df.count
-          info(s"save ${dsName} data count: ${cnt}")
-
-          if (cnt > 0) {
-            // lock makes it safer when writing new cache data
-            val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
-            if (newCacheLocked) {
-              try {
-                val dfw = df.write.mode(SaveMode.Append).partitionBy(ConstantColumns.tmst)
-                writeDataFrame(dfw, newFilePath)
-              } catch {
-                case e: Throwable => error(s"save data error: ${e.getMessage}")
-              } finally {
-                newCacheLock.unlock()
-              }
-            }
-          }
-
-          // uncache df
-          df.unpersist
-        }
-        case _ => {
-          info(s"no data frame to save")
-        }
-      }
-
-      // submit cache time and ready time
-      if (fanIncrement(ms)) {
-        info(s"save data [${ms}] finish")
-        submitCacheTime(ms)
-        submitReadyTime(ms)
-      }
-
-    }
-  }
-
-  /**
-    * read data frame in calculation phase
-    * @return   data frame to calculate, with the time range of data
-    */
-  def readData(): (Option[DataFrame], TimeRange) = {
-    // time range: (a, b]
-    val timeRange = TimeInfoCache.getTimeRange
-    val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + deltaTimeRange._2)
-
-    // read partition info
-    val filterStr = if (reviseTimeRange._1 == reviseTimeRange._2) {
-      info(s"read time range: [${reviseTimeRange._1}]")
-      s"`${ConstantColumns.tmst}` = ${reviseTimeRange._1}"
-    } else {
-      info(s"read time range: (${reviseTimeRange._1}, ${reviseTimeRange._2}]")
-      s"`${ConstantColumns.tmst}` > ${reviseTimeRange._1} AND `${ConstantColumns.tmst}` <= ${reviseTimeRange._2}"
-    }
-
-    // new cache data
-    val newDfOpt = try {
-      val dfr = sqlContext.read
-      Some(readDataFrame(dfr, newFilePath).filter(filterStr))
-    } catch {
-      case e: Throwable => {
-        warn(s"read data source cache warn: ${e.getMessage}")
-        None
-      }
-    }
-
-    // old cache data
-    val oldCacheIndexOpt = if (updatable) readOldCacheIndex else None
-    val oldDfOpt = oldCacheIndexOpt.flatMap { idx =>
-      val oldDfPath = s"${oldFilePath}/${idx}"
-      try {
-        val dfr = sqlContext.read
-        Some(readDataFrame(dfr, oldDfPath).filter(filterStr))
-      } catch {
-        case e: Throwable => {
-          warn(s"read old data source cache warn: ${e.getMessage}")
-          None
-        }
-      }
-    }
-
-    // whole cache data
-    val cacheDfOpt = unionDfOpts(newDfOpt, oldDfOpt)
-
-    // from until tmst range
-    val (from, until) = (reviseTimeRange._1, reviseTimeRange._2)
-    val tmstSet = afterTilRangeTmsts(from, until)
-
-    val retTimeRange = TimeRange(reviseTimeRange, tmstSet)
-    (cacheDfOpt, retTimeRange)
-  }
-
-  private def cleanOutTimePartitions(path: String, outTime: Long, partitionOpt: Option[String],
-                                     func: (Long, Long) => Boolean
-                                    ): Unit = {
-    val earlierOrEqPaths = listPartitionsByFunc(path: String, outTime, partitionOpt, func)
-    // delete out time data path
-    earlierOrEqPaths.foreach { path =>
-      info(s"delete hdfs path: ${path}")
-      HdfsUtil.deleteHdfsPath(path)
-    }
-  }
-  private def listPartitionsByFunc(path: String, bound: Long, partitionOpt: Option[String],
-                                        func: (Long, Long) => Boolean
-                                       ): Iterable[String] = {
-    val names = HdfsUtil.listSubPathsByType(path, "dir")
-    val regex = partitionOpt match {
-      case Some(partition) => s"^${partition}=(\\d+)$$".r
-      case _ => "^(\\d+)$".r
-    }
-    names.filter { name =>
-      name match {
-        case regex(value) => {
-          str2Long(value) match {
-            case Some(t) => func(t, bound)
-            case _ => false
-          }
-        }
-        case _ => false
-      }
-    }.map(name => s"${path}/${name}")
-  }
-  private def str2Long(str: String): Option[Long] = {
-    try {
-      Some(str.toLong)
-    } catch {
-      case e: Throwable => None
-    }
-  }
-
-  /**
-    * clean out-time cached data on hdfs
-    */
-  def cleanOutTimeData(): Unit = {
-    // clean tmst
-    val cleanTime = readCleanTime
-    cleanTime.foreach(clearTmstsTil(_))
-
-    if (!readOnly) {
-      // new cache data
-      val newCacheCleanTime = if (updatable) readLastProcTime else readCleanTime
-      newCacheCleanTime match {
-        case Some(nct) => {
-          // clean calculated new cache data
-          val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
-          if (newCacheLocked) {
-            try {
-              cleanOutTimePartitions(newFilePath, nct, Some(ConstantColumns.tmst),
-                (a: Long, b: Long) => (a <= b))
-            } catch {
-              case e: Throwable => error(s"clean new cache data error: ${e.getMessage}")
-            } finally {
-              newCacheLock.unlock()
-            }
-          }
-        }
-        case _ => {
-          // do nothing
-        }
-      }
-
-      // old cache data
-      val oldCacheCleanTime = if (updatable) readCleanTime else None
-      oldCacheCleanTime match {
-        case Some(oct) => {
-          val oldCacheIndexOpt = readOldCacheIndex
-          oldCacheIndexOpt.foreach { idx =>
-            val oldDfPath = s"${oldFilePath}/${idx}"
-            val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
-            if (oldCacheLocked) {
-              try {
-                // clean calculated old cache data
-                cleanOutTimePartitions(oldFilePath, idx, None, (a: Long, b: Long) => (a < b))
-                // clean out time old cache data not calculated
-//                cleanOutTimePartitions(oldDfPath, oct, Some(InternalColumns.tmst))
-              } catch {
-                case e: Throwable => error(s"clean old cache data error: ${e.getMessage}")
-              } finally {
-                oldCacheLock.unlock()
-              }
-            }
-          }
-        }
-        case _ => {
-          // do nothing
-        }
-      }
-    }
-  }
-
-  /**
-    * update old cached data by new data frame
-    * @param dfOpt    data frame to update old cached data
-    */
-  def updateData(dfOpt: Option[DataFrame]): Unit = {
-    if (!readOnly && updatable) {
-      dfOpt match {
-        case Some(df) => {
-          // old cache lock
-          val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
-          if (oldCacheLocked) {
-            try {
-              val oldCacheIndexOpt = readOldCacheIndex
-              val nextOldCacheIndex = oldCacheIndexOpt.getOrElse(defOldCacheIndex) + 1
-
-              val oldDfPath = s"${oldFilePath}/${nextOldCacheIndex}"
-              val cleanTime = getNextCleanTime
-              val filterStr = s"`${ConstantColumns.tmst}` > ${cleanTime}"
-              val updateDf = df.filter(filterStr)
-
-              val prlCount = sqlContext.sparkContext.defaultParallelism
-              // repartition
-              val repartitionedDf = updateDf.repartition(prlCount)
-              val dfw = repartitionedDf.write.mode(SaveMode.Overwrite)
-              writeDataFrame(dfw, oldDfPath)
-
-              submitOldCacheIndex(nextOldCacheIndex)
-            } catch {
-              case e: Throwable => error(s"update data error: ${e.getMessage}")
-            } finally {
-              oldCacheLock.unlock()
-            }
-          }
-        }
-        case _ => {
-          info(s"no data frame to update")
-        }
-      }
-    }
-  }
-
-  /**
-    * each time calculation phase finishes,
-    * data source cache needs to submit some cache information
-    */
-  def processFinish(): Unit = {
-    // next last proc time
-    val timeRange = TimeInfoCache.getTimeRange
-    submitLastProcTime(timeRange._2)
-
-    // next clean time
-    val nextCleanTime = timeRange._2 + deltaTimeRange._1
-    submitCleanTime(nextCleanTime)
-  }
-
-  // read next clean time
-  private def getNextCleanTime(): Long = {
-    val timeRange = TimeInfoCache.getTimeRange
-    val nextCleanTime = timeRange._2 + deltaTimeRange._1
-    nextCleanTime
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/DataSourceCacheFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/DataSourceCacheFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/DataSourceCacheFactory.scala
deleted file mode 100644
index ca882e0..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/DataSourceCacheFactory.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.cache
-
-import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.context.datasource.info.TmstCache
-import org.apache.griffin.measure.utils.ParamUtil._
-import org.apache.spark.sql.SQLContext
-
-object DataSourceCacheFactory extends Loggable {
-
-  private object DataSourceCacheType {
-    val ParquetRegex = "^(?i)parq(uet)?$".r
-    val JsonRegex = "^(?i)json$".r
-    val OrcRegex = "^(?i)orc$".r
-  }
-  import DataSourceCacheType._
-
-  val _type = "type"
-
-  /**
-    * create data source cache
-    * @param sqlContext   sqlContext in spark environment
-    * @param param        data source cache config
-    * @param name         data source name
-    * @param index        data source index
-    * @param tmstCache    the same tmstCache instance inside a data source
-    * @return             data source option
-    */
-  def getDataSourceCacheOpt(sqlContext: SQLContext, param: Map[String, Any],
-                            name: String, index: Int, tmstCache: TmstCache
-                           ): Option[DataSourceCache] = {
-    if (param != null) {
-      try {
-        val tp = param.getString(_type, "")
-        val dsCache = tp match {
-          case ParquetRegex() => ParquetDataSourceCache(sqlContext, param, name, index, tmstCache)
-          case JsonRegex() => JsonDataSourceCache(sqlContext, param, name, index, tmstCache)
-          case OrcRegex() => OrcDataSourceCache(sqlContext, param, name, index, tmstCache)
-          case _ => ParquetDataSourceCache(sqlContext, param, name, index, tmstCache)
-        }
-        Some(dsCache)
-      } catch {
-        case e: Throwable => {
-          error(s"generate data source cache fails")
-          None
-        }
-      }
-    } else None
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/JsonDataSourceCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/JsonDataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/JsonDataSourceCache.scala
deleted file mode 100644
index cb01274..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/JsonDataSourceCache.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.cache
-
-import org.apache.griffin.measure.context.datasource.info.TmstCache
-import org.apache.spark.sql._
-
-/**
-  * data source cache in json format
-  */
-case class JsonDataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
-                               dsName: String, index: Int, tmstCache: TmstCache
-                              ) extends DataSourceCache {
-
-  protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = {
-    info(s"write path: ${path}")
-    dfw.json(path)
-  }
-
-  protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = {
-    dfr.json(path)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/OrcDataSourceCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/OrcDataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/OrcDataSourceCache.scala
deleted file mode 100644
index daba15f..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/OrcDataSourceCache.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.cache
-
-import org.apache.griffin.measure.context.datasource.info.TmstCache
-import org.apache.spark.sql._
-
-/**
-  * data source cache in orc format
-  */
-case class OrcDataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
-                              dsName: String, index: Int, tmstCache: TmstCache
-                             ) extends DataSourceCache {
-
-  protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = {
-    info(s"write path: ${path}")
-    dfw.orc(path)
-  }
-
-  protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = {
-    dfr.orc(path)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/ParquetDataSourceCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/ParquetDataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/ParquetDataSourceCache.scala
deleted file mode 100644
index f00c6a3..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/ParquetDataSourceCache.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.cache
-
-import org.apache.griffin.measure.context.datasource.info.TmstCache
-import org.apache.spark.sql._
-
-/**
-  * data source cache in parquet format
-  */
-case class ParquetDataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
-                                  dsName: String, index: Int, tmstCache: TmstCache
-                                 ) extends DataSourceCache {
-
-  sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
-
-  protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = {
-    info(s"write path: ${path}")
-    dfw.parquet(path)
-  }
-
-  protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = {
-    dfr.parquet(path)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClient.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClient.scala
new file mode 100644
index 0000000..1c3ca60
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClient.scala
@@ -0,0 +1,366 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.datasource.cache
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.context.TimeRange
+import org.apache.griffin.measure.context.datasource.info.{DataSourceCacheable, TmstCache}
+import org.apache.griffin.measure.context.streaming.info.{InfoCacheInstance, TimeInfoCache}
+import org.apache.griffin.measure.step.builder.ConstantColumns
+import org.apache.griffin.measure.utils.DataFrameUtil._
+import org.apache.griffin.measure.utils.ParamUtil._
+import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil}
+import org.apache.spark.sql._
+
+import scala.util.Random
+
+/**
+  * data source cache in streaming mode
+  * save data frame into hdfs in dump phase
+  * read data frame from hdfs in calculate phase
+  * with update and clean actions for the cache data
+  */
+trait StreamingCacheClient extends DataSourceCacheable with WithFanIn[Long] with Loggable with Serializable {
+
+  val sqlContext: SQLContext
+  val param: Map[String, Any]
+  val dsName: String
+  val index: Int
+
+  val tmstCache: TmstCache
+  protected def fromUntilRangeTmsts(from: Long, until: Long) = tmstCache.fromUntil(from, until)
+  protected def clearTmst(t: Long) = tmstCache.remove(t)
+  protected def clearTmstsUntil(until: Long) = {
+    val outDateTmsts = tmstCache.until(until)
+    tmstCache.remove(outDateTmsts)
+  }
+  protected def afterTilRangeTmsts(after: Long, til: Long) = fromUntilRangeTmsts(after + 1, til + 1)
+  protected def clearTmstsTil(til: Long) = clearTmstsUntil(til + 1)
+
+  val _FilePath = "file.path"
+  val _InfoPath = "info.path"
+  val _ReadyTimeInterval = "ready.time.interval"
+  val _ReadyTimeDelay = "ready.time.delay"
+  val _TimeRange = "time.range"
+
+  val rdmStr = Random.alphanumeric.take(10).mkString
+  val defFilePath = s"hdfs:///griffin/cache/${dsName}_${rdmStr}"
+  val defInfoPath = s"${index}"
+
+  val filePath: String = param.getString(_FilePath, defFilePath)
+  val cacheInfoPath: String = param.getString(_InfoPath, defInfoPath)
+  val readyTimeInterval: Long = TimeUtil.milliseconds(param.getString(_ReadyTimeInterval, "1m")).getOrElse(60000L)
+  val readyTimeDelay: Long = TimeUtil.milliseconds(param.getString(_ReadyTimeDelay, "1m")).getOrElse(60000L)
+  val deltaTimeRange: (Long, Long) = {
+    def negative(n: Long): Long = if (n <= 0) n else 0
+    param.get(_TimeRange) match {
+      case Some(seq: Seq[String]) => {
+        val nseq = seq.flatMap(TimeUtil.milliseconds(_))
+        val ns = negative(nseq.headOption.getOrElse(0))
+        val ne = negative(nseq.tail.headOption.getOrElse(0))
+        (ns, ne)
+      }
+      case _ => (0, 0)
+    }
+  }
+
+  val _ReadOnly = "read.only"
+  val readOnly = param.getBoolean(_ReadOnly, false)
+
+  val _Updatable = "updatable"
+  val updatable = param.getBoolean(_Updatable, false)
+
+  val newCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.new")
+  val oldCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.old")
+
+  val newFilePath = s"${filePath}/new"
+  val oldFilePath = s"${filePath}/old"
+
+  val defOldCacheIndex = 0L
+
+  protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit
+  protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame
+
+  /**
+    * save data frame in dump phase
+    * @param dfOpt    data frame to be saved
+    * @param ms       timestamp of this data frame
+    */
+  def saveData(dfOpt: Option[DataFrame], ms: Long): Unit = {
+    if (!readOnly) {
+      dfOpt match {
+        case Some(df) => {
+          // cache df
+          df.cache
+
+          // cache df
+          val cnt = df.count
+          info(s"save ${dsName} data count: ${cnt}")
+
+          if (cnt > 0) {
+            // lock makes it safer when writing new cache data
+            val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
+            if (newCacheLocked) {
+              try {
+                val dfw = df.write.mode(SaveMode.Append).partitionBy(ConstantColumns.tmst)
+                writeDataFrame(dfw, newFilePath)
+              } catch {
+                case e: Throwable => error(s"save data error: ${e.getMessage}")
+              } finally {
+                newCacheLock.unlock()
+              }
+            }
+          }
+
+          // uncache df
+          df.unpersist
+        }
+        case _ => {
+          info(s"no data frame to save")
+        }
+      }
+
+      // submit cache time and ready time
+      if (fanIncrement(ms)) {
+        info(s"save data [${ms}] finish")
+        submitCacheTime(ms)
+        submitReadyTime(ms)
+      }
+
+    }
+  }
+
+  /**
+    * read data frame in calculation phase
+    * @return   data frame to calculate, with the time range of data
+    */
+  def readData(): (Option[DataFrame], TimeRange) = {
+    // time range: (a, b]
+    val timeRange = TimeInfoCache.getTimeRange
+    val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + deltaTimeRange._2)
+
+    // read partition info
+    val filterStr = if (reviseTimeRange._1 == reviseTimeRange._2) {
+      info(s"read time range: [${reviseTimeRange._1}]")
+      s"`${ConstantColumns.tmst}` = ${reviseTimeRange._1}"
+    } else {
+      info(s"read time range: (${reviseTimeRange._1}, ${reviseTimeRange._2}]")
+      s"`${ConstantColumns.tmst}` > ${reviseTimeRange._1} AND `${ConstantColumns.tmst}` <= ${reviseTimeRange._2}"
+    }
+
+    // new cache data
+    val newDfOpt = try {
+      val dfr = sqlContext.read
+      Some(readDataFrame(dfr, newFilePath).filter(filterStr))
+    } catch {
+      case e: Throwable => {
+        warn(s"read data source cache warn: ${e.getMessage}")
+        None
+      }
+    }
+
+    // old cache data
+    val oldCacheIndexOpt = if (updatable) readOldCacheIndex else None
+    val oldDfOpt = oldCacheIndexOpt.flatMap { idx =>
+      val oldDfPath = s"${oldFilePath}/${idx}"
+      try {
+        val dfr = sqlContext.read
+        Some(readDataFrame(dfr, oldDfPath).filter(filterStr))
+      } catch {
+        case e: Throwable => {
+          warn(s"read old data source cache warn: ${e.getMessage}")
+          None
+        }
+      }
+    }
+
+    // whole cache data
+    val cacheDfOpt = unionDfOpts(newDfOpt, oldDfOpt)
+
+    // from until tmst range
+    val (from, until) = (reviseTimeRange._1, reviseTimeRange._2)
+    val tmstSet = afterTilRangeTmsts(from, until)
+
+    val retTimeRange = TimeRange(reviseTimeRange, tmstSet)
+    (cacheDfOpt, retTimeRange)
+  }
+
+  private def cleanOutTimePartitions(path: String, outTime: Long, partitionOpt: Option[String],
+                                     func: (Long, Long) => Boolean
+                                    ): Unit = {
+    val earlierOrEqPaths = listPartitionsByFunc(path: String, outTime, partitionOpt, func)
+    // delete out time data path
+    earlierOrEqPaths.foreach { path =>
+      info(s"delete hdfs path: ${path}")
+      HdfsUtil.deleteHdfsPath(path)
+    }
+  }
+  private def listPartitionsByFunc(path: String, bound: Long, partitionOpt: Option[String],
+                                        func: (Long, Long) => Boolean
+                                       ): Iterable[String] = {
+    val names = HdfsUtil.listSubPathsByType(path, "dir")
+    val regex = partitionOpt match {
+      case Some(partition) => s"^${partition}=(\\d+)$$".r
+      case _ => "^(\\d+)$".r
+    }
+    names.filter { name =>
+      name match {
+        case regex(value) => {
+          str2Long(value) match {
+            case Some(t) => func(t, bound)
+            case _ => false
+          }
+        }
+        case _ => false
+      }
+    }.map(name => s"${path}/${name}")
+  }
+  private def str2Long(str: String): Option[Long] = {
+    try {
+      Some(str.toLong)
+    } catch {
+      case e: Throwable => None
+    }
+  }
+
+  /**
+    * clean out-time cached data on hdfs
+    */
+  def cleanOutTimeData(): Unit = {
+    // clean tmst
+    val cleanTime = readCleanTime
+    cleanTime.foreach(clearTmstsTil(_))
+
+    if (!readOnly) {
+      // new cache data
+      val newCacheCleanTime = if (updatable) readLastProcTime else readCleanTime
+      newCacheCleanTime match {
+        case Some(nct) => {
+          // clean calculated new cache data
+          val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
+          if (newCacheLocked) {
+            try {
+              cleanOutTimePartitions(newFilePath, nct, Some(ConstantColumns.tmst),
+                (a: Long, b: Long) => (a <= b))
+            } catch {
+              case e: Throwable => error(s"clean new cache data error: ${e.getMessage}")
+            } finally {
+              newCacheLock.unlock()
+            }
+          }
+        }
+        case _ => {
+          // do nothing
+        }
+      }
+
+      // old cache data
+      val oldCacheCleanTime = if (updatable) readCleanTime else None
+      oldCacheCleanTime match {
+        case Some(oct) => {
+          val oldCacheIndexOpt = readOldCacheIndex
+          oldCacheIndexOpt.foreach { idx =>
+            val oldDfPath = s"${oldFilePath}/${idx}"
+            val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
+            if (oldCacheLocked) {
+              try {
+                // clean calculated old cache data
+                cleanOutTimePartitions(oldFilePath, idx, None, (a: Long, b: Long) => (a < b))
+                // clean out time old cache data not calculated
+//                cleanOutTimePartitions(oldDfPath, oct, Some(InternalColumns.tmst))
+              } catch {
+                case e: Throwable => error(s"clean old cache data error: ${e.getMessage}")
+              } finally {
+                oldCacheLock.unlock()
+              }
+            }
+          }
+        }
+        case _ => {
+          // do nothing
+        }
+      }
+    }
+  }
+
+  /**
+    * update old cached data by new data frame
+    * @param dfOpt    data frame to update old cached data
+    */
+  def updateData(dfOpt: Option[DataFrame]): Unit = {
+    if (!readOnly && updatable) {
+      dfOpt match {
+        case Some(df) => {
+          // old cache lock
+          val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
+          if (oldCacheLocked) {
+            try {
+              val oldCacheIndexOpt = readOldCacheIndex
+              val nextOldCacheIndex = oldCacheIndexOpt.getOrElse(defOldCacheIndex) + 1
+
+              val oldDfPath = s"${oldFilePath}/${nextOldCacheIndex}"
+              val cleanTime = getNextCleanTime
+              val filterStr = s"`${ConstantColumns.tmst}` > ${cleanTime}"
+              val updateDf = df.filter(filterStr)
+
+              val prlCount = sqlContext.sparkContext.defaultParallelism
+              // repartition
+              val repartitionedDf = updateDf.repartition(prlCount)
+              val dfw = repartitionedDf.write.mode(SaveMode.Overwrite)
+              writeDataFrame(dfw, oldDfPath)
+
+              submitOldCacheIndex(nextOldCacheIndex)
+            } catch {
+              case e: Throwable => error(s"update data error: ${e.getMessage}")
+            } finally {
+              oldCacheLock.unlock()
+            }
+          }
+        }
+        case _ => {
+          info(s"no data frame to update")
+        }
+      }
+    }
+  }
+
+  /**
+    * each time calculation phase finishes,
+    * data source cache needs to submit some cache information
+    */
+  def processFinish(): Unit = {
+    // next last proc time
+    val timeRange = TimeInfoCache.getTimeRange
+    submitLastProcTime(timeRange._2)
+
+    // next clean time
+    val nextCleanTime = timeRange._2 + deltaTimeRange._1
+    submitCleanTime(nextCleanTime)
+  }
+
+  // read next clean time
+  private def getNextCleanTime(): Long = {
+    val timeRange = TimeInfoCache.getTimeRange
+    val nextCleanTime = timeRange._2 + deltaTimeRange._1
+    nextCleanTime
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClientFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClientFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClientFactory.scala
new file mode 100644
index 0000000..529b07a
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClientFactory.scala
@@ -0,0 +1,68 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.datasource.cache
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.context.datasource.info.TmstCache
+import org.apache.griffin.measure.utils.ParamUtil._
+import org.apache.spark.sql.SQLContext
+
+object StreamingCacheClientFactory extends Loggable {
+
+  private object DataSourceCacheType {
+    val ParquetRegex = "^(?i)parq(uet)?$".r
+    val JsonRegex = "^(?i)json$".r
+    val OrcRegex = "^(?i)orc$".r
+  }
+  import DataSourceCacheType._
+
+  val _type = "type"
+
+  /**
+    * create streaming cache client
+    * @param sqlContext   sqlContext in spark environment
+    * @param param        data source cache config
+    * @param name         data source name
+    * @param index        data source index
+    * @param tmstCache    the same tmstCache instance inside a data source
+    * @return             streaming cache client option
+    */
+  def getClientOpt(sqlContext: SQLContext, param: Map[String, Any],
+                   name: String, index: Int, tmstCache: TmstCache
+                  ): Option[StreamingCacheClient] = {
+    if (param != null) {
+      try {
+        val tp = param.getString(_type, "")
+        val dsCache = tp match {
+          case ParquetRegex() => StreamingCacheParquetClient(sqlContext, param, name, index, tmstCache)
+          case JsonRegex() => StreamingCacheJsonClient(sqlContext, param, name, index, tmstCache)
+          case OrcRegex() => StreamingCacheOrcClient(sqlContext, param, name, index, tmstCache)
+          case _ => StreamingCacheParquetClient(sqlContext, param, name, index, tmstCache)
+        }
+        Some(dsCache)
+      } catch {
+        case e: Throwable => {
+          error(s"generate data source cache fails")
+          None
+        }
+      }
+    } else None
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheJsonClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheJsonClient.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheJsonClient.scala
new file mode 100644
index 0000000..494db3e
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheJsonClient.scala
@@ -0,0 +1,40 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.datasource.cache
+
+import org.apache.griffin.measure.context.datasource.info.TmstCache
+import org.apache.spark.sql._
+
+/**
+  * data source cache in json format
+  */
+case class StreamingCacheJsonClient(sqlContext: SQLContext, param: Map[String, Any],
+                                    dsName: String, index: Int, tmstCache: TmstCache
+                              ) extends StreamingCacheClient {
+
+  protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = {
+    info(s"write path: ${path}")
+    dfw.json(path)
+  }
+
+  protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = {
+    dfr.json(path)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheOrcClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheOrcClient.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheOrcClient.scala
new file mode 100644
index 0000000..6e0f142
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheOrcClient.scala
@@ -0,0 +1,40 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.datasource.cache
+
+import org.apache.griffin.measure.context.datasource.info.TmstCache
+import org.apache.spark.sql._
+
+/**
+  * data source cache in orc format
+  */
+case class StreamingCacheOrcClient(sqlContext: SQLContext, param: Map[String, Any],
+                                   dsName: String, index: Int, tmstCache: TmstCache
+                             ) extends StreamingCacheClient {
+
+  protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = {
+    info(s"write path: ${path}")
+    dfw.orc(path)
+  }
+
+  protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = {
+    dfr.orc(path)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheParquetClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheParquetClient.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheParquetClient.scala
new file mode 100644
index 0000000..d99bc58
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheParquetClient.scala
@@ -0,0 +1,42 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.datasource.cache
+
+import org.apache.griffin.measure.context.datasource.info.TmstCache
+import org.apache.spark.sql._
+
+/**
+  * data source cache in parquet format
+  */
+case class StreamingCacheParquetClient(sqlContext: SQLContext, param: Map[String, Any],
+                                       dsName: String, index: Int, tmstCache: TmstCache
+                                 ) extends StreamingCacheClient {
+
+  sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
+
+  protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = {
+    info(s"write path: ${path}")
+    dfw.parquet(path)
+  }
+
+  protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = {
+    dfr.parquet(path)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala
index ea22309..30352a9 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala
@@ -20,7 +20,7 @@ package org.apache.griffin.measure.context.datasource.connector
 
 import org.apache.griffin.measure.Loggable
 import org.apache.griffin.measure.configuration.params.DataConnectorParam
-import org.apache.griffin.measure.context.datasource.cache.DataSourceCache
+import org.apache.griffin.measure.context.datasource.cache.StreamingCacheClient
 import org.apache.griffin.measure.context.datasource.connector.batch._
 import org.apache.griffin.measure.context.datasource.connector.streaming._
 import org.apache.griffin.measure.context.datasource.info.TmstCache
@@ -44,14 +44,14 @@ object DataConnectorFactory extends Loggable {
     * @param ssc              spark streaming env
     * @param dcParam          data connector param
     * @param tmstCache        same tmst cache in one data source
-    * @param dataSourceCacheOpt   for streaming data connector
+    * @param streamingCacheClientOpt   for streaming cache
     * @return   data connector
     */
   def getDataConnector(sparkSession: SparkSession,
                        ssc: StreamingContext,
                        dcParam: DataConnectorParam,
                        tmstCache: TmstCache,
-                       dataSourceCacheOpt: Option[DataSourceCache]
+                       streamingCacheClientOpt: Option[StreamingCacheClient]
                       ): Try[DataConnector] = {
     val conType = dcParam.conType
     val version = dcParam.version
@@ -61,7 +61,7 @@ object DataConnectorFactory extends Loggable {
         case AvroRegex() => AvroBatchDataConnector(sparkSession, dcParam, tmstCache)
         case TextDirRegex() => TextDirBatchDataConnector(sparkSession, dcParam, tmstCache)
         case KafkaRegex() => {
-          getStreamingDataConnector(sparkSession, ssc, dcParam, tmstCache, dataSourceCacheOpt)
+          getStreamingDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
         }
         case _ => throw new Exception("connector creation error!")
       }
@@ -72,13 +72,13 @@ object DataConnectorFactory extends Loggable {
                                         ssc: StreamingContext,
                                         dcParam: DataConnectorParam,
                                         tmstCache: TmstCache,
-                                        dataSourceCacheOpt: Option[DataSourceCache]
+                                        streamingCacheClientOpt: Option[StreamingCacheClient]
                                        ): StreamingDataConnector = {
     if (ssc == null) throw new Exception("streaming context is null!")
     val conType = dcParam.conType
     val version = dcParam.version
     conType match {
-      case KafkaRegex() => getKafkaDataConnector(sparkSession, ssc, dcParam, tmstCache, dataSourceCacheOpt)
+      case KafkaRegex() => getKafkaDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
       case _ => throw new Exception("streaming connector creation error!")
     }
   }
@@ -87,7 +87,7 @@ object DataConnectorFactory extends Loggable {
                                     ssc: StreamingContext,
                                     dcParam: DataConnectorParam,
                                     tmstCache: TmstCache,
-                                    dataSourceCacheOpt: Option[DataSourceCache]
+                                    streamingCacheClientOpt: Option[StreamingCacheClient]
                                    ): KafkaStreamingDataConnector  = {
     val KeyType = "key.type"
     val ValueType = "value.type"
@@ -96,7 +96,7 @@ object DataConnectorFactory extends Loggable {
     val valueType = config.getOrElse(ValueType, "java.lang.String").toString
     (getClassTag(keyType), getClassTag(valueType)) match {
       case (ClassTag(k: Class[String]), ClassTag(v: Class[String])) => {
-        KafkaStreamingStringDataConnector(sparkSession, ssc, dcParam, tmstCache, dataSourceCacheOpt)
+        KafkaStreamingStringDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
       }
       case _ => {
         throw new Exception("not supported type kafka data connector")

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala
index 9fe4876..de2822b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala
@@ -43,7 +43,7 @@ trait KafkaStreamingDataConnector extends StreamingDataConnector {
 
   def init(): Unit = {
     // register fan in
-    dataSourceCacheOpt.foreach(_.registerFanIn)
+    streamingCacheClientOpt.foreach(_.registerFanIn)
 
     val ds = stream match {
       case Success(dstream) => dstream
@@ -71,7 +71,7 @@ trait KafkaStreamingDataConnector extends StreamingDataConnector {
       }
 
       // save data frame
-      dataSourceCacheOpt.foreach(_.saveData(saveDfOpt, ms))
+      streamingCacheClientOpt.foreach(_.saveData(saveDfOpt, ms))
     })
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala
index b483933..3083ca6 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala
@@ -20,7 +20,7 @@ package org.apache.griffin.measure.context.datasource.connector.streaming
 
 import kafka.serializer.StringDecoder
 import org.apache.griffin.measure.configuration.params.DataConnectorParam
-import org.apache.griffin.measure.context.datasource.cache.DataSourceCache
+import org.apache.griffin.measure.context.datasource.cache.StreamingCacheClient
 import org.apache.griffin.measure.context.datasource.info.TmstCache
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.types.{StringType, StructField, StructType}
@@ -36,7 +36,7 @@ case class KafkaStreamingStringDataConnector(@transient sparkSession: SparkSessi
                                              @transient ssc: StreamingContext,
                                              dcParam: DataConnectorParam,
                                              tmstCache: TmstCache,
-                                             dataSourceCacheOpt: Option[DataSourceCache]
+                                             streamingCacheClientOpt: Option[StreamingCacheClient]
                                             ) extends KafkaStreamingDataConnector {
 
   type K = String

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/StreamingDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/StreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/StreamingDataConnector.scala
index 3b2c355..737bc21 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/StreamingDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/StreamingDataConnector.scala
@@ -19,7 +19,7 @@ under the License.
 package org.apache.griffin.measure.context.datasource.connector.streaming
 
 import org.apache.griffin.measure.context.TimeRange
-import org.apache.griffin.measure.context.datasource.cache.DataSourceCache
+import org.apache.griffin.measure.context.datasource.cache.StreamingCacheClient
 import org.apache.griffin.measure.context.datasource.connector.DataConnector
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
@@ -41,6 +41,6 @@ trait StreamingDataConnector extends DataConnector {
   // streaming data connector cannot directly read data frame
   def data(ms: Long): (Option[DataFrame], TimeRange) = (None, TimeRange.emptyTimeRange)
 
-  val dataSourceCacheOpt: Option[DataSourceCache]
+  val streamingCacheClientOpt: Option[StreamingCacheClient]
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
index 42364a8..9ec1641 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
@@ -44,7 +44,10 @@ trait DQApp extends Loggable with Serializable {
     */
   def retryable: Boolean
 
-  protected def getAppTime: Long = {
+  /**
+    * timestamp as a key for metrics
+    */
+  protected def getMeasureTime: Long = {
     if (dqParam.timestamp != null && dqParam.timestamp > 0) { dqParam.timestamp }
     else { System.currentTimeMillis }
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
index fec27b1..1aa5039 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
@@ -66,10 +66,10 @@ case class BatchDQApp(allParam: AllParam) extends DQApp {
     // start time
     val startTime = new Date().getTime
 
-    val appTime = getAppTime
-    val contextId = ContextId(appTime)
+    val measureTime = getMeasureTime
+    val contextId = ContextId(measureTime)
 
-    // generate data sources
+    // get data sources
     val dataSources = DataSourceFactory.getDataSources(sparkSession, null, dqParam.dataSources)
     dataSources.foreach(_.init)
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
index e4a8108..d89b7e8 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
@@ -18,14 +18,17 @@ under the License.
 */
 package org.apache.griffin.measure.launch.streaming
 
-import java.util.{Timer, TimerTask}
+import java.util.{Date, Timer, TimerTask}
 import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit}
 
+import org.apache.griffin.measure.Loggable
 import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.configuration.params._
 import org.apache.griffin.measure.context._
 import org.apache.griffin.measure.context.datasource.DataSourceFactory
-import org.apache.griffin.measure.context.streaming.info.InfoCacheInstance
+import org.apache.griffin.measure.context.streaming.info.{InfoCacheInstance, TimeInfoCache}
+import org.apache.griffin.measure.context.streaming.metric.CacheResults
+import org.apache.griffin.measure.job.builder.DQJobBuilder
 import org.apache.griffin.measure.launch.DQApp
 import org.apache.griffin.measure.step.builder.udf.GriffinUDFAgent
 import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil}
@@ -87,8 +90,8 @@ case class StreamingDQApp(allParam: AllParam) extends DQApp {
     })
 
     // start time
-    val appTime = getAppTime
-    val contextId = ContextId(appTime)
+    val measureTime = getMeasureTime
+    val contextId = ContextId(measureTime)
 
     // generate data sources
     val dataSources = DataSourceFactory.getDataSources(sparkSession, ssc, dqParam.dataSources)
@@ -104,13 +107,13 @@ case class StreamingDQApp(allParam: AllParam) extends DQApp {
     globalContext.getPersist().start(applicationId)
 
     // process thread
-    val dqThread = StreamingDQApp2(globalContext, dqParam.evaluateRule)
+    val dqCalculator = StreamingDQCalculator(globalContext, dqParam.evaluateRule)
 
     val processInterval = TimeUtil.milliseconds(sparkParam.processInterval) match {
       case Some(interval) => interval
       case _ => throw new Exception("invalid batch interval")
     }
-    val process = TimingProcess(processInterval, dqThread)
+    val process = Scheduler(processInterval, dqCalculator)
     process.startup()
 
     ssc.start()
@@ -150,7 +153,94 @@ case class StreamingDQApp(allParam: AllParam) extends DQApp {
     }
   }
 
-  case class TimingProcess(interval: Long, runnable: Runnable) {
+
+  /**
+    *
+    * @param globalContext
+    * @param evaluateRuleParam
+    */
+  case class StreamingDQCalculator(globalContext: DQContext,
+                                   evaluateRuleParam: EvaluateRuleParam
+                                  ) extends Runnable with Loggable {
+
+    val lock = InfoCacheInstance.genLock("process")
+    val appPersist = globalContext.getPersist()
+
+    def run(): Unit = {
+      val updateTimeDate = new Date()
+      val updateTime = updateTimeDate.getTime
+      println(s"===== [${updateTimeDate}] process begins =====")
+      val locked = lock.lock(5, TimeUnit.SECONDS)
+      if (locked) {
+        try {
+
+          TimeInfoCache.startTimeInfoCache
+
+          val startTime = new Date().getTime
+          appPersist.log(startTime, s"starting process ...")
+          val contextId = ContextId(startTime)
+
+          // create dq context
+          val dqContext: DQContext = globalContext.cloneDQContext(contextId)
+
+          // build job
+          val dqJob = DQJobBuilder.buildDQJob(dqContext, evaluateRuleParam)
+
+          // dq job execute
+          dqJob.execute(dqContext)
+
+          // finish calculation
+          finishCalculation(dqContext)
+
+          // end time
+          val endTime = new Date().getTime
+          appPersist.log(endTime, s"process using time: ${endTime - startTime} ms")
+
+          TimeInfoCache.endTimeInfoCache
+
+          // clean old data
+          cleanData(dqContext)
+
+        } catch {
+          case e: Throwable => error(s"process error: ${e.getMessage}")
+        } finally {
+          lock.unlock()
+        }
+      } else {
+        println(s"===== [${updateTimeDate}] process ignores =====")
+      }
+      val endTime = new Date().getTime
+      println(s"===== [${updateTimeDate}] process ends, using ${endTime - updateTime} ms =====")
+    }
+
+    // finish calculation for this round
+    private def finishCalculation(context: DQContext): Unit = {
+      context.dataSources.foreach(_.processFinish)
+    }
+
+    // clean old data and old result cache
+    private def cleanData(context: DQContext): Unit = {
+      try {
+        context.dataSources.foreach(_.cleanOldData)
+
+        context.clean()
+
+        val cleanTime = TimeInfoCache.getCleanTime
+        CacheResults.refresh(cleanTime)
+      } catch {
+        case e: Throwable => error(s"clean data error: ${e.getMessage}")
+      }
+    }
+
+  }
+
+
+  /**
+    *
+    * @param interval
+    * @param runnable
+    */
+  case class Scheduler(interval: Long, runnable: Runnable) {
 
     val pool: ThreadPoolExecutor = Executors.newFixedThreadPool(5).asInstanceOf[ThreadPoolExecutor]
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp2.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp2.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp2.scala
deleted file mode 100644
index 97ce980..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp2.scala
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.launch.streaming
-
-import java.util.Date
-import java.util.concurrent.TimeUnit
-
-import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.configuration.params._
-import org.apache.griffin.measure.context.streaming.info.{InfoCacheInstance, TimeInfoCache}
-import org.apache.griffin.measure.context.streaming.metric.CacheResults
-import org.apache.griffin.measure.context.{ContextId, DQContext}
-import org.apache.griffin.measure.job.builder.DQJobBuilder
-
-case class StreamingDQApp2(globalContext: DQContext,
-                           evaluateRuleParam: EvaluateRuleParam
-                          ) extends Runnable with Loggable {
-
-  val lock = InfoCacheInstance.genLock("process")
-  val appPersist = globalContext.getPersist()
-
-  def run(): Unit = {
-    val updateTimeDate = new Date()
-    val updateTime = updateTimeDate.getTime
-    println(s"===== [${updateTimeDate}] process begins =====")
-    val locked = lock.lock(5, TimeUnit.SECONDS)
-    if (locked) {
-      try {
-
-        TimeInfoCache.startTimeInfoCache
-
-        val startTime = new Date().getTime
-        appPersist.log(startTime, s"starting process ...")
-        val contextId = ContextId(startTime)
-
-        // create dq context
-        val dqContext: DQContext = globalContext.cloneDQContext(contextId)
-
-        // build job
-        val dqJob = DQJobBuilder.buildDQJob(dqContext, evaluateRuleParam)
-
-        // dq job execute
-        dqJob.execute(dqContext)
-
-        // finish calculation
-        finishCalculation(dqContext)
-
-        // end time
-        val endTime = new Date().getTime
-        appPersist.log(endTime, s"process using time: ${endTime - startTime} ms")
-
-        TimeInfoCache.endTimeInfoCache
-
-        // clean old data
-        cleanData(dqContext)
-
-      } catch {
-        case e: Throwable => error(s"process error: ${e.getMessage}")
-      } finally {
-        lock.unlock()
-      }
-    } else {
-      println(s"===== [${updateTimeDate}] process ignores =====")
-    }
-    val endTime = new Date().getTime
-    println(s"===== [${updateTimeDate}] process ends, using ${endTime - updateTime} ms =====")
-  }
-
-  // finish calculation for this round
-  private def finishCalculation(context: DQContext): Unit = {
-    context.dataSources.foreach(_.processFinish)
-  }
-
-  // clean old data and old result cache
-  private def cleanData(context: DQContext): Unit = {
-    try {
-      context.dataSources.foreach(_.cleanOldData)
-
-      context.clean()
-
-      val cleanTime = TimeInfoCache.getCleanTime
-      CacheResults.refresh(cleanTime)
-    } catch {
-      case e: Throwable => error(s"clean data error: ${e.getMessage}")
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
index 9a2c7b5..fa9e38b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
@@ -21,7 +21,7 @@ package org.apache.griffin.measure.step.builder
 import org.apache.griffin.measure.configuration.enums.NormalizeType
 import org.apache.griffin.measure.configuration.params.RuleParam
 import org.apache.griffin.measure.context.DQContext
-import org.apache.griffin.measure.step.write.{DsCacheUpdateWriteStep, MetricWriteStep, RecordWriteStep}
+import org.apache.griffin.measure.step.write.{DataSourceUpdateWriteStep, MetricWriteStep, RecordWriteStep}
 import org.apache.griffin.measure.step.{DQStep, SeqDQStep}
 
 /**
@@ -52,7 +52,7 @@ trait RuleParamStepBuilder extends DQStepBuilder {
     }.toSeq
     // update writer
     val dsCacheUpdateSteps = ruleParam.dsCacheUpdateOpt.map { dsCacheUpdate =>
-      DsCacheUpdateWriteStep(dsCacheUpdate.dsName, name)
+      DataSourceUpdateWriteStep(dsCacheUpdate.dsName, name)
     }.toSeq
 
     metricSteps ++ recordSteps ++ dsCacheUpdateSteps

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
index a0e5ca3..9c14325 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
@@ -27,7 +27,7 @@ import org.apache.griffin.measure.step.builder.dsl.expr._
 import org.apache.griffin.measure.step.builder.dsl.transform.analyzer.AccuracyAnalyzer
 import org.apache.griffin.measure.step.transform.DataFrameOps.AccuracyOprKeys
 import org.apache.griffin.measure.step.transform.{DataFrameOps, DataFrameOpsTransformStep, SparkSqlTransformStep}
-import org.apache.griffin.measure.step.write.{DsCacheUpdateWriteStep, MetricWriteStep, RecordWriteStep}
+import org.apache.griffin.measure.step.write.{DataSourceUpdateWriteStep, MetricWriteStep, RecordWriteStep}
 import org.apache.griffin.measure.utils.ParamUtil._
 
 /**
@@ -91,7 +91,7 @@ case class AccuracyExpr2DQSteps(context: DQContext,
         case BatchProcessType => Nil
         case StreamingProcessType => {
           val dsName = ruleParam.dsCacheUpdateOpt.map(_.dsName).getOrElse(sourceName)
-          DsCacheUpdateWriteStep(dsName, missRecordsTableName) :: Nil
+          DataSourceUpdateWriteStep(dsName, missRecordsTableName) :: Nil
         }
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
index 3482955..1cf94e0 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
@@ -26,7 +26,7 @@ import org.apache.griffin.measure.step.builder.ConstantColumns
 import org.apache.griffin.measure.step.builder.dsl.expr.{DistinctnessClause, _}
 import org.apache.griffin.measure.step.builder.dsl.transform.analyzer.DistinctnessAnalyzer
 import org.apache.griffin.measure.step.transform.SparkSqlTransformStep
-import org.apache.griffin.measure.step.write.{DsCacheUpdateWriteStep, MetricWriteStep, RecordWriteStep}
+import org.apache.griffin.measure.step.write.{DataSourceUpdateWriteStep, MetricWriteStep, RecordWriteStep}
 import org.apache.griffin.measure.utils.ParamUtil._
 
 /**
@@ -133,7 +133,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
       val ((transSteps2, writeSteps2), dupCountTableName) = procType match {
         case StreamingProcessType if (withOlderTable) => {
           // 4.0 update old data
-          val targetDsUpdateWriteStep = DsCacheUpdateWriteStep(targetName, targetName)
+          val targetDsUpdateWriteStep = DataSourceUpdateWriteStep(targetName, targetName)
 
           // 4. older alias
           val olderAliasTableName = "__older"

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
new file mode 100644
index 0000000..0472416
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
@@ -0,0 +1,61 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.write
+
+import org.apache.commons.lang.StringUtils
+import org.apache.griffin.measure.context.DQContext
+import org.apache.spark.sql.DataFrame
+
+/**
+  * update data source streaming cache
+  */
+case class DataSourceUpdateWriteStep(dsName: String,
+                                     inputName: String
+                                    ) extends WriteStep {
+
+  val name: String = ""
+  val writeTimestampOpt: Option[Long] = None
+
+  def execute(context: DQContext): Boolean = {
+    collectDsCacheUpdateDf(context) match {
+      case Some(df) => {
+        context.dataSources.find(ds => StringUtils.equals(ds.name, dsName)).foreach(_.updateData(df))
+      }
+      case _ => {
+        warn(s"update ${dsName} from ${inputName} fails")
+      }
+    }
+    true
+  }
+
+  private def getDataFrame(context: DQContext, name: String): Option[DataFrame] = {
+    try {
+      val df = context.sqlContext.table(s"`${name}`")
+      Some(df)
+    } catch {
+      case e: Throwable => {
+        error(s"get data frame ${name} fails")
+        None
+      }
+    }
+  }
+
+  private def collectDsCacheUpdateDf(context: DQContext): Option[DataFrame] = getDataFrame(context, inputName)
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/step/write/DsCacheUpdateWriteStep.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/DsCacheUpdateWriteStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/DsCacheUpdateWriteStep.scala
deleted file mode 100644
index 27dbb3c..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/step/write/DsCacheUpdateWriteStep.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.step.write
-
-import org.apache.commons.lang.StringUtils
-import org.apache.griffin.measure.context.DQContext
-import org.apache.spark.sql.DataFrame
-
-/**
-  * update streaming data source cache
-  */
-case class DsCacheUpdateWriteStep(dsName: String,
-                                  inputName: String
-                                 ) extends WriteStep {
-
-  val name: String = ""
-  val writeTimestampOpt: Option[Long] = None
-
-  def execute(context: DQContext): Boolean = {
-    collectDsCacheUpdateDf(context) match {
-      case Some(df) => {
-        context.dataSources.find(ds => StringUtils.equals(ds.name, dsName)).foreach(_.updateData(df))
-      }
-      case _ => {
-        warn(s"update ${dsName} from ${inputName} fails")
-      }
-    }
-    true
-  }
-
-  private def getDataFrame(context: DQContext, name: String): Option[DataFrame] = {
-    try {
-      val df = context.sqlContext.table(s"`${name}`")
-      Some(df)
-    } catch {
-      case e: Throwable => {
-        error(s"get data frame ${name} fails")
-        None
-      }
-    }
-  }
-
-  private def collectDsCacheUpdateDf(context: DQContext): Option[DataFrame] = getDataFrame(context, inputName)
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/test/resources/env-batch.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/env-batch.json b/measure/src/test/resources/env-batch.json
index 3e8aa80..0d6ea8a 100644
--- a/measure/src/test/resources/env-batch.json
+++ b/measure/src/test/resources/env-batch.json
@@ -20,15 +20,6 @@
         "max.persist.lines": 10000,
         "max.lines.per.file": 10000
       }
-    },
-    {
-      "type": "http",
-      "config": {
-        "method": "post",
-        "api": "http://10.148.181.248:39200/griffin/accuracy",
-        "over.time": "1m",
-        "retry": 10
-      }
     }
   ],