You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2018/06/13 07:28:11 UTC

incubator-griffin git commit: add validation of parameters

Repository: incubator-griffin
Updated Branches:
  refs/heads/master 77aad4955 -> 060fc28b8


add validation of parameters

Author: Lionel Liu <bh...@163.com>
Author: dodobel <12...@qq.com>

Closes #297 from bhlx3lyx7/spark2.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/060fc28b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/060fc28b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/060fc28b

Branch: refs/heads/master
Commit: 060fc28b81320ebb44604e967f7f47076384b2e0
Parents: 77aad49
Author: Lionel Liu <bh...@163.com>
Authored: Wed Jun 13 15:28:06 2018 +0800
Committer: Lionel Liu <bh...@163.com>
Committed: Wed Jun 13 15:28:06 2018 +0800

----------------------------------------------------------------------
 .../apache/griffin/measure/Application.scala    |  23 +-
 .../configuration/enums/NormalizeType.scala     |   3 +-
 .../measure/configuration/params/AllParam.scala |  36 ---
 .../measure/configuration/params/DQConfig.scala | 217 +++++++++++++++++++
 .../measure/configuration/params/DQParam.scala  | 183 ----------------
 .../configuration/params/EnvConfig.scala        | 112 ++++++++++
 .../measure/configuration/params/EnvParam.scala |  97 ---------
 .../configuration/params/GriffinConfig.scala    |  42 ++++
 .../measure/configuration/params/Param.scala    |   3 +-
 .../params/reader/ParamFileReader.scala         |   2 +-
 .../params/reader/ParamJsonReader.scala         |   2 +-
 .../params/reader/ParamReader.scala             |  10 +
 .../validator/ParamValidator.scala              |  38 ----
 .../context/datasource/DataSourceFactory.scala  |   8 +-
 .../cache/StreamingCacheClientFactory.scala     |   8 +-
 .../datasource/connector/DataConnector.scala    |   4 +-
 .../connector/DataConnectorFactory.scala        |   8 +-
 .../batch/AvroBatchDataConnector.scala          |   2 +-
 .../batch/HiveBatchDataConnector.scala          |   2 +-
 .../batch/TextDirBatchDataConnector.scala       |   2 +-
 .../streaming/KafkaStreamingDataConnector.scala |   2 +-
 .../streaming/info/InfoCacheFactory.scala       |   4 +-
 .../measure/context/writer/PersistFactory.scala |   4 +-
 .../measure/job/builder/DQJobBuilder.scala      |   8 +-
 .../apache/griffin/measure/launch/DQApp.scala   |   6 +-
 .../measure/launch/batch/BatchDQApp.scala       |  16 +-
 .../launch/streaming/StreamingDQApp.scala       |  26 +--
 .../measure/step/builder/DQStepBuilder.scala    |   4 +-
 .../builder/DataSourceParamStepBuilder.scala    |   2 +-
 .../step/builder/RuleParamStepBuilder.scala     |  12 +-
 .../dsl/transform/AccuracyExpr2DQSteps.scala    |  16 +-
 .../transform/CompletenessExpr2DQSteps.scala    |   8 +-
 .../transform/DistinctnessExpr2DQSteps.scala    |   4 +-
 .../dsl/transform/ProfilingExpr2DQSteps.scala   |   5 +-
 .../dsl/transform/TimelinessExpr2DQSteps.scala  |   7 +-
 .../dsl/transform/UniquenessExpr2DQSteps.scala  |   2 +-
 .../preproc/PreProcRuleParamGenerator.scala     |   6 +-
 37 files changed, 475 insertions(+), 459 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/Application.scala b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
index dbbe970..566e8df 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/Application.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
@@ -20,9 +20,7 @@ package org.apache.griffin.measure
 
 import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.configuration.params.reader.ParamReaderFactory
-import org.apache.griffin.measure.configuration.params.{AllParam, DQParam, EnvParam, Param}
-import org.apache.griffin.measure.configuration.validator.ParamValidator
-import org.apache.griffin.measure.context.writer.PersistTaskRunner
+import org.apache.griffin.measure.configuration.params.{GriffinConfig, DQConfig, EnvConfig, Param}
 import org.apache.griffin.measure.launch.DQApp
 import org.apache.griffin.measure.launch.batch.BatchDQApp
 import org.apache.griffin.measure.launch.streaming.StreamingDQApp
@@ -48,35 +46,24 @@ object Application extends Loggable {
     info(dqParamFile)
 
     // read param files
-    val envParam = readParamFile[EnvParam](envParamFile) match {
+    val envParam = readParamFile[EnvConfig](envParamFile) match {
       case Success(p) => p
       case Failure(ex) => {
         error(ex.getMessage)
         sys.exit(-2)
       }
     }
-    val dqParam = readParamFile[DQParam](dqParamFile) match {
+    val dqParam = readParamFile[DQConfig](dqParamFile) match {
       case Success(p) => p
       case Failure(ex) => {
         error(ex.getMessage)
         sys.exit(-2)
       }
     }
-    val allParam: AllParam = AllParam(envParam, dqParam)
-
-    // validate param files
-    ParamValidator.validate(allParam) match {
-      case Failure(ex) => {
-        error(ex.getMessage)
-        sys.exit(-3)
-      }
-      case _ => {
-        info("params validation pass")
-      }
-    }
+    val allParam: GriffinConfig = GriffinConfig(envParam, dqParam)
 
     // choose process
-    val procType = ProcessType(allParam.dqParam.procType)
+    val procType = ProcessType(allParam.getDqConfig.procType)
     val dqApp: DQApp = procType match {
       case BatchProcessType => BatchDQApp(allParam)
       case StreamingProcessType => StreamingDQApp(allParam)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/NormalizeType.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/NormalizeType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/NormalizeType.scala
index b353cbc..61bf27c 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/NormalizeType.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/NormalizeType.scala
@@ -30,11 +30,12 @@ sealed trait NormalizeType {
 
 object NormalizeType {
   private val normalizeTypes: List[NormalizeType] = List(DefaultNormalizeType, EntriesNormalizeType, ArrayNormalizeType, MapNormalizeType)
+  val default = DefaultNormalizeType
   def apply(ptn: String): NormalizeType = {
     normalizeTypes.find(tp => ptn match {
       case tp.idPattern() => true
       case _ => false
-    }).getOrElse(DefaultNormalizeType)
+    }).getOrElse(default)
   }
   def unapply(pt: NormalizeType): Option[String] = Some(pt.desc)
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/AllParam.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/AllParam.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/AllParam.scala
deleted file mode 100644
index 4ba1a15..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/AllParam.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.configuration.params
-
-import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-
-/**
-  * full set of griffin configuration
-  * @param envParam   environment configuration (must)
-  * @param dqParam    dq measurement configuration (must)
-  */
-@JsonInclude(Include.NON_NULL)
-case class AllParam( @JsonProperty("env") envParam: EnvParam,
-                     @JsonProperty("dq") dqParam: DQParam
-                   ) extends Param {
-  override def validate(): Boolean = {
-    envParam != null && dqParam != null
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/DQConfig.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/DQConfig.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/DQConfig.scala
new file mode 100644
index 0000000..d07ab51
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/DQConfig.scala
@@ -0,0 +1,217 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.configuration.params
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import org.apache.commons.lang.StringUtils
+import org.apache.griffin.measure.configuration.enums._
+
+/**
+  * dq param
+  * @param name         name of dq measurement (must)
+  * @param timestamp    default timestamp of measure in batch mode (optional)
+  * @param procType     batch mode or streaming mode (must)
+  * @param dataSources   data sources (must)
+  * @param evaluateRule  dq measurement (must)
+  */
+@JsonInclude(Include.NON_NULL)
+case class DQConfig(@JsonProperty("name") name: String,
+                    @JsonProperty("timestamp") timestamp: Long,
+                    @JsonProperty("process.type") procType: String,
+                    @JsonProperty("data.sources") dataSources: List[DataSourceParam],
+                    @JsonProperty("evaluate.rule") evaluateRule: EvaluateRuleParam
+                  ) extends Param {
+  def getName: String = name
+  def getTimestamp: Long = timestamp
+  def getProcType: String = procType
+  def getDataSources: Seq[DataSourceParam] = {
+    dataSources.foldLeft((Nil: Seq[DataSourceParam], Set[String]())) { (ret, ds) =>
+      val (seq, names) = ret
+      if (!names.contains(ds.getName)){
+        (seq :+ ds, names + ds.getName)
+      } else ret
+    }._1
+  }
+  def getEvaluateRule: EvaluateRuleParam = evaluateRule
+
+  def validate(): Unit = {
+    assert(StringUtils.isNotBlank(name), "dq config name should not be blank")
+    assert(StringUtils.isNotBlank(procType), "process.type should not be blank")
+    assert((dataSources != null), "data.sources should not be null")
+    assert((evaluateRule != null), "evaluate.rule should not be null")
+    getDataSources.foreach(_.validate)
+    evaluateRule.validate
+  }
+}
+
+/**
+  * data source param
+  * @param name         data source name (must)
+  * @param connectors   data connectors (optional)
+  * @param cache        data source cache configuration (must in streaming mode with streaming connectors)
+  */
+@JsonInclude(Include.NON_NULL)
+case class DataSourceParam( @JsonProperty("name") name: String,
+                            @JsonProperty("connectors") connectors: List[DataConnectorParam],
+                            @JsonProperty("cache") cache: Map[String, Any]
+                          ) extends Param {
+  def getName: String = name
+  def getConnectors: Seq[DataConnectorParam] = if (connectors != null) connectors else Nil
+  def getCacheOpt: Option[Map[String, Any]] = if (cache != null) Some(cache) else None
+
+  def validate(): Unit = {
+    assert(StringUtils.isNotBlank(name), "data source name should not be empty")
+    getConnectors.foreach(_.validate)
+  }
+}
+
+/**
+  * data connector param
+  * @param conType    data connector type, e.g.: hive, avro, kafka (must)
+  * @param version    data connector type version (optional)
+  * @param config     detail configuration of data connector (must)
+  * @param preProc    pre-process rules after load data (optional)
+  */
+@JsonInclude(Include.NON_NULL)
+case class DataConnectorParam( @JsonProperty("type") conType: String,
+                               @JsonProperty("version") version: String,
+                               @JsonProperty("config") config: Map[String, Any],
+                               @JsonProperty("pre.proc") preProc: List[RuleParam]
+                             ) extends Param {
+  def getType: String = conType
+  def getVersion: String = version
+  def getConfig: Map[String, Any] = if (config != null) config else Map[String, Any]()
+  def getPreProcRules: Seq[RuleParam] = if (preProc != null) preProc else Nil
+
+  def validate(): Unit = {
+    assert(StringUtils.isNotBlank(conType), "data connector type should not be empty")
+    getPreProcRules.foreach(_.validate)
+  }
+}
+
+/**
+  * evaluate rule param
+  * @param rules      rules to define dq measurement (optional)
+  */
+@JsonInclude(Include.NON_NULL)
+case class EvaluateRuleParam( @JsonProperty("rules") rules: List[RuleParam]
+                            ) extends Param {
+  def getRules: Seq[RuleParam] = if (rules != null) rules else Nil
+
+  def validate(): Unit = {
+    getRules.foreach(_.validate)
+  }
+}
+
+/**
+  * rule param
+  * @param dslType    dsl type of this rule (must)
+  * @param dqType     dq type of this rule (must if dsl type is "griffin-dsl")
+  * @param name       name of result calculated by this rule (must if for later usage)
+  * @param rule       rule to define dq step calculation (must)
+  * @param details    detail config of rule (optional)
+  * @param cache      cache the result for multiple usage (optional, valid for "spark-sql" and "df-opr" mode)
+  * @param metric     config for metric output (optional)
+  * @param record     config for record output (optional)
+  * @param dsCacheUpdate    config for data source cache update output (optional, valid in streaming mode)
+  */
+@JsonInclude(Include.NON_NULL)
+case class RuleParam( @JsonProperty("dsl.type") dslType: String,
+                      @JsonProperty("dq.type") dqType: String,
+                      @JsonProperty("name") name: String,
+                      @JsonProperty("rule") rule: String,
+                      @JsonProperty("details") details: Map[String, Any],
+                      @JsonProperty("cache") cache: Boolean,
+                      @JsonProperty("metric") metric: RuleMetricParam,
+                      @JsonProperty("record") record: RuleRecordParam,
+                      @JsonProperty("ds.cache.update") dsCacheUpdate: RuleDsCacheUpdateParam
+                    ) extends Param {
+  def getDslType: DslType = if (dslType != null) DslType(dslType) else DslType("")
+  def getDqType: DqType = if (dqType != null) DqType(dqType) else DqType("")
+  def getCache: Boolean = if (cache != null) cache else false
+
+  def getName: String = if (name != null) name else ""
+  def getRule: String = if (rule != null) rule else ""
+  def getDetails: Map[String, Any] = if (details != null) details else Map[String, Any]()
+
+  def getMetricOpt: Option[RuleMetricParam] = if (metric != null) Some(metric) else None
+  def getRecordOpt: Option[RuleRecordParam] = if (record != null) Some(record) else None
+  def getDsCacheUpdateOpt: Option[RuleDsCacheUpdateParam] = if (dsCacheUpdate != null) Some(dsCacheUpdate) else None
+
+  def replaceName(newName: String): RuleParam = {
+    if (StringUtils.equals(newName, name)) this
+    else RuleParam(dslType, dqType, newName, rule, details, cache, metric, record, dsCacheUpdate)
+  }
+  def replaceRule(newRule: String): RuleParam = {
+    if (StringUtils.equals(newRule, rule)) this
+    else RuleParam(dslType, dqType, name, newRule, details, cache, metric, record, dsCacheUpdate)
+  }
+  def replaceDetails(newDetails: Map[String, Any]): RuleParam = {
+    RuleParam(dslType, dqType, name, rule, newDetails, cache, metric, record, dsCacheUpdate)
+  }
+
+  def validate(): Unit = {
+    assert(!(getDslType.equals(GriffinDslType) && getDqType.equals(UnknownType)),
+      "unknown dq type for griffin dsl")
+
+    getMetricOpt.foreach(_.validate)
+    getRecordOpt.foreach(_.validate)
+    getDsCacheUpdateOpt.foreach(_.validate)
+  }
+}
+
+/**
+  * metric param of rule
+  * @param name         name of metric to output (optional)
+  * @param collectType  the normalize strategy to collect metric  (optional)
+  */
+@JsonInclude(Include.NON_NULL)
+case class RuleMetricParam( @JsonProperty("name") name: String,
+                            @JsonProperty("collect.type") collectType: String
+                          ) extends Param {
+  def getNameOpt: Option[String] = if (StringUtils.isNotBlank(name)) Some(name) else None
+  def getCollectType: NormalizeType = if (StringUtils.isNotBlank(collectType)) NormalizeType(collectType) else NormalizeType("")
+
+  def validate(): Unit = {}
+}
+
+/**
+  * record param of rule
+  * @param name   name of record to output (optional)
+  */
+@JsonInclude(Include.NON_NULL)
+case class RuleRecordParam( @JsonProperty("name") name: String
+                          ) extends Param {
+  def getNameOpt: Option[String] = if (StringUtils.isNotBlank(name)) Some(name) else None
+
+  def validate(): Unit = {}
+}
+
+/**
+  * data source cache update param of rule
+  * @param dsName   name of data source to be updated by thie rule result (must)
+  */
+@JsonInclude(Include.NON_NULL)
+case class RuleDsCacheUpdateParam( @JsonProperty("ds.name") dsName: String
+                                 ) extends Param {
+  def getDsNameOpt: Option[String] = if (StringUtils.isNotBlank(dsName)) Some(dsName) else None
+
+  def validate(): Unit = {}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/DQParam.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/DQParam.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/DQParam.scala
deleted file mode 100644
index 8d3c354..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/DQParam.scala
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.configuration.params
-
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
-import org.apache.commons.lang.StringUtils
-import org.apache.griffin.measure.configuration.enums.{DqType, DslType}
-
-/**
-  * dq param
-  * @param name         name of dq measurement (must)
-  * @param timestamp    default timestamp of measure in batch mode (optional)
-  * @param procType     batch mode or streaming mode (must)
-  * @param dataSourceParams   data sources (optional)
-  * @param evaluateRuleParam  dq measurement (optional)
-  */
-@JsonInclude(Include.NON_NULL)
-case class DQParam( @JsonProperty("name") name: String,
-                    @JsonProperty("timestamp") timestamp: Long,
-                    @JsonProperty("process.type") procType: String,
-                    @JsonProperty("data.sources") dataSourceParams: List[DataSourceParam],
-                    @JsonProperty("evaluate.rule") evaluateRuleParam: EvaluateRuleParam
-                  ) extends Param {
-  val dataSources = {
-    val (validDsParams, _) = dataSourceParams.foldLeft((Nil: Seq[DataSourceParam], Set[String]())) { (ret, dsParam) =>
-      val (seq, names) = ret
-      if (dsParam.hasName && !names.contains(dsParam.name)) {
-        (seq :+ dsParam, names + dsParam.name)
-      } else ret
-    }
-    validDsParams
-  }
-  val evaluateRule: EvaluateRuleParam = {
-    if (evaluateRuleParam != null) evaluateRuleParam
-    else EvaluateRuleParam("", Nil)
-  }
-
-  override def validate(): Boolean = {
-    dataSources.nonEmpty
-  }
-}
-
-/**
-  * data source param
-  * @param name         data source name (must)
-  * @param connectors   data connectors (optional)
-  * @param cache        data source cache configuration (must in streaming mode with streaming connectors)
-  */
-@JsonInclude(Include.NON_NULL)
-case class DataSourceParam( @JsonProperty("name") name: String,
-                            @JsonProperty("connectors") connectors: List[DataConnectorParam],
-                            @JsonProperty("cache") cache: Map[String, Any]
-                          ) extends Param {
-  def hasName: Boolean = StringUtils.isNotBlank(name)
-  def getConnectors: List[DataConnectorParam] = if (connectors != null) connectors else Nil
-  def hasCache: Boolean = (cache != null)
-
-  override def validate(): Boolean = hasName
-}
-
-/**
-  * data connector param
-  * @param conType    data connector type, e.g.: hive, avro, kafka (must)
-  * @param version    data connector type version (optional)
-  * @param config     detail configuration of data connector (must)
-  * @param preProc    pre-process rules after load data (optional)
-  */
-@JsonInclude(Include.NON_NULL)
-case class DataConnectorParam( @JsonProperty("type") conType: String,
-                               @JsonProperty("version") version: String,
-                               @JsonProperty("config") config: Map[String, Any],
-                               @JsonProperty("pre.proc") preProc: List[RuleParam]
-                             ) extends Param {
-  override def validate(): Boolean = {
-    StringUtils.isNotBlank(conType)
-  }
-}
-
-/**
-  * evaluate rule param
-  * @param dslType    default dsl type for all rules (optional)
-  * @param rules      rules to define dq measurement (optional)
-  */
-@JsonInclude(Include.NON_NULL)
-case class EvaluateRuleParam( @JsonProperty("dsl.type") dslType: String,
-                              @JsonProperty("rules") rules: List[RuleParam]
-                            ) extends Param {
-  def getDslType: DslType = if (dslType != null) DslType(dslType) else DslType("")
-  def getRules: List[RuleParam] = if (rules != null) rules else Nil
-}
-
-/**
-  * rule param
-  * @param dslType    dsl type of this rule (must if default dsl type not set)
-  * @param dqType     dq type of this rule (valid for "griffin-dsl")
-  * @param name       name of result calculated by this rule (must if for later usage)
-  * @param rule       rule to define dq step calculation (must)
-  * @param details    detail config of rule (optional)
-  * @param cache      cache the result for multiple usage (optional, valid for "spark-sql" and "df-opr" mode)
-  * @param metric     config for metric output (optional)
-  * @param record     config for record output (optional)
-  * @param dsCacheUpdate    config for data source cache update output (optional, valid in streaming mode)
-  */
-@JsonInclude(Include.NON_NULL)
-case class RuleParam( @JsonProperty("dsl.type") dslType: String,
-                      @JsonProperty("dq.type") dqType: String,
-                      @JsonProperty("name") name: String,
-                      @JsonProperty("rule") rule: String,
-                      @JsonProperty("details") details: Map[String, Any],
-                      @JsonProperty("cache") cache: Boolean,
-                      @JsonProperty("metric") metric: RuleMetricParam,
-                      @JsonProperty("record") record: RuleRecordParam,
-                      @JsonProperty("ds.cache.update") dsCacheUpdate: RuleDsCacheUpdateParam
-                    ) extends Param {
-  def getDslType(defaultDslType: DslType): DslType = if (dslType != null) DslType(dslType) else defaultDslType
-  def getDqType: DqType = if (dqType != null) DqType(dqType) else DqType("")
-  def getName: String = if (name != null) name else ""
-  def getRule: String = if (rule != null) rule else ""
-  def getDetails: Map[String, Any] = if (details != null) details else Map[String, Any]()
-  def getCache: Boolean = if (cache != null) cache else false
-
-  def metricOpt: Option[RuleMetricParam] = if (metric != null) Some(metric) else None
-  def recordOpt: Option[RuleRecordParam] = if (record != null) Some(record) else None
-  def dsCacheUpdateOpt: Option[RuleDsCacheUpdateParam] = if (dsCacheUpdate != null) Some(dsCacheUpdate) else None
-
-  def replaceName(newName: String): RuleParam = {
-    if (StringUtils.equals(newName, name)) this
-    else RuleParam(dslType, dqType, newName, rule, details, cache, metric, record, dsCacheUpdate)
-  }
-  def replaceRule(newRule: String): RuleParam = {
-    if (StringUtils.equals(newRule, rule)) this
-    else RuleParam(dslType, dqType, name, newRule, details, cache, metric, record, dsCacheUpdate)
-  }
-  def replaceDetails(newDetails: Map[String, Any]): RuleParam = {
-    RuleParam(dslType, dqType, name, rule, newDetails, cache, metric, record, dsCacheUpdate)
-  }
-}
-
-/**
-  * metric param of rule
-  * @param name         name of metric to output (optional)
-  * @param collectType  the normalize strategy to collect metric  (optional)
-  */
-@JsonInclude(Include.NON_NULL)
-case class RuleMetricParam( @JsonProperty("name") name: String,
-                            @JsonProperty("collect.type") collectType: String
-                          ) extends Param {
-}
-
-/**
-  * record param of rule
-  * @param name   name of record to output (optional)
-  */
-@JsonInclude(Include.NON_NULL)
-case class RuleRecordParam( @JsonProperty("name") name: String
-                          ) extends Param {
-}
-
-/**
-  * data source cache update param of rule
-  * @param dsName   name of data source to be updated by thie rule result (must)
-  */
-@JsonInclude(Include.NON_NULL)
-case class RuleDsCacheUpdateParam( @JsonProperty("ds.name") dsName: String
-                                 ) extends Param {
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvConfig.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvConfig.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvConfig.scala
new file mode 100644
index 0000000..bc6d50f
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvConfig.scala
@@ -0,0 +1,112 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.configuration.params
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import org.apache.commons.lang.StringUtils
+import org.apache.griffin.measure.utils.TimeUtil
+
+/**
+  * environment param
+  * @param sparkParam       config of spark environment (must)
+  * @param persistParams    config of persist ways (optional)
+  * @param infoCacheParams  config of information cache ways (required in streaming mode)
+  */
+@JsonInclude(Include.NON_NULL)
+case class EnvConfig(@JsonProperty("spark") sparkParam: SparkParam,
+                     @JsonProperty("persist") persistParams: List[PersistParam],
+                     @JsonProperty("info.cache") infoCacheParams: List[InfoCacheParam]
+                   ) extends Param {
+  def getSparkParam: SparkParam = sparkParam
+  def getPersistParams: Seq[PersistParam] = if (persistParams != null) persistParams else Nil
+  def getInfoCacheParams: Seq[InfoCacheParam] = if (infoCacheParams != null) infoCacheParams else Nil
+
+  def validate(): Unit = {
+    assert((sparkParam != null), "spark param should not be null")
+    sparkParam.validate
+    getPersistParams.foreach(_.validate)
+    getInfoCacheParams.foreach(_.validate)
+  }
+}
+
+/**
+  * spark param
+  * @param logLevel         log level of spark application (optional)
+  * @param cpDir            checkpoint directory for spark streaming (required in streaming mode)
+  * @param batchInterval    batch interval for spark streaming (required in streaming mode)
+  * @param processInterval  process interval for streaming dq calculation (required in streaming mode)
+  * @param config           extra config for spark environment (optional)
+  * @param initClear        clear checkpoint directory or not when initial (optional)
+  */
+@JsonInclude(Include.NON_NULL)
+case class SparkParam( @JsonProperty("log.level") logLevel: String,
+                       @JsonProperty("checkpoint.dir") cpDir: String,
+                       @JsonProperty("batch.interval") batchInterval: String,
+                       @JsonProperty("process.interval") processInterval: String,
+                       @JsonProperty("config") config: Map[String, String],
+                       @JsonProperty("init.clear") initClear: Boolean
+                     ) extends Param {
+  def getLogLevel: String = if (logLevel != null) logLevel else "WARN"
+  def getCpDir: String = if (cpDir != null) cpDir else ""
+  def getBatchInterval: String = if (batchInterval != null) batchInterval else ""
+  def getProcessInterval: String = if (processInterval != null) processInterval else ""
+  def getConfig: Map[String, String] = if (config != null) config else Map[String, String]()
+  def needInitClear: Boolean = if (initClear != null) initClear else false
+
+  def validate(): Unit = {
+//    assert(StringUtils.isNotBlank(cpDir), "checkpoint.dir should not be empty")
+//    assert(TimeUtil.milliseconds(getBatchInterval).nonEmpty, "batch.interval should be valid time string")
+//    assert(TimeUtil.milliseconds(getProcessInterval).nonEmpty, "process.interval should be valid time string")
+  }
+}
+
+/**
+  * persist param
+  * @param persistType    persist type, e.g.: log, hdfs, http, mongo (must)
+  * @param config         config of persist way (must)
+  */
+@JsonInclude(Include.NON_NULL)
+case class PersistParam( @JsonProperty("type") persistType: String,
+                         @JsonProperty("config") config: Map[String, Any]
+                       ) extends Param {
+  def getType: String = persistType
+  def getConfig: Map[String, Any] = if (config != null) config else Map[String, Any]()
+
+  def validate(): Unit = {
+    assert(StringUtils.isNotBlank(persistType), "persist type should not be empty")
+  }
+}
+
+/**
+  * info cache param
+  * @param cacheType    information cache type, e.g.: zookeeper (must)
+  * @param config       config of cache way
+  */
+@JsonInclude(Include.NON_NULL)
+case class InfoCacheParam( @JsonProperty("type") cacheType: String,
+                           @JsonProperty("config") config: Map[String, Any]
+                         ) extends Param {
+  def getType: String = cacheType
+  def getConfig: Map[String, Any] = if (config != null) config else Map[String, Any]()
+
+  def validate(): Unit = {
+    assert(StringUtils.isNotBlank(cacheType), "info cache type should not be empty")
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvParam.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvParam.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvParam.scala
deleted file mode 100644
index 5ee0610..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvParam.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.configuration.params
-
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
-import org.apache.commons.lang.StringUtils
-
-/**
-  * environment param
-  * @param sparkParam       config of spark environment (must)
-  * @param persistParams    config of persist ways (optional)
-  * @param infoCacheParams  config of information cache ways (must in streaming mode)
-  * @param cleanerParam     config of cleaner (optional)
-  */
-@JsonInclude(Include.NON_NULL)
-case class EnvParam( @JsonProperty("spark") sparkParam: SparkParam,
-                     @JsonProperty("persist") persistParams: List[PersistParam],
-                     @JsonProperty("info.cache") infoCacheParams: List[InfoCacheParam],
-                     @JsonProperty("cleaner") cleanerParam: CleanerParam
-                   ) extends Param {
-}
-
-/**
-  * spark param
-  * @param logLevel         log level of spark application (optional)
-  * @param cpDir            checkpoint directory for spark streaming (must in streaming mode)
-  * @param batchInterval    batch interval for spark streaming (must in streaming mode)
-  * @param processInterval  process interval for streaming dq calculation (must in streaming mode)
-  * @param config           extra config for spark environment (optional)
-  * @param initClear        clear checkpoint directory or not when initial (optional)
-  */
-@JsonInclude(Include.NON_NULL)
-case class SparkParam( @JsonProperty("log.level") logLevel: String,
-                       @JsonProperty("checkpoint.dir") cpDir: String,
-                       @JsonProperty("batch.interval") batchInterval: String,
-                       @JsonProperty("process.interval") processInterval: String,
-                       @JsonProperty("config") config: Map[String, String],
-                       @JsonProperty("init.clear") initClear: Boolean
-                     ) extends Param {
-  def getLogLevel: String = if (logLevel != null) logLevel else "WARN"
-  def needInitClear: Boolean = if (initClear != null) initClear else false
-}
-
-/**
-  * persist param
-  * @param persistType    persist type, e.g.: log, hdfs, http, mongo (must)
-  * @param config         config of persist way (must)
-  */
-@JsonInclude(Include.NON_NULL)
-case class PersistParam( @JsonProperty("type") persistType: String,
-                         @JsonProperty("config") config: Map[String, Any]
-                       ) extends Param {
-  override def validate(): Boolean = {
-    StringUtils.isNotBlank(persistType)
-  }
-}
-
-/**
-  * info cache param
-  * @param cacheType    information cache type, e.g.: zookeeper (must)
-  * @param config       config of cache way
-  */
-@JsonInclude(Include.NON_NULL)
-case class InfoCacheParam( @JsonProperty("type") cacheType: String,
-                           @JsonProperty("config") config: Map[String, Any]
-                         ) extends Param {
-  override def validate(): Boolean = {
-    StringUtils.isNotBlank(cacheType)
-  }
-}
-
-/**
-  * cleaner param, invalid at current
-  * @param cleanInterval    clean interval (optional)
-  */
-@JsonInclude(Include.NON_NULL)
-case class CleanerParam( @JsonProperty("clean.interval") cleanInterval: String
-                       ) extends Param {
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/GriffinConfig.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/GriffinConfig.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/GriffinConfig.scala
new file mode 100644
index 0000000..8debe48
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/GriffinConfig.scala
@@ -0,0 +1,42 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.configuration.params
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+
+/**
+  * full set of griffin configuration
+  * @param envConfig   environment configuration (must)
+  * @param dqConfig    dq measurement configuration (must)
+  */
+@JsonInclude(Include.NON_NULL)
+case class GriffinConfig(@JsonProperty("env") envConfig: EnvConfig,
+                         @JsonProperty("dq") dqConfig: DQConfig
+                   ) extends Param {
+  def getEnvConfig: EnvConfig = envConfig
+  def getDqConfig: DQConfig = dqConfig
+
+  def validate(): Unit = {
+    assert((envConfig != null), "environment config should not be null")
+    assert((dqConfig != null), "dq config should not be null")
+    envConfig.validate
+    dqConfig.validate
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/Param.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/Param.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/Param.scala
index 87ad246..6116cdf 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/Param.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/Param.scala
@@ -22,8 +22,7 @@ trait Param extends Serializable {
 
   /**
     * validate param internally
-    * @return
     */
-  def validate(): Boolean = true
+  def validate(): Unit
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamFileReader.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamFileReader.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamFileReader.scala
index 0ff5b06..a528127 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamFileReader.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamFileReader.scala
@@ -36,7 +36,7 @@ case class ParamFileReader(filePath: String) extends ParamReader {
       val source = HdfsUtil.openFile(filePath)
       val param = JsonUtil.fromJson[T](source)
       source.close
-      param
+      validate(param)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamJsonReader.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamJsonReader.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamJsonReader.scala
index 7a7eaf6..91ffa9a 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamJsonReader.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamJsonReader.scala
@@ -32,7 +32,7 @@ case class ParamJsonReader(jsonString: String) extends ParamReader {
   def readConfig[T <: Param](implicit m : Manifest[T]): Try[T] = {
     Try {
       val param = JsonUtil.fromJson[T](jsonString)
-      param
+      validate(param)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamReader.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamReader.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamReader.scala
index 77134ea..21ebccd 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamReader.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamReader.scala
@@ -32,4 +32,14 @@ trait ParamReader extends Loggable with Serializable {
     */
   def readConfig[T <: Param](implicit m : Manifest[T]): Try[T]
 
+  /**
+    * validate config param
+    * @param param  param to be validated
+    * @return       param itself
+    */
+  protected def validate[T <: Param](param: T): T = {
+    param.validate()
+    param
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/configuration/validator/ParamValidator.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/validator/ParamValidator.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/validator/ParamValidator.scala
deleted file mode 100644
index 4c9a4d6..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/validator/ParamValidator.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.configuration.validator
-
-import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.configuration.params._
-
-import scala.util.Try
-
-object ParamValidator extends Loggable with Serializable {
-
-  /**
-    * validate param
-    * @param param    param to be validated
-    * @tparam T       type of param
-    * @return         param valid or not
-    */
-  def validate[T <: Param](param: T): Try[Boolean] = Try {
-    param.validate
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala
index 95c8de7..edd88b6 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala
@@ -32,9 +32,9 @@ object DataSourceFactory extends Loggable {
 
   def getDataSources(sparkSession: SparkSession,
                      ssc: StreamingContext,
-                     dataSourceParams: Seq[DataSourceParam]
+                     dataSources: Seq[DataSourceParam]
                     ): Seq[DataSource] = {
-    dataSourceParams.zipWithIndex.flatMap { pair =>
+    dataSources.zipWithIndex.flatMap { pair =>
       val (param, index) = pair
       getDataSource(sparkSession, ssc, param, index)
     }
@@ -45,13 +45,13 @@ object DataSourceFactory extends Loggable {
                             dataSourceParam: DataSourceParam,
                             index: Int
                            ): Option[DataSource] = {
-    val name = dataSourceParam.name
+    val name = dataSourceParam.getName
     val connectorParams = dataSourceParam.getConnectors
     val tmstCache = TmstCache()
 
     // for streaming data cache
     val streamingCacheClientOpt = StreamingCacheClientFactory.getClientOpt(
-      sparkSession.sqlContext, dataSourceParam.cache, name, index, tmstCache)
+      sparkSession.sqlContext, dataSourceParam.getCacheOpt, name, index, tmstCache)
 
     val dataConnectors: Seq[DataConnector] = connectorParams.flatMap { connectorParam =>
       DataConnectorFactory.getDataConnector(sparkSession, ssc, connectorParam,

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClientFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClientFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClientFactory.scala
index 529b07a..fd9d231 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClientFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClientFactory.scala
@@ -37,16 +37,16 @@ object StreamingCacheClientFactory extends Loggable {
   /**
     * create streaming cache client
     * @param sqlContext   sqlContext in spark environment
-    * @param param        data source cache config
+    * @param cacheOpt     data source cache config option
     * @param name         data source name
     * @param index        data source index
     * @param tmstCache    the same tmstCache instance inside a data source
     * @return             streaming cache client option
     */
-  def getClientOpt(sqlContext: SQLContext, param: Map[String, Any],
+  def getClientOpt(sqlContext: SQLContext, cacheOpt: Option[Map[String, Any]],
                    name: String, index: Int, tmstCache: TmstCache
                   ): Option[StreamingCacheClient] = {
-    if (param != null) {
+    cacheOpt.flatMap { param =>
       try {
         val tp = param.getString(_type, "")
         val dsCache = tp match {
@@ -62,7 +62,7 @@ object StreamingCacheClientFactory extends Loggable {
           None
         }
       }
-    } else None
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnector.scala
index 6dc17d1..a4c1995 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnector.scala
@@ -65,14 +65,14 @@ trait DataConnector extends Loggable with Serializable {
       saveTmst(timestamp)    // save timestamp
 
       dfOpt.flatMap { df =>
-        val preProcRules = PreProcRuleParamGenerator.getNewPreProcRules(dcParam.preProc, suffix)
+        val preProcRules = PreProcRuleParamGenerator.getNewPreProcRules(dcParam.getPreProcRules, suffix)
 
         // init data
         context.compileTableRegister.registerTable(thisTable)
         context.runTimeTableRegister.registerTable(thisTable, df)
 
         // build job
-        val preprocJob = DQJobBuilder.buildDQJob(context, preProcRules, SparkSqlType)
+        val preprocJob = DQJobBuilder.buildDQJob(context, preProcRules)
 
         // job execute
         preprocJob.execute(context)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala
index 30352a9..4538fbb 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala
@@ -53,8 +53,8 @@ object DataConnectorFactory extends Loggable {
                        tmstCache: TmstCache,
                        streamingCacheClientOpt: Option[StreamingCacheClient]
                       ): Try[DataConnector] = {
-    val conType = dcParam.conType
-    val version = dcParam.version
+    val conType = dcParam.getType
+    val version = dcParam.getVersion
     Try {
       conType match {
         case HiveRegex() => HiveBatchDataConnector(sparkSession, dcParam, tmstCache)
@@ -75,8 +75,8 @@ object DataConnectorFactory extends Loggable {
                                         streamingCacheClientOpt: Option[StreamingCacheClient]
                                        ): StreamingDataConnector = {
     if (ssc == null) throw new Exception("streaming context is null!")
-    val conType = dcParam.conType
-    val version = dcParam.version
+    val conType = dcParam.getType
+    val version = dcParam.getVersion
     conType match {
       case KafkaRegex() => getKafkaDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
       case _ => throw new Exception("streaming connector creation error!")

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/AvroBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/AvroBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/AvroBatchDataConnector.scala
index 1ee6e78..a906246 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/AvroBatchDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/AvroBatchDataConnector.scala
@@ -33,7 +33,7 @@ case class AvroBatchDataConnector(@transient sparkSession: SparkSession,
                                   tmstCache: TmstCache
                                  ) extends BatchDataConnector {
 
-  val config = dcParam.config
+  val config = dcParam.getConfig
 
   val FilePath = "file.path"
   val FileName = "file.name"

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/HiveBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/HiveBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/HiveBatchDataConnector.scala
index 85cd120..331f469 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/HiveBatchDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/HiveBatchDataConnector.scala
@@ -32,7 +32,7 @@ case class HiveBatchDataConnector(@transient sparkSession: SparkSession,
                                   tmstCache: TmstCache
                                  ) extends BatchDataConnector {
 
-  val config = dcParam.config
+  val config = dcParam.getConfig
 
   val Database = "database"
   val TableName = "table.name"

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/TextDirBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/TextDirBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/TextDirBatchDataConnector.scala
index ca5b7b5..bc76f9d 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/TextDirBatchDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/TextDirBatchDataConnector.scala
@@ -33,7 +33,7 @@ case class TextDirBatchDataConnector(@transient sparkSession: SparkSession,
                                      tmstCache: TmstCache
                                     ) extends BatchDataConnector {
 
-  val config = dcParam.config
+  val config = dcParam.getConfig
 
   val DirPath = "dir.path"
   val DataDirDepth = "data.dir.depth"

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala
index de2822b..0f30b7f 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala
@@ -33,7 +33,7 @@ trait KafkaStreamingDataConnector extends StreamingDataConnector {
   type VD <: Decoder[V]
   type OUT = (K, V)
 
-  val config = dcParam.config
+  val config = dcParam.getConfig
 
   val KafkaConfig = "kafka.config"
   val Topics = "topics"

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheFactory.scala
index 85106b4..28ade3b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheFactory.scala
@@ -27,8 +27,8 @@ case class InfoCacheFactory(infoCacheParams: Iterable[InfoCacheParam], metricNam
   val ZK_REGEX = """^(?i)zk|zookeeper$""".r
 
   def getInfoCache(infoCacheParam: InfoCacheParam): Option[InfoCache] = {
-    val config = infoCacheParam.config
-    val infoCacheTry = infoCacheParam.cacheType match {
+    val config = infoCacheParam.getConfig
+    val infoCacheTry = infoCacheParam.getType match {
       case ZK_REGEX() => Try(ZKInfoCache(config, metricName))
       case _ => throw new Exception("not supported info cache type")
     }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistFactory.scala
index 9314876..0a2649e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistFactory.scala
@@ -41,8 +41,8 @@ case class PersistFactory(persistParams: Iterable[PersistParam], metricName: Str
   }
 
   private def getPersist(timeStamp: Long, persistParam: PersistParam, block: Boolean): Option[Persist] = {
-    val config = persistParam.config
-    val persistTry = persistParam.persistType match {
+    val config = persistParam.getConfig
+    val persistTry = persistParam.getType match {
       case LOG_REGEX() => Try(LoggerPersist(config, metricName, timeStamp))
       case HDFS_REGEX() => Try(HdfsPersist(config, metricName, timeStamp))
       case HTTP_REGEX() => Try(HttpPersist(config, metricName, timeStamp, block))

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala
index a8e5b26..074a74e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala
@@ -38,26 +38,24 @@ object DQJobBuilder {
     * @return       dq job
     */
   def buildDQJob(context: DQContext, evaluateRuleParam: EvaluateRuleParam): DQJob = {
-    val defaultDslType = evaluateRuleParam.getDslType
     val ruleParams = evaluateRuleParam.getRules
-    buildDQJob(context, ruleParams, defaultDslType)
+    buildDQJob(context, ruleParams)
   }
 
   /**
     * build dq job with rules in evaluate rule param or pre-proc param
     * @param context          dq context
     * @param ruleParams       rule params
-    * @param defaultDslType   default dsl type in evaluate rule param
     * @return       dq job
     */
-  def buildDQJob(context: DQContext, ruleParams: Seq[RuleParam], defaultDslType: DslType): DQJob = {
+  def buildDQJob(context: DQContext, ruleParams: Seq[RuleParam]): DQJob = {
     // build steps by datasources
     val dsSteps = context.dataSources.flatMap { dataSource =>
       DQStepBuilder.buildStepOptByDataSourceParam(context, dataSource.dsParam)
     }
     // build steps by rules
     val ruleSteps = ruleParams.flatMap { ruleParam =>
-      DQStepBuilder.buildStepOptByRuleParam(context, ruleParam, defaultDslType)
+      DQStepBuilder.buildStepOptByRuleParam(context, ruleParam)
     }
     // metric flush step
     val metricFlushStep = MetricFlushStep()

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
index 9ec1641..79cef33 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
@@ -19,7 +19,7 @@ under the License.
 package org.apache.griffin.measure.launch
 
 import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.configuration.params.{DQParam, EnvParam}
+import org.apache.griffin.measure.configuration.params.{DQConfig, EnvConfig}
 
 import scala.util.Try
 
@@ -28,8 +28,8 @@ import scala.util.Try
   */
 trait DQApp extends Loggable with Serializable {
 
-  val envParam: EnvParam
-  val dqParam: DQParam
+  val envParam: EnvConfig
+  val dqParam: DQConfig
 
   def init: Try[_]
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
index 1aa5039..06892a3 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
@@ -32,15 +32,15 @@ import org.apache.spark.sql.{SQLContext, SparkSession}
 
 import scala.util.Try
 
-case class BatchDQApp(allParam: AllParam) extends DQApp {
+case class BatchDQApp(allParam: GriffinConfig) extends DQApp {
 
-  val envParam: EnvParam = allParam.envParam
-  val dqParam: DQParam = allParam.dqParam
+  val envParam: EnvConfig = allParam.getEnvConfig
+  val dqParam: DQConfig = allParam.getDqConfig
 
   val sparkParam = envParam.sparkParam
   val metricName = dqParam.name
-  val dataSourceParams = dqParam.dataSources
-  val dataSourceNames = dataSourceParams.map(_.name)
+//  val dataSourceParams = dqParam.dataSources
+//  val dataSourceNames = dataSourceParams.map(_.name)
   val persistParams = envParam.persistParams
 
   var sqlContext: SQLContext = _
@@ -52,10 +52,10 @@ case class BatchDQApp(allParam: AllParam) extends DQApp {
   def init: Try[_] = Try {
     // build spark 2.0+ application context
     val conf = new SparkConf().setAppName(metricName)
-    conf.setAll(sparkParam.config)
+    conf.setAll(sparkParam.getConfig)
     conf.set("spark.sql.crossJoin.enabled", "true")
     sparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
-    sparkSession.sparkContext.setLogLevel(sparkParam.logLevel)
+    sparkSession.sparkContext.setLogLevel(sparkParam.getLogLevel)
     sqlContext = sparkSession.sqlContext
 
     // register udf
@@ -70,7 +70,7 @@ case class BatchDQApp(allParam: AllParam) extends DQApp {
     val contextId = ContextId(measureTime)
 
     // get data sources
-    val dataSources = DataSourceFactory.getDataSources(sparkSession, null, dqParam.dataSources)
+    val dataSources = DataSourceFactory.getDataSources(sparkSession, null, dqParam.getDataSources)
     dataSources.foreach(_.init)
 
     // create dq context

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
index d89b7e8..f990c8e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
@@ -38,15 +38,15 @@ import org.apache.spark.streaming.{Milliseconds, StreamingContext}
 
 import scala.util.Try
 
-case class StreamingDQApp(allParam: AllParam) extends DQApp {
+case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
 
-  val envParam: EnvParam = allParam.envParam
-  val dqParam: DQParam = allParam.dqParam
+  val envParam: EnvConfig = allParam.getEnvConfig
+  val dqParam: DQConfig = allParam.getDqConfig
 
   val sparkParam = envParam.sparkParam
   val metricName = dqParam.name
-  val dataSourceParams = dqParam.dataSources
-  val dataSourceNames = dataSourceParams.map(_.name)
+//  val dataSourceParams = dqParam.dataSources
+//  val dataSourceNames = dataSourceParams.map(_.name)
   val persistParams = envParam.persistParams
 
   var sqlContext: SQLContext = _
@@ -58,10 +58,10 @@ case class StreamingDQApp(allParam: AllParam) extends DQApp {
   def init: Try[_] = Try {
     // build spark 2.0+ application context
     val conf = new SparkConf().setAppName(metricName)
-    conf.setAll(sparkParam.config)
+    conf.setAll(sparkParam.getConfig)
     conf.set("spark.sql.crossJoin.enabled", "true")
     sparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
-    sparkSession.sparkContext.setLogLevel(sparkParam.logLevel)
+    sparkSession.sparkContext.setLogLevel(sparkParam.getLogLevel)
     sqlContext = sparkSession.sqlContext
 
     // clear checkpoint directory
@@ -78,7 +78,7 @@ case class StreamingDQApp(allParam: AllParam) extends DQApp {
   def run: Try[_] = Try {
 
     // streaming context
-    val ssc = StreamingContext.getOrCreate(sparkParam.cpDir, () => {
+    val ssc = StreamingContext.getOrCreate(sparkParam.getCpDir, () => {
       try {
         createStreamingContext
       } catch {
@@ -94,7 +94,7 @@ case class StreamingDQApp(allParam: AllParam) extends DQApp {
     val contextId = ContextId(measureTime)
 
     // generate data sources
-    val dataSources = DataSourceFactory.getDataSources(sparkSession, ssc, dqParam.dataSources)
+    val dataSources = DataSourceFactory.getDataSources(sparkSession, ssc, dqParam.getDataSources)
     dataSources.foreach(_.init)
 
     // create dq context
@@ -109,7 +109,7 @@ case class StreamingDQApp(allParam: AllParam) extends DQApp {
     // process thread
     val dqCalculator = StreamingDQCalculator(globalContext, dqParam.evaluateRule)
 
-    val processInterval = TimeUtil.milliseconds(sparkParam.processInterval) match {
+    val processInterval = TimeUtil.milliseconds(sparkParam.getProcessInterval) match {
       case Some(interval) => interval
       case _ => throw new Exception("invalid batch interval")
     }
@@ -135,19 +135,19 @@ case class StreamingDQApp(allParam: AllParam) extends DQApp {
 
 
   def createStreamingContext: StreamingContext = {
-    val batchInterval = TimeUtil.milliseconds(sparkParam.batchInterval) match {
+    val batchInterval = TimeUtil.milliseconds(sparkParam.getBatchInterval) match {
       case Some(interval) => Milliseconds(interval)
       case _ => throw new Exception("invalid batch interval")
     }
     val ssc = new StreamingContext(sparkSession.sparkContext, batchInterval)
-    ssc.checkpoint(sparkParam.cpDir)
+    ssc.checkpoint(sparkParam.getCpDir)
 
     ssc
   }
 
   private def clearCpDir: Unit = {
     if (sparkParam.needInitClear) {
-      val cpDir = sparkParam.cpDir
+      val cpDir = sparkParam.getCpDir
       info(s"clear checkpoint directory ${cpDir}")
       HdfsUtil.deleteHdfsPath(cpDir)
     }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala
index f5b69ad..2ee308d 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala
@@ -57,9 +57,9 @@ object DQStepBuilder {
     }
   }
 
-  def buildStepOptByRuleParam(context: DQContext, ruleParam: RuleParam, defaultDslType: DslType
+  def buildStepOptByRuleParam(context: DQContext, ruleParam: RuleParam
                              ): Option[DQStep] = {
-    val dslType = ruleParam.getDslType(defaultDslType)
+    val dslType = ruleParam.getDslType
     val dsNames = context.dataSourceNames
     val funcNames = context.functionNames
     val dqStepOpt = getRuleParamStepBuilder(dslType, dsNames, funcNames)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataSourceParamStepBuilder.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataSourceParamStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataSourceParamStepBuilder.scala
index b941211..333615d 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataSourceParamStepBuilder.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataSourceParamStepBuilder.scala
@@ -31,7 +31,7 @@ trait DataSourceParamStepBuilder extends DQStepBuilder {
   type ParamType = DataSourceParam
 
   def buildDQStep(context: DQContext, param: ParamType): Option[DQStep] = {
-    val name = getStepName(param.name)
+    val name = getStepName(param.getName)
     val steps = param.getConnectors.flatMap { dc =>
       buildReadSteps(context, dc)
     }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
index fa9e38b..2a43b34 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
@@ -43,16 +43,16 @@ trait RuleParamStepBuilder extends DQStepBuilder {
   protected def buildDirectWriteSteps(ruleParam: RuleParam): Seq[DQStep] = {
     val name = getStepName(ruleParam.getName)
     // metric writer
-    val metricSteps = ruleParam.metricOpt.map { metric =>
-      MetricWriteStep(metric.name, name, NormalizeType(metric.collectType))
+    val metricSteps = ruleParam.getMetricOpt.map { metric =>
+      MetricWriteStep(metric.getNameOpt.getOrElse(name), name, NormalizeType(metric.collectType))
     }.toSeq
     // record writer
-    val recordSteps = ruleParam.recordOpt.map { record =>
-      RecordWriteStep(record.name, name)
+    val recordSteps = ruleParam.getRecordOpt.map { record =>
+      RecordWriteStep(record.getNameOpt.getOrElse(name), name)
     }.toSeq
     // update writer
-    val dsCacheUpdateSteps = ruleParam.dsCacheUpdateOpt.map { dsCacheUpdate =>
-      DataSourceUpdateWriteStep(dsCacheUpdate.dsName, name)
+    val dsCacheUpdateSteps = ruleParam.getDsCacheUpdateOpt.map { dsCacheUpdate =>
+      DataSourceUpdateWriteStep(dsCacheUpdate.getDsNameOpt.getOrElse(""), name)
     }.toSeq
 
     metricSteps ++ recordSteps ++ dsCacheUpdateSteps

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
index 9c14325..0416dbb 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
@@ -82,7 +82,7 @@ case class AccuracyExpr2DQSteps(context: DQContext,
       val missRecordsTransStep = SparkSqlTransformStep(missRecordsTableName, missRecordsSql, emptyMap, true)
       val missRecordsWriteSteps = procType match {
         case BatchProcessType => {
-          val rwName = ruleParam.recordOpt.map(_.name).getOrElse(missRecordsTableName)
+          val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(missRecordsTableName)
           RecordWriteStep(rwName, missRecordsTableName) :: Nil
         }
         case StreamingProcessType => Nil
@@ -90,7 +90,7 @@ case class AccuracyExpr2DQSteps(context: DQContext,
       val missRecordsUpdateWriteSteps = procType match {
         case BatchProcessType => Nil
         case StreamingProcessType => {
-          val dsName = ruleParam.dsCacheUpdateOpt.map(_.dsName).getOrElse(sourceName)
+          val dsName = ruleParam.getDsCacheUpdateOpt.flatMap(_.getDsNameOpt).getOrElse(sourceName)
           DataSourceUpdateWriteStep(dsName, missRecordsTableName) :: Nil
         }
       }
@@ -139,8 +139,9 @@ case class AccuracyExpr2DQSteps(context: DQContext,
       val accuracyTransStep = SparkSqlTransformStep(accuracyTableName, accuracyMetricSql, emptyMap)
       val accuracyMetricWriteSteps = procType match {
         case BatchProcessType => {
-          val mwName = ruleParam.metricOpt.map(_.name).getOrElse(ruleParam.name)
-          val collectType = NormalizeType(ruleParam.metricOpt.map(_.collectType).getOrElse(""))
+          val metricOpt = ruleParam.getMetricOpt
+          val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.name)
+          val collectType = metricOpt.map(_.getCollectType).getOrElse(NormalizeType.default)
           MetricWriteStep(mwName, accuracyTableName, collectType) :: Nil
         }
         case StreamingProcessType => Nil
@@ -166,8 +167,9 @@ case class AccuracyExpr2DQSteps(context: DQContext,
           val accuracyMetricTransStep = DataFrameOpsTransformStep(accuracyMetricTableName,
             accuracyMetricRule, accuracyMetricDetails)
           val accuracyMetricWriteStep = {
-            val mwName = ruleParam.metricOpt.map(_.name).getOrElse(ruleParam.name)
-            val collectType = NormalizeType(ruleParam.metricOpt.map(_.collectType).getOrElse(""))
+            val metricOpt = ruleParam.getMetricOpt
+            val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.name)
+            val collectType = metricOpt.map(_.getCollectType).getOrElse(NormalizeType.default)
             MetricWriteStep(mwName, accuracyMetricTableName, collectType)
           }
 
@@ -182,7 +184,7 @@ case class AccuracyExpr2DQSteps(context: DQContext,
           val accuracyRecordTransStep = SparkSqlTransformStep(
             accuracyRecordTableName, accuracyRecordSql, emptyMap)
           val accuracyRecordWriteStep = {
-            val rwName = ruleParam.recordOpt.map(_.name).getOrElse(missRecordsTableName)
+            val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(missRecordsTableName)
             RecordWriteStep(rwName, missRecordsTableName, Some(accuracyRecordTableName))
           }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
index fcb576c..bd03c7a 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
@@ -89,7 +89,7 @@ case class CompletenessExpr2DQSteps(context: DQContext,
       val incompleteRecordsSql = s"SELECT * FROM `${sourceAliasTableName}` WHERE ${incompleteWhereClause}"
       val incompleteRecordTransStep = SparkSqlTransformStep(incompleteRecordsTableName, incompleteRecordsSql, emptyMap, true)
       val incompleteRecordWriteStep = {
-        val rwName = ruleParam.recordOpt.map(_.name).getOrElse(incompleteRecordsTableName)
+        val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(incompleteRecordsTableName)
         RecordWriteStep(rwName, incompleteRecordsTableName)
       }
 
@@ -136,8 +136,10 @@ case class CompletenessExpr2DQSteps(context: DQContext,
       }
       val completeTransStep = SparkSqlTransformStep(completeTableName, completeMetricSql, emptyMap)
       val completeWriteStep = {
-        val mwName = ruleParam.metricOpt.map(_.name).getOrElse(completeTableName)
-        MetricWriteStep(mwName, completeTableName, DefaultNormalizeType)
+        val metricOpt = ruleParam.getMetricOpt
+        val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(completeTableName)
+        val collectType = metricOpt.map(_.getCollectType).getOrElse(NormalizeType.default)
+        MetricWriteStep(mwName, completeTableName, collectType)
       }
 
       val transSteps = {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
index 1cf94e0..cf886e3 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
@@ -271,7 +271,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
           }
           val dupItemsTransStep = SparkSqlTransformStep(dupItemsTableName, dupItemsSql, emptyMap)
           val dupItemsWriteStep = {
-            val rwName = ruleParam.recordOpt.map(_.name).getOrElse(dupItemsTableName)
+            val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(dupItemsTableName)
             RecordWriteStep(rwName, dupItemsTableName, None, writeTimestampOpt)
           }
 
@@ -317,7 +317,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
           }
           val dupRecordTransStep = SparkSqlTransformStep(dupRecordTableName, dupRecordSql, emptyMap, true)
           val dupRecordWriteStep = {
-            val rwName = ruleParam.recordOpt.map(_.name).getOrElse(dupRecordTableName)
+            val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(dupRecordTableName)
             RecordWriteStep(rwName, dupRecordTableName, None, writeTimestampOpt)
           }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
index b4da7eb..bc7f620 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
@@ -94,8 +94,9 @@ case class ProfilingExpr2DQSteps(context: DQContext,
       val profilingName = ruleParam.name
       val profilingTransStep = SparkSqlTransformStep(profilingName, profilingSql, details)
       val profilingMetricWriteStep = {
-        val mwName = ruleParam.metricOpt.map(_.name).getOrElse(ruleParam.name)
-        val collectType = NormalizeType(ruleParam.metricOpt.map(_.collectType).getOrElse(""))
+        val metricOpt = ruleParam.getMetricOpt
+        val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.name)
+        val collectType = metricOpt.map(_.getCollectType).getOrElse(NormalizeType.default)
         MetricWriteStep(mwName, profilingName, collectType)
       }
       profilingTransStep :: profilingMetricWriteStep :: Nil

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
index a56937c..9fa58f2 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
@@ -131,8 +131,9 @@ case class TimelinessExpr2DQSteps(context: DQContext,
       }
       val metricTransStep = SparkSqlTransformStep(metricTableName, metricSql, emptyMap)
       val metricWriteStep = {
-        val mwName = ruleParam.metricOpt.map(_.name).getOrElse(ruleParam.name)
-        val collectType = NormalizeType(ruleParam.metricOpt.map(_.collectType).getOrElse(""))
+        val metricOpt = ruleParam.getMetricOpt
+        val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.name)
+        val collectType = metricOpt.map(_.getCollectType).getOrElse(NormalizeType.default)
         MetricWriteStep(mwName, metricTableName, collectType)
       }
 
@@ -149,7 +150,7 @@ case class TimelinessExpr2DQSteps(context: DQContext,
           }
           val recordTransStep = SparkSqlTransformStep(recordTableName, recordSql, emptyMap)
           val recordWriteStep = {
-            val rwName = ruleParam.recordOpt.map(_.name).getOrElse(recordTableName)
+            val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(recordTableName)
             RecordWriteStep(rwName, recordTableName, None)
           }
           (recordTransStep :: Nil, recordWriteStep :: Nil)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
index 9fecb6d..77a79d4 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
@@ -166,7 +166,7 @@ case class UniquenessExpr2DQSteps(context: DQContext,
         }
         val dupRecordTransStep = SparkSqlTransformStep(dupRecordTableName, dupRecordSql, emptyMap, true)
         val dupRecordWriteStep = {
-          val rwName = ruleParam.recordOpt.map(_.name).getOrElse(dupRecordTableName)
+          val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(dupRecordTableName)
           RecordWriteStep(rwName, dupRecordTableName)
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcRuleParamGenerator.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcRuleParamGenerator.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcRuleParamGenerator.scala
index f1543be..07b13ea 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcRuleParamGenerator.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcRuleParamGenerator.scala
@@ -28,10 +28,8 @@ object PreProcRuleParamGenerator {
   val _name = "name"
 
   def getNewPreProcRules(rules: Seq[RuleParam], suffix: String): Seq[RuleParam] = {
-    if (rules == null) Nil else {
-      rules.map { rule =>
-        getNewPreProcRule(rule, suffix)
-      }
+    rules.map { rule =>
+      getNewPreProcRule(rule, suffix)
     }
   }