You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2018/08/27 02:05:49 UTC

incubator-griffin git commit: set the fields in configuration classes private

Repository: incubator-griffin
Updated Branches:
  refs/heads/master 5b2c7959a -> 75b426fab


set the fields in configuration classes private

in configuration case classes, get*** functions are defined to access to the fields, so for consistency, make the fields private

Author: grant-xuexu <gr...@gmail.com>

Closes #396 from grant-xuexu/configuration.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/75b426fa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/75b426fa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/75b426fa

Branch: refs/heads/master
Commit: 75b426fab82d404f552fbdfb602958ebb47850d9
Parents: 5b2c795
Author: grant-xuexu <gr...@gmail.com>
Authored: Mon Aug 27 10:05:44 2018 +0800
Committer: Lionel Liu <bh...@163.com>
Committed: Mon Aug 27 10:05:44 2018 +0800

----------------------------------------------------------------------
 .../apache/griffin/measure/Application.scala    |  2 +-
 .../configuration/dqdefinition/DQConfig.scala   | 56 ++++++++++----------
 .../configuration/dqdefinition/EnvConfig.scala  | 26 ++++-----
 .../dqdefinition/GriffinConfig.scala            |  4 +-
 .../datasource/connector/DataConnector.scala    |  2 +-
 .../connector/DataConnectorFactory.scala        | 16 ++----
 .../apache/griffin/measure/launch/DQApp.scala   |  2 +-
 .../measure/launch/batch/BatchDQApp.scala       |  6 +--
 .../launch/streaming/StreamingDQApp.scala       |  8 +--
 .../step/builder/RuleParamStepBuilder.scala     |  2 +-
 .../step/builder/dsl/expr/TreeNode.scala        | 25 ++++++---
 .../dsl/transform/AccuracyExpr2DQSteps.scala    |  6 +--
 .../transform/CompletenessExpr2DQSteps.scala    |  2 +-
 .../dsl/transform/ProfilingExpr2DQSteps.scala   |  4 +-
 .../dsl/transform/TimelinessExpr2DQSteps.scala  |  4 +-
 .../reader/ParamFileReaderSpec.scala            |  4 +-
 .../reader/ParamJsonReaderSpec.scala            |  4 +-
 17 files changed, 89 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/75b426fa/measure/src/main/scala/org/apache/griffin/measure/Application.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/Application.scala b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
index 278214d..e7df806 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/Application.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
@@ -64,7 +64,7 @@ object Application extends Loggable {
     val allParam: GriffinConfig = GriffinConfig(envParam, dqParam)
 
     // choose process
-    val procType = ProcessType(allParam.getDqConfig.procType)
+    val procType = ProcessType(allParam.getDqConfig.getProcType)
     val dqApp: DQApp = procType match {
       case BatchProcessType => BatchDQApp(allParam)
       case StreamingProcessType => StreamingDQApp(allParam)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/75b426fa/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
index 79a43c1..4943329 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
@@ -33,12 +33,12 @@ import org.apache.griffin.measure.configuration.enums._
   * @param sinks          sink types (optional, by default will be elasticsearch)
   */
 @JsonInclude(Include.NON_NULL)
-case class DQConfig(@JsonProperty("name") name: String,
-                    @JsonProperty("timestamp") timestamp: Long,
-                    @JsonProperty("process.type") procType: String,
-                    @JsonProperty("data.sources") dataSources: List[DataSourceParam],
-                    @JsonProperty("evaluate.rule") evaluateRule: EvaluateRuleParam,
-                    @JsonProperty("sinks") sinks: List[String]
+case class DQConfig(@JsonProperty("name") private val name: String,
+                    @JsonProperty("timestamp") private val timestamp: Long,
+                    @JsonProperty("process.type") private val procType: String,
+                    @JsonProperty("data.sources") private val dataSources: List[DataSourceParam],
+                    @JsonProperty("evaluate.rule") private val evaluateRule: EvaluateRuleParam,
+                    @JsonProperty("sinks") private val sinks: List[String]
                   ) extends Param {
   def getName: String = name
   def getTimestampOpt: Option[Long] = if (timestamp != 0) Some(timestamp) else None
@@ -72,13 +72,13 @@ case class DQConfig(@JsonProperty("name") name: String,
   * @param checkpoint   data source checkpoint configuration (must in streaming mode with streaming connectors)
   */
 @JsonInclude(Include.NON_NULL)
-case class DataSourceParam( @JsonProperty("name") name: String,
-                            @JsonProperty("baseline") baseline: Boolean,
-                            @JsonProperty("connectors") connectors: List[DataConnectorParam],
-                            @JsonProperty("checkpoint") checkpoint: Map[String, Any]
+case class DataSourceParam( @JsonProperty("name") private val name: String,
+                            @JsonProperty("baseline") private val baseline: Boolean,
+                            @JsonProperty("connectors") private val connectors: List[DataConnectorParam],
+                            @JsonProperty("checkpoint") private val checkpoint: Map[String, Any]
                           ) extends Param {
   def getName: String = name
-  def isBaseline: Boolean = if (baseline != null) baseline else false
+  def isBaseline: Boolean = if (!baseline.equals(null)) baseline else false
   def getConnectors: Seq[DataConnectorParam] = if (connectors != null) connectors else Nil
   def getCheckpointOpt: Option[Map[String, Any]] = if (checkpoint != null) Some(checkpoint) else None
 
@@ -97,11 +97,11 @@ case class DataSourceParam( @JsonProperty("name") name: String,
   * @param preProc    pre-process rules after load data (optional)
   */
 @JsonInclude(Include.NON_NULL)
-case class DataConnectorParam( @JsonProperty("type") conType: String,
-                               @JsonProperty("version") version: String,
-                               @JsonProperty("dataframe.name") dataFrameName: String,
-                               @JsonProperty("config") config: Map[String, Any],
-                               @JsonProperty("pre.proc") preProc: List[RuleParam]
+case class DataConnectorParam( @JsonProperty("type") private val conType: String,
+                               @JsonProperty("version") private val version: String,
+                               @JsonProperty("dataframe.name") private val dataFrameName: String,
+                               @JsonProperty("config") private val config: Map[String, Any],
+                               @JsonProperty("pre.proc") private val preProc: List[RuleParam]
                              ) extends Param {
   def getType: String = conType
   def getVersion: String = if (version != null) version else ""
@@ -120,7 +120,7 @@ case class DataConnectorParam( @JsonProperty("type") conType: String,
   * @param rules      rules to define dq measurement (optional)
   */
 @JsonInclude(Include.NON_NULL)
-case class EvaluateRuleParam( @JsonProperty("rules") rules: List[RuleParam]
+case class EvaluateRuleParam( @JsonProperty("rules") private val rules: List[RuleParam]
                             ) extends Param {
   def getRules: Seq[RuleParam] = if (rules != null) rules else Nil
 
@@ -144,14 +144,14 @@ case class EvaluateRuleParam( @JsonProperty("rules") rules: List[RuleParam]
 //  * @param dsCacheUpdate    config for data source cache update output (optional, valid in streaming mode)
   */
 @JsonInclude(Include.NON_NULL)
-case class RuleParam(@JsonProperty("dsl.type") dslType: String,
-                     @JsonProperty("dq.type") dqType: String,
-                     @JsonProperty("in.dataframe.name") inDfName: String,
-                     @JsonProperty("out.dataframe.name") outDfName: String,
-                     @JsonProperty("rule") rule: String,
-                     @JsonProperty("details") details: Map[String, Any],
-                     @JsonProperty("cache") cache: Boolean,
-                     @JsonProperty("out") outputs: List[RuleOutputParam]
+case class RuleParam(@JsonProperty("dsl.type") private val dslType: String,
+                     @JsonProperty("dq.type") private val dqType: String,
+                     @JsonProperty("in.dataframe.name") private val inDfName: String,
+                     @JsonProperty("out.dataframe.name") private val outDfName: String,
+                     @JsonProperty("rule") private val rule: String,
+                     @JsonProperty("details") private val details: Map[String, Any],
+                     @JsonProperty("cache") private val cache: Boolean,
+                     @JsonProperty("out") private val outputs: List[RuleOutputParam]
                     ) extends Param {
   def getDslType: DslType = if (dslType != null) DslType(dslType) else DslType("")
   def getDqType: DqType = if (dqType != null) DqType(dqType) else DqType("")
@@ -197,9 +197,9 @@ case class RuleParam(@JsonProperty("dsl.type") dslType: String,
   * @param flatten        flatten type of output metric (optional, available in output metric type)
   */
 @JsonInclude(Include.NON_NULL)
-case class RuleOutputParam( @JsonProperty("type") outputType: String,
-                            @JsonProperty("name") name: String,
-                            @JsonProperty("flatten") flatten: String
+case class RuleOutputParam( @JsonProperty("type") private val outputType: String,
+                            @JsonProperty("name") private val name: String,
+                            @JsonProperty("flatten") private val flatten: String
                           ) extends Param {
   def getOutputType: OutputType = if (outputType != null) OutputType(outputType) else OutputType("")
   def getNameOpt: Option[String] = if (StringUtils.isNotBlank(name)) Some(name) else None

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/75b426fa/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
index 2ad2837..bf77d13 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
@@ -30,9 +30,9 @@ import org.apache.griffin.measure.configuration.enums._
   * @param checkpointParams   config of checkpoint locations (required in streaming mode)
   */
 @JsonInclude(Include.NON_NULL)
-case class EnvConfig(@JsonProperty("spark") sparkParam: SparkParam,
-                     @JsonProperty("sinks") sinkParams: List[SinkParam],
-                     @JsonProperty("griffin.checkpoint") checkpointParams: List[CheckpointParam]
+case class EnvConfig(@JsonProperty("spark") private val sparkParam: SparkParam,
+                     @JsonProperty("sinks") private val sinkParams: List[SinkParam],
+                     @JsonProperty("griffin.checkpoint") private val checkpointParams: List[CheckpointParam]
                    ) extends Param {
   def getSparkParam: SparkParam = sparkParam
   def getSinkParams: Seq[SinkParam] = if (sinkParams != null) sinkParams else Nil
@@ -56,12 +56,12 @@ case class EnvConfig(@JsonProperty("spark") sparkParam: SparkParam,
   * @param initClear        clear checkpoint directory or not when initial (optional)
   */
 @JsonInclude(Include.NON_NULL)
-case class SparkParam( @JsonProperty("log.level") logLevel: String,
-                       @JsonProperty("checkpoint.dir") cpDir: String,
-                       @JsonProperty("batch.interval") batchInterval: String,
-                       @JsonProperty("process.interval") processInterval: String,
-                       @JsonProperty("config") config: Map[String, String],
-                       @JsonProperty("init.clear") initClear: Boolean
+case class SparkParam( @JsonProperty("log.level") private val logLevel: String,
+                       @JsonProperty("checkpoint.dir") private val cpDir: String,
+                       @JsonProperty("batch.interval") private val batchInterval: String,
+                       @JsonProperty("process.interval") private val processInterval: String,
+                       @JsonProperty("config") private val config: Map[String, String],
+                       @JsonProperty("init.clear") private val initClear: Boolean
                      ) extends Param {
   def getLogLevel: String = if (logLevel != null) logLevel else "WARN"
   def getCpDir: String = if (cpDir != null) cpDir else ""
@@ -83,8 +83,8 @@ case class SparkParam( @JsonProperty("log.level") logLevel: String,
   * @param config         config of sink way (must)
   */
 @JsonInclude(Include.NON_NULL)
-case class SinkParam(@JsonProperty("type") sinkType: String,
-                     @JsonProperty("config") config: Map[String, Any]
+case class SinkParam(@JsonProperty("type") private val sinkType: String,
+                     @JsonProperty("config") private val config: Map[String, Any]
                     ) extends Param {
   def getType: SinkType = SinkType(sinkType)
   def getConfig: Map[String, Any] = if (config != null) config else Map[String, Any]()
@@ -100,8 +100,8 @@ case class SinkParam(@JsonProperty("type") sinkType: String,
   * @param config       config of checkpoint location
   */
 @JsonInclude(Include.NON_NULL)
-case class CheckpointParam(@JsonProperty("type") cpType: String,
-                           @JsonProperty("config") config: Map[String, Any]
+case class CheckpointParam(@JsonProperty("type") private val cpType: String,
+                           @JsonProperty("config") private val config: Map[String, Any]
                           ) extends Param {
   def getType: String = cpType
   def getConfig: Map[String, Any] = if (config != null) config else Map[String, Any]()

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/75b426fa/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/GriffinConfig.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/GriffinConfig.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/GriffinConfig.scala
index 4527d4b..98a694f 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/GriffinConfig.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/GriffinConfig.scala
@@ -27,8 +27,8 @@ import com.fasterxml.jackson.annotation.JsonInclude.Include
   * @param dqConfig    dq measurement configuration (must)
   */
 @JsonInclude(Include.NON_NULL)
-case class GriffinConfig(@JsonProperty("env") envConfig: EnvConfig,
-                         @JsonProperty("dq") dqConfig: DQConfig
+case class GriffinConfig(@JsonProperty("env") private val envConfig: EnvConfig,
+                         @JsonProperty("dq") private val dqConfig: DQConfig
                    ) extends Param {
   def getEnvConfig: EnvConfig = envConfig
   def getDqConfig: DQConfig = dqConfig

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/75b426fa/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
index caea078..05d3c75 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.functions._
 
 trait DataConnector extends Loggable with Serializable {
 
-  @transient val sparkSession: SparkSession
+  val sparkSession: SparkSession
 
   val dcParam: DataConnectorParam
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/75b426fa/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
index 104b05c..f4911fc 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
@@ -91,11 +91,12 @@ object DataConnectorFactory extends Loggable {
                                    ): KafkaStreamingDataConnector  = {
     val KeyType = "key.type"
     val ValueType = "value.type"
-    val config = dcParam.config
+    val config = dcParam.getConfig
     val keyType = config.getOrElse(KeyType, "java.lang.String").toString
     val valueType = config.getOrElse(ValueType, "java.lang.String").toString
-    (getClassTag(keyType), getClassTag(valueType)) match {
-      case (ClassTag(k: Class[String]), ClassTag(v: Class[String])) => {
+
+    (keyType, valueType) match {
+      case ("java.lang.String", "java.lang.String") => {
         KafkaStreamingStringDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
       }
       case _ => {
@@ -104,15 +105,6 @@ object DataConnectorFactory extends Loggable {
     }
   }
 
-  private def getClassTag(tp: String): ClassTag[_] = {
-    try {
-      val clazz = Class.forName(tp)
-      ClassTag(clazz)
-    } catch {
-      case e: Throwable => throw e
-    }
-  }
-
 //  def filterDataConnectors[T <: DataConnector : ClassTag](connectors: Seq[DataConnector]): Seq[T] = {
 //    connectors.flatMap { dc =>
 //      dc match {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/75b426fa/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
index 3b19892..b6cca98 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
@@ -49,7 +49,7 @@ trait DQApp extends Loggable with Serializable {
     */
   protected def getMeasureTime: Long = {
     dqParam.getTimestampOpt match {
-      case Some(t) if t > 0 => dqParam.timestamp
+      case Some(t) if t > 0 => t
       case _ => System.currentTimeMillis
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/75b426fa/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
index 8733789..ba1f389 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
@@ -37,8 +37,8 @@ case class BatchDQApp(allParam: GriffinConfig) extends DQApp {
   val envParam: EnvConfig = allParam.getEnvConfig
   val dqParam: DQConfig = allParam.getDqConfig
 
-  val sparkParam = envParam.sparkParam
-  val metricName = dqParam.name
+  val sparkParam = envParam.getSparkParam
+  val metricName = dqParam.getName
 //  val dataSourceParams = dqParam.dataSources
 //  val dataSourceNames = dataSourceParams.map(_.name)
   val sinkParams = getSinkParams
@@ -83,7 +83,7 @@ case class BatchDQApp(allParam: GriffinConfig) extends DQApp {
     dqContext.getSink().start(applicationId)
 
     // build job
-    val dqJob = DQJobBuilder.buildDQJob(dqContext, dqParam.evaluateRule)
+    val dqJob = DQJobBuilder.buildDQJob(dqContext, dqParam.getEvaluateRule)
 
     // dq job execute
     dqJob.execute(dqContext)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/75b426fa/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
index 1768ae2..ceecb78 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
@@ -43,8 +43,8 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
   val envParam: EnvConfig = allParam.getEnvConfig
   val dqParam: DQConfig = allParam.getDqConfig
 
-  val sparkParam = envParam.sparkParam
-  val metricName = dqParam.name
+  val sparkParam = envParam.getSparkParam
+  val metricName = dqParam.getName
 //  val dataSourceParams = dqParam.dataSources
 //  val dataSourceNames = dataSourceParams.map(_.name)
   val sinkParams = getSinkParams
@@ -68,7 +68,7 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
     clearCpDir
 
     // init info cache instance
-    OffsetCheckpointClient.initClient(envParam.checkpointParams, metricName)
+    OffsetCheckpointClient.initClient(envParam.getCheckpointParams, metricName)
     OffsetCheckpointClient.init
 
     // register udf
@@ -107,7 +107,7 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
     globalContext.getSink().start(applicationId)
 
     // process thread
-    val dqCalculator = StreamingDQCalculator(globalContext, dqParam.evaluateRule)
+    val dqCalculator = StreamingDQCalculator(globalContext, dqParam.getEvaluateRule)
 
     val processInterval = TimeUtil.milliseconds(sparkParam.getProcessInterval) match {
       case Some(interval) => interval

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/75b426fa/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
index 176aed6..5a04c11 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
@@ -44,7 +44,7 @@ trait RuleParamStepBuilder extends DQStepBuilder {
     val name = getStepName(ruleParam.getOutDfName())
     // metric writer
     val metricSteps = ruleParam.getOutputOpt(MetricOutputType).map { metric =>
-      MetricWriteStep(metric.getNameOpt.getOrElse(name), name, FlattenType(metric.flatten))
+      MetricWriteStep(metric.getNameOpt.getOrElse(name), name, metric.getFlatten)
     }.toSeq
     // record writer
     val recordSteps = ruleParam.getOutputOpt(RecordOutputType).map { record =>

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/75b426fa/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/TreeNode.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/TreeNode.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/TreeNode.scala
index 32330cd..4ce0fe6 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/TreeNode.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/TreeNode.scala
@@ -18,6 +18,8 @@ under the License.
 */
 package org.apache.griffin.measure.step.builder.dsl.expr
 
+import scala.reflect.ClassTag
+
 trait TreeNode extends Serializable {
 
   var children = Seq[TreeNode]()
@@ -25,21 +27,32 @@ trait TreeNode extends Serializable {
   def addChild(expr: TreeNode) = { children :+= expr }
   def addChildren(exprs: Seq[TreeNode]) = { children ++= exprs }
 
-  def preOrderTraverseDepthFirst[T, A <: TreeNode](z: T)(seqOp: (A, T) => T, combOp: (T, T) => T): T = {
-    if (this.isInstanceOf[A]) {
+  def preOrderTraverseDepthFirst[T, A <: TreeNode](z: T)(seqOp: (A, T) => T, combOp: (T, T) => T)(implicit tag: ClassTag[A]): T = {
+
+    val clazz = tag.runtimeClass
+    if(clazz.isAssignableFrom(this.getClass)){
       val tv = seqOp(this.asInstanceOf[A], z)
       children.foldLeft(combOp(z, tv)) { (ov, tn) =>
         combOp(ov, tn.preOrderTraverseDepthFirst(z)(seqOp, combOp))
       }
-    } else z
+    }
+    else {
+      z
+    }
+
   }
-  def postOrderTraverseDepthFirst[T, A <: TreeNode](z: T)(seqOp: (A, T) => T, combOp: (T, T) => T): T = {
-    if (this.isInstanceOf[A]) {
+  def postOrderTraverseDepthFirst[T, A <: TreeNode](z: T)(seqOp: (A, T) => T, combOp: (T, T) => T)(implicit tag: ClassTag[A]): T = {
+
+    val clazz = tag.runtimeClass
+    if(clazz.isAssignableFrom(this.getClass)){
       val cv = children.foldLeft(z) { (ov, tn) =>
         combOp(ov, tn.postOrderTraverseDepthFirst(z)(seqOp, combOp))
       }
       combOp(z, seqOp(this.asInstanceOf[A], cv))
-    } else z
+    }
+    else{
+      z
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/75b426fa/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
index 7c84d38..b97a1a0 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
@@ -114,7 +114,7 @@ case class AccuracyExpr2DQSteps(context: DQContext,
       val totalCountTransStep = SparkSqlTransformStep(totalCountTableName, totalCountSql, emptyMap)
 
       // 4. accuracy metric
-      val accuracyTableName = ruleParam.outDfName
+      val accuracyTableName = ruleParam.getOutDfName()
       val matchedColName = details.getStringOrKey(_matched)
       val accuracyMetricSql = procType match {
         case BatchProcessType => {
@@ -140,7 +140,7 @@ case class AccuracyExpr2DQSteps(context: DQContext,
       val accuracyMetricWriteSteps = procType match {
         case BatchProcessType => {
           val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
-          val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.outDfName)
+          val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
           val flattenType = metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
           MetricWriteStep(mwName, accuracyTableName, flattenType) :: Nil
         }
@@ -167,7 +167,7 @@ case class AccuracyExpr2DQSteps(context: DQContext,
             accuracyTableName, accuracyMetricRule, accuracyMetricDetails)
           val accuracyMetricWriteStep = {
             val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
-            val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.outDfName)
+            val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
             val flattenType = metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
             MetricWriteStep(mwName, accuracyMetricTableName, flattenType)
           }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/75b426fa/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
index 347fabd..4852a5b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
@@ -112,7 +112,7 @@ case class CompletenessExpr2DQSteps(context: DQContext,
       val totalCountTransStep = SparkSqlTransformStep(totalCountTableName, totalCountSql, emptyMap)
 
       // 5. complete metric
-      val completeTableName = ruleParam.outDfName
+      val completeTableName = ruleParam.getOutDfName()
       val completeColName = details.getStringOrKey(_complete)
       val completeMetricSql = procType match {
         case BatchProcessType => {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/75b426fa/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
index 33d44c5..ecc115c 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
@@ -92,11 +92,11 @@ case class ProfilingExpr2DQSteps(context: DQContext,
       val profilingSql = {
         s"SELECT ${selCondition} ${selClause} ${fromClause} ${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}"
       }
-      val profilingName = ruleParam.outDfName
+      val profilingName = ruleParam.getOutDfName()
       val profilingTransStep = SparkSqlTransformStep(profilingName, profilingSql, details)
       val profilingMetricWriteStep = {
         val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
-        val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.outDfName)
+        val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
         val flattenType = metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
         MetricWriteStep(mwName, profilingName, flattenType)
       }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/75b426fa/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
index 71e9f4b..8a3924b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
@@ -108,7 +108,7 @@ case class TimelinessExpr2DQSteps(context: DQContext,
       val latencyTransStep = SparkSqlTransformStep(latencyTableName, latencySql, emptyMap, true)
 
       // 3. timeliness metric
-      val metricTableName = ruleParam.outDfName
+      val metricTableName = ruleParam.getOutDfName()
       val totalColName = details.getStringOrKey(_total)
       val avgColName = details.getStringOrKey(_avg)
       val metricSql = procType match {
@@ -132,7 +132,7 @@ case class TimelinessExpr2DQSteps(context: DQContext,
       val metricTransStep = SparkSqlTransformStep(metricTableName, metricSql, emptyMap)
       val metricWriteStep = {
         val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
-        val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.outDfName)
+        val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
         val flattenType = metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
         MetricWriteStep(mwName, metricTableName, flattenType)
       }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/75b426fa/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
index bb75cec..dfa2598 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
@@ -32,8 +32,8 @@ class ParamFileReaderSpec extends FlatSpec with Matchers{
     val params = reader.readConfig[DQConfig]
     params match {
       case Success(v) =>
-        v.evaluateRule.getRules(0).dslType should === ("griffin-dsl")
-        v.evaluateRule.getRules(0).outDfName should === ("accu")
+        v.getEvaluateRule.getRules(0).getDslType.desc should === ("griffin-dsl")
+        v.getEvaluateRule.getRules(0).getOutDfName() should === ("accu")
       case Failure(_) =>
         fail("it should not happen")
     }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/75b426fa/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala
index 1e9f3b0..f4b31fa 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala
@@ -36,8 +36,8 @@ class ParamJsonReaderSpec extends FlatSpec with Matchers{
     val params = reader.readConfig[DQConfig]
     params match {
       case Success(v) =>
-        v.evaluateRule.getRules(0).dslType should === ("griffin-dsl")
-        v.evaluateRule.getRules(0).outDfName should === ("accu")
+        v.getEvaluateRule.getRules(0).getDslType.desc should === ("griffin-dsl")
+        v.getEvaluateRule.getRules(0).getOutDfName() should === ("accu")
       case Failure(_) =>
         fail("it should not happen")
     }