You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2018/09/29 10:21:53 UTC

[3/3] incubator-griffin git commit: Fix case clauses

Fix case clauses

Author: William Guo <gu...@apache.org>

Closes #425 from guoyuepeng/fix_case_clauses.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/18fc4cf4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/18fc4cf4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/18fc4cf4

Branch: refs/heads/master
Commit: 18fc4cf4cd706aa3e793fb0790c12ece5e5342a3
Parents: 485c5cf
Author: William Guo <gu...@apache.org>
Authored: Sat Sep 29 18:21:42 2018 +0800
Committer: Lionel Liu <bh...@163.com>
Committed: Sat Sep 29 18:21:42 2018 +0800

----------------------------------------------------------------------
 .../apache/griffin/measure/Application.scala    |  36 ++-
 .../configuration/dqdefinition/DQConfig.scala   |  12 +-
 .../configuration/dqdefinition/EnvConfig.scala  |   5 +-
 .../dqdefinition/reader/ParamFileReader.scala   |   6 +-
 .../dqdefinition/reader/ParamJsonReader.scala   |   5 +-
 .../dqdefinition/reader/ParamReader.scala       |   5 +-
 .../measure/configuration/enums/DqType.scala    |   8 +-
 .../configuration/enums/FlattenType.scala       |   8 +-
 .../configuration/enums/OutputType.scala        |   8 +-
 .../configuration/enums/ProcessType.scala       |   6 +-
 .../measure/configuration/enums/SinkType.scala  |  19 +-
 .../griffin/measure/context/DQContext.scala     |  11 +-
 .../measure/context/DataFrameCache.scala        |  12 +-
 .../griffin/measure/context/TableRegister.scala |   8 +-
 .../checkpoint/lock/CheckpointLockInZK.scala    |   6 +-
 .../offset/OffsetCheckpointClient.scala         |   2 +-
 .../offset/OffsetCheckpointFactory.scala        |   5 +-
 .../offset/OffsetCheckpointInZK.scala           |  29 ++-
 .../streaming/metric/AccuracyMetric.scala       |   8 +-
 .../context/streaming/metric/CacheResults.scala |   6 +-
 .../griffin/measure/datasource/DataSource.scala |  11 +-
 .../measure/datasource/DataSourceFactory.scala  |   8 +-
 .../measure/datasource/TimestampStorage.scala   |  29 ++-
 .../datasource/cache/StreamingCacheClient.scala |  65 ++---
 .../cache/StreamingCacheClientFactory.scala     |  18 +-
 .../cache/StreamingCacheJsonClient.scala        |   3 +-
 .../cache/StreamingCacheOrcClient.scala         |   3 +-
 .../cache/StreamingCacheParquetClient.scala     |  10 +-
 .../cache/StreamingOffsetCacheable.scala        |  18 +-
 .../measure/datasource/cache/WithFanIn.scala    |   8 +-
 .../datasource/connector/DataConnector.scala    |  15 +-
 .../connector/DataConnectorFactory.scala        |  35 ++-
 .../batch/AvroBatchDataConnector.scala          |   6 +-
 .../batch/HiveBatchDataConnector.scala          |   6 +-
 .../batch/TextDirBatchDataConnector.scala       |  18 +-
 .../streaming/KafkaStreamingDataConnector.scala |   6 +-
 .../KafkaStreamingStringDataConnector.scala     |  12 +-
 .../streaming/StreamingDataConnector.scala      |  10 +-
 .../measure/job/builder/DQJobBuilder.scala      |   2 -
 .../apache/griffin/measure/launch/DQApp.scala   |   3 +-
 .../measure/launch/batch/BatchDQApp.scala       |  10 +-
 .../launch/streaming/StreamingDQApp.scala       |  19 +-
 .../griffin/measure/sink/ConsoleSink.scala      |   3 +-
 .../measure/sink/ElasticSearchSink.scala        |  15 +-
 .../apache/griffin/measure/sink/HdfsSink.scala  |  17 +-
 .../apache/griffin/measure/sink/MongoSink.scala |  10 +-
 .../org/apache/griffin/measure/sink/Sink.scala  |   3 +-
 .../griffin/measure/sink/SinkFactory.scala      |   3 +-
 .../griffin/measure/sink/SinkTaskRunner.scala   |  18 +-
 .../measure/step/builder/DQStepBuilder.scala    |  10 +-
 .../step/builder/GriffinDslDQStepBuilder.scala  |   4 +-
 .../step/builder/RuleParamStepBuilder.scala     |   4 +-
 .../builder/dsl/expr/ClauseExpression.scala     |  14 +-
 .../step/builder/dsl/expr/LogicalExpr.scala     |   5 +-
 .../step/builder/dsl/expr/SelectExpr.scala      |   8 +-
 .../step/builder/dsl/expr/TreeNode.scala        |  18 +-
 .../step/builder/dsl/parser/BasicParser.scala   |   6 +-
 .../builder/dsl/parser/GriffinDslParser.scala   |   5 +-
 .../dsl/transform/AccuracyExpr2DQSteps.scala    |  55 +++--
 .../transform/CompletenessExpr2DQSteps.scala    |  40 +--
 .../transform/DistinctnessExpr2DQSteps.scala    |  48 ++--
 .../builder/dsl/transform/Expr2DQSteps.scala    |   2 +-
 .../dsl/transform/ProfilingExpr2DQSteps.scala   |  16 +-
 .../dsl/transform/TimelinessExpr2DQSteps.scala  |  46 ++--
 .../dsl/transform/UniquenessExpr2DQSteps.scala  |  34 ++-
 .../transform/analyzer/AccuracyAnalyzer.scala   |   6 +-
 .../analyzer/CompletenessAnalyzer.scala         |   3 +-
 .../analyzer/DistinctnessAnalyzer.scala         |   4 +-
 .../transform/analyzer/ProfilingAnalyzer.scala  |   3 +-
 .../transform/analyzer/UniquenessAnalyzer.scala |   3 +-
 .../builder/preproc/PreProcParamMaker.scala     |   8 +-
 .../griffin/measure/step/read/ReadStep.scala    |   9 +-
 .../measure/step/read/UnionReadStep.scala       |   3 +-
 .../measure/step/transform/DataFrameOps.scala   |  16 +-
 .../transform/DataFrameOpsTransformStep.scala   |   7 +-
 .../step/transform/SparkSqlTransformStep.scala  |   3 +-
 .../step/write/DataSourceUpdateWriteStep.scala  |  19 +-
 .../measure/step/write/MetricFlushStep.scala    |   3 +-
 .../measure/step/write/MetricWriteStep.scala    |  17 +-
 .../measure/step/write/RecordWriteStep.scala    |  38 ++-
 .../measure/step/write/SparkRowFormatter.scala  |   6 +-
 .../apache/griffin/measure/utils/FSUtil.scala   |  14 +-
 .../apache/griffin/measure/utils/HdfsUtil.scala |  30 +--
 .../apache/griffin/measure/utils/HttpUtil.scala |  24 +-
 .../apache/griffin/measure/utils/JsonUtil.scala |   5 +-
 .../griffin/measure/utils/ParamUtil.scala       |   8 +-
 .../apache/griffin/measure/utils/TimeUtil.scala |  18 +-
 scalastyle-config.xml                           | 246 +++++++++++++++++++
 88 files changed, 887 insertions(+), 510 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/Application.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/Application.scala b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
index e7df806..1bac17b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/Application.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
@@ -18,15 +18,16 @@ under the License.
 */
 package org.apache.griffin.measure
 
-import org.apache.griffin.measure.configuration.enums._
+import scala.reflect.ClassTag
+import scala.util.{Failure, Success, Try}
+
+import org.apache.griffin.measure.configuration.dqdefinition.{DQConfig, EnvConfig, GriffinConfig, Param}
 import org.apache.griffin.measure.configuration.dqdefinition.reader.ParamReaderFactory
-import org.apache.griffin.measure.configuration.dqdefinition.{GriffinConfig, DQConfig, EnvConfig, Param}
+import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.launch.DQApp
 import org.apache.griffin.measure.launch.batch.BatchDQApp
 import org.apache.griffin.measure.launch.streaming.StreamingDQApp
 
-import scala.reflect.ClassTag
-import scala.util.{Failure, Success, Try}
 
 /**
   * application entrance
@@ -49,17 +50,15 @@ object Application extends Loggable {
     // read param files
     val envParam = readParamFile[EnvConfig](envParamFile) match {
       case Success(p) => p
-      case Failure(ex) => {
+      case Failure(ex) =>
         error(ex.getMessage)
         sys.exit(-2)
-      }
     }
     val dqParam = readParamFile[DQConfig](dqParamFile) match {
       case Success(p) => p
-      case Failure(ex) => {
+      case Failure(ex) =>
         error(ex.getMessage)
         sys.exit(-2)
-      }
     }
     val allParam: GriffinConfig = GriffinConfig(envParam, dqParam)
 
@@ -68,32 +67,28 @@ object Application extends Loggable {
     val dqApp: DQApp = procType match {
       case BatchProcessType => BatchDQApp(allParam)
       case StreamingProcessType => StreamingDQApp(allParam)
-      case _ => {
+      case _ =>
         error(s"${procType} is unsupported process type!")
         sys.exit(-4)
-      }
     }
 
     startup
 
     // dq app init
     dqApp.init match {
-      case Success(_) => {
+      case Success(_) =>
         info("process init success")
-      }
-      case Failure(ex) => {
+      case Failure(ex) =>
         error(s"process init error: ${ex.getMessage}")
         shutdown
         sys.exit(-5)
-      }
     }
 
     // dq app run
     dqApp.run match {
-      case Success(_) => {
+      case Success(_) =>
         info("process run success")
-      }
-      case Failure(ex) => {
+      case Failure(ex) =>
         error(s"process run error: ${ex.getMessage}")
 
         if (dqApp.retryable) {
@@ -102,19 +97,16 @@ object Application extends Loggable {
           shutdown
           sys.exit(-5)
         }
-      }
     }
 
     // dq app end
     dqApp.close match {
-      case Success(_) => {
+      case Success(_) =>
         info("process end success")
-      }
-      case Failure(ex) => {
+      case Failure(ex) =>
         error(s"process end error: ${ex.getMessage}")
         shutdown
         sys.exit(-5)
-      }
     }
 
     shutdown

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
index 4943329..b281481 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
@@ -18,9 +18,10 @@ under the License.
 */
 package org.apache.griffin.measure.configuration.dqdefinition
 
-import com.fasterxml.jackson.annotation.JsonInclude.Include
 import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
 import org.apache.commons.lang.StringUtils
+
 import org.apache.griffin.measure.configuration.enums._
 
 /**
@@ -46,7 +47,7 @@ case class DQConfig(@JsonProperty("name") private val name: String,
   def getDataSources: Seq[DataSourceParam] = {
     dataSources.foldLeft((Nil: Seq[DataSourceParam], Set[String]())) { (ret, ds) =>
       val (seq, names) = ret
-      if (!names.contains(ds.getName)){
+      if (!names.contains(ds.getName)) {
         (seq :+ ds, names + ds.getName)
       } else ret
     }._1
@@ -133,8 +134,9 @@ case class EvaluateRuleParam( @JsonProperty("rules") private val rules: List[Rul
   * rule param
   * @param dslType    dsl type of this rule (must)
   * @param dqType     dq type of this rule (must if dsl type is "griffin-dsl")
-  * @param inDfName       name of input dataframe of this rule, by default will be the previous rule output dataframe name
-  * @param outDfName      name of output dataframe of this rule, by default will be generated as data connector dataframe name with index suffix
+  * @param inDfName   name of input dataframe of this rule, by default will be the previous rule output dataframe name
+  * @param outDfName  name of output dataframe of this rule, by default will be generated
+  *                   as data connector dataframe name with index suffix
   * @param rule       rule to define dq step calculation (must)
   * @param details    detail config of rule (optional)
   * @param cache      cache the result for multiple usage (optional, valid for "spark-sql" and "df-ops" mode)
@@ -206,4 +208,4 @@ case class RuleOutputParam( @JsonProperty("type") private val outputType: String
   def getFlatten: FlattenType = if (StringUtils.isNotBlank(flatten)) FlattenType(flatten) else FlattenType("")
 
   def validate(): Unit = {}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
index bf77d13..2e227a3 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
@@ -18,9 +18,10 @@ under the License.
 */
 package org.apache.griffin.measure.configuration.dqdefinition
 
-import com.fasterxml.jackson.annotation.JsonInclude.Include
 import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
 import org.apache.commons.lang.StringUtils
+
 import org.apache.griffin.measure.configuration.enums._
 
 /**
@@ -109,4 +110,4 @@ case class CheckpointParam(@JsonProperty("type") private val cpType: String,
   def validate(): Unit = {
     assert(StringUtils.isNotBlank(cpType), "griffin checkpoint type should not be empty")
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReader.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReader.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReader.scala
index 4a4aeed..2739f74 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReader.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReader.scala
@@ -18,11 +18,13 @@ under the License.
 */
 package org.apache.griffin.measure.configuration.dqdefinition.reader
 
+import scala.reflect.ClassTag
+import scala.util.Try
+
 import org.apache.griffin.measure.configuration.dqdefinition.Param
 import org.apache.griffin.measure.utils.{HdfsUtil, JsonUtil}
 
-import scala.reflect.ClassTag
-import scala.util.Try
+
 
 /**
   * read params from config file path

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReader.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReader.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReader.scala
index 063dc7e..ba51d5f 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReader.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReader.scala
@@ -18,11 +18,12 @@ under the License.
 */
 package org.apache.griffin.measure.configuration.dqdefinition.reader
 
+import scala.reflect.ClassTag
+import scala.util.Try
+
 import org.apache.griffin.measure.configuration.dqdefinition.Param
 import org.apache.griffin.measure.utils.JsonUtil
 
-import scala.reflect.ClassTag
-import scala.util.Try
 
 /**
   * read params from json string directly

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReader.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReader.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReader.scala
index 9a9a46c..5c914c6 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReader.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReader.scala
@@ -18,11 +18,12 @@ under the License.
 */
 package org.apache.griffin.measure.configuration.dqdefinition.reader
 
+import scala.reflect.ClassTag
+import scala.util.Try
+
 import org.apache.griffin.measure.Loggable
 import org.apache.griffin.measure.configuration.dqdefinition.Param
 
-import scala.reflect.ClassTag
-import scala.util.Try
 
 trait ParamReader extends Loggable with Serializable {
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DqType.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DqType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DqType.scala
index cee8d98..bbeb04f 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DqType.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DqType.scala
@@ -31,7 +31,13 @@ sealed trait DqType {
 
 object DqType {
   private val dqTypes: List[DqType] = List(
-    AccuracyType, ProfilingType, UniquenessType, DistinctnessType, TimelinessType, CompletenessType, UnknownType
+    AccuracyType,
+    ProfilingType,
+    UniquenessType,
+    DistinctnessType,
+    TimelinessType,
+    CompletenessType,
+    UnknownType
   )
   def apply(ptn: String): DqType = {
     dqTypes.find(dqType => ptn match {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/FlattenType.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/FlattenType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/FlattenType.scala
index 160ecaf..cb2fba1 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/FlattenType.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/FlattenType.scala
@@ -29,7 +29,13 @@ sealed trait FlattenType {
 }
 
 object FlattenType {
-  private val flattenTypes: List[FlattenType] = List(DefaultFlattenType, EntriesFlattenType, ArrayFlattenType, MapFlattenType)
+  private val flattenTypes: List[FlattenType] = List(
+    DefaultFlattenType,
+    EntriesFlattenType,
+    ArrayFlattenType,
+    MapFlattenType
+  )
+
   val default = DefaultFlattenType
   def apply(ptn: String): FlattenType = {
     flattenTypes.find(tp => ptn match {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/OutputType.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/OutputType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/OutputType.scala
index 5b1d261..8a80044 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/OutputType.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/OutputType.scala
@@ -29,7 +29,13 @@ sealed trait OutputType {
 }
 
 object OutputType {
-  private val outputTypes: List[OutputType] = List(MetricOutputType, RecordOutputType, DscUpdateOutputType, UnknownOutputType)
+  private val outputTypes: List[OutputType] = List(
+    MetricOutputType,
+    RecordOutputType,
+    DscUpdateOutputType,
+    UnknownOutputType
+  )
+
   val default = UnknownOutputType
   def apply(ptn: String): OutputType = {
     outputTypes.find(tp => ptn match {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/ProcessType.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/ProcessType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/ProcessType.scala
index 4f799ed..3cbc749 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/ProcessType.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/ProcessType.scala
@@ -29,7 +29,11 @@ sealed trait ProcessType {
 }
 
 object ProcessType {
-  private val procTypes: List[ProcessType] = List(BatchProcessType, StreamingProcessType)
+  private val procTypes: List[ProcessType] = List(
+    BatchProcessType,
+    StreamingProcessType
+  )
+
   def apply(ptn: String): ProcessType = {
     procTypes.find(tp => ptn match {
       case tp.idPattern() => true

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
index 5471e83..d9e5d2b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
@@ -30,15 +30,22 @@ sealed trait SinkType {
 
 object SinkType {
   private val sinkTypes: List[SinkType] = List(
-    ConsoleSinkType, HdfsSinkType, ElasticsearchSinkType, MongoSinkType, UnknownSinkType
+    ConsoleSinkType,
+    HdfsSinkType,
+    ElasticsearchSinkType,
+    MongoSinkType,
+    UnknownSinkType
   )
+
   def apply(ptn: String): SinkType = {
     sinkTypes.find(tp => ptn match {
       case tp.idPattern() => true
       case _ => false
     }).getOrElse(UnknownSinkType)
   }
+
   def unapply(pt: SinkType): Option[String] = Some(pt.desc)
+
   def validSinkTypes(strs: Seq[String]): Seq[SinkType] = {
     val seq = strs.map(s => SinkType(s)).filter(_ != UnknownSinkType).distinct
     if (seq.size > 0) seq else Seq(ElasticsearchSinkType)
@@ -48,7 +55,7 @@ object SinkType {
 /**
   * console sink, will sink metric in console
   */
- case object ConsoleSinkType extends SinkType {
+case object ConsoleSinkType extends SinkType {
   val idPattern = "^(?i)console|log$".r
   val desc = "console"
 }
@@ -56,7 +63,7 @@ object SinkType {
 /**
   * hdfs sink, will sink metric and record in hdfs
   */
- case object HdfsSinkType extends SinkType {
+case object HdfsSinkType extends SinkType {
   val idPattern = "^(?i)hdfs$".r
   val desc = "hdfs"
 }
@@ -64,7 +71,7 @@ object SinkType {
 /**
   * elasticsearch sink, will sink metric in elasticsearch
   */
- case object ElasticsearchSinkType extends SinkType {
+case object ElasticsearchSinkType extends SinkType {
   val idPattern = "^(?i)es|elasticsearch|http$".r
   val desc = "elasticsearch"
 }
@@ -72,12 +79,12 @@ object SinkType {
 /**
   * mongo sink, will sink metric in mongo db
   */
- case object MongoSinkType extends SinkType {
+case object MongoSinkType extends SinkType {
   val idPattern = "^(?i)mongo|mongodb$".r
   val desc = "distinct"
 }
 
- case object UnknownSinkType extends SinkType {
+case object UnknownSinkType extends SinkType {
   val idPattern = "".r
   val desc = "unknown"
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
index a9f3da0..b0759c5 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
@@ -18,11 +18,12 @@ under the License.
 */
 package org.apache.griffin.measure.context
 
-import org.apache.griffin.measure.configuration.enums._
+import org.apache.spark.sql.{Encoders, SparkSession, SQLContext}
+
 import org.apache.griffin.measure.configuration.dqdefinition._
+import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.datasource._
 import org.apache.griffin.measure.sink.{Sink, SinkFactory}
-import org.apache.spark.sql.{Encoders, SQLContext, SparkSession}
 
 /**
   * dq context: the context of each calculation
@@ -58,6 +59,7 @@ case class DQContext(contextId: ContextId,
     }
   }
   dataSourceNames.foreach(name => compileTableRegister.registerTable(name))
+
   def getDataSourceName(index: Int): String = {
     if (dataSourceNames.size > index) dataSourceNames(index) else ""
   }
@@ -66,20 +68,25 @@ case class DQContext(contextId: ContextId,
   val functionNames: Seq[String] = sparkSession.catalog.listFunctions.map(_.name).collect.toSeq
 
   val dataSourceTimeRanges = loadDataSources()
+
   def loadDataSources(): Map[String, TimeRange] = {
     dataSources.map { ds =>
       (ds.name, ds.loadData(this))
     }.toMap
   }
+
   printTimeRanges
 
   private val sinkFactory = SinkFactory(sinkParams, name)
   private val defaultSink: Sink = createSink(contextId.timestamp)
+
   def getSink(timestamp: Long): Sink = {
     if (timestamp == contextId.timestamp) getSink()
     else createSink(timestamp)
   }
+
   def getSink(): Sink = defaultSink
+
   private def createSink(t: Long): Sink = {
     procType match {
       case BatchProcessType => sinkFactory.getSinks(t, true)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala b/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala
index 4e8f4df..0671565 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala
@@ -18,11 +18,11 @@ under the License.
 */
 package org.apache.griffin.measure.context
 
-import org.apache.griffin.measure.Loggable
+import scala.collection.mutable.{Map => MutableMap, MutableList}
+
 import org.apache.spark.sql.DataFrame
 
-import scala.collection.concurrent.{Map => ConcMap}
-import scala.collection.mutable.{MutableList, Map => MutableMap}
+import org.apache.griffin.measure.Loggable
 
 /**
   * cache and unpersist dataframes
@@ -42,17 +42,15 @@ case class DataFrameCache() extends Loggable {
   def cacheDataFrame(name: String, df: DataFrame): Unit = {
     info(s"try to cache data frame ${name}")
     dataFrames.get(name) match {
-      case Some(odf) => {
+      case Some(odf) =>
         trashDataFrame(odf)
         dataFrames += (name -> df)
         df.cache
         info("cache after replace old df")
-      }
-      case _ => {
+      case _ =>
         dataFrames += (name -> df)
         df.cache
         info("cache after replace no old df")
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala b/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala
index 8d86170..c4dda3b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala
@@ -18,10 +18,12 @@ under the License.
 */
 package org.apache.griffin.measure.context
 
-import org.apache.griffin.measure.Loggable
+import scala.collection.mutable.{Set => MutableSet}
+
 import org.apache.spark.sql._
 
-import scala.collection.mutable.{Set => MutableSet}
+import org.apache.griffin.measure.Loggable
+
 
 /**
   * register table name
@@ -78,4 +80,4 @@ case class RunTimeTableRegister(@transient sqlContext: SQLContext) extends Table
     tables.clear
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLockInZK.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLockInZK.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLockInZK.scala
index b1cbe0f..d78aedf 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLockInZK.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLockInZK.scala
@@ -32,10 +32,9 @@ case class CheckpointLockInZK(@transient mutex: InterProcessMutex) extends Check
         mutex.acquire(-1, null)
       }
     } catch {
-      case e: Throwable => {
+      case e: Throwable =>
         error(s"lock error: ${e.getMessage}")
         false
-      }
     }
 
   }
@@ -44,9 +43,8 @@ case class CheckpointLockInZK(@transient mutex: InterProcessMutex) extends Check
     try {
       if (mutex.isAcquiredInThisProcess) mutex.release
     } catch {
-      case e: Throwable => {
+      case e: Throwable =>
         error(s"unlock error: ${e.getMessage}")
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointClient.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointClient.scala
index 8acfbeb..48337d5 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointClient.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointClient.scala
@@ -24,7 +24,7 @@ import org.apache.griffin.measure.context.streaming.checkpoint.lock.{CheckpointL
 object OffsetCheckpointClient extends OffsetCheckpoint with OffsetOps {
   var offsetCheckpoints: Seq[OffsetCheckpoint] = Nil
 
-  def initClient(checkpointParams: Iterable[CheckpointParam], metricName: String) = {
+  def initClient(checkpointParams: Iterable[CheckpointParam], metricName: String) : Unit = {
     val fac = OffsetCheckpointFactory(checkpointParams, metricName)
     offsetCheckpoints = checkpointParams.flatMap(param => fac.getOffsetCheckpoint(param)).toList
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointFactory.scala
index 5fe8e15..f19542a 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointFactory.scala
@@ -18,9 +18,10 @@ under the License.
 */
 package org.apache.griffin.measure.context.streaming.checkpoint.offset
 
+import scala.util.Try
+
 import org.apache.griffin.measure.configuration.dqdefinition.CheckpointParam
 
-import scala.util.Try
 
 case class OffsetCheckpointFactory(checkpointParams: Iterable[CheckpointParam], metricName: String
                                   ) extends Serializable {
@@ -36,4 +37,4 @@ case class OffsetCheckpointFactory(checkpointParams: Iterable[CheckpointParam],
     offsetCheckpointTry.toOption
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointInZK.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointInZK.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointInZK.scala
index e051779..b575573 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointInZK.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointInZK.scala
@@ -18,22 +18,24 @@ under the License.
 */
 package org.apache.griffin.measure.context.streaming.checkpoint.offset
 
+import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
 import org.apache.curator.framework.imps.CuratorFrameworkState
 import org.apache.curator.framework.recipes.locks.InterProcessMutex
-import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
 import org.apache.curator.retry.ExponentialBackoffRetry
 import org.apache.curator.utils.ZKPaths
-import org.apache.griffin.measure.context.streaming.checkpoint.lock.CheckpointLockInZK
 import org.apache.zookeeper.CreateMode
-
 import scala.collection.JavaConverters._
 
+import org.apache.griffin.measure.context.streaming.checkpoint.lock.CheckpointLockInZK
+
+
 /**
   * leverage zookeeper for info cache
   * @param config
   * @param metricName
   */
-case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String) extends OffsetCheckpoint with OffsetOps {
+case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String)
+  extends OffsetCheckpoint with OffsetOps {
 
   val Hosts = "hosts"
   val Namespace = "namespace"
@@ -67,7 +69,9 @@ case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String) ex
   }
   val lockPath = config.getOrElse(LockPath, "lock").toString
 
-  private val cacheNamespace: String = if (namespace.isEmpty) metricName else namespace + separator + metricName
+  private val cacheNamespace: String =
+    if (namespace.isEmpty) metricName else namespace + separator + metricName
+
   private val builder = CuratorFrameworkFactory.builder()
     .connectString(hosts)
     .retryPolicy(new ExponentialBackoffRetry(1000, 3))
@@ -141,10 +145,9 @@ case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String) ex
     try {
       client.getChildren().forPath(path).asScala.toList
     } catch {
-      case e: Throwable => {
+      case e: Throwable =>
         warn(s"list ${path} warn: ${e.getMessage}")
         Nil
-      }
     }
   }
 
@@ -162,10 +165,9 @@ case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String) ex
         .forPath(path, content.getBytes("utf-8"))
       true
     } catch {
-      case e: Throwable => {
+      case e: Throwable =>
         error(s"create ( ${path} -> ${content} ) error: ${e.getMessage}")
         false
-      }
     }
   }
 
@@ -174,10 +176,9 @@ case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String) ex
       client.setData().forPath(path, content.getBytes("utf-8"))
       true
     } catch {
-      case e: Throwable => {
+      case e: Throwable =>
         error(s"update ( ${path} -> ${content} ) error: ${e.getMessage}")
         false
-      }
     }
   }
 
@@ -185,10 +186,9 @@ case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String) ex
     try {
       Some(new String(client.getData().forPath(path), "utf-8"))
     } catch {
-      case e: Throwable => {
+      case e: Throwable =>
         warn(s"read ${path} warn: ${e.getMessage}")
         None
-      }
     }
   }
 
@@ -204,10 +204,9 @@ case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String) ex
     try {
       client.checkExists().forPath(path) != null
     } catch {
-      case e: Throwable => {
+      case e: Throwable =>
         warn(s"check exists ${path} warn: ${e.getMessage}")
         false
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/AccuracyMetric.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/AccuracyMetric.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/AccuracyMetric.scala
index e69716e..19dfb9e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/AccuracyMetric.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/AccuracyMetric.scala
@@ -45,9 +45,11 @@ case class AccuracyMetric(miss: Long, total: Long) extends Metric {
     (this.miss != other.miss) || (this.total != other.total)
   }
 
-  def getMiss = miss
-  def getTotal = total
-  def getMatch = total - miss
+  def getMiss: Long = miss
+
+  def getTotal: Long = total
+
+  def getMatch: Long = total - miss
 
   def matchPercentage: Double = if (getTotal <= 0) 0 else getMatch.toDouble / getTotal * 100
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/CacheResults.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/CacheResults.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/CacheResults.scala
index cc8e772..6c99618 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/CacheResults.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/CacheResults.scala
@@ -18,9 +18,10 @@ under the License.
 */
 package org.apache.griffin.measure.context.streaming.metric
 
+import scala.collection.mutable.{Map => MutableMap}
+
 import org.apache.griffin.measure.Loggable
 
-import scala.collection.mutable.{Map => MutableMap}
 
 /**
   * in streaming mode, some metrics may update,
@@ -32,10 +33,9 @@ object CacheResults extends Loggable {
     def olderThan(ut: Long): Boolean = updateTime < ut
     def update(ut: Long, r: Metric): Option[Metric] = {
       r match {
-        case m: result.T if (olderThan(ut)) => {
+        case m: result.T if (olderThan(ut)) =>
           val ur = result.update(m)
           if (result.differsFrom(ur)) Some(ur) else None
-        }
         case _ => None
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
index 6bf6373..f2cd0ec 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
@@ -18,13 +18,14 @@ under the License.
 */
 package org.apache.griffin.measure.datasource
 
+import org.apache.spark.sql._
+
 import org.apache.griffin.measure.Loggable
 import org.apache.griffin.measure.configuration.dqdefinition.DataSourceParam
-import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
 import org.apache.griffin.measure.context.{DQContext, TimeRange}
+import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
 import org.apache.griffin.measure.datasource.connector.DataConnector
 import org.apache.griffin.measure.utils.DataFrameUtil._
-import org.apache.spark.sql._
 
 /**
   * data source
@@ -50,12 +51,10 @@ case class DataSource(name: String,
     val timestamp = context.contextId.timestamp
     val (dfOpt, timeRange) = data(timestamp)
     dfOpt match {
-      case Some(df) => {
+      case Some(df) =>
         context.runTimeTableRegister.registerTable(name, df)
-      }
-      case None => {
+      case None =>
         warn(s"load data source [${name}] fails")
-      }
     }
     timeRange
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
index 7807dfd..28e616b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
@@ -18,14 +18,16 @@ under the License.
 */
 package org.apache.griffin.measure.datasource
 
+import scala.util.Success
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.streaming.StreamingContext
+
 import org.apache.griffin.measure.Loggable
 import org.apache.griffin.measure.configuration.dqdefinition.DataSourceParam
 import org.apache.griffin.measure.datasource.cache.StreamingCacheClientFactory
 import org.apache.griffin.measure.datasource.connector.{DataConnector, DataConnectorFactory}
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.streaming.StreamingContext
 
-import scala.util.Success
 
 object DataSourceFactory extends Loggable {
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/TimestampStorage.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/TimestampStorage.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/TimestampStorage.scala
index a305563..04a7f85 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/TimestampStorage.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/TimestampStorage.scala
@@ -18,10 +18,9 @@ under the License.
 */
 package org.apache.griffin.measure.datasource
 
-import org.apache.griffin.measure.Loggable
-
 import scala.collection.mutable.{SortedSet => MutableSortedSet}
 
+import org.apache.griffin.measure.Loggable
 /**
   * tmst cache, CRUD of timestamps
   */
@@ -29,19 +28,19 @@ case class TimestampStorage() extends Loggable {
 
   private val tmstGroup: MutableSortedSet[Long] = MutableSortedSet.empty[Long]
 
-  //-- insert tmst into tmst group --
-  def insert(tmst: Long) = tmstGroup += tmst
-  def insert(tmsts: Iterable[Long]) = tmstGroup ++= tmsts
+  // -- insert tmst into tmst group --
+  def insert(tmst: Long) : MutableSortedSet[Long] = tmstGroup += tmst
+  def insert(tmsts: Iterable[Long]) : MutableSortedSet[Long] = tmstGroup ++= tmsts
 
-  //-- remove tmst from tmst group --
-  def remove(tmst: Long) = tmstGroup -= tmst
-  def remove(tmsts: Iterable[Long]) = tmstGroup --= tmsts
+  // -- remove tmst from tmst group --
+  def remove(tmst: Long) : MutableSortedSet[Long] = tmstGroup -= tmst
+  def remove(tmsts: Iterable[Long]) : MutableSortedSet[Long] = tmstGroup --= tmsts
 
-  //-- get subset of tmst group --
-  def fromUntil(from: Long, until: Long) = tmstGroup.range(from, until).toSet
-  def afterTil(after: Long, til: Long) = tmstGroup.range(after + 1, til + 1).toSet
-  def until(until: Long) = tmstGroup.until(until).toSet
-  def from(from: Long) = tmstGroup.from(from).toSet
-  def all = tmstGroup.toSet
+  // -- get subset of tmst group --
+  def fromUntil(from: Long, until: Long) : Set[Long] = tmstGroup.range(from, until).toSet
+  def afterTil(after: Long, til: Long) : Set[Long] = tmstGroup.range(after + 1, til + 1).toSet
+  def until(until: Long) : Set[Long] = tmstGroup.until(until).toSet
+  def from(from: Long) : Set[Long] = tmstGroup.from(from).toSet
+  def all : Set[Long] = tmstGroup.toSet
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
index b351f82..a03a468 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
@@ -20,17 +20,19 @@ package org.apache.griffin.measure.datasource.cache
 
 import java.util.concurrent.TimeUnit
 
+import scala.util.Random
+
+import org.apache.spark.sql._
+
 import org.apache.griffin.measure.Loggable
 import org.apache.griffin.measure.context.TimeRange
 import org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointClient
 import org.apache.griffin.measure.datasource.TimestampStorage
 import org.apache.griffin.measure.step.builder.ConstantColumns
+import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil}
 import org.apache.griffin.measure.utils.DataFrameUtil._
 import org.apache.griffin.measure.utils.ParamUtil._
-import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil}
-import org.apache.spark.sql._
 
-import scala.util.Random
 
 /**
   * data source cache in streaming mode
@@ -38,7 +40,8 @@ import scala.util.Random
   * read data frame from hdfs in calculate phase
   * with update and clean actions for the cache data
   */
-trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long] with Loggable with Serializable {
+trait StreamingCacheClient
+  extends StreamingOffsetCacheable with WithFanIn[Long] with Loggable with Serializable {
 
   val sqlContext: SQLContext
   val param: Map[String, Any]
@@ -46,7 +49,9 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
   val index: Int
 
   val timestampStorage: TimestampStorage
-  protected def fromUntilRangeTmsts(from: Long, until: Long) = timestampStorage.fromUntil(from, until)
+  protected def fromUntilRangeTmsts(from: Long, until: Long) =
+    timestampStorage.fromUntil(from, until)
+
   protected def clearTmst(t: Long) = timestampStorage.remove(t)
   protected def clearTmstsUntil(until: Long) = {
     val outDateTmsts = timestampStorage.until(until)
@@ -67,17 +72,20 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
 
   val filePath: String = param.getString(_FilePath, defFilePath)
   val cacheInfoPath: String = param.getString(_InfoPath, defInfoPath)
-  val readyTimeInterval: Long = TimeUtil.milliseconds(param.getString(_ReadyTimeInterval, "1m")).getOrElse(60000L)
-  val readyTimeDelay: Long = TimeUtil.milliseconds(param.getString(_ReadyTimeDelay, "1m")).getOrElse(60000L)
+  val readyTimeInterval: Long =
+    TimeUtil.milliseconds(param.getString(_ReadyTimeInterval, "1m")).getOrElse(60000L)
+
+  val readyTimeDelay: Long =
+    TimeUtil.milliseconds(param.getString(_ReadyTimeDelay, "1m")).getOrElse(60000L)
+
   val deltaTimeRange: (Long, Long) = {
     def negative(n: Long): Long = if (n <= 0) n else 0
     param.get(_TimeRange) match {
-      case Some(seq: Seq[String]) => {
+      case Some(seq: Seq[String]) =>
         val nseq = seq.flatMap(TimeUtil.milliseconds(_))
         val ns = negative(nseq.headOption.getOrElse(0))
         val ne = negative(nseq.tail.headOption.getOrElse(0))
         (ns, ne)
-      }
       case _ => (0, 0)
     }
   }
@@ -112,7 +120,7 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
   def saveData(dfOpt: Option[DataFrame], ms: Long): Unit = {
     if (!readOnly) {
       dfOpt match {
-        case Some(df) => {
+        case Some(df) =>
           // cache df
           df.cache
 
@@ -137,10 +145,8 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
 
           // uncache df
           df.unpersist
-        }
-        case _ => {
+        case _ =>
           info("no data frame to save")
-        }
       }
 
       // submit cache time and ready time
@@ -168,7 +174,9 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
       s"`${ConstantColumns.tmst}` = ${reviseTimeRange._1}"
     } else {
       info(s"read time range: (${reviseTimeRange._1}, ${reviseTimeRange._2}]")
-      s"`${ConstantColumns.tmst}` > ${reviseTimeRange._1} AND `${ConstantColumns.tmst}` <= ${reviseTimeRange._2}"
+
+      s"`${ConstantColumns.tmst}` > ${reviseTimeRange._1} " +
+        s"AND `${ConstantColumns.tmst}` <= ${reviseTimeRange._2}"
     }
 
     // new cache data
@@ -176,10 +184,9 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
       val dfr = sqlContext.read
       readDataFrameOpt(dfr, newFilePath).map(_.filter(filterStr))
     } catch {
-      case e: Throwable => {
+      case e: Throwable =>
         warn(s"read data source cache warn: ${e.getMessage}")
         None
-      }
     }
 
     // old cache data
@@ -190,10 +197,9 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
         val dfr = sqlContext.read
         readDataFrameOpt(dfr, oldDfPath).map(_.filter(filterStr))
       } catch {
-        case e: Throwable => {
+        case e: Throwable =>
           warn(s"read old data source cache warn: ${e.getMessage}")
           None
-        }
       }
     }
 
@@ -228,12 +234,11 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
     }
     names.filter { name =>
       name match {
-        case regex(value) => {
+        case regex(value) =>
           str2Long(value) match {
             case Some(t) => func(t, bound)
             case _ => false
           }
-        }
         case _ => false
       }
     }.map(name => s"${path}/${name}")
@@ -258,7 +263,7 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
       // new cache data
       val newCacheCleanTime = if (updatable) readLastProcTime else readCleanTime
       newCacheCleanTime match {
-        case Some(nct) => {
+        case Some(nct) =>
           // clean calculated new cache data
           val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
           if (newCacheLocked) {
@@ -271,16 +276,15 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
               newCacheLock.unlock()
             }
           }
-        }
-        case _ => {
+        case _ =>
           // do nothing
-        }
+          info("should not happen")
       }
 
       // old cache data
       val oldCacheCleanTime = if (updatable) readCleanTime else None
       oldCacheCleanTime match {
-        case Some(oct) => {
+        case Some(oct) =>
           val oldCacheIndexOpt = readOldCacheIndex
           oldCacheIndexOpt.foreach { idx =>
             val oldDfPath = s"${oldFilePath}/${idx}"
@@ -298,10 +302,9 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
               }
             }
           }
-        }
-        case _ => {
+        case _ =>
           // do nothing
-        }
+          info("should not happen")
       }
     }
   }
@@ -313,7 +316,7 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
   def updateData(dfOpt: Option[DataFrame]): Unit = {
     if (!readOnly && updatable) {
       dfOpt match {
-        case Some(df) => {
+        case Some(df) =>
           // old cache lock
           val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
           if (oldCacheLocked) {
@@ -339,10 +342,8 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
               oldCacheLock.unlock()
             }
           }
-        }
-        case _ => {
+        case _ =>
           info("no data frame to update")
-        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala
index f991e2d..eeda8ef 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala
@@ -18,10 +18,11 @@ under the License.
 */
 package org.apache.griffin.measure.datasource.cache
 
+import org.apache.spark.sql.SQLContext
+
 import org.apache.griffin.measure.Loggable
 import org.apache.griffin.measure.datasource.TimestampStorage
 import org.apache.griffin.measure.utils.ParamUtil._
-import org.apache.spark.sql.SQLContext
 
 object StreamingCacheClientFactory extends Loggable {
 
@@ -50,17 +51,20 @@ object StreamingCacheClientFactory extends Loggable {
       try {
         val tp = param.getString(_type, "")
         val dsCache = tp match {
-          case ParquetRegex() => StreamingCacheParquetClient(sqlContext, param, name, index, tmstCache)
-          case JsonRegex() => StreamingCacheJsonClient(sqlContext, param, name, index, tmstCache)
-          case OrcRegex() => StreamingCacheOrcClient(sqlContext, param, name, index, tmstCache)
-          case _ => StreamingCacheParquetClient(sqlContext, param, name, index, tmstCache)
+          case ParquetRegex() =>
+            StreamingCacheParquetClient(sqlContext, param, name, index, tmstCache)
+          case JsonRegex() =>
+            StreamingCacheJsonClient(sqlContext, param, name, index, tmstCache)
+          case OrcRegex() =>
+            StreamingCacheOrcClient(sqlContext, param, name, index, tmstCache)
+          case _ =>
+            StreamingCacheParquetClient(sqlContext, param, name, index, tmstCache)
         }
         Some(dsCache)
       } catch {
-        case e: Throwable => {
+        case e: Throwable =>
           error("generate data source cache fails")
           None
-        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala
index a12ef87..c81d4d1 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala
@@ -18,9 +18,10 @@ under the License.
 */
 package org.apache.griffin.measure.datasource.cache
 
-import org.apache.griffin.measure.datasource.TimestampStorage
 import org.apache.spark.sql._
 
+import org.apache.griffin.measure.datasource.TimestampStorage
+
 /**
   * data source cache in json format
   */

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala
index 63e7104..0649b74 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala
@@ -18,9 +18,10 @@ under the License.
 */
 package org.apache.griffin.measure.datasource.cache
 
-import org.apache.griffin.measure.datasource.TimestampStorage
 import org.apache.spark.sql._
 
+import org.apache.griffin.measure.datasource.TimestampStorage
+
 /**
   * data source cache in orc format
   */

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala
index c275227..9c369ee 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala
@@ -18,14 +18,18 @@ under the License.
 */
 package org.apache.griffin.measure.datasource.cache
 
-import org.apache.griffin.measure.datasource.TimestampStorage
 import org.apache.spark.sql._
 
+import org.apache.griffin.measure.datasource.TimestampStorage
+
 /**
   * data source cache in parquet format
   */
-case class StreamingCacheParquetClient(sqlContext: SQLContext, param: Map[String, Any],
-                                       dsName: String, index: Int, timestampStorage: TimestampStorage
+case class StreamingCacheParquetClient(sqlContext: SQLContext,
+                                       param: Map[String, Any],
+                                       dsName: String,
+                                       index: Int,
+                                       timestampStorage: TimestampStorage
                                  ) extends StreamingCacheClient {
 
   sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala
index 7b7f506..e73a058 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala
@@ -30,13 +30,13 @@ trait StreamingOffsetCacheable extends Loggable with Serializable {
   val readyTimeInterval: Long
   val readyTimeDelay: Long
 
-  def selfCacheInfoPath = s"${OffsetCheckpointClient.infoPath}/${cacheInfoPath}"
+  def selfCacheInfoPath : String = s"${OffsetCheckpointClient.infoPath}/${cacheInfoPath}"
 
-  def selfCacheTime = OffsetCheckpointClient.cacheTime(selfCacheInfoPath)
-  def selfLastProcTime = OffsetCheckpointClient.lastProcTime(selfCacheInfoPath)
-  def selfReadyTime = OffsetCheckpointClient.readyTime(selfCacheInfoPath)
-  def selfCleanTime = OffsetCheckpointClient.cleanTime(selfCacheInfoPath)
-  def selfOldCacheIndex = OffsetCheckpointClient.oldCacheIndex(selfCacheInfoPath)
+  def selfCacheTime : String = OffsetCheckpointClient.cacheTime(selfCacheInfoPath)
+  def selfLastProcTime : String = OffsetCheckpointClient.lastProcTime(selfCacheInfoPath)
+  def selfReadyTime : String = OffsetCheckpointClient.readyTime(selfCacheInfoPath)
+  def selfCleanTime : String = OffsetCheckpointClient.cleanTime(selfCacheInfoPath)
+  def selfOldCacheIndex : String = OffsetCheckpointClient.oldCacheIndex(selfCacheInfoPath)
 
   protected def submitCacheTime(ms: Long): Unit = {
     val map = Map[String, String]((selfCacheTime -> ms.toString))
@@ -80,9 +80,11 @@ trait StreamingOffsetCacheable extends Loggable with Serializable {
       try {
         Some(v.toLong)
       } catch {
-        case _:Throwable => error("try to read not existing value from OffsetCacheClient::readSelfInfo");None
+        case _: Throwable =>
+          error("try to read not existing value from OffsetCacheClient::readSelfInfo")
+          None
       }
     }
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/WithFanIn.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/WithFanIn.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/WithFanIn.scala
index 413b5a2..675f2f2 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/WithFanIn.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/WithFanIn.scala
@@ -20,7 +20,7 @@ package org.apache.griffin.measure.datasource.cache
 
 import java.util.concurrent.atomic.AtomicInteger
 
-import scala.collection.concurrent.{TrieMap, Map => ConcMap}
+import scala.collection.concurrent.{Map => ConcMap, TrieMap}
 
 /**
   * fan in trait, for multiple input and one output
@@ -55,14 +55,12 @@ trait WithFanIn[T] {
 
   private def fanInc(key: T): Unit = {
     fanInCountMap.get(key) match {
-      case Some(n) => {
+      case Some(n) =>
         val suc = fanInCountMap.replace(key, n, n + 1)
         if (!suc) fanInc(key)
-      }
-      case _ => {
+      case _ =>
         val oldOpt = fanInCountMap.putIfAbsent(key, 1)
         if (oldOpt.nonEmpty) fanInc(key)
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
index 05d3c75..ae6a18d 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
@@ -20,16 +20,17 @@ package org.apache.griffin.measure.datasource.connector
 
 import java.util.concurrent.atomic.AtomicLong
 
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.functions._
+
 import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.configuration.enums.BatchProcessType
 import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
+import org.apache.griffin.measure.configuration.enums.BatchProcessType
 import org.apache.griffin.measure.context.{ContextId, DQContext, TimeRange}
 import org.apache.griffin.measure.datasource.TimestampStorage
 import org.apache.griffin.measure.job.builder.DQJobBuilder
 import org.apache.griffin.measure.step.builder.ConstantColumns
 import org.apache.griffin.measure.step.builder.preproc.PreProcParamMaker
-import org.apache.spark.sql.{DataFrame, SparkSession}
-import org.apache.spark.sql.functions._
 
 trait DataConnector extends Loggable with Serializable {
 
@@ -64,7 +65,8 @@ trait DataConnector extends Loggable with Serializable {
       saveTmst(timestamp)    // save timestamp
 
       dfOpt.flatMap { df =>
-        val (preProcRules, thisTable) = PreProcParamMaker.makePreProcRules(dcParam.getPreProcRules, suffix, dcDfName)
+        val (preProcRules, thisTable) =
+          PreProcParamMaker.makePreProcRules(dcParam.getPreProcRules, suffix, dcDfName)
 
         // init data
         context.compileTableRegister.registerTable(thisTable)
@@ -89,10 +91,9 @@ trait DataConnector extends Loggable with Serializable {
       }
 
     } catch {
-      case e: Throwable => {
+      case e: Throwable =>
         error(s"pre-process of data connector [${id}] error: ${e.getMessage}")
         None
-      }
     }
   }
 }
@@ -108,4 +109,4 @@ object DataConnectorIdGenerator {
   private def increment: Long = {
     counter.incrementAndGet()
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
index f4911fc..b51d4fb 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
@@ -18,17 +18,18 @@ under the License.
 */
 package org.apache.griffin.measure.datasource.connector
 
+import scala.util.Try
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.streaming.StreamingContext
+
 import org.apache.griffin.measure.Loggable
 import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
 import org.apache.griffin.measure.datasource.TimestampStorage
 import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
 import org.apache.griffin.measure.datasource.connector.batch._
 import org.apache.griffin.measure.datasource.connector.streaming._
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.streaming.StreamingContext
 
-import scala.reflect.ClassTag
-import scala.util.Try
 
 object DataConnectorFactory extends Loggable {
 
@@ -60,9 +61,8 @@ object DataConnectorFactory extends Loggable {
         case HiveRegex() => HiveBatchDataConnector(sparkSession, dcParam, tmstCache)
         case AvroRegex() => AvroBatchDataConnector(sparkSession, dcParam, tmstCache)
         case TextDirRegex() => TextDirBatchDataConnector(sparkSession, dcParam, tmstCache)
-        case KafkaRegex() => {
+        case KafkaRegex() =>
           getStreamingDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
-        }
         case _ => throw new Exception("connector creation error!")
       }
     }
@@ -78,7 +78,8 @@ object DataConnectorFactory extends Loggable {
     val conType = dcParam.getType
     val version = dcParam.getVersion
     conType match {
-      case KafkaRegex() => getKafkaDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
+      case KafkaRegex() =>
+        getKafkaDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
       case _ => throw new Exception("streaming connector creation error!")
     }
   }
@@ -88,7 +89,7 @@ object DataConnectorFactory extends Loggable {
                                     dcParam: DataConnectorParam,
                                     tmstCache: TimestampStorage,
                                     streamingCacheClientOpt: Option[StreamingCacheClient]
-                                   ): KafkaStreamingDataConnector  = {
+                                   ): KafkaStreamingDataConnector = {
     val KeyType = "key.type"
     val ValueType = "value.type"
     val config = dcParam.getConfig
@@ -96,22 +97,14 @@ object DataConnectorFactory extends Loggable {
     val valueType = config.getOrElse(ValueType, "java.lang.String").toString
 
     (keyType, valueType) match {
-      case ("java.lang.String", "java.lang.String") => {
-        KafkaStreamingStringDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
-      }
-      case _ => {
+      case ("java.lang.String", "java.lang.String") =>
+        KafkaStreamingStringDataConnector(
+          sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
+      case _ =>
         throw new Exception("not supported type kafka data connector")
-      }
     }
   }
 
-//  def filterDataConnectors[T <: DataConnector : ClassTag](connectors: Seq[DataConnector]): Seq[T] = {
-//    connectors.flatMap { dc =>
-//      dc match {
-//        case mdc: T => Some(mdc)
-//        case _ => None
-//      }
-//    }
-//  }
+
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
index 7fa9080..bf71b2c 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
@@ -18,11 +18,12 @@ under the License.
 */
 package org.apache.griffin.measure.datasource.connector.batch
 
+import org.apache.spark.sql.{DataFrame, SparkSession}
+
 import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
 import org.apache.griffin.measure.context.TimeRange
 import org.apache.griffin.measure.datasource.TimestampStorage
 import org.apache.griffin.measure.utils.HdfsUtil
-import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.griffin.measure.utils.ParamUtil._
 
 /**
@@ -58,10 +59,9 @@ case class AvroBatchDataConnector(@transient sparkSession: SparkSession,
       val preDfOpt = preProcess(dfOpt, ms)
       preDfOpt
     } catch {
-      case e: Throwable => {
+      case e: Throwable =>
         error(s"load avro file ${concreteFileFullPath} fails")
         None
-      }
     }
     val tmsts = readTmst(ms)
     (dfOpt, TimeRange(ms, tmsts))

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/HiveBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/HiveBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/HiveBatchDataConnector.scala
index ab1e823..1df3bd7 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/HiveBatchDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/HiveBatchDataConnector.scala
@@ -18,10 +18,11 @@ under the License.
 */
 package org.apache.griffin.measure.datasource.connector.batch
 
+import org.apache.spark.sql.{DataFrame, SparkSession}
+
 import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
 import org.apache.griffin.measure.context.TimeRange
 import org.apache.griffin.measure.datasource.TimestampStorage
-import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.griffin.measure.utils.ParamUtil._
 
 /**
@@ -54,10 +55,9 @@ case class HiveBatchDataConnector(@transient sparkSession: SparkSession,
       val preDfOpt = preProcess(dfOpt, ms)
       preDfOpt
     } catch {
-      case e: Throwable => {
+      case e: Throwable =>
         error(s"load hive table ${concreteTableName} fails: ${e.getMessage}")
         None
-      }
     }
     val tmsts = readTmst(ms)
     (dfOpt, TimeRange(ms, tmsts))

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
index 3dcab16..a7ab02e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
@@ -18,11 +18,12 @@ under the License.
 */
 package org.apache.griffin.measure.datasource.connector.batch
 
+import org.apache.spark.sql.{DataFrame, SparkSession}
+
 import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
 import org.apache.griffin.measure.context.TimeRange
 import org.apache.griffin.measure.datasource.TimestampStorage
 import org.apache.griffin.measure.utils.HdfsUtil
-import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.griffin.measure.utils.ParamUtil._
 
 /**
@@ -60,7 +61,7 @@ case class TextDirBatchDataConnector(@transient sparkSession: SparkSession,
       val validDataDirs = dataDirs.filter(dir => !emptyDir(dir))
 
       if (validDataDirs.nonEmpty) {
-        val df = sparkSession.read.text(validDataDirs:  _*)
+        val df = sparkSession.read.text(validDataDirs: _*)
         val dfOpt = Some(df)
         val preDfOpt = preProcess(dfOpt, ms)
         preDfOpt
@@ -68,16 +69,17 @@ case class TextDirBatchDataConnector(@transient sparkSession: SparkSession,
         None
       }
     } catch {
-      case e: Throwable => {
+      case e: Throwable =>
         error(s"load text dir ${dirPath} fails: ${e.getMessage}")
         None
-      }
     }
     val tmsts = readTmst(ms)
     (dfOpt, TimeRange(ms, tmsts))
   }
 
-  private def listSubDirs(paths: Seq[String], depth: Int, filteFunc: (String) => Boolean): Seq[String] = {
+  private def listSubDirs(paths: Seq[String],
+                          depth: Int,
+                          filteFunc: (String) => Boolean): Seq[String] = {
     val subDirs = paths.flatMap { path => HdfsUtil.listSubPathsByType(path, "dir", true) }
     if (depth <= 0) {
       subDirs.filter(filteFunc)
@@ -90,7 +92,8 @@ case class TextDirBatchDataConnector(@transient sparkSession: SparkSession,
   private def isDone(dir: String): Boolean = HdfsUtil.existFileInDir(dir, doneFile)
   private def isSuccess(dir: String): Boolean = HdfsUtil.existFileInDir(dir, successFile)
 
-  private def touchDone(dir: String): Unit = HdfsUtil.createEmptyFile(HdfsUtil.getHdfsFilePath(dir, doneFile))
+  private def touchDone(dir: String): Unit =
+    HdfsUtil.createEmptyFile(HdfsUtil.getHdfsFilePath(dir, doneFile))
 
   private def emptyDir(dir: String): Boolean = {
     HdfsUtil.listSubPathsByType(dir, "file").filter(!_.startsWith(ignoreFilePrefix)).size == 0
@@ -98,7 +101,8 @@ case class TextDirBatchDataConnector(@transient sparkSession: SparkSession,
 
 //  def metaData(): Try[Iterable[(String, String)]] = {
 //    Try {
-//      val st = sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath).schema
+//      val st = sqlContext.read.format("com.databricks.spark.avro").
+  //       load(concreteFileFullPath).schema
 //      st.fields.map(f => (f.name, f.dataType.typeName))
 //    }
 //  }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala
index 1475898..ec09ffc 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala
@@ -18,10 +18,11 @@ under the License.
 */
 package org.apache.griffin.measure.datasource.connector.streaming
 
+import scala.util.{Failure, Success, Try}
+
 import kafka.serializer.Decoder
 import org.apache.spark.streaming.dstream.InputDStream
 
-import scala.util.{Failure, Success, Try}
 import org.apache.griffin.measure.utils.ParamUtil._
 
 /**
@@ -64,10 +65,9 @@ trait KafkaStreamingDataConnector extends StreamingDataConnector {
         // pre-process
         preProcess(dfOpt, ms)
       } catch {
-        case e: Throwable => {
+        case e: Throwable =>
           error(s"streaming data connector error: ${e.getMessage}")
           None
-        }
       }
 
       // save data frame

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala
index 8477fcf..ee5e497 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala
@@ -19,16 +19,17 @@ under the License.
 package org.apache.griffin.measure.datasource.connector.streaming
 
 import kafka.serializer.StringDecoder
-import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
-import org.apache.griffin.measure.datasource.TimestampStorage
-import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
 import org.apache.spark.sql.{Row, _}
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
 import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.dstream.InputDStream
 import org.apache.spark.streaming.kafka.KafkaUtils
 
+import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
+
 /**
   * streaming data connector for kafka with string format key and value
   */
@@ -60,10 +61,9 @@ case class KafkaStreamingStringDataConnector(@transient sparkSession: SparkSessi
         val df = sparkSession.createDataFrame(rowRdd, schema)
         Some(df)
       } catch {
-        case e: Throwable => {
+        case e: Throwable =>
           error("streaming data transform fails")
           None
-        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnector.scala
index 5c2170c..323b0ac 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnector.scala
@@ -18,14 +18,16 @@ under the License.
 */
 package org.apache.griffin.measure.datasource.connector.streaming
 
-import org.apache.griffin.measure.context.TimeRange
-import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
-import org.apache.griffin.measure.datasource.connector.DataConnector
+import scala.util.Try
+
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
 import org.apache.spark.streaming.dstream.InputDStream
 
-import scala.util.Try
+import org.apache.griffin.measure.context.TimeRange
+import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
+import org.apache.griffin.measure.datasource.connector.DataConnector
+
 
 trait StreamingDataConnector extends DataConnector {
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala
index 249d6d2..0917919 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala
@@ -18,11 +18,9 @@ under the License.
 */
 package org.apache.griffin.measure.job.builder
 
-import org.apache.griffin.measure.configuration.enums.DslType
 import org.apache.griffin.measure.configuration.dqdefinition._
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.job._
-import org.apache.griffin.measure.step.DQStep
 import org.apache.griffin.measure.step.builder.DQStepBuilder
 import org.apache.griffin.measure.step.write.MetricFlushStep
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
index b6cca98..92480d8 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
@@ -18,10 +18,11 @@ under the License.
 */
 package org.apache.griffin.measure.launch
 
+import scala.util.Try
+
 import org.apache.griffin.measure.Loggable
 import org.apache.griffin.measure.configuration.dqdefinition.{DQConfig, EnvConfig, SinkParam}
 
-import scala.util.Try
 
 /**
   * dq application process

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
index ba1f389..e2dbc8d 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
@@ -20,17 +20,19 @@ package org.apache.griffin.measure.launch.batch
 
 import java.util.Date
 
-import org.apache.griffin.measure.configuration.enums._
+import scala.util.Try
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{SparkSession, SQLContext}
+
 import org.apache.griffin.measure.configuration.dqdefinition._
+import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.context._
 import org.apache.griffin.measure.datasource.DataSourceFactory
 import org.apache.griffin.measure.job.builder.DQJobBuilder
 import org.apache.griffin.measure.launch.DQApp
 import org.apache.griffin.measure.step.builder.udf.GriffinUDFAgent
-import org.apache.spark.SparkConf
-import org.apache.spark.sql.{SQLContext, SparkSession}
 
-import scala.util.Try
 
 case class BatchDQApp(allParam: GriffinConfig) extends DQApp {
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
index ceecb78..eb31a5e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
@@ -21,22 +21,24 @@ package org.apache.griffin.measure.launch.streaming
 import java.util.{Date, Timer, TimerTask}
 import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit}
 
+import scala.util.Try
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{SparkSession, SQLContext}
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
+
 import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.configuration.dqdefinition._
+import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.context._
 import org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointClient
-import org.apache.griffin.measure.datasource.DataSourceFactory
 import org.apache.griffin.measure.context.streaming.metric.CacheResults
+import org.apache.griffin.measure.datasource.DataSourceFactory
 import org.apache.griffin.measure.job.builder.DQJobBuilder
 import org.apache.griffin.measure.launch.DQApp
 import org.apache.griffin.measure.step.builder.udf.GriffinUDFAgent
 import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil}
-import org.apache.spark.SparkConf
-import org.apache.spark.sql.{SQLContext, SparkSession}
-import org.apache.spark.streaming.{Milliseconds, StreamingContext}
 
-import scala.util.Try
 
 case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
 
@@ -82,10 +84,9 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
       try {
         createStreamingContext
       } catch {
-        case e: Throwable => {
+        case e: Throwable =>
           error(s"create streaming context error: ${e.getMessage}")
           throw e
-        }
       }
     })
 
@@ -118,7 +119,7 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
 
     ssc.start()
     ssc.awaitTermination()
-    ssc.stop(stopSparkContext=true, stopGracefully=true)
+    ssc.stop(stopSparkContext = true, stopGracefully = true)
 
     // clean context
     globalContext.clean()

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
index 306befe..feebd91 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
@@ -18,9 +18,10 @@ under the License.
 */
 package org.apache.griffin.measure.sink
 
+import org.apache.spark.rdd.RDD
+
 import org.apache.griffin.measure.utils.JsonUtil
 import org.apache.griffin.measure.utils.ParamUtil._
-import org.apache.spark.rdd.RDD
 
 /**
   * sink metric and record to console, for debug