You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2018/08/10 10:16:39 UTC

[2/3] incubator-griffin git commit: Modify measure module to support updated env and config json files format

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheInZK.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheInZK.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheInZK.scala
deleted file mode 100644
index 9e020c2..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheInZK.scala
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.streaming.offset
-
-import org.apache.curator.framework.imps.CuratorFrameworkState
-import org.apache.curator.framework.recipes.locks.InterProcessMutex
-import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
-import org.apache.curator.retry.ExponentialBackoffRetry
-import org.apache.curator.utils.ZKPaths
-import org.apache.griffin.measure.context.streaming.lock.CacheLockInZK
-import org.apache.zookeeper.CreateMode
-
-import scala.collection.JavaConverters._
-
-/**
-  * leverage zookeeper for info cache
-  * @param config
-  * @param metricName
-  */
-case class OffsetCacheInZK(config: Map[String, Any], metricName: String) extends OffsetCache with OffsetOps {
-
-  val Hosts = "hosts"
-  val Namespace = "namespace"
-  val Mode = "mode"
-  val InitClear = "init.clear"
-  val CloseClear = "close.clear"
-  val LockPath = "lock.path"
-
-  val PersistRegex = """^(?i)persist$""".r
-  val EphemeralRegex = """^(?i)ephemeral$""".r
-
-  final val separator = ZKPaths.PATH_SEPARATOR
-
-  val hosts = config.getOrElse(Hosts, "").toString
-  val namespace = config.getOrElse(Namespace, "").toString
-  val mode: CreateMode = config.get(Mode) match {
-    case Some(s: String) => s match {
-      case PersistRegex() => CreateMode.PERSISTENT
-      case EphemeralRegex() => CreateMode.EPHEMERAL
-      case _ => CreateMode.PERSISTENT
-    }
-    case _ => CreateMode.PERSISTENT
-  }
-  val initClear = config.get(InitClear) match {
-    case Some(b: Boolean) => b
-    case _ => true
-  }
-  val closeClear = config.get(CloseClear) match {
-    case Some(b: Boolean) => b
-    case _ => false
-  }
-  val lockPath = config.getOrElse(LockPath, "lock").toString
-
-  private val cacheNamespace: String = if (namespace.isEmpty) metricName else namespace + separator + metricName
-  private val builder = CuratorFrameworkFactory.builder()
-    .connectString(hosts)
-    .retryPolicy(new ExponentialBackoffRetry(1000, 3))
-    .namespace(cacheNamespace)
-  private val client: CuratorFramework = builder.build
-
-  def init(): Unit = {
-    client.start()
-    info("start zk info cache")
-    client.usingNamespace(cacheNamespace)
-    info(s"init with namespace: ${cacheNamespace}")
-    delete(lockPath :: Nil)
-    if (initClear) {
-      clear
-    }
-  }
-
-  def available(): Boolean = {
-    client.getState match {
-      case CuratorFrameworkState.STARTED => true
-      case _ => false
-    }
-  }
-
-  def close(): Unit = {
-    if (closeClear) {
-      clear
-    }
-    info("close zk info cache")
-    client.close()
-  }
-
-  def cache(kvs: Map[String, String]): Unit = {
-    kvs.foreach(kv => createOrUpdate(path(kv._1), kv._2))
-  }
-
-  def read(keys: Iterable[String]): Map[String, String] = {
-    keys.flatMap { key =>
-      read(path(key)) match {
-        case Some(v) => Some((key, v))
-        case _ => None
-      }
-    }.toMap
-  }
-
-  def delete(keys: Iterable[String]): Unit = {
-    keys.foreach { key => delete(path(key)) }
-  }
-
-  def clear(): Unit = {
-//    delete("/")
-    delete(finalCacheInfoPath :: Nil)
-    delete(infoPath :: Nil)
-    info("clear info")
-  }
-
-  def listKeys(p: String): List[String] = {
-    children(path(p))
-  }
-
-  def genLock(s: String): CacheLockInZK = {
-    val lpt = if (s.isEmpty) path(lockPath) else path(lockPath) + separator + s
-    CacheLockInZK(new InterProcessMutex(client, lpt))
-  }
-
-  private def path(k: String): String = {
-    if (k.startsWith(separator)) k else separator + k
-  }
-
-  private def children(path: String): List[String] = {
-    try {
-      client.getChildren().forPath(path).asScala.toList
-    } catch {
-      case e: Throwable => {
-        warn(s"list ${path} warn: ${e.getMessage}")
-        Nil
-      }
-    }
-  }
-
-  private def createOrUpdate(path: String, content: String): Boolean = {
-    if (checkExists(path)) {
-      update(path, content)
-    } else {
-      create(path, content)
-    }
-  }
-
-  private def create(path: String, content: String): Boolean = {
-    try {
-      client.create().creatingParentsIfNeeded().withMode(mode)
-        .forPath(path, content.getBytes("utf-8"))
-      true
-    } catch {
-      case e: Throwable => {
-        error(s"create ( ${path} -> ${content} ) error: ${e.getMessage}")
-        false
-      }
-    }
-  }
-
-  private def update(path: String, content: String): Boolean = {
-    try {
-      client.setData().forPath(path, content.getBytes("utf-8"))
-      true
-    } catch {
-      case e: Throwable => {
-        error(s"update ( ${path} -> ${content} ) error: ${e.getMessage}")
-        false
-      }
-    }
-  }
-
-  private def read(path: String): Option[String] = {
-    try {
-      Some(new String(client.getData().forPath(path), "utf-8"))
-    } catch {
-      case e: Throwable => {
-        warn(s"read ${path} warn: ${e.getMessage}")
-        None
-      }
-    }
-  }
-
-  private def delete(path: String): Unit = {
-    try {
-      client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path)
-    } catch {
-      case e: Throwable => error(s"delete ${path} error: ${e.getMessage}")
-    }
-  }
-
-  private def checkExists(path: String): Boolean = {
-    try {
-      client.checkExists().forPath(path) != null
-    } catch {
-      case e: Throwable => {
-        warn(s"check exists ${path} warn: ${e.getMessage}")
-        false
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetOps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetOps.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetOps.scala
deleted file mode 100644
index c8763ec..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetOps.scala
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.streaming.offset
-
-trait OffsetOps extends Serializable { this: OffsetCache =>
-
-  val CacheTime = "cache.time"
-  val LastProcTime = "last.proc.time"
-  val ReadyTime = "ready.time"
-  val CleanTime = "clean.time"
-  val OldCacheIndex = "old.cache.index"
-
-  def cacheTime(path: String): String = s"${path}/${CacheTime}"
-  def lastProcTime(path: String): String = s"${path}/${LastProcTime}"
-  def readyTime(path: String): String = s"${path}/${ReadyTime}"
-  def cleanTime(path: String): String = s"${path}/${CleanTime}"
-  def oldCacheIndex(path: String): String = s"${path}/${OldCacheIndex}"
-
-  val infoPath = "info"
-
-  val finalCacheInfoPath = "info.final"
-  val finalReadyTime = s"${finalCacheInfoPath}/${ReadyTime}"
-  val finalLastProcTime = s"${finalCacheInfoPath}/${LastProcTime}"
-  val finalCleanTime = s"${finalCacheInfoPath}/${CleanTime}"
-
-  def startOffsetCache(): Unit = {
-    genFinalReadyTime
-  }
-
-  def getTimeRange(): (Long, Long) = {
-    readTimeRange
-  }
-
-  def getCleanTime(): Long = {
-    readCleanTime
-  }
-
-  def endOffsetCache: Unit = {
-    genFinalLastProcTime
-    genFinalCleanTime
-  }
-
-  private def genFinalReadyTime(): Unit = {
-    val subPath = listKeys(infoPath)
-    val keys = subPath.map { p => s"${infoPath}/${p}/${ReadyTime}" }
-    val result = read(keys)
-    val times = keys.flatMap { k =>
-      getLongOpt(result, k)
-    }
-    if (times.nonEmpty) {
-      val time = times.min
-      val map = Map[String, String]((finalReadyTime -> time.toString))
-      cache(map)
-    }
-  }
-
-  private def genFinalLastProcTime(): Unit = {
-    val subPath = listKeys(infoPath)
-    val keys = subPath.map { p => s"${infoPath}/${p}/${LastProcTime}" }
-    val result = read(keys)
-    val times = keys.flatMap { k =>
-      getLongOpt(result, k)
-    }
-    if (times.nonEmpty) {
-      val time = times.min
-      val map = Map[String, String]((finalLastProcTime -> time.toString))
-      cache(map)
-    }
-  }
-
-  private def genFinalCleanTime(): Unit = {
-    val subPath = listKeys(infoPath)
-    val keys = subPath.map { p => s"${infoPath}/${p}/${CleanTime}" }
-    val result = read(keys)
-    val times = keys.flatMap { k =>
-      getLongOpt(result, k)
-    }
-    if (times.nonEmpty) {
-      val time = times.min
-      val map = Map[String, String]((finalCleanTime -> time.toString))
-      cache(map)
-    }
-  }
-
-  private def readTimeRange(): (Long, Long) = {
-    val map = read(List(finalLastProcTime, finalReadyTime))
-    val lastProcTime = getLong(map, finalLastProcTime)
-    val curReadyTime = getLong(map, finalReadyTime)
-    (lastProcTime, curReadyTime)
-  }
-
-  private def readCleanTime(): Long = {
-    val map = read(List(finalCleanTime))
-    val cleanTime = getLong(map, finalCleanTime)
-    cleanTime
-  }
-
-  private def getLongOpt(map: Map[String, String], key: String): Option[Long] = {
-    try {
-      map.get(key).map(_.toLong)
-    } catch {
-      case e: Throwable => None
-    }
-  }
-  private def getLong(map: Map[String, String], key: String) = {
-    getLongOpt(map, key).getOrElse(-1L)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
index 8bb0735..6bf6373 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
@@ -39,6 +39,8 @@ case class DataSource(name: String,
                       streamingCacheClientOpt: Option[StreamingCacheClient]
                      ) extends Loggable with Serializable {
 
+  val isBaseline: Boolean = dsParam.isBaseline
+
   def init(): Unit = {
     dataConnectors.foreach(_.init)
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
index f48185e..7807dfd 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
@@ -50,7 +50,7 @@ object DataSourceFactory extends Loggable {
 
     // for streaming data cache
     val streamingCacheClientOpt = StreamingCacheClientFactory.getClientOpt(
-      sparkSession.sqlContext, dataSourceParam.getCacheOpt, name, index, timestampStorage)
+      sparkSession.sqlContext, dataSourceParam.getCheckpointOpt, name, index, timestampStorage)
 
     val dataConnectors: Seq[DataConnector] = connectorParams.flatMap { connectorParam =>
       DataConnectorFactory.getDataConnector(sparkSession, ssc, connectorParam,

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
index cef5c53..0ebe6ba 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit
 
 import org.apache.griffin.measure.Loggable
 import org.apache.griffin.measure.context.TimeRange
-import org.apache.griffin.measure.context.streaming.offset.OffsetCacheClient
+import org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointClient
 import org.apache.griffin.measure.datasource.TimestampStorage
 import org.apache.griffin.measure.step.builder.ConstantColumns
 import org.apache.griffin.measure.utils.DataFrameUtil._
@@ -89,8 +89,8 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
   val _Updatable = "updatable"
   val updatable = param.getBoolean(_Updatable, false)
 
-  val newCacheLock = OffsetCacheClient.genLock(s"${cacheInfoPath}.new")
-  val oldCacheLock = OffsetCacheClient.genLock(s"${cacheInfoPath}.old")
+  val newCacheLock = OffsetCheckpointClient.genLock(s"${cacheInfoPath}.new")
+  val oldCacheLock = OffsetCheckpointClient.genLock(s"${cacheInfoPath}.old")
 
   val newFilePath = s"${filePath}/new"
   val oldFilePath = s"${filePath}/old"
@@ -155,7 +155,7 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
     */
   def readData(): (Option[DataFrame], TimeRange) = {
     // time range: (a, b]
-    val timeRange = OffsetCacheClient.getTimeRange
+    val timeRange = OffsetCheckpointClient.getTimeRange
     val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + deltaTimeRange._2)
 
     // read partition info
@@ -349,7 +349,7 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
     */
   def processFinish(): Unit = {
     // next last proc time
-    val timeRange = OffsetCacheClient.getTimeRange
+    val timeRange = OffsetCheckpointClient.getTimeRange
     submitLastProcTime(timeRange._2)
 
     // next clean time
@@ -359,7 +359,7 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
 
   // read next clean time
   private def getNextCleanTime(): Long = {
-    val timeRange = OffsetCacheClient.getTimeRange
+    val timeRange = OffsetCheckpointClient.getTimeRange
     val nextCleanTime = timeRange._2 + deltaTimeRange._1
     nextCleanTime
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala
index 17b15e9..f991e2d 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala
@@ -36,17 +36,17 @@ object StreamingCacheClientFactory extends Loggable {
 
   /**
     * create streaming cache client
-    * @param sqlContext   sqlContext in spark environment
-    * @param cacheOpt     data source cache config option
-    * @param name         data source name
-    * @param index        data source index
-    * @param tmstCache    the same tmstCache instance inside a data source
-    * @return             streaming cache client option
+    * @param sqlContext     sqlContext in spark environment
+    * @param checkpointOpt  data source checkpoint/cache config option
+    * @param name           data source name
+    * @param index          data source index
+    * @param tmstCache      the same tmstCache instance inside a data source
+    * @return               streaming cache client option
     */
-  def getClientOpt(sqlContext: SQLContext, cacheOpt: Option[Map[String, Any]],
+  def getClientOpt(sqlContext: SQLContext, checkpointOpt: Option[Map[String, Any]],
                    name: String, index: Int, tmstCache: TimestampStorage
                   ): Option[StreamingCacheClient] = {
-    cacheOpt.flatMap { param =>
+    checkpointOpt.flatMap { param =>
       try {
         val tp = param.getString(_type, "")
         val dsCache = tp match {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala
index 52c1650..7b7f506 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala
@@ -19,7 +19,7 @@ under the License.
 package org.apache.griffin.measure.datasource.cache
 
 import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.context.streaming.offset.OffsetCacheClient
+import org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointClient
 
 /**
   * timestamp offset of streaming data source cache
@@ -30,30 +30,30 @@ trait StreamingOffsetCacheable extends Loggable with Serializable {
   val readyTimeInterval: Long
   val readyTimeDelay: Long
 
-  def selfCacheInfoPath = s"${OffsetCacheClient.infoPath}/${cacheInfoPath}"
+  def selfCacheInfoPath = s"${OffsetCheckpointClient.infoPath}/${cacheInfoPath}"
 
-  def selfCacheTime = OffsetCacheClient.cacheTime(selfCacheInfoPath)
-  def selfLastProcTime = OffsetCacheClient.lastProcTime(selfCacheInfoPath)
-  def selfReadyTime = OffsetCacheClient.readyTime(selfCacheInfoPath)
-  def selfCleanTime = OffsetCacheClient.cleanTime(selfCacheInfoPath)
-  def selfOldCacheIndex = OffsetCacheClient.oldCacheIndex(selfCacheInfoPath)
+  def selfCacheTime = OffsetCheckpointClient.cacheTime(selfCacheInfoPath)
+  def selfLastProcTime = OffsetCheckpointClient.lastProcTime(selfCacheInfoPath)
+  def selfReadyTime = OffsetCheckpointClient.readyTime(selfCacheInfoPath)
+  def selfCleanTime = OffsetCheckpointClient.cleanTime(selfCacheInfoPath)
+  def selfOldCacheIndex = OffsetCheckpointClient.oldCacheIndex(selfCacheInfoPath)
 
   protected def submitCacheTime(ms: Long): Unit = {
     val map = Map[String, String]((selfCacheTime -> ms.toString))
-    OffsetCacheClient.cache(map)
+    OffsetCheckpointClient.cache(map)
   }
 
   protected def submitReadyTime(ms: Long): Unit = {
     val curReadyTime = ms - readyTimeDelay
     if (curReadyTime % readyTimeInterval == 0) {
       val map = Map[String, String]((selfReadyTime -> curReadyTime.toString))
-      OffsetCacheClient.cache(map)
+      OffsetCheckpointClient.cache(map)
     }
   }
 
   protected def submitLastProcTime(ms: Long): Unit = {
     val map = Map[String, String]((selfLastProcTime -> ms.toString))
-    OffsetCacheClient.cache(map)
+    OffsetCheckpointClient.cache(map)
   }
 
   protected def readLastProcTime(): Option[Long] = readSelfInfo(selfLastProcTime)
@@ -61,7 +61,7 @@ trait StreamingOffsetCacheable extends Loggable with Serializable {
   protected def submitCleanTime(ms: Long): Unit = {
     val cleanTime = genCleanTime(ms)
     val map = Map[String, String]((selfCleanTime -> cleanTime.toString))
-    OffsetCacheClient.cache(map)
+    OffsetCheckpointClient.cache(map)
   }
 
   protected def genCleanTime(ms: Long): Long = ms
@@ -70,13 +70,13 @@ trait StreamingOffsetCacheable extends Loggable with Serializable {
 
   protected def submitOldCacheIndex(index: Long): Unit = {
     val map = Map[String, String]((selfOldCacheIndex -> index.toString))
-    OffsetCacheClient.cache(map)
+    OffsetCheckpointClient.cache(map)
   }
 
   def readOldCacheIndex(): Option[Long] = readSelfInfo(selfOldCacheIndex)
 
   private def readSelfInfo(key: String): Option[Long] = {
-    OffsetCacheClient.read(key :: Nil).get(key).flatMap { v =>
+    OffsetCheckpointClient.read(key :: Nil).get(key).flatMap { v =>
       try {
         Some(v.toLong)
       } catch {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
index fc09661..caea078 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
@@ -21,13 +21,13 @@ package org.apache.griffin.measure.datasource.connector
 import java.util.concurrent.atomic.AtomicLong
 
 import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.configuration.enums.{BatchProcessType, DslType, SparkSqlType}
+import org.apache.griffin.measure.configuration.enums.BatchProcessType
 import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
 import org.apache.griffin.measure.context.{ContextId, DQContext, TimeRange}
 import org.apache.griffin.measure.datasource.TimestampStorage
 import org.apache.griffin.measure.job.builder.DQJobBuilder
 import org.apache.griffin.measure.step.builder.ConstantColumns
-import org.apache.griffin.measure.step.builder.preproc.PreProcRuleParamGenerator
+import org.apache.griffin.measure.step.builder.preproc.PreProcParamMaker
 import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spark.sql.functions._
 
@@ -38,7 +38,6 @@ trait DataConnector extends Loggable with Serializable {
   val dcParam: DataConnectorParam
 
   val id: String = DataConnectorIdGenerator.genId
-  protected def thisName(suffix: String): String = s"this_${suffix}"
 
   val timestampStorage: TimestampStorage
   protected def saveTmst(t: Long) = timestampStorage.insert(t)
@@ -59,13 +58,13 @@ trait DataConnector extends Loggable with Serializable {
 
     val timestamp = context.contextId.timestamp
     val suffix = context.contextId.id
-    val thisTable = thisName(suffix)
+    val dcDfName = dcParam.getDataFrameName("this")
 
     try {
       saveTmst(timestamp)    // save timestamp
 
       dfOpt.flatMap { df =>
-        val preProcRules = PreProcRuleParamGenerator.getNewPreProcRules(dcParam.getPreProcRules, suffix)
+        val (preProcRules, thisTable) = PreProcParamMaker.makePreProcRules(dcParam.getPreProcRules, suffix, dcDfName)
 
         // init data
         context.compileTableRegister.registerTable(thisTable)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
index 8860408..3b19892 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
@@ -19,7 +19,7 @@ under the License.
 package org.apache.griffin.measure.launch
 
 import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.configuration.dqdefinition.{DQConfig, EnvConfig}
+import org.apache.griffin.measure.configuration.dqdefinition.{DQConfig, EnvConfig, SinkParam}
 
 import scala.util.Try
 
@@ -54,4 +54,11 @@ trait DQApp extends Loggable with Serializable {
     }
   }
 
+  protected def getSinkParams: Seq[SinkParam] = {
+    val validSinkTypes = dqParam.getValidSinkTypes
+    envParam.getSinkParams.flatMap { sinkParam =>
+      if (validSinkTypes.contains(sinkParam.getType)) Some(sinkParam) else None
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
index a651f2e..8733789 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
@@ -41,7 +41,7 @@ case class BatchDQApp(allParam: GriffinConfig) extends DQApp {
   val metricName = dqParam.name
 //  val dataSourceParams = dqParam.dataSources
 //  val dataSourceNames = dataSourceParams.map(_.name)
-  val persistParams = envParam.persistParams
+  val sinkParams = getSinkParams
 
   var sqlContext: SQLContext = _
 
@@ -75,12 +75,12 @@ case class BatchDQApp(allParam: GriffinConfig) extends DQApp {
 
     // create dq context
     val dqContext: DQContext = DQContext(
-      contextId, metricName, dataSources, persistParams, BatchProcessType
+      contextId, metricName, dataSources, sinkParams, BatchProcessType
     )(sparkSession)
 
     // start id
     val applicationId = sparkSession.sparkContext.applicationId
-    dqContext.getPersist().start(applicationId)
+    dqContext.getSink().start(applicationId)
 
     // build job
     val dqJob = DQJobBuilder.buildDQJob(dqContext, dqParam.evaluateRule)
@@ -90,13 +90,13 @@ case class BatchDQApp(allParam: GriffinConfig) extends DQApp {
 
     // end time
     val endTime = new Date().getTime
-    dqContext.getPersist().log(endTime, s"process using time: ${endTime - startTime} ms")
+    dqContext.getSink().log(endTime, s"process using time: ${endTime - startTime} ms")
 
     // clean context
     dqContext.clean()
 
     // finish
-    dqContext.getPersist().finish()
+    dqContext.getSink().finish()
   }
 
   def close: Try[_] = Try {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
index 126bedd..1768ae2 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
@@ -25,8 +25,8 @@ import org.apache.griffin.measure.Loggable
 import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.configuration.dqdefinition._
 import org.apache.griffin.measure.context._
+import org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointClient
 import org.apache.griffin.measure.datasource.DataSourceFactory
-import org.apache.griffin.measure.context.streaming.offset.OffsetCacheClient
 import org.apache.griffin.measure.context.streaming.metric.CacheResults
 import org.apache.griffin.measure.job.builder.DQJobBuilder
 import org.apache.griffin.measure.launch.DQApp
@@ -47,7 +47,7 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
   val metricName = dqParam.name
 //  val dataSourceParams = dqParam.dataSources
 //  val dataSourceNames = dataSourceParams.map(_.name)
-  val persistParams = envParam.persistParams
+  val sinkParams = getSinkParams
 
   var sqlContext: SQLContext = _
 
@@ -68,8 +68,8 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
     clearCpDir
 
     // init info cache instance
-    OffsetCacheClient.initClient(envParam.offsetCacheParams, metricName)
-    OffsetCacheClient.init
+    OffsetCheckpointClient.initClient(envParam.checkpointParams, metricName)
+    OffsetCheckpointClient.init
 
     // register udf
     GriffinUDFAgent.register(sqlContext)
@@ -99,12 +99,12 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
 
     // create dq context
     val globalContext: DQContext = DQContext(
-      contextId, metricName, dataSources, persistParams, StreamingProcessType
+      contextId, metricName, dataSources, sinkParams, StreamingProcessType
     )(sparkSession)
 
     // start id
     val applicationId = sparkSession.sparkContext.applicationId
-    globalContext.getPersist().start(applicationId)
+    globalContext.getSink().start(applicationId)
 
     // process thread
     val dqCalculator = StreamingDQCalculator(globalContext, dqParam.evaluateRule)
@@ -124,7 +124,7 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
     globalContext.clean()
 
     // finish
-    globalContext.getPersist().finish()
+    globalContext.getSink().finish()
 
   }
 
@@ -163,8 +163,8 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
                                    evaluateRuleParam: EvaluateRuleParam
                                   ) extends Runnable with Loggable {
 
-    val lock = OffsetCacheClient.genLock("process")
-    val appPersist = globalContext.getPersist()
+    val lock = OffsetCheckpointClient.genLock("process")
+    val appSink = globalContext.getSink()
 
     def run(): Unit = {
       val updateTimeDate = new Date()
@@ -174,10 +174,10 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
       if (locked) {
         try {
 
-          OffsetCacheClient.startOffsetCache
+          OffsetCheckpointClient.startOffsetCheckpoint
 
           val startTime = new Date().getTime
-          appPersist.log(startTime, "starting process ...")
+          appSink.log(startTime, "starting process ...")
           val contextId = ContextId(startTime)
 
           // create dq context
@@ -194,9 +194,9 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
 
           // end time
           val endTime = new Date().getTime
-          appPersist.log(endTime, s"process using time: ${endTime - startTime} ms")
+          appSink.log(endTime, s"process using time: ${endTime - startTime} ms")
 
-          OffsetCacheClient.endOffsetCache
+          OffsetCheckpointClient.endOffsetCheckpoint
 
           // clean old data
           cleanData(dqContext)
@@ -225,7 +225,7 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
 
         context.clean()
 
-        val cleanTime = OffsetCacheClient.getCleanTime
+        val cleanTime = OffsetCheckpointClient.getCleanTime
         CacheResults.refresh(cleanTime)
       } catch {
         case e: Throwable => error(s"clean data error: ${e.getMessage}")

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
index 9e0acd8..306befe 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
@@ -23,7 +23,7 @@ import org.apache.griffin.measure.utils.ParamUtil._
 import org.apache.spark.rdd.RDD
 
 /**
-  * persist metric and record to console, for debug
+  * sink metric and record to console, for debug
   */
 case class ConsoleSink(config: Map[String, Any], metricName: String, timeStamp: Long) extends Sink {
 
@@ -46,7 +46,7 @@ case class ConsoleSink(config: Map[String, Any], metricName: String, timeStamp:
     println(s"[${timeStamp}] ${rt}: ${msg}")
   }
 
-  def persistRecords(records: RDD[String], name: String): Unit = {
+  def sinkRecords(records: RDD[String], name: String): Unit = {
 //    println(s"${metricName} [${timeStamp}] records: ")
 //    try {
 //      val recordCount = records.count
@@ -61,7 +61,7 @@ case class ConsoleSink(config: Map[String, Any], metricName: String, timeStamp:
 //    }
   }
 
-  def persistRecords(records: Iterable[String], name: String): Unit = {
+  def sinkRecords(records: Iterable[String], name: String): Unit = {
 //    println(s"${metricName} [${timeStamp}] records: ")
 //    try {
 //      val recordCount = records.size
@@ -74,7 +74,7 @@ case class ConsoleSink(config: Map[String, Any], metricName: String, timeStamp:
 //    }
   }
 
-  def persistMetrics(metrics: Map[String, Any]): Unit = {
+  def sinkMetrics(metrics: Map[String, Any]): Unit = {
     println(s"${metricName} [${timeStamp}] metrics: ")
     val json = JsonUtil.toJson(metrics)
     println(json)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
new file mode 100644
index 0000000..e5f72d1
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
@@ -0,0 +1,81 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.sink
+
+import org.apache.griffin.measure.utils.ParamUtil._
+import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil, TimeUtil}
+import org.apache.spark.rdd.RDD
+
+import scala.concurrent.Future
+
+/**
+  * sink metric and record through http request
+  */
+case class ElasticSearchSink(config: Map[String, Any], metricName: String,
+                             timeStamp: Long, block: Boolean
+                            ) extends Sink {
+
+  val Api = "api"
+  val Method = "method"
+  val ConnectionTimeout = "connection.timeout"
+  val Retry = "retry"
+
+  val api = config.getString(Api, "")
+  val method = config.getString(Method, "post")
+  val connectionTimeout = TimeUtil.milliseconds(config.getString(ConnectionTimeout, "")).getOrElse(-1L)
+  val retry = config.getInt(Retry, 10)
+
+  val _Value = "value"
+
+  def available(): Boolean = {
+    api.nonEmpty
+  }
+
+  def start(msg: String): Unit = {}
+  def finish(): Unit = {}
+
+  private def httpResult(dataMap: Map[String, Any]) = {
+    try {
+      val data = JsonUtil.toJson(dataMap)
+      // http request
+      val params = Map[String, Object]()
+      val header = Map[String, Object](("Content-Type","application/json"))
+
+      def func(): (Long, Future[Boolean]) = {
+        import scala.concurrent.ExecutionContext.Implicits.global
+        (timeStamp, Future(HttpUtil.httpRequest(api, method, params, header, data)))
+      }
+      if (block) SinkTaskRunner.addBlockTask(func _, retry, connectionTimeout)
+      else SinkTaskRunner.addNonBlockTask(func _, retry)
+    } catch {
+      case e: Throwable => error(e.getMessage)
+    }
+
+  }
+
+  def log(rt: Long, msg: String): Unit = {}
+
+  def sinkRecords(records: RDD[String], name: String): Unit = {}
+  def sinkRecords(records: Iterable[String], name: String): Unit = {}
+
+  def sinkMetrics(metrics: Map[String, Any]): Unit = {
+    httpResult(metrics)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala
index 5608e7d..718f1c1 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala
@@ -25,7 +25,7 @@ import org.apache.griffin.measure.utils.{HdfsUtil, JsonUtil}
 import org.apache.spark.rdd.RDD
 
 /**
-  * persist metric and record to hdfs
+  * sink metric and record to hdfs
   */
 case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Long) extends Sink {
 
@@ -111,7 +111,7 @@ case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Lon
     HdfsUtil.deleteHdfsPath(path)
   }
 
-  def persistRecords(records: RDD[String], name: String): Unit = {
+  def sinkRecords(records: RDD[String], name: String): Unit = {
     val path = filePath(name)
     clearOldRecords(path)
     try {
@@ -121,7 +121,7 @@ case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Lon
         val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt
         if (groupCount <= 1) {
           val recs = records.take(count.toInt)
-          persistRecords2Hdfs(path, recs)
+          sinkRecords2Hdfs(path, recs)
         } else {
           val groupedRecords: RDD[(Long, Iterable[String])] =
             records.zipWithIndex.flatMap { r =>
@@ -131,7 +131,7 @@ case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Lon
           groupedRecords.foreach { group =>
             val (gid, recs) = group
             val hdfsPath = if (gid == 0) path else withSuffix(path, gid.toString)
-            persistRecords2Hdfs(hdfsPath, recs)
+            sinkRecords2Hdfs(hdfsPath, recs)
           }
         }
       }
@@ -140,7 +140,7 @@ case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Lon
     }
   }
 
-  def persistRecords(records: Iterable[String], name: String): Unit = {
+  def sinkRecords(records: Iterable[String], name: String): Unit = {
     val path = filePath(name)
     clearOldRecords(path)
     try {
@@ -150,13 +150,13 @@ case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Lon
         val groupCount = (count - 1) / maxLinesPerFile + 1
         if (groupCount <= 1) {
           val recs = records.take(count.toInt)
-          persistRecords2Hdfs(path, recs)
+          sinkRecords2Hdfs(path, recs)
         } else {
           val groupedRecords = records.grouped(maxLinesPerFile).zipWithIndex
           groupedRecords.take(groupCount).foreach { group =>
             val (recs, gid) = group
             val hdfsPath = getHdfsPath(path, gid)
-            persistRecords2Hdfs(hdfsPath, recs)
+            sinkRecords2Hdfs(hdfsPath, recs)
           }
         }
       }
@@ -165,16 +165,16 @@ case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Lon
     }
   }
 
-  def persistMetrics(metrics: Map[String, Any]): Unit = {
+  def sinkMetrics(metrics: Map[String, Any]): Unit = {
     try {
       val json = JsonUtil.toJson(metrics)
-      persistRecords2Hdfs(MetricsFile, json :: Nil)
+      sinkRecords2Hdfs(MetricsFile, json :: Nil)
     } catch {
       case e: Throwable => error(e.getMessage)
     }
   }
 
-  private def persistRecords2Hdfs(hdfsPath: String, records: Iterable[String]): Unit = {
+  private def sinkRecords2Hdfs(hdfsPath: String, records: Iterable[String]): Unit = {
     try {
       val recStr = records.mkString("\n")
       HdfsUtil.writeContent(hdfsPath, recStr)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/sink/HttpSink.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/HttpSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/HttpSink.scala
deleted file mode 100644
index 63677c5..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/HttpSink.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.sink
-
-import org.apache.griffin.measure.utils.ParamUtil._
-import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil, TimeUtil}
-import org.apache.spark.rdd.RDD
-
-import scala.concurrent.Future
-
-/**
-  * persist metric and record through http request
-  */
-case class HttpSink(config: Map[String, Any], metricName: String,
-                    timeStamp: Long, block: Boolean
-                      ) extends Sink {
-
-  val Api = "api"
-  val Method = "method"
-  val OverTime = "over.time"
-  val Retry = "retry"
-
-  val api = config.getString(Api, "")
-  val method = config.getString(Method, "post")
-  val overTime = TimeUtil.milliseconds(config.getString(OverTime, "")).getOrElse(-1L)
-  val retry = config.getInt(Retry, 10)
-
-  val _Value = "value"
-
-  def available(): Boolean = {
-    api.nonEmpty
-  }
-
-  def start(msg: String): Unit = {}
-  def finish(): Unit = {}
-
-  private def httpResult(dataMap: Map[String, Any]) = {
-    try {
-      val data = JsonUtil.toJson(dataMap)
-      // http request
-      val params = Map[String, Object]()
-      val header = Map[String, Object](("Content-Type","application/json"))
-
-      def func(): (Long, Future[Boolean]) = {
-        import scala.concurrent.ExecutionContext.Implicits.global
-        (timeStamp, Future(HttpUtil.httpRequest(api, method, params, header, data)))
-      }
-      if (block) SinkTaskRunner.addBlockTask(func _, retry, overTime)
-      else SinkTaskRunner.addNonBlockTask(func _, retry)
-    } catch {
-      case e: Throwable => error(e.getMessage)
-    }
-
-  }
-
-  def log(rt: Long, msg: String): Unit = {}
-
-  def persistRecords(records: RDD[String], name: String): Unit = {}
-  def persistRecords(records: Iterable[String], name: String): Unit = {}
-
-  def persistMetrics(metrics: Map[String, Any]): Unit = {
-    httpResult(metrics)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala
index 6eb55e5..206f187 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala
@@ -28,7 +28,7 @@ import org.mongodb.scala.result.UpdateResult
 import scala.concurrent.Future
 
 /**
-  * persist metric and record to mongo
+  * sink metric and record to mongo
   */
 case class MongoSink(config: Map[String, Any], metricName: String,
                      timeStamp: Long, block: Boolean
@@ -53,10 +53,10 @@ case class MongoSink(config: Map[String, Any], metricName: String,
 
   def log(rt: Long, msg: String): Unit = {}
 
-  def persistRecords(records: RDD[String], name: String): Unit = {}
-  def persistRecords(records: Iterable[String], name: String): Unit = {}
+  def sinkRecords(records: RDD[String], name: String): Unit = {}
+  def sinkRecords(records: Iterable[String], name: String): Unit = {}
 
-  def persistMetrics(metrics: Map[String, Any]): Unit = {
+  def sinkMetrics(metrics: Map[String, Any]): Unit = {
     mongoInsert(metrics)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/sink/MultiSinks.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/MultiSinks.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/MultiSinks.scala
index ede52cc..cca1ff9 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/MultiSinks.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/MultiSinks.scala
@@ -21,13 +21,13 @@ package org.apache.griffin.measure.sink
 import org.apache.spark.rdd.RDD
 
 /**
-  * persist metric and record in multiple ways
+  * sink metric and record in multiple ways
   */
-case class MultiSinks(persists: Iterable[Sink]) extends Sink {
+case class MultiSinks(sinks: Iterable[Sink]) extends Sink {
 
   val block: Boolean = false
 
-  val headSinkOpt: Option[Sink] = persists.headOption
+  val headSinkOpt: Option[Sink] = sinks.headOption
 
   val metricName: String = headSinkOpt.map(_.metricName).getOrElse("")
 
@@ -35,45 +35,45 @@ case class MultiSinks(persists: Iterable[Sink]) extends Sink {
 
   val config: Map[String, Any] = Map[String, Any]()
 
-  def available(): Boolean = { persists.exists(_.available()) }
+  def available(): Boolean = { sinks.exists(_.available()) }
 
-  def start(msg: String): Unit = { persists.foreach(_.start(msg)) }
-  def finish(): Unit = { persists.foreach(_.finish()) }
+  def start(msg: String): Unit = { sinks.foreach(_.start(msg)) }
+  def finish(): Unit = { sinks.foreach(_.finish()) }
 
   def log(rt: Long, msg: String): Unit = {
-    persists.foreach { persist =>
+    sinks.foreach { sink =>
       try {
-        persist.log(rt, msg)
+        sink.log(rt, msg)
       } catch {
         case e: Throwable => error(s"log error: ${e.getMessage}")
       }
     }
   }
 
-  def persistRecords(records: RDD[String], name: String): Unit = {
-    persists.foreach { persist =>
+  def sinkRecords(records: RDD[String], name: String): Unit = {
+    sinks.foreach { sink =>
       try {
-        persist.persistRecords(records, name)
+        sink.sinkRecords(records, name)
       } catch {
-        case e: Throwable => error(s"persist records error: ${e.getMessage}")
+        case e: Throwable => error(s"sink records error: ${e.getMessage}")
       }
     }
   }
-  def persistRecords(records: Iterable[String], name: String): Unit = {
-    persists.foreach { persist =>
+  def sinkRecords(records: Iterable[String], name: String): Unit = {
+    sinks.foreach { sink =>
       try {
-        persist.persistRecords(records, name)
+        sink.sinkRecords(records, name)
       } catch {
-        case e: Throwable => error(s"persist records error: ${e.getMessage}")
+        case e: Throwable => error(s"sink records error: ${e.getMessage}")
       }
     }
   }
-  def persistMetrics(metrics: Map[String, Any]): Unit = {
-    persists.foreach { persist =>
+  def sinkMetrics(metrics: Map[String, Any]): Unit = {
+    sinks.foreach { sink =>
       try {
-        persist.persistMetrics(metrics)
+        sink.sinkMetrics(metrics)
       } catch {
-        case e: Throwable => error(s"persist metrics error: ${e.getMessage}")
+        case e: Throwable => error(s"sink metrics error: ${e.getMessage}")
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/sink/Sink.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/Sink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/Sink.scala
index f052ae1..0c03bd8 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/Sink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/Sink.scala
@@ -22,7 +22,7 @@ import org.apache.griffin.measure.Loggable
 import org.apache.spark.rdd.RDD
 
 /**
-  * persist metric and record
+  * sink metric and record
   */
 trait Sink extends Loggable with Serializable {
   val metricName: String
@@ -39,9 +39,9 @@ trait Sink extends Loggable with Serializable {
 
   def log(rt: Long, msg: String): Unit
 
-  def persistRecords(records: RDD[String], name: String): Unit
-  def persistRecords(records: Iterable[String], name: String): Unit
+  def sinkRecords(records: RDD[String], name: String): Unit
+  def sinkRecords(records: Iterable[String], name: String): Unit
 
-  def persistMetrics(metrics: Map[String, Any]): Unit
+  def sinkMetrics(metrics: Map[String, Any]): Unit
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
index 85bf1b1..26b0178 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
@@ -18,39 +18,36 @@ under the License.
 */
 package org.apache.griffin.measure.sink
 
-import org.apache.griffin.measure.configuration.dqdefinition.PersistParam
+import org.apache.griffin.measure.configuration.dqdefinition.SinkParam
+import org.apache.griffin.measure.configuration.enums._
 
 import scala.util.{Success, Try}
 
 
-case class SinkFactory(persistParams: Iterable[PersistParam], metricName: String) extends Serializable {
-
-  val HDFS_REGEX = """^(?i)hdfs$""".r
-  val HTTP_REGEX = """^(?i)http$""".r
-  val LOG_REGEX = """^(?i)log$""".r
-  val MONGO_REGEX = """^(?i)mongo$""".r
+case class SinkFactory(sinkParams: Iterable[SinkParam], metricName: String) extends Serializable {
 
   /**
-    * create persist
-    * @param timeStamp    the timestamp of persist
-    * @param block        persist write metric in block or non-block way
-    * @return   persist
+    * create sink
+    * @param timeStamp    the timestamp of sink
+    * @param block        sink write metric in block or non-block way
+    * @return   sink
     */
-  def getPersists(timeStamp: Long, block: Boolean): MultiSinks = {
-    MultiSinks(persistParams.flatMap(param => getPersist(timeStamp, param, block)))
+  def getSinks(timeStamp: Long, block: Boolean): MultiSinks = {
+    MultiSinks(sinkParams.flatMap(param => getSink(timeStamp, param, block)))
   }
 
-  private def getPersist(timeStamp: Long, persistParam: PersistParam, block: Boolean): Option[Sink] = {
-    val config = persistParam.getConfig
-    val persistTry = persistParam.getType match {
-      case LOG_REGEX() => Try(ConsoleSink(config, metricName, timeStamp))
-      case HDFS_REGEX() => Try(HdfsSink(config, metricName, timeStamp))
-      case HTTP_REGEX() => Try(HttpSink(config, metricName, timeStamp, block))
-      case MONGO_REGEX() => Try(MongoSink(config, metricName, timeStamp, block))
-      case _ => throw new Exception("not supported persist type")
+  private def getSink(timeStamp: Long, sinkParam: SinkParam, block: Boolean): Option[Sink] = {
+    val config = sinkParam.getConfig
+    val sinkType = sinkParam.getType
+    val sinkTry = sinkType match {
+      case ConsoleSinkType => Try(ConsoleSink(config, metricName, timeStamp))
+      case HdfsSinkType => Try(HdfsSink(config, metricName, timeStamp))
+      case ElasticsearchSinkType => Try(ElasticSearchSink(config, metricName, timeStamp, block))
+      case MongoSinkType => Try(MongoSink(config, metricName, timeStamp, block))
+      case _ => throw new Exception(s"sink type ${sinkType} is not supported!")
     }
-    persistTry match {
-      case Success(persist) if (persist.available) => Some(persist)
+    sinkTry match {
+      case Success(sink) if (sink.available) => Some(sink)
       case _ => None
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/sink/SinkTaskRunner.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkTaskRunner.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkTaskRunner.scala
index 79f0bb1..1cc3f3e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkTaskRunner.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkTaskRunner.scala
@@ -28,7 +28,7 @@ import scala.concurrent.duration._
 import scala.util.{Failure, Success}
 
 /**
-  * persist task runner, to persist metrics in block or non-block mode
+  * sink task runner, to sink metrics in block or non-block mode
   */
 object SinkTaskRunner extends Loggable {
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataFrameOpsDQStepBuilder.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataFrameOpsDQStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataFrameOpsDQStepBuilder.scala
index f945856..796c797 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataFrameOpsDQStepBuilder.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataFrameOpsDQStepBuilder.scala
@@ -26,9 +26,10 @@ import org.apache.griffin.measure.step.transform.DataFrameOpsTransformStep
 case class DataFrameOpsDQStepBuilder() extends RuleParamStepBuilder {
 
   def buildSteps(context: DQContext, ruleParam: RuleParam): Seq[DQStep] = {
-    val name = getStepName(ruleParam.getName)
+    val name = getStepName(ruleParam.getOutDfName())
+    val inputDfName = getStepName(ruleParam.getInDfName())
     val transformStep = DataFrameOpsTransformStep(
-      name, ruleParam.getRule, ruleParam.getDetails, ruleParam.getCache)
+      name, inputDfName, ruleParam.getRule, ruleParam.getDetails, ruleParam.getCache)
     transformStep +: buildDirectWriteSteps(ruleParam)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala
index 7dd3c73..d3c0e41 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala
@@ -36,14 +36,14 @@ case class GriffinDslDQStepBuilder(dataSourceNames: Seq[String],
   val parser = GriffinDslParser(dataSourceNames, filteredFunctionNames)
 
   def buildSteps(context: DQContext, ruleParam: RuleParam): Seq[DQStep] = {
-    val name = getStepName(ruleParam.getName)
+    val name = getStepName(ruleParam.getOutDfName())
     val rule = ruleParam.getRule
     val dqType = ruleParam.getDqType
     try {
       val result = parser.parseRule(rule, dqType)
       if (result.successful) {
         val expr = result.get
-        val expr2DQSteps = Expr2DQSteps(context, expr, ruleParam.replaceName(name))
+        val expr2DQSteps = Expr2DQSteps(context, expr, ruleParam.replaceOutDfName(name))
         expr2DQSteps.getDQSteps()
       } else {
         warn(s"parse rule [ ${rule} ] fails: \n${result}")

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
index f9f101e..176aed6 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
@@ -18,7 +18,7 @@ under the License.
 */
 package org.apache.griffin.measure.step.builder
 
-import org.apache.griffin.measure.configuration.enums.NormalizeType
+import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.write.{DataSourceUpdateWriteStep, MetricWriteStep, RecordWriteStep}
@@ -41,18 +41,18 @@ trait RuleParamStepBuilder extends DQStepBuilder {
   def buildSteps(context: DQContext, ruleParam: RuleParam): Seq[DQStep]
 
   protected def buildDirectWriteSteps(ruleParam: RuleParam): Seq[DQStep] = {
-    val name = getStepName(ruleParam.getName)
+    val name = getStepName(ruleParam.getOutDfName())
     // metric writer
-    val metricSteps = ruleParam.getMetricOpt.map { metric =>
-      MetricWriteStep(metric.getNameOpt.getOrElse(name), name, NormalizeType(metric.collectType))
+    val metricSteps = ruleParam.getOutputOpt(MetricOutputType).map { metric =>
+      MetricWriteStep(metric.getNameOpt.getOrElse(name), name, FlattenType(metric.flatten))
     }.toSeq
     // record writer
-    val recordSteps = ruleParam.getRecordOpt.map { record =>
+    val recordSteps = ruleParam.getOutputOpt(RecordOutputType).map { record =>
       RecordWriteStep(record.getNameOpt.getOrElse(name), name)
     }.toSeq
     // update writer
-    val dsCacheUpdateSteps = ruleParam.getDsCacheUpdateOpt.map { dsCacheUpdate =>
-      DataSourceUpdateWriteStep(dsCacheUpdate.getDsNameOpt.getOrElse(""), name)
+    val dsCacheUpdateSteps = ruleParam.getOutputOpt(DscUpdateOutputType).map { dsCacheUpdate =>
+      DataSourceUpdateWriteStep(dsCacheUpdate.getNameOpt.getOrElse(""), name)
     }.toSeq
 
     metricSteps ++ recordSteps ++ dsCacheUpdateSteps

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/builder/SparkSqlDQStepBuilder.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/SparkSqlDQStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/SparkSqlDQStepBuilder.scala
index f66192c..b5dfd0c 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/SparkSqlDQStepBuilder.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/SparkSqlDQStepBuilder.scala
@@ -26,7 +26,7 @@ import org.apache.griffin.measure.step.transform.SparkSqlTransformStep
 case class SparkSqlDQStepBuilder() extends RuleParamStepBuilder {
 
   def buildSteps(context: DQContext, ruleParam: RuleParam): Seq[DQStep] = {
-    val name = getStepName(ruleParam.getName)
+    val name = getStepName(ruleParam.getOutDfName())
     val transformStep = SparkSqlTransformStep(
       name, ruleParam.getRule, ruleParam.getDetails, ruleParam.getCache)
     transformStep +: buildDirectWriteSteps(ruleParam)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
index 27872c2..7c84d38 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
@@ -18,7 +18,7 @@ under the License.
 */
 package org.apache.griffin.measure.step.builder.dsl.transform
 
-import org.apache.griffin.measure.configuration.enums.{BatchProcessType, NormalizeType, StreamingProcessType}
+import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.DQStep
@@ -82,7 +82,7 @@ case class AccuracyExpr2DQSteps(context: DQContext,
       val missRecordsTransStep = SparkSqlTransformStep(missRecordsTableName, missRecordsSql, emptyMap, true)
       val missRecordsWriteSteps = procType match {
         case BatchProcessType => {
-          val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(missRecordsTableName)
+          val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(missRecordsTableName)
           RecordWriteStep(rwName, missRecordsTableName) :: Nil
         }
         case StreamingProcessType => Nil
@@ -90,7 +90,7 @@ case class AccuracyExpr2DQSteps(context: DQContext,
       val missRecordsUpdateWriteSteps = procType match {
         case BatchProcessType => Nil
         case StreamingProcessType => {
-          val dsName = ruleParam.getDsCacheUpdateOpt.flatMap(_.getDsNameOpt).getOrElse(sourceName)
+          val dsName = ruleParam.getOutputOpt(DscUpdateOutputType).flatMap(_.getNameOpt).getOrElse(sourceName)
           DataSourceUpdateWriteStep(dsName, missRecordsTableName) :: Nil
         }
       }
@@ -114,7 +114,7 @@ case class AccuracyExpr2DQSteps(context: DQContext,
       val totalCountTransStep = SparkSqlTransformStep(totalCountTableName, totalCountSql, emptyMap)
 
       // 4. accuracy metric
-      val accuracyTableName = ruleParam.name
+      val accuracyTableName = ruleParam.outDfName
       val matchedColName = details.getStringOrKey(_matched)
       val accuracyMetricSql = procType match {
         case BatchProcessType => {
@@ -139,10 +139,10 @@ case class AccuracyExpr2DQSteps(context: DQContext,
       val accuracyTransStep = SparkSqlTransformStep(accuracyTableName, accuracyMetricSql, emptyMap)
       val accuracyMetricWriteSteps = procType match {
         case BatchProcessType => {
-          val metricOpt = ruleParam.getMetricOpt
-          val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.name)
-          val collectType = metricOpt.map(_.getCollectType).getOrElse(NormalizeType.default)
-          MetricWriteStep(mwName, accuracyTableName, collectType) :: Nil
+          val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
+          val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.outDfName)
+          val flattenType = metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
+          MetricWriteStep(mwName, accuracyTableName, flattenType) :: Nil
         }
         case StreamingProcessType => Nil
       }
@@ -159,18 +159,17 @@ case class AccuracyExpr2DQSteps(context: DQContext,
           val accuracyMetricTableName = "__accuracy"
           val accuracyMetricRule = DataFrameOps._accuracy
           val accuracyMetricDetails = Map[String, Any](
-            (AccuracyOprKeys._dfName -> accuracyTableName),
             (AccuracyOprKeys._miss -> missColName),
             (AccuracyOprKeys._total -> totalColName),
             (AccuracyOprKeys._matched -> matchedColName)
           )
           val accuracyMetricTransStep = DataFrameOpsTransformStep(accuracyMetricTableName,
-            accuracyMetricRule, accuracyMetricDetails)
+            accuracyTableName, accuracyMetricRule, accuracyMetricDetails)
           val accuracyMetricWriteStep = {
-            val metricOpt = ruleParam.getMetricOpt
-            val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.name)
-            val collectType = metricOpt.map(_.getCollectType).getOrElse(NormalizeType.default)
-            MetricWriteStep(mwName, accuracyMetricTableName, collectType)
+            val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
+            val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.outDfName)
+            val flattenType = metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
+            MetricWriteStep(mwName, accuracyMetricTableName, flattenType)
           }
 
           // 6. collect accuracy records
@@ -184,7 +183,7 @@ case class AccuracyExpr2DQSteps(context: DQContext,
           val accuracyRecordTransStep = SparkSqlTransformStep(
             accuracyRecordTableName, accuracyRecordSql, emptyMap)
           val accuracyRecordWriteStep = {
-            val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(missRecordsTableName)
+            val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(missRecordsTableName)
             RecordWriteStep(rwName, missRecordsTableName, Some(accuracyRecordTableName))
           }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
index 469b16e..347fabd 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
@@ -89,7 +89,7 @@ case class CompletenessExpr2DQSteps(context: DQContext,
       val incompleteRecordsSql = s"SELECT * FROM `${sourceAliasTableName}` WHERE ${incompleteWhereClause}"
       val incompleteRecordTransStep = SparkSqlTransformStep(incompleteRecordsTableName, incompleteRecordsSql, emptyMap, true)
       val incompleteRecordWriteStep = {
-        val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(incompleteRecordsTableName)
+        val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(incompleteRecordsTableName)
         RecordWriteStep(rwName, incompleteRecordsTableName)
       }
 
@@ -112,7 +112,7 @@ case class CompletenessExpr2DQSteps(context: DQContext,
       val totalCountTransStep = SparkSqlTransformStep(totalCountTableName, totalCountSql, emptyMap)
 
       // 5. complete metric
-      val completeTableName = ruleParam.name
+      val completeTableName = ruleParam.outDfName
       val completeColName = details.getStringOrKey(_complete)
       val completeMetricSql = procType match {
         case BatchProcessType => {
@@ -136,10 +136,10 @@ case class CompletenessExpr2DQSteps(context: DQContext,
       }
       val completeTransStep = SparkSqlTransformStep(completeTableName, completeMetricSql, emptyMap)
       val completeWriteStep = {
-        val metricOpt = ruleParam.getMetricOpt
+        val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
         val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(completeTableName)
-        val collectType = metricOpt.map(_.getCollectType).getOrElse(NormalizeType.default)
-        MetricWriteStep(mwName, completeTableName, collectType)
+        val flattenType = metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
+        MetricWriteStep(mwName, completeTableName, flattenType)
       }
 
       val transSteps = {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
index 3390263..6c56a77 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
@@ -18,7 +18,7 @@ under the License.
 */
 package org.apache.griffin.measure.step.builder.dsl.transform
 
-import org.apache.griffin.measure.configuration.enums.{ArrayNormalizeType, EntriesNormalizeType, ProcessType, StreamingProcessType}
+import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.DQStep
@@ -111,7 +111,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
       }
       val totalTransStep = SparkSqlTransformStep(totalTableName, totalSql, emptyMap)
       val totalMetricWriteStep = {
-        MetricWriteStep(totalColName, totalTableName, EntriesNormalizeType, writeTimestampOpt)
+        MetricWriteStep(totalColName, totalTableName, EntriesFlattenType, writeTimestampOpt)
       }
 
       // 3. group by self
@@ -221,7 +221,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
       }
       val distTransStep = SparkSqlTransformStep(distTableName, distSql, emptyMap)
       val distMetricWriteStep = {
-        MetricWriteStep(distColName, distTableName, EntriesNormalizeType, writeTimestampOpt)
+        MetricWriteStep(distColName, distTableName, EntriesFlattenType, writeTimestampOpt)
       }
 
       val transSteps3 = distTransStep :: Nil
@@ -271,7 +271,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
           }
           val dupItemsTransStep = SparkSqlTransformStep(dupItemsTableName, dupItemsSql, emptyMap)
           val dupItemsWriteStep = {
-            val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(dupItemsTableName)
+            val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(dupItemsTableName)
             RecordWriteStep(rwName, dupItemsTableName, None, writeTimestampOpt)
           }
 
@@ -287,7 +287,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
           }
           val groupDupMetricTransStep = SparkSqlTransformStep(groupDupMetricTableName, groupDupMetricSql, emptyMap)
           val groupDupMetricWriteStep = {
-            MetricWriteStep(duplicationArrayName, groupDupMetricTableName, ArrayNormalizeType, writeTimestampOpt)
+            MetricWriteStep(duplicationArrayName, groupDupMetricTableName, ArrayFlattenType, writeTimestampOpt)
           }
 
           val msteps = {
@@ -317,7 +317,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
           }
           val dupRecordTransStep = SparkSqlTransformStep(dupRecordTableName, dupRecordSql, emptyMap, true)
           val dupRecordWriteStep = {
-            val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(dupRecordTableName)
+            val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(dupRecordTableName)
             RecordWriteStep(rwName, dupRecordTableName, None, writeTimestampOpt)
           }
 
@@ -332,7 +332,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
           }
           val dupMetricTransStep = SparkSqlTransformStep(dupMetricTableName, dupMetricSql, emptyMap)
           val dupMetricWriteStep = {
-            MetricWriteStep(duplicationArrayName, dupMetricTableName, ArrayNormalizeType, writeTimestampOpt)
+            MetricWriteStep(duplicationArrayName, dupMetricTableName, ArrayFlattenType, writeTimestampOpt)
           }
 
           val msteps = {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
index 28fa96a..33d44c5 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
@@ -19,7 +19,7 @@ under the License.
 package org.apache.griffin.measure.step.builder.dsl.transform
 
 import org.apache.commons.lang.StringUtils
-import org.apache.griffin.measure.configuration.enums.{BatchProcessType, NormalizeType, StreamingProcessType}
+import org.apache.griffin.measure.configuration.enums.{BatchProcessType, FlattenType, MetricOutputType, StreamingProcessType}
 import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.DQStep
@@ -92,13 +92,13 @@ case class ProfilingExpr2DQSteps(context: DQContext,
       val profilingSql = {
         s"SELECT ${selCondition} ${selClause} ${fromClause} ${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}"
       }
-      val profilingName = ruleParam.name
+      val profilingName = ruleParam.outDfName
       val profilingTransStep = SparkSqlTransformStep(profilingName, profilingSql, details)
       val profilingMetricWriteStep = {
-        val metricOpt = ruleParam.getMetricOpt
-        val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.name)
-        val collectType = metricOpt.map(_.getCollectType).getOrElse(NormalizeType.default)
-        MetricWriteStep(mwName, profilingName, collectType)
+        val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
+        val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.outDfName)
+        val flattenType = metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
+        MetricWriteStep(mwName, profilingName, flattenType)
       }
       profilingTransStep :: profilingMetricWriteStep :: Nil
     }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
index e9eaa06..71e9f4b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
@@ -108,7 +108,7 @@ case class TimelinessExpr2DQSteps(context: DQContext,
       val latencyTransStep = SparkSqlTransformStep(latencyTableName, latencySql, emptyMap, true)
 
       // 3. timeliness metric
-      val metricTableName = ruleParam.name
+      val metricTableName = ruleParam.outDfName
       val totalColName = details.getStringOrKey(_total)
       val avgColName = details.getStringOrKey(_avg)
       val metricSql = procType match {
@@ -131,10 +131,10 @@ case class TimelinessExpr2DQSteps(context: DQContext,
       }
       val metricTransStep = SparkSqlTransformStep(metricTableName, metricSql, emptyMap)
       val metricWriteStep = {
-        val metricOpt = ruleParam.getMetricOpt
-        val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.name)
-        val collectType = metricOpt.map(_.getCollectType).getOrElse(NormalizeType.default)
-        MetricWriteStep(mwName, metricTableName, collectType)
+        val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
+        val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.outDfName)
+        val flattenType = metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
+        MetricWriteStep(mwName, metricTableName, flattenType)
       }
 
       // current steps
@@ -150,7 +150,7 @@ case class TimelinessExpr2DQSteps(context: DQContext,
           }
           val recordTransStep = SparkSqlTransformStep(recordTableName, recordSql, emptyMap)
           val recordWriteStep = {
-            val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(recordTableName)
+            val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(recordTableName)
             RecordWriteStep(rwName, recordTableName, None)
           }
           (recordTransStep :: Nil, recordWriteStep :: Nil)
@@ -191,7 +191,7 @@ case class TimelinessExpr2DQSteps(context: DQContext,
           }
           val rangeMetricTransStep = SparkSqlTransformStep(rangeMetricTableName, rangeMetricSql, emptyMap)
           val rangeMetricWriteStep = {
-            MetricWriteStep(stepColName, rangeMetricTableName, ArrayNormalizeType)
+            MetricWriteStep(stepColName, rangeMetricTableName, ArrayFlattenType)
           }
 
           (rangeTransStep :: rangeMetricTransStep :: Nil, rangeMetricWriteStep :: Nil)
@@ -216,7 +216,7 @@ case class TimelinessExpr2DQSteps(context: DQContext,
         }
         val percentileTransStep = SparkSqlTransformStep(percentileTableName, percentileSql, emptyMap)
         val percentileWriteStep = {
-          MetricWriteStep(percentileTableName, percentileTableName, DefaultNormalizeType)
+          MetricWriteStep(percentileTableName, percentileTableName, DefaultFlattenType)
         }
 
         (percentileTransStep :: Nil, percentileWriteStep :: Nil)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
index 8827bf1..443239c 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
@@ -129,7 +129,7 @@ case class UniquenessExpr2DQSteps(context: DQContext,
         }
       }
       val totalTransStep = SparkSqlTransformStep(totalTableName, totalSql, emptyMap)
-      val totalMetricWriteStep = MetricWriteStep(totalColName, totalTableName, EntriesNormalizeType)
+      val totalMetricWriteStep = MetricWriteStep(totalColName, totalTableName, EntriesFlattenType)
 
       // 6. unique record
       val uniqueRecordTableName = "__uniqueRecord"
@@ -151,7 +151,7 @@ case class UniquenessExpr2DQSteps(context: DQContext,
         }
       }
       val uniqueTransStep = SparkSqlTransformStep(uniqueTableName, uniqueSql, emptyMap)
-      val uniqueMetricWriteStep = MetricWriteStep(uniqueColName, uniqueTableName, EntriesNormalizeType)
+      val uniqueMetricWriteStep = MetricWriteStep(uniqueColName, uniqueTableName, EntriesFlattenType)
 
       val transSteps1 = sourceTransStep :: targetTransStep :: joinedTransStep :: groupTransStep ::
         totalTransStep :: uniqueRecordTransStep :: uniqueTransStep :: Nil
@@ -166,7 +166,7 @@ case class UniquenessExpr2DQSteps(context: DQContext,
         }
         val dupRecordTransStep = SparkSqlTransformStep(dupRecordTableName, dupRecordSql, emptyMap, true)
         val dupRecordWriteStep = {
-          val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(dupRecordTableName)
+          val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(dupRecordTableName)
           RecordWriteStep(rwName, dupRecordTableName)
         }
 
@@ -189,7 +189,7 @@ case class UniquenessExpr2DQSteps(context: DQContext,
         }
         val dupMetricTransStep = SparkSqlTransformStep(dupMetricTableName, dupMetricSql, emptyMap)
         val dupMetricWriteStep = {
-          MetricWriteStep(duplicationArrayName, dupMetricTableName, ArrayNormalizeType)
+          MetricWriteStep(duplicationArrayName, dupMetricTableName, ArrayFlattenType)
         }
 
         (dupRecordTransStep :: dupMetricTransStep :: Nil,

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcParamMaker.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcParamMaker.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcParamMaker.scala
new file mode 100644
index 0000000..eac3b2b
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcParamMaker.scala
@@ -0,0 +1,67 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.builder.preproc
+
+import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
+import org.apache.griffin.measure.configuration.enums._
+
+/**
+  * generate each entity pre-proc params by template defined in pre-proc param
+  */
+object PreProcParamMaker {
+
+  case class StringAnyMap(values:Map[String,Any])
+
+  def makePreProcRules(rules: Seq[RuleParam], suffix: String, dfName: String): (Seq[RuleParam], String) = {
+    val len = rules.size
+    val (newRules, _) = rules.zipWithIndex.foldLeft((Nil: Seq[RuleParam], dfName)) { (ret, pair) =>
+      val (rls, prevOutDfName) = ret
+      val (rule, i) = pair
+      val inName = rule.getInDfName(prevOutDfName)
+      val outName = if (i == len - 1) dfName else rule.getOutDfName(genNameWithIndex(dfName, i))
+      val ruleWithNames = rule.replaceInOutDfName(inName, outName)
+      (rls :+ makeNewPreProcRule(ruleWithNames, suffix), outName)
+    }
+    (newRules, withSuffix(dfName, suffix))
+  }
+
+  private def makeNewPreProcRule(rule: RuleParam, suffix: String): RuleParam = {
+    val newInDfName = withSuffix(rule.getInDfName(), suffix)
+    val newOutDfName = withSuffix(rule.getOutDfName(), suffix)
+    val rpRule = rule.replaceInOutDfName(newInDfName, newOutDfName)
+    rule.getDslType match {
+      case DataFrameOpsType => rpRule
+      case _ => {
+        val newRule = replaceDfNameSuffix(rule.getRule, rule.getInDfName(), suffix)
+        rpRule.replaceRule(newRule)
+      }
+    }
+  }
+
+  private def genNameWithIndex(name: String, i: Int): String = s"${name}${i}"
+
+  private def replaceDfNameSuffix(str: String, dfName: String, suffix: String): String = {
+    val regexStr = s"(?i)${dfName}"
+    val replaceDfName = withSuffix(dfName, suffix)
+    str.replaceAll(regexStr, replaceDfName)
+  }
+
+  def withSuffix(str: String, suffix: String): String = s"${str}_${suffix}"
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcRuleParamGenerator.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcRuleParamGenerator.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcRuleParamGenerator.scala
deleted file mode 100644
index 64b8623..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcRuleParamGenerator.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.step.builder.preproc
-
-import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
-
-/**
-  * generate rule params by template defined in pre-proc param
-  */
-object PreProcRuleParamGenerator {
-
-  case class StringAnyMap(values:Map[String,Any])
-
-  val _name = "name"
-
-  def getNewPreProcRules(rules: Seq[RuleParam], suffix: String): Seq[RuleParam] = {
-    rules.map { rule =>
-      getNewPreProcRule(rule, suffix)
-    }
-  }
-
-  private def getNewPreProcRule(param: RuleParam, suffix: String): RuleParam = {
-    val newName = genNewString(param.getName, suffix)
-    val newRule = genNewString(param.getRule, suffix)
-    val newDetails = getNewMap(param.getDetails, suffix)
-    param.replaceName(newName).replaceRule(newRule).replaceDetails(newDetails)
-  }
-
-  private def getNewMap(details: Map[String, Any], suffix: String): Map[String, Any] = {
-    val keys = details.keys
-    keys.foldLeft(details) { (map, key) =>
-      map.get(key) match {
-        case Some(s: String) => map + (key -> genNewString(s, suffix))
-        case Some(subMap: StringAnyMap) => map + (key -> getNewMap(subMap.values, suffix))
-        case Some(arr: Seq[_]) => map + (key -> getNewArr(arr, suffix))
-        case _ => map
-      }
-    }
-  }
-
-  private def getNewArr(paramArr: Seq[Any], suffix: String): Seq[Any] = {
-    paramArr.foldLeft(Nil: Seq[Any]) { (res, param) =>
-      param match {
-        case s: String => res :+ genNewString(s, suffix)
-        case map: StringAnyMap => res :+ getNewMap(map.values, suffix)
-        case arr: Seq[_] => res :+ getNewArr(arr, suffix)
-        case _ => res :+ param
-      }
-    }
-  }
-
-  private def genNewString(str: String, suffix: String): String = {
-    str.replaceAll("""\$\{(.*)\}""", s"$$1_${suffix}")
-  }
-
-}