You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2018/09/29 10:21:51 UTC

[1/3] incubator-griffin git commit: Fix case clauses

Repository: incubator-griffin
Updated Branches:
  refs/heads/master 485c5cfc7 -> 18fc4cf4c


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala
index 2bc373c..1fef694 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala
@@ -18,12 +18,13 @@ under the License.
 */
 package org.apache.griffin.measure.step.write
 
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql._
+
 import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.builder.ConstantColumns
 import org.apache.griffin.measure.utils.JsonUtil
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql._
 
 /**
   * write records needs to be sink
@@ -39,18 +40,16 @@ case class RecordWriteStep(name: String,
 
     val writeMode = writeTimestampOpt.map(_ => SimpleMode).getOrElse(context.writeMode)
     writeMode match {
-      case SimpleMode => {
+      case SimpleMode =>
         // batch records
         val recordsOpt = getBatchRecords(context)
         // write records
         recordsOpt match {
-          case Some(records) => {
+          case Some(records) =>
             context.getSink(timestamp).sinkRecords(records, name)
-          }
-          case _ => {}
+          case _ =>
         }
-      }
-      case TimestampMode => {
+      case TimestampMode =>
         // streaming records
         val (recordsOpt, emptyTimestamps) = getStreamingRecords(context)
         // write records
@@ -63,7 +62,6 @@ case class RecordWriteStep(name: String,
         emptyTimestamps.foreach { t =>
           context.getSink(t).sinkRecords(Nil, name)
         }
-      }
     }
     true
   }
@@ -81,28 +79,31 @@ case class RecordWriteStep(name: String,
       val df = context.sqlContext.table(s"`${name}`")
       Some(df)
     } catch {
-      case e: Throwable => {
+      case e: Throwable =>
         error(s"get data frame ${name} fails")
         None
-      }
     }
   }
 
-  private def getRecordDataFrame(context: DQContext): Option[DataFrame] = getDataFrame(context, inputName)
+  private def getRecordDataFrame(context: DQContext): Option[DataFrame]
+    = getDataFrame(context, inputName)
 
-  private def getFilterTableDataFrame(context: DQContext): Option[DataFrame] = filterTableNameOpt.flatMap(getDataFrame(context, _))
+  private def getFilterTableDataFrame(context: DQContext): Option[DataFrame]
+    = filterTableNameOpt.flatMap(getDataFrame(context, _))
 
   private def getBatchRecords(context: DQContext): Option[RDD[String]] = {
     getRecordDataFrame(context).map(_.toJSON.rdd);
   }
 
-  private def getStreamingRecords(context: DQContext): (Option[RDD[(Long, Iterable[String])]], Set[Long]) = {
+  private def getStreamingRecords(context: DQContext)
+    : (Option[RDD[(Long, Iterable[String])]], Set[Long])
+    = {
     implicit val encoder = Encoders.tuple(Encoders.scalaLong, Encoders.STRING)
     val defTimestamp = context.contextId.timestamp
     getRecordDataFrame(context) match {
-      case Some(df) => {
+      case Some(df) =>
         val (filterFuncOpt, emptyTimestamps) = getFilterTableDataFrame(context) match {
-          case Some(filterDf) => {
+          case Some(filterDf) =>
             // timestamps with empty flag
             val tmsts: Array[(Long, Boolean)] = (filterDf.collect.flatMap { row =>
               try {
@@ -120,13 +121,12 @@ case class RecordWriteStep(name: String,
             } else None
 
             (filterFuncOpt, emptyTmsts)
-          }
           case _ => (Some((t: Long) => true), Set[Long]())
         }
 
         // filter timestamps need to record
         filterFuncOpt match {
-          case Some(filterFunc) => {
+          case Some(filterFunc) =>
             val records = df.flatMap { row =>
               val tmst = getTmst(row, defTimestamp)
               if (filterFunc(tmst)) {
@@ -140,10 +140,8 @@ case class RecordWriteStep(name: String,
               } else None
             }
             (Some(records.rdd.groupByKey), emptyTimestamps)
-          }
           case _ => (None, emptyTimestamps)
         }
-      }
       case _ => (None, Set[Long]())
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/write/SparkRowFormatter.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/SparkRowFormatter.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/SparkRowFormatter.scala
index 592c1d4..220c46b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/write/SparkRowFormatter.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/SparkRowFormatter.scala
@@ -18,10 +18,11 @@ under the License.
 */
 package org.apache.griffin.measure.step.write
 
+import scala.collection.mutable.ArrayBuffer
+
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.types.{ArrayType, DataType, StructField, StructType}
 
-import scala.collection.mutable.ArrayBuffer
 
 /**
   * spark row formatter
@@ -46,7 +47,8 @@ object SparkRowFormatter {
       case (sf, a) =>
         sf.dataType match {
           case ArrayType(et, _) =>
-            Map(sf.name -> (if (a == null) a else formatArray(et, a.asInstanceOf[ArrayBuffer[Any]])))
+            Map(sf.name ->
+              (if (a == null) a else formatArray(et, a.asInstanceOf[ArrayBuffer[Any]])))
           case StructType(s) =>
             Map(sf.name -> (if (a == null) a else formatStruct(s, a.asInstanceOf[Row])))
           case _ => Map(sf.name -> a)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/utils/FSUtil.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/FSUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/FSUtil.scala
index 023a138..7ac8b4f 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/FSUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/FSUtil.scala
@@ -21,11 +21,12 @@ package org.apache.griffin.measure.utils
 import java.io.File
 import java.net.URI
 
-import org.apache.griffin.measure.Loggable
+import scala.collection.mutable.{Map => MutableMap}
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.FileSystem
 
-import scala.collection.mutable.{Map => MutableMap}
+import org.apache.griffin.measure.Loggable
 
 object FSUtil extends Loggable {
 
@@ -34,23 +35,20 @@ object FSUtil extends Loggable {
 
   def getFileSystem(path: String): FileSystem = {
     getUriOpt(path) match {
-      case Some(uri) => {
+      case Some(uri) =>
         fsMap.get(uri.getScheme) match {
           case Some(fs) => fs
-          case _ => {
+          case _ =>
             val fs = try {
               FileSystem.get(uri, getConfiguration)
             } catch {
-              case e: Throwable => {
+              case e: Throwable =>
                 error(s"get file system error: ${e.getMessage}")
                 throw e
-              }
             }
             fsMap += (uri.getScheme -> fs)
             fs
-          }
         }
-      }
       case _ => defaultFS
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala
index 89f505a..0cae5bd 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala
@@ -18,9 +18,10 @@ under the License.
 */
 package org.apache.griffin.measure.utils
 
-import org.apache.griffin.measure.Loggable
 import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, Path}
 
+import org.apache.griffin.measure.Loggable
+
 object HdfsUtil extends Loggable {
 
   private val seprator = "/"
@@ -90,24 +91,9 @@ object HdfsUtil extends Loggable {
     }
   }
 
-  //  def listPathFiles(dirPath: String): Iterable[String] = {
-  //    val path = new Path(dirPath)
-  //    try {
-  //      val fileStatusArray = dfs.listStatus(path)
-  //      fileStatusArray.flatMap { fileStatus =>
-  //        if (fileStatus.isFile) {
-  //          Some(fileStatus.getPath.getName)
-  //        } else None
-  //      }
-  //    } catch {
-  //      case e: Throwable => {
-  //        println(s"list path files error: ${e.getMessage}")
-  //        Nil
-  //      }
-  //    }
-  //  }
-
-  def listSubPathsByType(dirPath: String, subType: String, fullPath: Boolean = false): Iterable[String] = {
+
+  def listSubPathsByType(dirPath: String, subType: String, fullPath: Boolean = false)
+    : Iterable[String] = {
     if (existPath(dirPath)) {
       try {
         implicit val path = new Path(dirPath)
@@ -123,15 +109,15 @@ object HdfsUtil extends Loggable {
           if (fullPath) getHdfsFilePath(dirPath, fname) else fname
         }
       } catch {
-        case e: Throwable => {
+        case e: Throwable =>
           warn(s"list path [${dirPath}] warn: ${e.getMessage}")
           Nil
-        }
       }
     } else Nil
   }
 
-  def listSubPathsByTypes(dirPath: String, subTypes: Iterable[String], fullPath: Boolean = false): Iterable[String] = {
+  def listSubPathsByTypes(dirPath: String, subTypes: Iterable[String], fullPath: Boolean = false)
+    : Iterable[String] = {
     subTypes.flatMap { subType =>
       listSubPathsByType(dirPath, subType, fullPath)
     }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala
index e016b60..4949642 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala
@@ -27,22 +27,30 @@ object HttpUtil {
   val PUT_REGEX = """^(?i)put$""".r
   val DELETE_REGEX = """^(?i)delete$""".r
 
-  def postData(url: String, params: Map[String, Object], headers: Map[String, Object], data: String): Boolean = {
-    val response = Http(url).params(convertObjMap2StrMap(params)).headers(convertObjMap2StrMap(headers)).postData(data).asString
+  def postData(url: String,
+               params: Map[String, Object],
+               headers: Map[String, Object],
+               data: String): Boolean = {
+    val response = Http(url).params(convertObjMap2StrMap(params))
+      .headers(convertObjMap2StrMap(headers)).postData(data).asString
+
     response.isSuccess
   }
 
-  def httpRequest(url: String, method: String, params: Map[String, Object], headers: Map[String, Object], data: String): Boolean = {
-    val httpReq = Http(url).params(convertObjMap2StrMap(params)).headers(convertObjMap2StrMap(headers))
+  def httpRequest(url: String,
+                  method: String,
+                  params: Map[String, Object],
+                  headers: Map[String, Object],
+                  data: String): Boolean = {
+    val httpReq = Http(url).params(convertObjMap2StrMap(params))
+        .headers(convertObjMap2StrMap(headers))
     method match {
-      case POST_REGEX() => {
+      case POST_REGEX() =>
         val res = httpReq.postData(data).asString
         res.isSuccess
-      }
-      case PUT_REGEX() => {
+      case PUT_REGEX() =>
         val res = httpReq.put(data).asString
         res.isSuccess
-      }
       case _ => false
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/utils/JsonUtil.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/JsonUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/JsonUtil.scala
index 175bbd8..cbb8734 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/JsonUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/JsonUtil.scala
@@ -20,10 +20,11 @@ package org.apache.griffin.measure.utils
 
 import java.io.InputStream
 
+import scala.reflect._
+
 import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
 import com.fasterxml.jackson.module.scala.DefaultScalaModule
 
-import scala.reflect._
 
 object JsonUtil {
   val mapper = new ObjectMapper()
@@ -31,7 +32,7 @@ object JsonUtil {
   mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
 
   def toJson(value: Map[Symbol, Any]): String = {
-    toJson(value map { case (k,v) => k.name -> v})
+    toJson(value map { case (k, v) => k.name -> v})
   }
 
   def toJson(value: Any): String = {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala
index fccbfb5..c4420ef 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala
@@ -177,8 +177,10 @@ object ParamUtil {
       }
     }
 
-    case class StringAnyMap(values:Map[String,Any])
-    def getParamMap(key: String, defValue: Map[String, Any] = Map[String, Any]()): Map[String, Any] = {
+    case class StringAnyMap(values: Map[String, Any])
+
+    def getParamMap(key: String, defValue: Map[String, Any]
+      = Map[String, Any]()): Map[String, Any] = {
       params.get(key) match {
         case Some(v: StringAnyMap) => v.values
         case _ => defValue
@@ -193,7 +195,7 @@ object ParamUtil {
     }
 
     def getArr[T](key: String): Seq[T] = {
-      case class TSeqs(values:Seq[T])
+      case class TSeqs(values: Seq[T])
       params.get(key) match {
         case Some(seq: TSeqs) => seq.values
         case _ => Nil

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala
index e96cbb1..9707c65 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala
@@ -18,18 +18,18 @@ under the License.
 */
 package org.apache.griffin.measure.utils
 
-import org.apache.griffin.measure.Loggable
-
-import scala.util.matching.Regex
 import scala.util.{Failure, Success, Try}
+import scala.util.matching.Regex
+
+import org.apache.griffin.measure.Loggable
 
 object TimeUtil extends Loggable {
 
   private object Units {
     case class TimeUnit(name: String, shortName: String, ut: Long, regex: Regex) {
-      def toMs(t: Long) = t * ut
-      def fromMs(ms: Long) = ms / ut
-      def fitUnit(ms: Long) = (ms % ut == 0)
+      def toMs(t: Long) : Long = t * ut
+      def fromMs(ms: Long) : Long = ms / ut
+      def fitUnit(ms: Long) : Boolean = (ms % ut == 0)
     }
 
     val dayUnit = TimeUnit("day", "d", 24 * 60 * 60 * 1000, """^(?i)d(?:ay)?$""".r)
@@ -50,7 +50,7 @@ object TimeUtil extends Loggable {
     val value: Option[Long] = {
       Try {
         timeString match {
-          case TimeRegex(time, unit) => {
+          case TimeRegex(time, unit) =>
             val t = time.toLong
             unit match {
               case dayUnit.regex() => dayUnit.toMs(t)
@@ -60,11 +60,9 @@ object TimeUtil extends Loggable {
               case msUnit.regex() => msUnit.toMs(t)
               case _ => throw new Exception(s"${timeString} is invalid time format")
             }
-          }
-          case PureTimeRegex(time) => {
+          case PureTimeRegex(time) =>
             val t = time.toLong
             msUnit.toMs(t)
-          }
           case _ => throw new Exception(s"${timeString} is invalid time format")
         }
       } match {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/scalastyle-config.xml
----------------------------------------------------------------------
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
new file mode 100644
index 0000000..05877d8
--- /dev/null
+++ b/scalastyle-config.xml
@@ -0,0 +1,246 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<!--
+
+If you wish to turn off checking for a section of code, you can put a comment in the source
+before and after the section, with the following syntax:
+
+  // scalastyle:off
+  ...  // stuff that breaks the styles
+  // scalastyle:on
+
+You can also disable only one rule, by specifying its rule id, as specified in:
+  http://www.scalastyle.org/rules-0.7.0.html
+
+  // scalastyle:off no.finalize
+  override def finalize(): Unit = ...
+  // scalastyle:on no.finalize
+
+This file is divided into 3 sections:
+ (1) rules that we enforce.
+ (2) rules that we would like to enforce, but haven't cleaned up the codebase to turn on yet
+     (or we need to make the scalastyle rule more configurable).
+ (3) rules that we don't want to enforce.
+-->
+
+<!--
+Reference: Spark scalastyle-config.xml (https://github.com/apache/spark/blob/master/scalastyle-config.xml)
+-->
+<scalastyle>
+  <name>Scalastyle standard configuration</name>
+
+  <!-- ================================================================================ -->
+  <!--                               rules we enforce                                   -->
+  <!-- ================================================================================ -->
+
+  <check level="error" class="org.scalastyle.file.FileTabChecker" enabled="true"></check>
+
+  <check level="error" class="org.scalastyle.file.HeaderMatchesChecker" enabled="true">
+    <parameters>
+       <parameter name="header"><![CDATA[/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */]]></parameter>
+    </parameters>
+  </check>
+
+  <check level="error" class="org.scalastyle.scalariform.SpacesAfterPlusChecker" enabled="true"></check>
+
+  <check level="error" class="org.scalastyle.scalariform.SpacesBeforePlusChecker" enabled="true"></check>
+
+  <check level="error" class="org.scalastyle.file.WhitespaceEndOfLineChecker" enabled="true"></check>
+
+  <check level="error" class="org.scalastyle.file.FileLineLengthChecker" enabled="true">
+    <parameters>
+      <parameter name="maxLineLength"><![CDATA[120]]></parameter>
+      <parameter name="tabSize"><![CDATA[2]]></parameter>
+      <parameter name="ignoreImports">true</parameter>
+    </parameters>
+  </check>
+
+  <check level="error" class="org.scalastyle.scalariform.ClassNamesChecker" enabled="true">
+    <parameters><parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter></parameters>
+  </check>
+
+  <check level="error" class="org.scalastyle.scalariform.ObjectNamesChecker" enabled="true">
+    <parameters><parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter></parameters>
+  </check>
+
+  <check level="error" class="org.scalastyle.scalariform.PackageObjectNamesChecker" enabled="true">
+    <parameters><parameter name="regex"><![CDATA[^[a-z][A-Za-z]*$]]></parameter></parameters>
+  </check>
+
+  <check level="error" class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true">
+    <parameters><parameter name="maxParameters"><![CDATA[10]]></parameter></parameters>
+  </check>
+
+  <check level="error" class="org.scalastyle.scalariform.NoFinalizeChecker" enabled="true"></check>
+
+  <check level="error" class="org.scalastyle.scalariform.CovariantEqualsChecker" enabled="true"></check>
+
+  <check level="error" class="org.scalastyle.scalariform.StructuralTypeChecker" enabled="true"></check>
+
+  <check level="error" class="org.scalastyle.scalariform.UppercaseLChecker" enabled="true"></check>
+
+  <check level="error" class="org.scalastyle.scalariform.IfBraceChecker" enabled="true">
+    <parameters>
+      <parameter name="singleLineAllowed"><![CDATA[true]]></parameter>
+      <parameter name="doubleLineAllowed"><![CDATA[true]]></parameter>
+    </parameters>
+  </check>
+
+  <check level="error" class="org.scalastyle.scalariform.PublicMethodsHaveTypeChecker" enabled="true"></check>
+
+  <check level="error" class="org.scalastyle.file.NewLineAtEofChecker" enabled="true"></check>
+
+  <check customId="nonascii" level="error" class="org.scalastyle.scalariform.NonASCIICharacterChecker" enabled="true"></check>
+
+  <check level="error" class="org.scalastyle.scalariform.SpaceAfterCommentStartChecker" enabled="true"></check>
+
+  <check level="error" class="org.scalastyle.scalariform.EnsureSingleSpaceBeforeTokenChecker" enabled="true">
+   <parameters>
+     <parameter name="tokens">ARROW, EQUALS, ELSE, TRY, CATCH, FINALLY, LARROW, RARROW</parameter>
+   </parameters>
+  </check>
+
+  <check level="error" class="org.scalastyle.scalariform.EnsureSingleSpaceAfterTokenChecker" enabled="true">
+    <parameters>
+     <parameter name="tokens">ARROW, EQUALS, COMMA, COLON, IF, ELSE, DO, WHILE, FOR, MATCH, TRY, CATCH, FINALLY, LARROW, RARROW</parameter>
+    </parameters>
+  </check>
+
+  <!-- ??? usually shouldn't be checked into the code base. -->
+  <check level="error" class="org.scalastyle.scalariform.NotImplementedErrorUsage" enabled="true"></check>
+
+  <check level="error" class="org.scalastyle.scalariform.ImportOrderChecker" enabled="true">
+    <parameters>
+      <parameter name="groups">java,scala,3rdParty,griffin</parameter>
+      <parameter name="group.java">javax?\..*</parameter>
+      <parameter name="group.scala">scala\..*</parameter>
+      <parameter name="group.3rdParty">(?!org\.apache\.griffin\.).*</parameter>
+      <parameter name="group.griffin">org\.apache\.griffin\..*</parameter>
+    </parameters>
+  </check>
+
+  <check level="error" class="org.scalastyle.scalariform.DisallowSpaceBeforeTokenChecker" enabled="true">
+    <parameters>
+      <parameter name="tokens">COMMA</parameter>
+    </parameters>
+  </check>
+
+  <!-- Single Space between ')' and '{' -->
+  <check customId="SingleSpaceBetweenRParenAndLCurlyBrace" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
+    <parameters><parameter name="regex">\)\{</parameter></parameters>
+    <customMessage><![CDATA[
+      Single Space between ')' and `{`.
+    ]]></customMessage>
+  </check>
+
+  <check customId="OmitBracesInCase" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
+    <parameters><parameter name="regex">case[^\n>]*=>\s*\{</parameter></parameters>
+    <customMessage>Omit braces in case clauses.</customMessage>
+  </check>
+
+  <check level="error" class="org.scalastyle.scalariform.DeprecatedJavaChecker" enabled="true"></check>
+
+  <!-- ================================================================================ -->
+  <!--       rules we'd like to enforce, but haven't cleaned up the codebase yet        -->
+  <!-- ================================================================================ -->
+
+  <!-- We cannot turn the following two on, because it'd fail a lot of string interpolation use cases. -->
+  <!-- Ideally the following two rules should be configurable to rule out string interpolation. -->
+  <!--<check level="error" class="org.scalastyle.scalariform.NoWhitespaceBeforeLeftBracketChecker" enabled="false"></check>-->
+  <!--<check level="error" class="org.scalastyle.scalariform.NoWhitespaceAfterLeftBracketChecker" enabled="false"></check>-->
+
+  <!-- This breaks symbolic method names so we don't turn it on. -->
+  <!-- Maybe we should update it to allow basic symbolic names, and then we are good to go. -->
+  <!--<check level="error" class="org.scalastyle.scalariform.MethodNamesChecker" enabled="false">-->
+    <!--<parameters>-->
+    <!--<parameter name="regex"><![CDATA[^[a-z][A-Za-z0-9]*$]]></parameter>-->
+    <!--</parameters>-->
+  <!--</check>-->
+
+  <!-- Should turn this on, but we have a few places that need to be fixed first -->
+  <!--<check level="error" class="org.scalastyle.scalariform.EqualsHashCodeChecker" enabled="true"></check>-->
+
+  <!-- ================================================================================ -->
+  <!--                               rules we don't want                                -->
+  <!-- ================================================================================ -->
+
+  <!--<check level="error" class="org.scalastyle.scalariform.IllegalImportsChecker" enabled="false">-->
+    <!--<parameters><parameter name="illegalImports"><![CDATA[sun._,java.awt._]]></parameter></parameters>-->
+  <!--</check>-->
+
+  <!-- We want the opposite of this: NewLineAtEofChecker -->
+  <check level="error" class="org.scalastyle.file.NoNewLineAtEofChecker" enabled="false"></check>
+
+  <!-- This one complains about all kinds of random things. Disable. -->
+  <check level="error" class="org.scalastyle.scalariform.SimplifyBooleanExpressionChecker" enabled="false"></check>
+
+  <!-- We use return quite a bit for control flows and guards -->
+  <check level="error" class="org.scalastyle.scalariform.ReturnChecker" enabled="false"></check>
+
+  <!-- We use null a lot in low level code and to interface with 3rd party code -->
+  <check level="error" class="org.scalastyle.scalariform.NullChecker" enabled="false"></check>
+
+  <!-- Doesn't seem super big deal here ... -->
+  <check level="error" class="org.scalastyle.scalariform.NoCloneChecker" enabled="false"></check>
+
+  <!-- Doesn't seem super big deal here ... -->
+  <check level="error" class="org.scalastyle.file.FileLengthChecker" enabled="false">
+    <parameters><parameter name="maxFileLength">800></parameter></parameters>
+  </check>
+
+  <!-- Doesn't seem super big deal here ... -->
+  <check level="error" class="org.scalastyle.scalariform.NumberOfTypesChecker" enabled="false">
+    <parameters><parameter name="maxTypes">30</parameter></parameters>
+  </check>
+
+  <!-- Doesn't seem super big deal here ... -->
+  <check level="error" class="org.scalastyle.scalariform.CyclomaticComplexityChecker" enabled="false">
+    <parameters><parameter name="maximum">10</parameter></parameters>
+  </check>
+
+  <!-- Doesn't seem super big deal here ... -->
+  <check level="error" class="org.scalastyle.scalariform.MethodLengthChecker" enabled="false">
+    <parameters><parameter name="maxLength">50</parameter></parameters>
+  </check>
+
+  <!-- Not exactly feasible to enforce this right now. -->
+  <!-- It is also infrequent that somebody introduces a new class with a lot of methods. -->
+  <check level="error" class="org.scalastyle.scalariform.NumberOfMethodsInTypeChecker" enabled="false">
+    <parameters><parameter name="maxMethods"><![CDATA[30]]></parameter></parameters>
+  </check>
+
+  <!-- Doesn't seem super big deal here, and we have a lot of magic numbers ... -->
+  <check level="error" class="org.scalastyle.scalariform.MagicNumberChecker" enabled="false">
+    <parameters><parameter name="ignore">-1,0,1,2,3</parameter></parameters>
+  </check>
+
+</scalastyle>


[3/3] incubator-griffin git commit: Fix case clauses

Posted by gu...@apache.org.
Fix case clauses

Author: William Guo <gu...@apache.org>

Closes #425 from guoyuepeng/fix_case_clauses.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/18fc4cf4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/18fc4cf4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/18fc4cf4

Branch: refs/heads/master
Commit: 18fc4cf4cd706aa3e793fb0790c12ece5e5342a3
Parents: 485c5cf
Author: William Guo <gu...@apache.org>
Authored: Sat Sep 29 18:21:42 2018 +0800
Committer: Lionel Liu <bh...@163.com>
Committed: Sat Sep 29 18:21:42 2018 +0800

----------------------------------------------------------------------
 .../apache/griffin/measure/Application.scala    |  36 ++-
 .../configuration/dqdefinition/DQConfig.scala   |  12 +-
 .../configuration/dqdefinition/EnvConfig.scala  |   5 +-
 .../dqdefinition/reader/ParamFileReader.scala   |   6 +-
 .../dqdefinition/reader/ParamJsonReader.scala   |   5 +-
 .../dqdefinition/reader/ParamReader.scala       |   5 +-
 .../measure/configuration/enums/DqType.scala    |   8 +-
 .../configuration/enums/FlattenType.scala       |   8 +-
 .../configuration/enums/OutputType.scala        |   8 +-
 .../configuration/enums/ProcessType.scala       |   6 +-
 .../measure/configuration/enums/SinkType.scala  |  19 +-
 .../griffin/measure/context/DQContext.scala     |  11 +-
 .../measure/context/DataFrameCache.scala        |  12 +-
 .../griffin/measure/context/TableRegister.scala |   8 +-
 .../checkpoint/lock/CheckpointLockInZK.scala    |   6 +-
 .../offset/OffsetCheckpointClient.scala         |   2 +-
 .../offset/OffsetCheckpointFactory.scala        |   5 +-
 .../offset/OffsetCheckpointInZK.scala           |  29 ++-
 .../streaming/metric/AccuracyMetric.scala       |   8 +-
 .../context/streaming/metric/CacheResults.scala |   6 +-
 .../griffin/measure/datasource/DataSource.scala |  11 +-
 .../measure/datasource/DataSourceFactory.scala  |   8 +-
 .../measure/datasource/TimestampStorage.scala   |  29 ++-
 .../datasource/cache/StreamingCacheClient.scala |  65 ++---
 .../cache/StreamingCacheClientFactory.scala     |  18 +-
 .../cache/StreamingCacheJsonClient.scala        |   3 +-
 .../cache/StreamingCacheOrcClient.scala         |   3 +-
 .../cache/StreamingCacheParquetClient.scala     |  10 +-
 .../cache/StreamingOffsetCacheable.scala        |  18 +-
 .../measure/datasource/cache/WithFanIn.scala    |   8 +-
 .../datasource/connector/DataConnector.scala    |  15 +-
 .../connector/DataConnectorFactory.scala        |  35 ++-
 .../batch/AvroBatchDataConnector.scala          |   6 +-
 .../batch/HiveBatchDataConnector.scala          |   6 +-
 .../batch/TextDirBatchDataConnector.scala       |  18 +-
 .../streaming/KafkaStreamingDataConnector.scala |   6 +-
 .../KafkaStreamingStringDataConnector.scala     |  12 +-
 .../streaming/StreamingDataConnector.scala      |  10 +-
 .../measure/job/builder/DQJobBuilder.scala      |   2 -
 .../apache/griffin/measure/launch/DQApp.scala   |   3 +-
 .../measure/launch/batch/BatchDQApp.scala       |  10 +-
 .../launch/streaming/StreamingDQApp.scala       |  19 +-
 .../griffin/measure/sink/ConsoleSink.scala      |   3 +-
 .../measure/sink/ElasticSearchSink.scala        |  15 +-
 .../apache/griffin/measure/sink/HdfsSink.scala  |  17 +-
 .../apache/griffin/measure/sink/MongoSink.scala |  10 +-
 .../org/apache/griffin/measure/sink/Sink.scala  |   3 +-
 .../griffin/measure/sink/SinkFactory.scala      |   3 +-
 .../griffin/measure/sink/SinkTaskRunner.scala   |  18 +-
 .../measure/step/builder/DQStepBuilder.scala    |  10 +-
 .../step/builder/GriffinDslDQStepBuilder.scala  |   4 +-
 .../step/builder/RuleParamStepBuilder.scala     |   4 +-
 .../builder/dsl/expr/ClauseExpression.scala     |  14 +-
 .../step/builder/dsl/expr/LogicalExpr.scala     |   5 +-
 .../step/builder/dsl/expr/SelectExpr.scala      |   8 +-
 .../step/builder/dsl/expr/TreeNode.scala        |  18 +-
 .../step/builder/dsl/parser/BasicParser.scala   |   6 +-
 .../builder/dsl/parser/GriffinDslParser.scala   |   5 +-
 .../dsl/transform/AccuracyExpr2DQSteps.scala    |  55 +++--
 .../transform/CompletenessExpr2DQSteps.scala    |  40 +--
 .../transform/DistinctnessExpr2DQSteps.scala    |  48 ++--
 .../builder/dsl/transform/Expr2DQSteps.scala    |   2 +-
 .../dsl/transform/ProfilingExpr2DQSteps.scala   |  16 +-
 .../dsl/transform/TimelinessExpr2DQSteps.scala  |  46 ++--
 .../dsl/transform/UniquenessExpr2DQSteps.scala  |  34 ++-
 .../transform/analyzer/AccuracyAnalyzer.scala   |   6 +-
 .../analyzer/CompletenessAnalyzer.scala         |   3 +-
 .../analyzer/DistinctnessAnalyzer.scala         |   4 +-
 .../transform/analyzer/ProfilingAnalyzer.scala  |   3 +-
 .../transform/analyzer/UniquenessAnalyzer.scala |   3 +-
 .../builder/preproc/PreProcParamMaker.scala     |   8 +-
 .../griffin/measure/step/read/ReadStep.scala    |   9 +-
 .../measure/step/read/UnionReadStep.scala       |   3 +-
 .../measure/step/transform/DataFrameOps.scala   |  16 +-
 .../transform/DataFrameOpsTransformStep.scala   |   7 +-
 .../step/transform/SparkSqlTransformStep.scala  |   3 +-
 .../step/write/DataSourceUpdateWriteStep.scala  |  19 +-
 .../measure/step/write/MetricFlushStep.scala    |   3 +-
 .../measure/step/write/MetricWriteStep.scala    |  17 +-
 .../measure/step/write/RecordWriteStep.scala    |  38 ++-
 .../measure/step/write/SparkRowFormatter.scala  |   6 +-
 .../apache/griffin/measure/utils/FSUtil.scala   |  14 +-
 .../apache/griffin/measure/utils/HdfsUtil.scala |  30 +--
 .../apache/griffin/measure/utils/HttpUtil.scala |  24 +-
 .../apache/griffin/measure/utils/JsonUtil.scala |   5 +-
 .../griffin/measure/utils/ParamUtil.scala       |   8 +-
 .../apache/griffin/measure/utils/TimeUtil.scala |  18 +-
 scalastyle-config.xml                           | 246 +++++++++++++++++++
 88 files changed, 887 insertions(+), 510 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/Application.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/Application.scala b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
index e7df806..1bac17b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/Application.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
@@ -18,15 +18,16 @@ under the License.
 */
 package org.apache.griffin.measure
 
-import org.apache.griffin.measure.configuration.enums._
+import scala.reflect.ClassTag
+import scala.util.{Failure, Success, Try}
+
+import org.apache.griffin.measure.configuration.dqdefinition.{DQConfig, EnvConfig, GriffinConfig, Param}
 import org.apache.griffin.measure.configuration.dqdefinition.reader.ParamReaderFactory
-import org.apache.griffin.measure.configuration.dqdefinition.{GriffinConfig, DQConfig, EnvConfig, Param}
+import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.launch.DQApp
 import org.apache.griffin.measure.launch.batch.BatchDQApp
 import org.apache.griffin.measure.launch.streaming.StreamingDQApp
 
-import scala.reflect.ClassTag
-import scala.util.{Failure, Success, Try}
 
 /**
   * application entrance
@@ -49,17 +50,15 @@ object Application extends Loggable {
     // read param files
     val envParam = readParamFile[EnvConfig](envParamFile) match {
       case Success(p) => p
-      case Failure(ex) => {
+      case Failure(ex) =>
         error(ex.getMessage)
         sys.exit(-2)
-      }
     }
     val dqParam = readParamFile[DQConfig](dqParamFile) match {
       case Success(p) => p
-      case Failure(ex) => {
+      case Failure(ex) =>
         error(ex.getMessage)
         sys.exit(-2)
-      }
     }
     val allParam: GriffinConfig = GriffinConfig(envParam, dqParam)
 
@@ -68,32 +67,28 @@ object Application extends Loggable {
     val dqApp: DQApp = procType match {
       case BatchProcessType => BatchDQApp(allParam)
       case StreamingProcessType => StreamingDQApp(allParam)
-      case _ => {
+      case _ =>
         error(s"${procType} is unsupported process type!")
         sys.exit(-4)
-      }
     }
 
     startup
 
     // dq app init
     dqApp.init match {
-      case Success(_) => {
+      case Success(_) =>
         info("process init success")
-      }
-      case Failure(ex) => {
+      case Failure(ex) =>
         error(s"process init error: ${ex.getMessage}")
         shutdown
         sys.exit(-5)
-      }
     }
 
     // dq app run
     dqApp.run match {
-      case Success(_) => {
+      case Success(_) =>
         info("process run success")
-      }
-      case Failure(ex) => {
+      case Failure(ex) =>
         error(s"process run error: ${ex.getMessage}")
 
         if (dqApp.retryable) {
@@ -102,19 +97,16 @@ object Application extends Loggable {
           shutdown
           sys.exit(-5)
         }
-      }
     }
 
     // dq app end
     dqApp.close match {
-      case Success(_) => {
+      case Success(_) =>
         info("process end success")
-      }
-      case Failure(ex) => {
+      case Failure(ex) =>
         error(s"process end error: ${ex.getMessage}")
         shutdown
         sys.exit(-5)
-      }
     }
 
     shutdown

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
index 4943329..b281481 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
@@ -18,9 +18,10 @@ under the License.
 */
 package org.apache.griffin.measure.configuration.dqdefinition
 
-import com.fasterxml.jackson.annotation.JsonInclude.Include
 import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
 import org.apache.commons.lang.StringUtils
+
 import org.apache.griffin.measure.configuration.enums._
 
 /**
@@ -46,7 +47,7 @@ case class DQConfig(@JsonProperty("name") private val name: String,
   def getDataSources: Seq[DataSourceParam] = {
     dataSources.foldLeft((Nil: Seq[DataSourceParam], Set[String]())) { (ret, ds) =>
       val (seq, names) = ret
-      if (!names.contains(ds.getName)){
+      if (!names.contains(ds.getName)) {
         (seq :+ ds, names + ds.getName)
       } else ret
     }._1
@@ -133,8 +134,9 @@ case class EvaluateRuleParam( @JsonProperty("rules") private val rules: List[Rul
   * rule param
   * @param dslType    dsl type of this rule (must)
   * @param dqType     dq type of this rule (must if dsl type is "griffin-dsl")
-  * @param inDfName       name of input dataframe of this rule, by default will be the previous rule output dataframe name
-  * @param outDfName      name of output dataframe of this rule, by default will be generated as data connector dataframe name with index suffix
+  * @param inDfName   name of input dataframe of this rule, by default will be the previous rule output dataframe name
+  * @param outDfName  name of output dataframe of this rule, by default will be generated
+  *                   as data connector dataframe name with index suffix
   * @param rule       rule to define dq step calculation (must)
   * @param details    detail config of rule (optional)
   * @param cache      cache the result for multiple usage (optional, valid for "spark-sql" and "df-ops" mode)
@@ -206,4 +208,4 @@ case class RuleOutputParam( @JsonProperty("type") private val outputType: String
   def getFlatten: FlattenType = if (StringUtils.isNotBlank(flatten)) FlattenType(flatten) else FlattenType("")
 
   def validate(): Unit = {}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
index bf77d13..2e227a3 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
@@ -18,9 +18,10 @@ under the License.
 */
 package org.apache.griffin.measure.configuration.dqdefinition
 
-import com.fasterxml.jackson.annotation.JsonInclude.Include
 import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
 import org.apache.commons.lang.StringUtils
+
 import org.apache.griffin.measure.configuration.enums._
 
 /**
@@ -109,4 +110,4 @@ case class CheckpointParam(@JsonProperty("type") private val cpType: String,
   def validate(): Unit = {
     assert(StringUtils.isNotBlank(cpType), "griffin checkpoint type should not be empty")
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReader.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReader.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReader.scala
index 4a4aeed..2739f74 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReader.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReader.scala
@@ -18,11 +18,13 @@ under the License.
 */
 package org.apache.griffin.measure.configuration.dqdefinition.reader
 
+import scala.reflect.ClassTag
+import scala.util.Try
+
 import org.apache.griffin.measure.configuration.dqdefinition.Param
 import org.apache.griffin.measure.utils.{HdfsUtil, JsonUtil}
 
-import scala.reflect.ClassTag
-import scala.util.Try
+
 
 /**
   * read params from config file path

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReader.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReader.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReader.scala
index 063dc7e..ba51d5f 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReader.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReader.scala
@@ -18,11 +18,12 @@ under the License.
 */
 package org.apache.griffin.measure.configuration.dqdefinition.reader
 
+import scala.reflect.ClassTag
+import scala.util.Try
+
 import org.apache.griffin.measure.configuration.dqdefinition.Param
 import org.apache.griffin.measure.utils.JsonUtil
 
-import scala.reflect.ClassTag
-import scala.util.Try
 
 /**
   * read params from json string directly

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReader.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReader.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReader.scala
index 9a9a46c..5c914c6 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReader.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReader.scala
@@ -18,11 +18,12 @@ under the License.
 */
 package org.apache.griffin.measure.configuration.dqdefinition.reader
 
+import scala.reflect.ClassTag
+import scala.util.Try
+
 import org.apache.griffin.measure.Loggable
 import org.apache.griffin.measure.configuration.dqdefinition.Param
 
-import scala.reflect.ClassTag
-import scala.util.Try
 
 trait ParamReader extends Loggable with Serializable {
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DqType.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DqType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DqType.scala
index cee8d98..bbeb04f 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DqType.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DqType.scala
@@ -31,7 +31,13 @@ sealed trait DqType {
 
 object DqType {
   private val dqTypes: List[DqType] = List(
-    AccuracyType, ProfilingType, UniquenessType, DistinctnessType, TimelinessType, CompletenessType, UnknownType
+    AccuracyType,
+    ProfilingType,
+    UniquenessType,
+    DistinctnessType,
+    TimelinessType,
+    CompletenessType,
+    UnknownType
   )
   def apply(ptn: String): DqType = {
     dqTypes.find(dqType => ptn match {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/FlattenType.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/FlattenType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/FlattenType.scala
index 160ecaf..cb2fba1 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/FlattenType.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/FlattenType.scala
@@ -29,7 +29,13 @@ sealed trait FlattenType {
 }
 
 object FlattenType {
-  private val flattenTypes: List[FlattenType] = List(DefaultFlattenType, EntriesFlattenType, ArrayFlattenType, MapFlattenType)
+  private val flattenTypes: List[FlattenType] = List(
+    DefaultFlattenType,
+    EntriesFlattenType,
+    ArrayFlattenType,
+    MapFlattenType
+  )
+
   val default = DefaultFlattenType
   def apply(ptn: String): FlattenType = {
     flattenTypes.find(tp => ptn match {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/OutputType.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/OutputType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/OutputType.scala
index 5b1d261..8a80044 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/OutputType.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/OutputType.scala
@@ -29,7 +29,13 @@ sealed trait OutputType {
 }
 
 object OutputType {
-  private val outputTypes: List[OutputType] = List(MetricOutputType, RecordOutputType, DscUpdateOutputType, UnknownOutputType)
+  private val outputTypes: List[OutputType] = List(
+    MetricOutputType,
+    RecordOutputType,
+    DscUpdateOutputType,
+    UnknownOutputType
+  )
+
   val default = UnknownOutputType
   def apply(ptn: String): OutputType = {
     outputTypes.find(tp => ptn match {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/ProcessType.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/ProcessType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/ProcessType.scala
index 4f799ed..3cbc749 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/ProcessType.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/ProcessType.scala
@@ -29,7 +29,11 @@ sealed trait ProcessType {
 }
 
 object ProcessType {
-  private val procTypes: List[ProcessType] = List(BatchProcessType, StreamingProcessType)
+  private val procTypes: List[ProcessType] = List(
+    BatchProcessType,
+    StreamingProcessType
+  )
+
   def apply(ptn: String): ProcessType = {
     procTypes.find(tp => ptn match {
       case tp.idPattern() => true

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
index 5471e83..d9e5d2b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
@@ -30,15 +30,22 @@ sealed trait SinkType {
 
 object SinkType {
   private val sinkTypes: List[SinkType] = List(
-    ConsoleSinkType, HdfsSinkType, ElasticsearchSinkType, MongoSinkType, UnknownSinkType
+    ConsoleSinkType,
+    HdfsSinkType,
+    ElasticsearchSinkType,
+    MongoSinkType,
+    UnknownSinkType
   )
+
   def apply(ptn: String): SinkType = {
     sinkTypes.find(tp => ptn match {
       case tp.idPattern() => true
       case _ => false
     }).getOrElse(UnknownSinkType)
   }
+
   def unapply(pt: SinkType): Option[String] = Some(pt.desc)
+
   def validSinkTypes(strs: Seq[String]): Seq[SinkType] = {
     val seq = strs.map(s => SinkType(s)).filter(_ != UnknownSinkType).distinct
     if (seq.size > 0) seq else Seq(ElasticsearchSinkType)
@@ -48,7 +55,7 @@ object SinkType {
 /**
   * console sink, will sink metric in console
   */
- case object ConsoleSinkType extends SinkType {
+case object ConsoleSinkType extends SinkType {
   val idPattern = "^(?i)console|log$".r
   val desc = "console"
 }
@@ -56,7 +63,7 @@ object SinkType {
 /**
   * hdfs sink, will sink metric and record in hdfs
   */
- case object HdfsSinkType extends SinkType {
+case object HdfsSinkType extends SinkType {
   val idPattern = "^(?i)hdfs$".r
   val desc = "hdfs"
 }
@@ -64,7 +71,7 @@ object SinkType {
 /**
   * elasticsearch sink, will sink metric in elasticsearch
   */
- case object ElasticsearchSinkType extends SinkType {
+case object ElasticsearchSinkType extends SinkType {
   val idPattern = "^(?i)es|elasticsearch|http$".r
   val desc = "elasticsearch"
 }
@@ -72,12 +79,12 @@ object SinkType {
 /**
   * mongo sink, will sink metric in mongo db
   */
- case object MongoSinkType extends SinkType {
+case object MongoSinkType extends SinkType {
   val idPattern = "^(?i)mongo|mongodb$".r
   val desc = "distinct"
 }
 
- case object UnknownSinkType extends SinkType {
+case object UnknownSinkType extends SinkType {
   val idPattern = "".r
   val desc = "unknown"
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
index a9f3da0..b0759c5 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
@@ -18,11 +18,12 @@ under the License.
 */
 package org.apache.griffin.measure.context
 
-import org.apache.griffin.measure.configuration.enums._
+import org.apache.spark.sql.{Encoders, SparkSession, SQLContext}
+
 import org.apache.griffin.measure.configuration.dqdefinition._
+import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.datasource._
 import org.apache.griffin.measure.sink.{Sink, SinkFactory}
-import org.apache.spark.sql.{Encoders, SQLContext, SparkSession}
 
 /**
   * dq context: the context of each calculation
@@ -58,6 +59,7 @@ case class DQContext(contextId: ContextId,
     }
   }
   dataSourceNames.foreach(name => compileTableRegister.registerTable(name))
+
   def getDataSourceName(index: Int): String = {
     if (dataSourceNames.size > index) dataSourceNames(index) else ""
   }
@@ -66,20 +68,25 @@ case class DQContext(contextId: ContextId,
   val functionNames: Seq[String] = sparkSession.catalog.listFunctions.map(_.name).collect.toSeq
 
   val dataSourceTimeRanges = loadDataSources()
+
   def loadDataSources(): Map[String, TimeRange] = {
     dataSources.map { ds =>
       (ds.name, ds.loadData(this))
     }.toMap
   }
+
   printTimeRanges
 
   private val sinkFactory = SinkFactory(sinkParams, name)
   private val defaultSink: Sink = createSink(contextId.timestamp)
+
   def getSink(timestamp: Long): Sink = {
     if (timestamp == contextId.timestamp) getSink()
     else createSink(timestamp)
   }
+
   def getSink(): Sink = defaultSink
+
   private def createSink(t: Long): Sink = {
     procType match {
       case BatchProcessType => sinkFactory.getSinks(t, true)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala b/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala
index 4e8f4df..0671565 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala
@@ -18,11 +18,11 @@ under the License.
 */
 package org.apache.griffin.measure.context
 
-import org.apache.griffin.measure.Loggable
+import scala.collection.mutable.{Map => MutableMap, MutableList}
+
 import org.apache.spark.sql.DataFrame
 
-import scala.collection.concurrent.{Map => ConcMap}
-import scala.collection.mutable.{MutableList, Map => MutableMap}
+import org.apache.griffin.measure.Loggable
 
 /**
   * cache and unpersist dataframes
@@ -42,17 +42,15 @@ case class DataFrameCache() extends Loggable {
   def cacheDataFrame(name: String, df: DataFrame): Unit = {
     info(s"try to cache data frame ${name}")
     dataFrames.get(name) match {
-      case Some(odf) => {
+      case Some(odf) =>
         trashDataFrame(odf)
         dataFrames += (name -> df)
         df.cache
         info("cache after replace old df")
-      }
-      case _ => {
+      case _ =>
         dataFrames += (name -> df)
         df.cache
         info("cache after replace no old df")
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala b/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala
index 8d86170..c4dda3b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala
@@ -18,10 +18,12 @@ under the License.
 */
 package org.apache.griffin.measure.context
 
-import org.apache.griffin.measure.Loggable
+import scala.collection.mutable.{Set => MutableSet}
+
 import org.apache.spark.sql._
 
-import scala.collection.mutable.{Set => MutableSet}
+import org.apache.griffin.measure.Loggable
+
 
 /**
   * register table name
@@ -78,4 +80,4 @@ case class RunTimeTableRegister(@transient sqlContext: SQLContext) extends Table
     tables.clear
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLockInZK.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLockInZK.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLockInZK.scala
index b1cbe0f..d78aedf 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLockInZK.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLockInZK.scala
@@ -32,10 +32,9 @@ case class CheckpointLockInZK(@transient mutex: InterProcessMutex) extends Check
         mutex.acquire(-1, null)
       }
     } catch {
-      case e: Throwable => {
+      case e: Throwable =>
         error(s"lock error: ${e.getMessage}")
         false
-      }
     }
 
   }
@@ -44,9 +43,8 @@ case class CheckpointLockInZK(@transient mutex: InterProcessMutex) extends Check
     try {
       if (mutex.isAcquiredInThisProcess) mutex.release
     } catch {
-      case e: Throwable => {
+      case e: Throwable =>
         error(s"unlock error: ${e.getMessage}")
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointClient.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointClient.scala
index 8acfbeb..48337d5 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointClient.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointClient.scala
@@ -24,7 +24,7 @@ import org.apache.griffin.measure.context.streaming.checkpoint.lock.{CheckpointL
 object OffsetCheckpointClient extends OffsetCheckpoint with OffsetOps {
   var offsetCheckpoints: Seq[OffsetCheckpoint] = Nil
 
-  def initClient(checkpointParams: Iterable[CheckpointParam], metricName: String) = {
+  def initClient(checkpointParams: Iterable[CheckpointParam], metricName: String) : Unit = {
     val fac = OffsetCheckpointFactory(checkpointParams, metricName)
     offsetCheckpoints = checkpointParams.flatMap(param => fac.getOffsetCheckpoint(param)).toList
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointFactory.scala
index 5fe8e15..f19542a 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointFactory.scala
@@ -18,9 +18,10 @@ under the License.
 */
 package org.apache.griffin.measure.context.streaming.checkpoint.offset
 
+import scala.util.Try
+
 import org.apache.griffin.measure.configuration.dqdefinition.CheckpointParam
 
-import scala.util.Try
 
 case class OffsetCheckpointFactory(checkpointParams: Iterable[CheckpointParam], metricName: String
                                   ) extends Serializable {
@@ -36,4 +37,4 @@ case class OffsetCheckpointFactory(checkpointParams: Iterable[CheckpointParam],
     offsetCheckpointTry.toOption
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointInZK.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointInZK.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointInZK.scala
index e051779..b575573 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointInZK.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointInZK.scala
@@ -18,22 +18,24 @@ under the License.
 */
 package org.apache.griffin.measure.context.streaming.checkpoint.offset
 
+import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
 import org.apache.curator.framework.imps.CuratorFrameworkState
 import org.apache.curator.framework.recipes.locks.InterProcessMutex
-import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
 import org.apache.curator.retry.ExponentialBackoffRetry
 import org.apache.curator.utils.ZKPaths
-import org.apache.griffin.measure.context.streaming.checkpoint.lock.CheckpointLockInZK
 import org.apache.zookeeper.CreateMode
-
 import scala.collection.JavaConverters._
 
+import org.apache.griffin.measure.context.streaming.checkpoint.lock.CheckpointLockInZK
+
+
 /**
   * leverage zookeeper for info cache
   * @param config
   * @param metricName
   */
-case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String) extends OffsetCheckpoint with OffsetOps {
+case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String)
+  extends OffsetCheckpoint with OffsetOps {
 
   val Hosts = "hosts"
   val Namespace = "namespace"
@@ -67,7 +69,9 @@ case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String) ex
   }
   val lockPath = config.getOrElse(LockPath, "lock").toString
 
-  private val cacheNamespace: String = if (namespace.isEmpty) metricName else namespace + separator + metricName
+  private val cacheNamespace: String =
+    if (namespace.isEmpty) metricName else namespace + separator + metricName
+
   private val builder = CuratorFrameworkFactory.builder()
     .connectString(hosts)
     .retryPolicy(new ExponentialBackoffRetry(1000, 3))
@@ -141,10 +145,9 @@ case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String) ex
     try {
       client.getChildren().forPath(path).asScala.toList
     } catch {
-      case e: Throwable => {
+      case e: Throwable =>
         warn(s"list ${path} warn: ${e.getMessage}")
         Nil
-      }
     }
   }
 
@@ -162,10 +165,9 @@ case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String) ex
         .forPath(path, content.getBytes("utf-8"))
       true
     } catch {
-      case e: Throwable => {
+      case e: Throwable =>
         error(s"create ( ${path} -> ${content} ) error: ${e.getMessage}")
         false
-      }
     }
   }
 
@@ -174,10 +176,9 @@ case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String) ex
       client.setData().forPath(path, content.getBytes("utf-8"))
       true
     } catch {
-      case e: Throwable => {
+      case e: Throwable =>
         error(s"update ( ${path} -> ${content} ) error: ${e.getMessage}")
         false
-      }
     }
   }
 
@@ -185,10 +186,9 @@ case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String) ex
     try {
       Some(new String(client.getData().forPath(path), "utf-8"))
     } catch {
-      case e: Throwable => {
+      case e: Throwable =>
         warn(s"read ${path} warn: ${e.getMessage}")
         None
-      }
     }
   }
 
@@ -204,10 +204,9 @@ case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String) ex
     try {
       client.checkExists().forPath(path) != null
     } catch {
-      case e: Throwable => {
+      case e: Throwable =>
         warn(s"check exists ${path} warn: ${e.getMessage}")
         false
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/AccuracyMetric.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/AccuracyMetric.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/AccuracyMetric.scala
index e69716e..19dfb9e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/AccuracyMetric.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/AccuracyMetric.scala
@@ -45,9 +45,11 @@ case class AccuracyMetric(miss: Long, total: Long) extends Metric {
     (this.miss != other.miss) || (this.total != other.total)
   }
 
-  def getMiss = miss
-  def getTotal = total
-  def getMatch = total - miss
+  def getMiss: Long = miss
+
+  def getTotal: Long = total
+
+  def getMatch: Long = total - miss
 
   def matchPercentage: Double = if (getTotal <= 0) 0 else getMatch.toDouble / getTotal * 100
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/CacheResults.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/CacheResults.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/CacheResults.scala
index cc8e772..6c99618 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/CacheResults.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/CacheResults.scala
@@ -18,9 +18,10 @@ under the License.
 */
 package org.apache.griffin.measure.context.streaming.metric
 
+import scala.collection.mutable.{Map => MutableMap}
+
 import org.apache.griffin.measure.Loggable
 
-import scala.collection.mutable.{Map => MutableMap}
 
 /**
   * in streaming mode, some metrics may update,
@@ -32,10 +33,9 @@ object CacheResults extends Loggable {
     def olderThan(ut: Long): Boolean = updateTime < ut
     def update(ut: Long, r: Metric): Option[Metric] = {
       r match {
-        case m: result.T if (olderThan(ut)) => {
+        case m: result.T if (olderThan(ut)) =>
           val ur = result.update(m)
           if (result.differsFrom(ur)) Some(ur) else None
-        }
         case _ => None
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
index 6bf6373..f2cd0ec 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
@@ -18,13 +18,14 @@ under the License.
 */
 package org.apache.griffin.measure.datasource
 
+import org.apache.spark.sql._
+
 import org.apache.griffin.measure.Loggable
 import org.apache.griffin.measure.configuration.dqdefinition.DataSourceParam
-import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
 import org.apache.griffin.measure.context.{DQContext, TimeRange}
+import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
 import org.apache.griffin.measure.datasource.connector.DataConnector
 import org.apache.griffin.measure.utils.DataFrameUtil._
-import org.apache.spark.sql._
 
 /**
   * data source
@@ -50,12 +51,10 @@ case class DataSource(name: String,
     val timestamp = context.contextId.timestamp
     val (dfOpt, timeRange) = data(timestamp)
     dfOpt match {
-      case Some(df) => {
+      case Some(df) =>
         context.runTimeTableRegister.registerTable(name, df)
-      }
-      case None => {
+      case None =>
         warn(s"load data source [${name}] fails")
-      }
     }
     timeRange
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
index 7807dfd..28e616b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
@@ -18,14 +18,16 @@ under the License.
 */
 package org.apache.griffin.measure.datasource
 
+import scala.util.Success
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.streaming.StreamingContext
+
 import org.apache.griffin.measure.Loggable
 import org.apache.griffin.measure.configuration.dqdefinition.DataSourceParam
 import org.apache.griffin.measure.datasource.cache.StreamingCacheClientFactory
 import org.apache.griffin.measure.datasource.connector.{DataConnector, DataConnectorFactory}
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.streaming.StreamingContext
 
-import scala.util.Success
 
 object DataSourceFactory extends Loggable {
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/TimestampStorage.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/TimestampStorage.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/TimestampStorage.scala
index a305563..04a7f85 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/TimestampStorage.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/TimestampStorage.scala
@@ -18,10 +18,9 @@ under the License.
 */
 package org.apache.griffin.measure.datasource
 
-import org.apache.griffin.measure.Loggable
-
 import scala.collection.mutable.{SortedSet => MutableSortedSet}
 
+import org.apache.griffin.measure.Loggable
 /**
   * tmst cache, CRUD of timestamps
   */
@@ -29,19 +28,19 @@ case class TimestampStorage() extends Loggable {
 
   private val tmstGroup: MutableSortedSet[Long] = MutableSortedSet.empty[Long]
 
-  //-- insert tmst into tmst group --
-  def insert(tmst: Long) = tmstGroup += tmst
-  def insert(tmsts: Iterable[Long]) = tmstGroup ++= tmsts
+  // -- insert tmst into tmst group --
+  def insert(tmst: Long) : MutableSortedSet[Long] = tmstGroup += tmst
+  def insert(tmsts: Iterable[Long]) : MutableSortedSet[Long] = tmstGroup ++= tmsts
 
-  //-- remove tmst from tmst group --
-  def remove(tmst: Long) = tmstGroup -= tmst
-  def remove(tmsts: Iterable[Long]) = tmstGroup --= tmsts
+  // -- remove tmst from tmst group --
+  def remove(tmst: Long) : MutableSortedSet[Long] = tmstGroup -= tmst
+  def remove(tmsts: Iterable[Long]) : MutableSortedSet[Long] = tmstGroup --= tmsts
 
-  //-- get subset of tmst group --
-  def fromUntil(from: Long, until: Long) = tmstGroup.range(from, until).toSet
-  def afterTil(after: Long, til: Long) = tmstGroup.range(after + 1, til + 1).toSet
-  def until(until: Long) = tmstGroup.until(until).toSet
-  def from(from: Long) = tmstGroup.from(from).toSet
-  def all = tmstGroup.toSet
+  // -- get subset of tmst group --
+  def fromUntil(from: Long, until: Long) : Set[Long] = tmstGroup.range(from, until).toSet
+  def afterTil(after: Long, til: Long) : Set[Long] = tmstGroup.range(after + 1, til + 1).toSet
+  def until(until: Long) : Set[Long] = tmstGroup.until(until).toSet
+  def from(from: Long) : Set[Long] = tmstGroup.from(from).toSet
+  def all : Set[Long] = tmstGroup.toSet
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
index b351f82..a03a468 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
@@ -20,17 +20,19 @@ package org.apache.griffin.measure.datasource.cache
 
 import java.util.concurrent.TimeUnit
 
+import scala.util.Random
+
+import org.apache.spark.sql._
+
 import org.apache.griffin.measure.Loggable
 import org.apache.griffin.measure.context.TimeRange
 import org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointClient
 import org.apache.griffin.measure.datasource.TimestampStorage
 import org.apache.griffin.measure.step.builder.ConstantColumns
+import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil}
 import org.apache.griffin.measure.utils.DataFrameUtil._
 import org.apache.griffin.measure.utils.ParamUtil._
-import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil}
-import org.apache.spark.sql._
 
-import scala.util.Random
 
 /**
   * data source cache in streaming mode
@@ -38,7 +40,8 @@ import scala.util.Random
   * read data frame from hdfs in calculate phase
   * with update and clean actions for the cache data
   */
-trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long] with Loggable with Serializable {
+trait StreamingCacheClient
+  extends StreamingOffsetCacheable with WithFanIn[Long] with Loggable with Serializable {
 
   val sqlContext: SQLContext
   val param: Map[String, Any]
@@ -46,7 +49,9 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
   val index: Int
 
   val timestampStorage: TimestampStorage
-  protected def fromUntilRangeTmsts(from: Long, until: Long) = timestampStorage.fromUntil(from, until)
+  protected def fromUntilRangeTmsts(from: Long, until: Long) =
+    timestampStorage.fromUntil(from, until)
+
   protected def clearTmst(t: Long) = timestampStorage.remove(t)
   protected def clearTmstsUntil(until: Long) = {
     val outDateTmsts = timestampStorage.until(until)
@@ -67,17 +72,20 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
 
   val filePath: String = param.getString(_FilePath, defFilePath)
   val cacheInfoPath: String = param.getString(_InfoPath, defInfoPath)
-  val readyTimeInterval: Long = TimeUtil.milliseconds(param.getString(_ReadyTimeInterval, "1m")).getOrElse(60000L)
-  val readyTimeDelay: Long = TimeUtil.milliseconds(param.getString(_ReadyTimeDelay, "1m")).getOrElse(60000L)
+  val readyTimeInterval: Long =
+    TimeUtil.milliseconds(param.getString(_ReadyTimeInterval, "1m")).getOrElse(60000L)
+
+  val readyTimeDelay: Long =
+    TimeUtil.milliseconds(param.getString(_ReadyTimeDelay, "1m")).getOrElse(60000L)
+
   val deltaTimeRange: (Long, Long) = {
     def negative(n: Long): Long = if (n <= 0) n else 0
     param.get(_TimeRange) match {
-      case Some(seq: Seq[String]) => {
+      case Some(seq: Seq[String]) =>
         val nseq = seq.flatMap(TimeUtil.milliseconds(_))
         val ns = negative(nseq.headOption.getOrElse(0))
         val ne = negative(nseq.tail.headOption.getOrElse(0))
         (ns, ne)
-      }
       case _ => (0, 0)
     }
   }
@@ -112,7 +120,7 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
   def saveData(dfOpt: Option[DataFrame], ms: Long): Unit = {
     if (!readOnly) {
       dfOpt match {
-        case Some(df) => {
+        case Some(df) =>
           // cache df
           df.cache
 
@@ -137,10 +145,8 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
 
           // uncache df
           df.unpersist
-        }
-        case _ => {
+        case _ =>
           info("no data frame to save")
-        }
       }
 
       // submit cache time and ready time
@@ -168,7 +174,9 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
       s"`${ConstantColumns.tmst}` = ${reviseTimeRange._1}"
     } else {
       info(s"read time range: (${reviseTimeRange._1}, ${reviseTimeRange._2}]")
-      s"`${ConstantColumns.tmst}` > ${reviseTimeRange._1} AND `${ConstantColumns.tmst}` <= ${reviseTimeRange._2}"
+
+      s"`${ConstantColumns.tmst}` > ${reviseTimeRange._1} " +
+        s"AND `${ConstantColumns.tmst}` <= ${reviseTimeRange._2}"
     }
 
     // new cache data
@@ -176,10 +184,9 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
       val dfr = sqlContext.read
       readDataFrameOpt(dfr, newFilePath).map(_.filter(filterStr))
     } catch {
-      case e: Throwable => {
+      case e: Throwable =>
         warn(s"read data source cache warn: ${e.getMessage}")
         None
-      }
     }
 
     // old cache data
@@ -190,10 +197,9 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
         val dfr = sqlContext.read
         readDataFrameOpt(dfr, oldDfPath).map(_.filter(filterStr))
       } catch {
-        case e: Throwable => {
+        case e: Throwable =>
           warn(s"read old data source cache warn: ${e.getMessage}")
           None
-        }
       }
     }
 
@@ -228,12 +234,11 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
     }
     names.filter { name =>
       name match {
-        case regex(value) => {
+        case regex(value) =>
           str2Long(value) match {
             case Some(t) => func(t, bound)
             case _ => false
           }
-        }
         case _ => false
       }
     }.map(name => s"${path}/${name}")
@@ -258,7 +263,7 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
       // new cache data
       val newCacheCleanTime = if (updatable) readLastProcTime else readCleanTime
       newCacheCleanTime match {
-        case Some(nct) => {
+        case Some(nct) =>
           // clean calculated new cache data
           val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
           if (newCacheLocked) {
@@ -271,16 +276,15 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
               newCacheLock.unlock()
             }
           }
-        }
-        case _ => {
+        case _ =>
           // do nothing
-        }
+          info("should not happen")
       }
 
       // old cache data
       val oldCacheCleanTime = if (updatable) readCleanTime else None
       oldCacheCleanTime match {
-        case Some(oct) => {
+        case Some(oct) =>
           val oldCacheIndexOpt = readOldCacheIndex
           oldCacheIndexOpt.foreach { idx =>
             val oldDfPath = s"${oldFilePath}/${idx}"
@@ -298,10 +302,9 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
               }
             }
           }
-        }
-        case _ => {
+        case _ =>
           // do nothing
-        }
+          info("should not happen")
       }
     }
   }
@@ -313,7 +316,7 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
   def updateData(dfOpt: Option[DataFrame]): Unit = {
     if (!readOnly && updatable) {
       dfOpt match {
-        case Some(df) => {
+        case Some(df) =>
           // old cache lock
           val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
           if (oldCacheLocked) {
@@ -339,10 +342,8 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
               oldCacheLock.unlock()
             }
           }
-        }
-        case _ => {
+        case _ =>
           info("no data frame to update")
-        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala
index f991e2d..eeda8ef 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala
@@ -18,10 +18,11 @@ under the License.
 */
 package org.apache.griffin.measure.datasource.cache
 
+import org.apache.spark.sql.SQLContext
+
 import org.apache.griffin.measure.Loggable
 import org.apache.griffin.measure.datasource.TimestampStorage
 import org.apache.griffin.measure.utils.ParamUtil._
-import org.apache.spark.sql.SQLContext
 
 object StreamingCacheClientFactory extends Loggable {
 
@@ -50,17 +51,20 @@ object StreamingCacheClientFactory extends Loggable {
       try {
         val tp = param.getString(_type, "")
         val dsCache = tp match {
-          case ParquetRegex() => StreamingCacheParquetClient(sqlContext, param, name, index, tmstCache)
-          case JsonRegex() => StreamingCacheJsonClient(sqlContext, param, name, index, tmstCache)
-          case OrcRegex() => StreamingCacheOrcClient(sqlContext, param, name, index, tmstCache)
-          case _ => StreamingCacheParquetClient(sqlContext, param, name, index, tmstCache)
+          case ParquetRegex() =>
+            StreamingCacheParquetClient(sqlContext, param, name, index, tmstCache)
+          case JsonRegex() =>
+            StreamingCacheJsonClient(sqlContext, param, name, index, tmstCache)
+          case OrcRegex() =>
+            StreamingCacheOrcClient(sqlContext, param, name, index, tmstCache)
+          case _ =>
+            StreamingCacheParquetClient(sqlContext, param, name, index, tmstCache)
         }
         Some(dsCache)
       } catch {
-        case e: Throwable => {
+        case e: Throwable =>
           error("generate data source cache fails")
           None
-        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala
index a12ef87..c81d4d1 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala
@@ -18,9 +18,10 @@ under the License.
 */
 package org.apache.griffin.measure.datasource.cache
 
-import org.apache.griffin.measure.datasource.TimestampStorage
 import org.apache.spark.sql._
 
+import org.apache.griffin.measure.datasource.TimestampStorage
+
 /**
   * data source cache in json format
   */

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala
index 63e7104..0649b74 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala
@@ -18,9 +18,10 @@ under the License.
 */
 package org.apache.griffin.measure.datasource.cache
 
-import org.apache.griffin.measure.datasource.TimestampStorage
 import org.apache.spark.sql._
 
+import org.apache.griffin.measure.datasource.TimestampStorage
+
 /**
   * data source cache in orc format
   */

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala
index c275227..9c369ee 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala
@@ -18,14 +18,18 @@ under the License.
 */
 package org.apache.griffin.measure.datasource.cache
 
-import org.apache.griffin.measure.datasource.TimestampStorage
 import org.apache.spark.sql._
 
+import org.apache.griffin.measure.datasource.TimestampStorage
+
 /**
   * data source cache in parquet format
   */
-case class StreamingCacheParquetClient(sqlContext: SQLContext, param: Map[String, Any],
-                                       dsName: String, index: Int, timestampStorage: TimestampStorage
+case class StreamingCacheParquetClient(sqlContext: SQLContext,
+                                       param: Map[String, Any],
+                                       dsName: String,
+                                       index: Int,
+                                       timestampStorage: TimestampStorage
                                  ) extends StreamingCacheClient {
 
   sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala
index 7b7f506..e73a058 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala
@@ -30,13 +30,13 @@ trait StreamingOffsetCacheable extends Loggable with Serializable {
   val readyTimeInterval: Long
   val readyTimeDelay: Long
 
-  def selfCacheInfoPath = s"${OffsetCheckpointClient.infoPath}/${cacheInfoPath}"
+  def selfCacheInfoPath : String = s"${OffsetCheckpointClient.infoPath}/${cacheInfoPath}"
 
-  def selfCacheTime = OffsetCheckpointClient.cacheTime(selfCacheInfoPath)
-  def selfLastProcTime = OffsetCheckpointClient.lastProcTime(selfCacheInfoPath)
-  def selfReadyTime = OffsetCheckpointClient.readyTime(selfCacheInfoPath)
-  def selfCleanTime = OffsetCheckpointClient.cleanTime(selfCacheInfoPath)
-  def selfOldCacheIndex = OffsetCheckpointClient.oldCacheIndex(selfCacheInfoPath)
+  def selfCacheTime : String = OffsetCheckpointClient.cacheTime(selfCacheInfoPath)
+  def selfLastProcTime : String = OffsetCheckpointClient.lastProcTime(selfCacheInfoPath)
+  def selfReadyTime : String = OffsetCheckpointClient.readyTime(selfCacheInfoPath)
+  def selfCleanTime : String = OffsetCheckpointClient.cleanTime(selfCacheInfoPath)
+  def selfOldCacheIndex : String = OffsetCheckpointClient.oldCacheIndex(selfCacheInfoPath)
 
   protected def submitCacheTime(ms: Long): Unit = {
     val map = Map[String, String]((selfCacheTime -> ms.toString))
@@ -80,9 +80,11 @@ trait StreamingOffsetCacheable extends Loggable with Serializable {
       try {
         Some(v.toLong)
       } catch {
-        case _:Throwable => error("try to read not existing value from OffsetCacheClient::readSelfInfo");None
+        case _: Throwable =>
+          error("try to read not existing value from OffsetCacheClient::readSelfInfo")
+          None
       }
     }
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/WithFanIn.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/WithFanIn.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/WithFanIn.scala
index 413b5a2..675f2f2 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/WithFanIn.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/WithFanIn.scala
@@ -20,7 +20,7 @@ package org.apache.griffin.measure.datasource.cache
 
 import java.util.concurrent.atomic.AtomicInteger
 
-import scala.collection.concurrent.{TrieMap, Map => ConcMap}
+import scala.collection.concurrent.{Map => ConcMap, TrieMap}
 
 /**
   * fan in trait, for multiple input and one output
@@ -55,14 +55,12 @@ trait WithFanIn[T] {
 
   private def fanInc(key: T): Unit = {
     fanInCountMap.get(key) match {
-      case Some(n) => {
+      case Some(n) =>
         val suc = fanInCountMap.replace(key, n, n + 1)
         if (!suc) fanInc(key)
-      }
-      case _ => {
+      case _ =>
         val oldOpt = fanInCountMap.putIfAbsent(key, 1)
         if (oldOpt.nonEmpty) fanInc(key)
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
index 05d3c75..ae6a18d 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
@@ -20,16 +20,17 @@ package org.apache.griffin.measure.datasource.connector
 
 import java.util.concurrent.atomic.AtomicLong
 
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.functions._
+
 import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.configuration.enums.BatchProcessType
 import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
+import org.apache.griffin.measure.configuration.enums.BatchProcessType
 import org.apache.griffin.measure.context.{ContextId, DQContext, TimeRange}
 import org.apache.griffin.measure.datasource.TimestampStorage
 import org.apache.griffin.measure.job.builder.DQJobBuilder
 import org.apache.griffin.measure.step.builder.ConstantColumns
 import org.apache.griffin.measure.step.builder.preproc.PreProcParamMaker
-import org.apache.spark.sql.{DataFrame, SparkSession}
-import org.apache.spark.sql.functions._
 
 trait DataConnector extends Loggable with Serializable {
 
@@ -64,7 +65,8 @@ trait DataConnector extends Loggable with Serializable {
       saveTmst(timestamp)    // save timestamp
 
       dfOpt.flatMap { df =>
-        val (preProcRules, thisTable) = PreProcParamMaker.makePreProcRules(dcParam.getPreProcRules, suffix, dcDfName)
+        val (preProcRules, thisTable) =
+          PreProcParamMaker.makePreProcRules(dcParam.getPreProcRules, suffix, dcDfName)
 
         // init data
         context.compileTableRegister.registerTable(thisTable)
@@ -89,10 +91,9 @@ trait DataConnector extends Loggable with Serializable {
       }
 
     } catch {
-      case e: Throwable => {
+      case e: Throwable =>
         error(s"pre-process of data connector [${id}] error: ${e.getMessage}")
         None
-      }
     }
   }
 }
@@ -108,4 +109,4 @@ object DataConnectorIdGenerator {
   private def increment: Long = {
     counter.incrementAndGet()
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
index f4911fc..b51d4fb 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
@@ -18,17 +18,18 @@ under the License.
 */
 package org.apache.griffin.measure.datasource.connector
 
+import scala.util.Try
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.streaming.StreamingContext
+
 import org.apache.griffin.measure.Loggable
 import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
 import org.apache.griffin.measure.datasource.TimestampStorage
 import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
 import org.apache.griffin.measure.datasource.connector.batch._
 import org.apache.griffin.measure.datasource.connector.streaming._
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.streaming.StreamingContext
 
-import scala.reflect.ClassTag
-import scala.util.Try
 
 object DataConnectorFactory extends Loggable {
 
@@ -60,9 +61,8 @@ object DataConnectorFactory extends Loggable {
         case HiveRegex() => HiveBatchDataConnector(sparkSession, dcParam, tmstCache)
         case AvroRegex() => AvroBatchDataConnector(sparkSession, dcParam, tmstCache)
         case TextDirRegex() => TextDirBatchDataConnector(sparkSession, dcParam, tmstCache)
-        case KafkaRegex() => {
+        case KafkaRegex() =>
           getStreamingDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
-        }
         case _ => throw new Exception("connector creation error!")
       }
     }
@@ -78,7 +78,8 @@ object DataConnectorFactory extends Loggable {
     val conType = dcParam.getType
     val version = dcParam.getVersion
     conType match {
-      case KafkaRegex() => getKafkaDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
+      case KafkaRegex() =>
+        getKafkaDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
       case _ => throw new Exception("streaming connector creation error!")
     }
   }
@@ -88,7 +89,7 @@ object DataConnectorFactory extends Loggable {
                                     dcParam: DataConnectorParam,
                                     tmstCache: TimestampStorage,
                                     streamingCacheClientOpt: Option[StreamingCacheClient]
-                                   ): KafkaStreamingDataConnector  = {
+                                   ): KafkaStreamingDataConnector = {
     val KeyType = "key.type"
     val ValueType = "value.type"
     val config = dcParam.getConfig
@@ -96,22 +97,14 @@ object DataConnectorFactory extends Loggable {
     val valueType = config.getOrElse(ValueType, "java.lang.String").toString
 
     (keyType, valueType) match {
-      case ("java.lang.String", "java.lang.String") => {
-        KafkaStreamingStringDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
-      }
-      case _ => {
+      case ("java.lang.String", "java.lang.String") =>
+        KafkaStreamingStringDataConnector(
+          sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
+      case _ =>
         throw new Exception("not supported type kafka data connector")
-      }
     }
   }
 
-//  def filterDataConnectors[T <: DataConnector : ClassTag](connectors: Seq[DataConnector]): Seq[T] = {
-//    connectors.flatMap { dc =>
-//      dc match {
-//        case mdc: T => Some(mdc)
-//        case _ => None
-//      }
-//    }
-//  }
+
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
index 7fa9080..bf71b2c 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
@@ -18,11 +18,12 @@ under the License.
 */
 package org.apache.griffin.measure.datasource.connector.batch
 
+import org.apache.spark.sql.{DataFrame, SparkSession}
+
 import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
 import org.apache.griffin.measure.context.TimeRange
 import org.apache.griffin.measure.datasource.TimestampStorage
 import org.apache.griffin.measure.utils.HdfsUtil
-import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.griffin.measure.utils.ParamUtil._
 
 /**
@@ -58,10 +59,9 @@ case class AvroBatchDataConnector(@transient sparkSession: SparkSession,
       val preDfOpt = preProcess(dfOpt, ms)
       preDfOpt
     } catch {
-      case e: Throwable => {
+      case e: Throwable =>
         error(s"load avro file ${concreteFileFullPath} fails")
         None
-      }
     }
     val tmsts = readTmst(ms)
     (dfOpt, TimeRange(ms, tmsts))

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/HiveBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/HiveBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/HiveBatchDataConnector.scala
index ab1e823..1df3bd7 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/HiveBatchDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/HiveBatchDataConnector.scala
@@ -18,10 +18,11 @@ under the License.
 */
 package org.apache.griffin.measure.datasource.connector.batch
 
+import org.apache.spark.sql.{DataFrame, SparkSession}
+
 import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
 import org.apache.griffin.measure.context.TimeRange
 import org.apache.griffin.measure.datasource.TimestampStorage
-import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.griffin.measure.utils.ParamUtil._
 
 /**
@@ -54,10 +55,9 @@ case class HiveBatchDataConnector(@transient sparkSession: SparkSession,
       val preDfOpt = preProcess(dfOpt, ms)
       preDfOpt
     } catch {
-      case e: Throwable => {
+      case e: Throwable =>
         error(s"load hive table ${concreteTableName} fails: ${e.getMessage}")
         None
-      }
     }
     val tmsts = readTmst(ms)
     (dfOpt, TimeRange(ms, tmsts))

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
index 3dcab16..a7ab02e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
@@ -18,11 +18,12 @@ under the License.
 */
 package org.apache.griffin.measure.datasource.connector.batch
 
+import org.apache.spark.sql.{DataFrame, SparkSession}
+
 import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
 import org.apache.griffin.measure.context.TimeRange
 import org.apache.griffin.measure.datasource.TimestampStorage
 import org.apache.griffin.measure.utils.HdfsUtil
-import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.griffin.measure.utils.ParamUtil._
 
 /**
@@ -60,7 +61,7 @@ case class TextDirBatchDataConnector(@transient sparkSession: SparkSession,
       val validDataDirs = dataDirs.filter(dir => !emptyDir(dir))
 
       if (validDataDirs.nonEmpty) {
-        val df = sparkSession.read.text(validDataDirs:  _*)
+        val df = sparkSession.read.text(validDataDirs: _*)
         val dfOpt = Some(df)
         val preDfOpt = preProcess(dfOpt, ms)
         preDfOpt
@@ -68,16 +69,17 @@ case class TextDirBatchDataConnector(@transient sparkSession: SparkSession,
         None
       }
     } catch {
-      case e: Throwable => {
+      case e: Throwable =>
         error(s"load text dir ${dirPath} fails: ${e.getMessage}")
         None
-      }
     }
     val tmsts = readTmst(ms)
     (dfOpt, TimeRange(ms, tmsts))
   }
 
-  private def listSubDirs(paths: Seq[String], depth: Int, filteFunc: (String) => Boolean): Seq[String] = {
+  private def listSubDirs(paths: Seq[String],
+                          depth: Int,
+                          filteFunc: (String) => Boolean): Seq[String] = {
     val subDirs = paths.flatMap { path => HdfsUtil.listSubPathsByType(path, "dir", true) }
     if (depth <= 0) {
       subDirs.filter(filteFunc)
@@ -90,7 +92,8 @@ case class TextDirBatchDataConnector(@transient sparkSession: SparkSession,
   private def isDone(dir: String): Boolean = HdfsUtil.existFileInDir(dir, doneFile)
   private def isSuccess(dir: String): Boolean = HdfsUtil.existFileInDir(dir, successFile)
 
-  private def touchDone(dir: String): Unit = HdfsUtil.createEmptyFile(HdfsUtil.getHdfsFilePath(dir, doneFile))
+  private def touchDone(dir: String): Unit =
+    HdfsUtil.createEmptyFile(HdfsUtil.getHdfsFilePath(dir, doneFile))
 
   private def emptyDir(dir: String): Boolean = {
     HdfsUtil.listSubPathsByType(dir, "file").filter(!_.startsWith(ignoreFilePrefix)).size == 0
@@ -98,7 +101,8 @@ case class TextDirBatchDataConnector(@transient sparkSession: SparkSession,
 
 //  def metaData(): Try[Iterable[(String, String)]] = {
 //    Try {
-//      val st = sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath).schema
+//      val st = sqlContext.read.format("com.databricks.spark.avro").
+  //       load(concreteFileFullPath).schema
 //      st.fields.map(f => (f.name, f.dataType.typeName))
 //    }
 //  }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala
index 1475898..ec09ffc 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala
@@ -18,10 +18,11 @@ under the License.
 */
 package org.apache.griffin.measure.datasource.connector.streaming
 
+import scala.util.{Failure, Success, Try}
+
 import kafka.serializer.Decoder
 import org.apache.spark.streaming.dstream.InputDStream
 
-import scala.util.{Failure, Success, Try}
 import org.apache.griffin.measure.utils.ParamUtil._
 
 /**
@@ -64,10 +65,9 @@ trait KafkaStreamingDataConnector extends StreamingDataConnector {
         // pre-process
         preProcess(dfOpt, ms)
       } catch {
-        case e: Throwable => {
+        case e: Throwable =>
           error(s"streaming data connector error: ${e.getMessage}")
           None
-        }
       }
 
       // save data frame

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala
index 8477fcf..ee5e497 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala
@@ -19,16 +19,17 @@ under the License.
 package org.apache.griffin.measure.datasource.connector.streaming
 
 import kafka.serializer.StringDecoder
-import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
-import org.apache.griffin.measure.datasource.TimestampStorage
-import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
 import org.apache.spark.sql.{Row, _}
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
 import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.dstream.InputDStream
 import org.apache.spark.streaming.kafka.KafkaUtils
 
+import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
+
 /**
   * streaming data connector for kafka with string format key and value
   */
@@ -60,10 +61,9 @@ case class KafkaStreamingStringDataConnector(@transient sparkSession: SparkSessi
         val df = sparkSession.createDataFrame(rowRdd, schema)
         Some(df)
       } catch {
-        case e: Throwable => {
+        case e: Throwable =>
           error("streaming data transform fails")
           None
-        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnector.scala
index 5c2170c..323b0ac 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnector.scala
@@ -18,14 +18,16 @@ under the License.
 */
 package org.apache.griffin.measure.datasource.connector.streaming
 
-import org.apache.griffin.measure.context.TimeRange
-import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
-import org.apache.griffin.measure.datasource.connector.DataConnector
+import scala.util.Try
+
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
 import org.apache.spark.streaming.dstream.InputDStream
 
-import scala.util.Try
+import org.apache.griffin.measure.context.TimeRange
+import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
+import org.apache.griffin.measure.datasource.connector.DataConnector
+
 
 trait StreamingDataConnector extends DataConnector {
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala
index 249d6d2..0917919 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala
@@ -18,11 +18,9 @@ under the License.
 */
 package org.apache.griffin.measure.job.builder
 
-import org.apache.griffin.measure.configuration.enums.DslType
 import org.apache.griffin.measure.configuration.dqdefinition._
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.job._
-import org.apache.griffin.measure.step.DQStep
 import org.apache.griffin.measure.step.builder.DQStepBuilder
 import org.apache.griffin.measure.step.write.MetricFlushStep
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
index b6cca98..92480d8 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
@@ -18,10 +18,11 @@ under the License.
 */
 package org.apache.griffin.measure.launch
 
+import scala.util.Try
+
 import org.apache.griffin.measure.Loggable
 import org.apache.griffin.measure.configuration.dqdefinition.{DQConfig, EnvConfig, SinkParam}
 
-import scala.util.Try
 
 /**
   * dq application process

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
index ba1f389..e2dbc8d 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
@@ -20,17 +20,19 @@ package org.apache.griffin.measure.launch.batch
 
 import java.util.Date
 
-import org.apache.griffin.measure.configuration.enums._
+import scala.util.Try
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{SparkSession, SQLContext}
+
 import org.apache.griffin.measure.configuration.dqdefinition._
+import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.context._
 import org.apache.griffin.measure.datasource.DataSourceFactory
 import org.apache.griffin.measure.job.builder.DQJobBuilder
 import org.apache.griffin.measure.launch.DQApp
 import org.apache.griffin.measure.step.builder.udf.GriffinUDFAgent
-import org.apache.spark.SparkConf
-import org.apache.spark.sql.{SQLContext, SparkSession}
 
-import scala.util.Try
 
 case class BatchDQApp(allParam: GriffinConfig) extends DQApp {
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
index ceecb78..eb31a5e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
@@ -21,22 +21,24 @@ package org.apache.griffin.measure.launch.streaming
 import java.util.{Date, Timer, TimerTask}
 import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit}
 
+import scala.util.Try
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{SparkSession, SQLContext}
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
+
 import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.configuration.dqdefinition._
+import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.context._
 import org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointClient
-import org.apache.griffin.measure.datasource.DataSourceFactory
 import org.apache.griffin.measure.context.streaming.metric.CacheResults
+import org.apache.griffin.measure.datasource.DataSourceFactory
 import org.apache.griffin.measure.job.builder.DQJobBuilder
 import org.apache.griffin.measure.launch.DQApp
 import org.apache.griffin.measure.step.builder.udf.GriffinUDFAgent
 import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil}
-import org.apache.spark.SparkConf
-import org.apache.spark.sql.{SQLContext, SparkSession}
-import org.apache.spark.streaming.{Milliseconds, StreamingContext}
 
-import scala.util.Try
 
 case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
 
@@ -82,10 +84,9 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
       try {
         createStreamingContext
       } catch {
-        case e: Throwable => {
+        case e: Throwable =>
           error(s"create streaming context error: ${e.getMessage}")
           throw e
-        }
       }
     })
 
@@ -118,7 +119,7 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
 
     ssc.start()
     ssc.awaitTermination()
-    ssc.stop(stopSparkContext=true, stopGracefully=true)
+    ssc.stop(stopSparkContext = true, stopGracefully = true)
 
     // clean context
     globalContext.clean()

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
index 306befe..feebd91 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
@@ -18,9 +18,10 @@ under the License.
 */
 package org.apache.griffin.measure.sink
 
+import org.apache.spark.rdd.RDD
+
 import org.apache.griffin.measure.utils.JsonUtil
 import org.apache.griffin.measure.utils.ParamUtil._
-import org.apache.spark.rdd.RDD
 
 /**
   * sink metric and record to console, for debug



[2/3] incubator-griffin git commit: Fix case clauses

Posted by gu...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
index e5f72d1..3c20a0e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
@@ -18,11 +18,13 @@ under the License.
 */
 package org.apache.griffin.measure.sink
 
-import org.apache.griffin.measure.utils.ParamUtil._
-import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil, TimeUtil}
+import scala.concurrent.Future
+
 import org.apache.spark.rdd.RDD
 
-import scala.concurrent.Future
+import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil, TimeUtil}
+import org.apache.griffin.measure.utils.ParamUtil._
+
 
 /**
   * sink metric and record through http request
@@ -38,7 +40,10 @@ case class ElasticSearchSink(config: Map[String, Any], metricName: String,
 
   val api = config.getString(Api, "")
   val method = config.getString(Method, "post")
-  val connectionTimeout = TimeUtil.milliseconds(config.getString(ConnectionTimeout, "")).getOrElse(-1L)
+
+  val connectionTimeout =
+    TimeUtil.milliseconds(config.getString(ConnectionTimeout, "")).getOrElse(-1L)
+
   val retry = config.getInt(Retry, 10)
 
   val _Value = "value"
@@ -55,7 +60,7 @@ case class ElasticSearchSink(config: Map[String, Any], metricName: String,
       val data = JsonUtil.toJson(dataMap)
       // http request
       val params = Map[String, Object]()
-      val header = Map[String, Object](("Content-Type","application/json"))
+      val header = Map[String, Object](("Content-Type", "application/json"))
 
       def func(): (Long, Future[Boolean]) = {
         import scala.concurrent.ExecutionContext.Implicits.global

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala
index 718f1c1..588fabf 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala
@@ -20,10 +20,11 @@ package org.apache.griffin.measure.sink
 
 import java.util.Date
 
-import org.apache.griffin.measure.utils.ParamUtil._
-import org.apache.griffin.measure.utils.{HdfsUtil, JsonUtil}
 import org.apache.spark.rdd.RDD
 
+import org.apache.griffin.measure.utils.{HdfsUtil, JsonUtil}
+import org.apache.griffin.measure.utils.ParamUtil._
+
 /**
   * sink metric and record to hdfs
   */
@@ -83,6 +84,7 @@ case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Lon
       case e: Throwable => error(e.getMessage)
     }
   }
+
   def finish(): Unit = {
     try {
       HdfsUtil.createEmptyFile(FinishFile)
@@ -103,6 +105,7 @@ case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Lon
   private def getHdfsPath(path: String, groupId: Int): String = {
     HdfsUtil.getHdfsFilePath(path, s"${groupId}")
   }
+
   private def getHdfsPath(path: String, ptnId: Int, groupId: Int): String = {
     HdfsUtil.getHdfsFilePath(path, s"${ptnId}.${groupId}")
   }
@@ -116,7 +119,10 @@ case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Lon
     clearOldRecords(path)
     try {
       val recordCount = records.count
-      val count = if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount)
+
+      val count =
+        if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount)
+
       if (count > 0) {
         val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt
         if (groupCount <= 1) {
@@ -145,7 +151,10 @@ case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Lon
     clearOldRecords(path)
     try {
       val recordCount = records.size
-      val count = if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount)
+
+      val count =
+        if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount)
+
       if (count > 0) {
         val groupCount = (count - 1) / maxLinesPerFile + 1
         if (groupCount <= 1) {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala
index 206f187..ab59e59 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala
@@ -18,14 +18,16 @@ under the License.
 */
 package org.apache.griffin.measure.sink
 
-import org.apache.griffin.measure.utils.ParamUtil._
-import org.apache.griffin.measure.utils.TimeUtil
+import scala.concurrent.Future
+
 import org.apache.spark.rdd.RDD
 import org.mongodb.scala._
 import org.mongodb.scala.model.{Filters, UpdateOptions, Updates}
 import org.mongodb.scala.result.UpdateResult
 
-import scala.concurrent.Future
+import org.apache.griffin.measure.utils.ParamUtil._
+import org.apache.griffin.measure.utils.TimeUtil
+
 
 /**
   * sink metric and record to mongo
@@ -98,7 +100,7 @@ object MongoConnection {
   var dataConf: MongoConf = _
   private var dataCollection: MongoCollection[Document] = _
 
-  def getDataCollection = dataCollection
+  def getDataCollection : MongoCollection[Document] = dataCollection
 
   def init(config: Map[String, Any]): Unit = {
     if (!initialed) {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/sink/Sink.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/Sink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/Sink.scala
index 0c03bd8..9d598fb 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/Sink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/Sink.scala
@@ -18,9 +18,10 @@ under the License.
 */
 package org.apache.griffin.measure.sink
 
-import org.apache.griffin.measure.Loggable
 import org.apache.spark.rdd.RDD
 
+import org.apache.griffin.measure.Loggable
+
 /**
   * sink metric and record
   */

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
index 26b0178..49818f2 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
@@ -18,10 +18,11 @@ under the License.
 */
 package org.apache.griffin.measure.sink
 
+import scala.util.{Success, Try}
+
 import org.apache.griffin.measure.configuration.dqdefinition.SinkParam
 import org.apache.griffin.measure.configuration.enums._
 
-import scala.util.{Success, Try}
 
 
 case class SinkFactory(sinkParams: Iterable[SinkParam], metricName: String) extends Serializable {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/sink/SinkTaskRunner.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkTaskRunner.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkTaskRunner.scala
index 1cc3f3e..ca38629 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkTaskRunner.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkTaskRunner.scala
@@ -21,12 +21,13 @@ package org.apache.griffin.measure.sink
 import java.util.Date
 import java.util.concurrent.TimeUnit
 
-import org.apache.griffin.measure.Loggable
-
 import scala.concurrent._
 import scala.concurrent.duration._
 import scala.util.{Failure, Success}
 
+import org.apache.griffin.measure.Loggable
+
+
 /**
   * sink task runner, to sink metrics in block or non-block mode
   */
@@ -52,11 +53,11 @@ object SinkTaskRunner extends Loggable {
     val st = new Date().getTime
     val (t, res) = func()
     res.onComplete {
-      case Success(value) => {
+      case Success(value) =>
         val et = new Date().getTime
         info(s"task ${t} success with (${value}) [ using time ${et - st} ms ]")
-      }
-      case Failure(e) => {
+
+      case Failure(e) =>
         val et = new Date().getTime
         warn(s"task ${t} fails [ using time ${et - st} ms ] : ${e.getMessage}")
         if (nextRetry >= 0) {
@@ -65,11 +66,11 @@ object SinkTaskRunner extends Loggable {
         } else {
           error(s"task fails: task ${t} retry ends but fails")
         }
-      }
     }
   }
 
-  private def blockExecute(func: () => (Long, Future[_]), retry: Int, waitDuration: Duration): Unit = {
+  private def blockExecute(func: () => (Long, Future[_]),
+                           retry: Int, waitDuration: Duration): Unit = {
     val nextRetry = nextRetryCount(retry)
     val st = new Date().getTime
     val (t, res) = func()
@@ -78,7 +79,7 @@ object SinkTaskRunner extends Loggable {
       val et = new Date().getTime
       info(s"task ${t} success with (${value}) [ using time ${et - st} ms ]")
     } catch {
-      case e: Throwable => {
+      case e: Throwable =>
         val et = new Date().getTime
         warn(s"task ${t} fails [ using time ${et - st} ms ] : ${e.getMessage}")
         if (nextRetry >= 0) {
@@ -87,7 +88,6 @@ object SinkTaskRunner extends Loggable {
         } else {
           error(s"task fails: task ${t} retry ends but fails")
         }
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala
index c37b5a3..5ce2b14 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala
@@ -19,9 +19,10 @@ under the License.
 package org.apache.griffin.measure.step.builder
 
 import org.apache.commons.lang.StringUtils
+
 import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.configuration.dqdefinition.{DataSourceParam, Param, RuleParam}
+import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step._
 
@@ -49,7 +50,8 @@ object DQStepBuilder {
       .flatMap(_.buildDQStep(context, dsParam))
   }
 
-  private def getDataSourceParamStepBuilder(procType: ProcessType): Option[DataSourceParamStepBuilder] = {
+  private def getDataSourceParamStepBuilder(procType: ProcessType)
+  : Option[DataSourceParamStepBuilder] = {
     procType match {
       case BatchProcessType => Some(BatchDataSourceStepBuilder())
       case StreamingProcessType => Some(StreamingDataSourceStepBuilder())
@@ -64,7 +66,9 @@ object DQStepBuilder {
     val funcNames = context.functionNames
     val dqStepOpt = getRuleParamStepBuilder(dslType, dsNames, funcNames)
       .flatMap(_.buildDQStep(context, ruleParam))
-    dqStepOpt.toSeq.flatMap(_.getNames).foreach(name => context.compileTableRegister.registerTable(name))
+    dqStepOpt.toSeq.flatMap(_.getNames).foreach(name =>
+      context.compileTableRegister.registerTable(name)
+    )
     dqStepOpt
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala
index d3c0e41..aa43cf6 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala
@@ -24,7 +24,6 @@ import org.apache.griffin.measure.step.DQStep
 import org.apache.griffin.measure.step.builder.dsl.parser.GriffinDslParser
 import org.apache.griffin.measure.step.builder.dsl.transform.Expr2DQSteps
 
-import scala.util.{Failure, Success}
 
 case class GriffinDslDQStepBuilder(dataSourceNames: Seq[String],
                                    functionNames: Seq[String]
@@ -50,10 +49,9 @@ case class GriffinDslDQStepBuilder(dataSourceNames: Seq[String],
         Nil
       }
     } catch {
-      case e: Throwable => {
+      case e: Throwable =>
         error(s"generate rule plan ${name} fails: ${e.getMessage}")
         Nil
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
index 5a04c11..4af3ceb 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
@@ -18,11 +18,11 @@ under the License.
 */
 package org.apache.griffin.measure.step.builder
 
-import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
+import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.context.DQContext
-import org.apache.griffin.measure.step.write.{DataSourceUpdateWriteStep, MetricWriteStep, RecordWriteStep}
 import org.apache.griffin.measure.step.{DQStep, SeqDQStep}
+import org.apache.griffin.measure.step.write.{DataSourceUpdateWriteStep, MetricWriteStep, RecordWriteStep}
 
 /**
   * build dq step by rule param

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ClauseExpression.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ClauseExpression.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ClauseExpression.scala
index 17e678e..1c04e75 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ClauseExpression.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ClauseExpression.scala
@@ -35,7 +35,8 @@ case class SelectClause(exprs: Seq[Expr], extraConditionOpt: Option[ExtraConditi
   def coalesceDesc: String = desc
 
   override def map(func: (Expr) => Expr): SelectClause = {
-    SelectClause(exprs.map(func(_)), extraConditionOpt.map(func(_).asInstanceOf[ExtraConditionExpr]))
+    SelectClause(exprs.map(func(_)),
+      extraConditionOpt.map(func(_).asInstanceOf[ExtraConditionExpr]))
   }
 
 }
@@ -81,11 +82,10 @@ case class GroupbyClause(exprs: Seq[Expr], havingClauseOpt: Option[Expr]) extend
 
   def merge(other: GroupbyClause): GroupbyClause = {
     val newHavingClauseOpt = (havingClauseOpt, other.havingClauseOpt) match {
-      case (Some(hc), Some(ohc)) => {
+      case (Some(hc), Some(ohc)) =>
         val logical1 = LogicalFactorExpr(hc, false, None)
         val logical2 = LogicalFactorExpr(ohc, false, None)
         Some(BinaryLogicalExpr(logical1, ("AND", logical2) :: Nil))
-      }
       case (a @ Some(_), _) => a
       case (_, b @ Some(_)) => b
       case (_, _) => None
@@ -250,7 +250,8 @@ case class DistinctnessClause(exprs: Seq[Expr]) extends ClauseExpression {
 
   def desc: String = exprs.map(_.desc).mkString(", ")
   def coalesceDesc: String = exprs.map(_.coalesceDesc).mkString(", ")
-  override def map(func: (Expr) => Expr): DistinctnessClause = DistinctnessClause(exprs.map(func(_)))
+  override def map(func: (Expr) => Expr) : DistinctnessClause =
+    DistinctnessClause(exprs.map(func(_)))
 }
 
 case class TimelinessClause(exprs: Seq[Expr]) extends ClauseExpression {
@@ -266,5 +267,6 @@ case class CompletenessClause(exprs: Seq[Expr]) extends ClauseExpression {
 
   def desc: String = exprs.map(_.desc).mkString(", ")
   def coalesceDesc: String = exprs.map(_.coalesceDesc).mkString(", ")
-  override def map(func: (Expr) => Expr): CompletenessClause = CompletenessClause(exprs.map(func(_)))
-}
\ No newline at end of file
+  override def map(func: (Expr) => Expr): CompletenessClause =
+    CompletenessClause(exprs.map(func(_)))
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/LogicalExpr.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/LogicalExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/LogicalExpr.scala
index 13317bb..426dbbb 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/LogicalExpr.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/LogicalExpr.scala
@@ -185,7 +185,8 @@ case class UnaryLogicalExpr(oprs: Seq[String], factor: LogicalExpr) extends Logi
   }
 }
 
-case class BinaryLogicalExpr(factor: LogicalExpr, tails: Seq[(String, LogicalExpr)]) extends LogicalExpr {
+case class BinaryLogicalExpr(factor: LogicalExpr, tails: Seq[(String, LogicalExpr)])
+  extends LogicalExpr {
 
   addChildren(factor +: tails.map(_._2))
 
@@ -220,4 +221,4 @@ case class BinaryLogicalExpr(factor: LogicalExpr, tails: Seq[(String, LogicalExp
       (pair._1, func(pair._2).asInstanceOf[LogicalExpr])
     })
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/SelectExpr.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/SelectExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/SelectExpr.scala
index b64dae2..c4f80d1 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/SelectExpr.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/SelectExpr.scala
@@ -96,17 +96,17 @@ case class FunctionSelectExpr(functionName: String, args: Seq[Expr]) extends Sel
 
 // -------------
 
-case class SelectionExpr(head: HeadExpr, selectors: Seq[SelectExpr], aliasOpt: Option[String]) extends SelectExpr {
+case class SelectionExpr(head: HeadExpr, selectors: Seq[SelectExpr], aliasOpt: Option[String])
+  extends SelectExpr {
 
   addChildren(head +: selectors)
 
   def desc: String = {
     selectors.foldLeft(head.desc) { (hd, sel) =>
       sel match {
-        case FunctionSelectExpr(funcName, args) => {
+        case FunctionSelectExpr(funcName, args) =>
           val nargs = hd +: args.map(_.desc)
           s"${funcName}(${nargs.mkString(", ")})"
-        }
         case _ => s"${hd}${sel.desc}"
       }
     }
@@ -129,4 +129,4 @@ case class SelectionExpr(head: HeadExpr, selectors: Seq[SelectExpr], aliasOpt: O
     SelectionExpr(func(head).asInstanceOf[HeadExpr],
       selectors.map(func(_).asInstanceOf[SelectExpr]), aliasOpt)
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/TreeNode.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/TreeNode.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/TreeNode.scala
index 4ce0fe6..b10fd57 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/TreeNode.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/TreeNode.scala
@@ -24,13 +24,15 @@ trait TreeNode extends Serializable {
 
   var children = Seq[TreeNode]()
 
-  def addChild(expr: TreeNode) = { children :+= expr }
-  def addChildren(exprs: Seq[TreeNode]) = { children ++= exprs }
+  def addChild(expr: TreeNode) : Unit = { children :+= expr }
+  def addChildren(exprs: Seq[TreeNode]) : Unit = { children ++= exprs }
 
-  def preOrderTraverseDepthFirst[T, A <: TreeNode](z: T)(seqOp: (A, T) => T, combOp: (T, T) => T)(implicit tag: ClassTag[A]): T = {
+  def preOrderTraverseDepthFirst[T, A <: TreeNode](z: T)
+                                                  (seqOp: (A, T) => T, combOp: (T, T) => T)
+                                                  (implicit tag: ClassTag[A]): T = {
 
     val clazz = tag.runtimeClass
-    if(clazz.isAssignableFrom(this.getClass)){
+    if(clazz.isAssignableFrom(this.getClass)) {
       val tv = seqOp(this.asInstanceOf[A], z)
       children.foldLeft(combOp(z, tv)) { (ov, tn) =>
         combOp(ov, tn.preOrderTraverseDepthFirst(z)(seqOp, combOp))
@@ -41,16 +43,18 @@ trait TreeNode extends Serializable {
     }
 
   }
-  def postOrderTraverseDepthFirst[T, A <: TreeNode](z: T)(seqOp: (A, T) => T, combOp: (T, T) => T)(implicit tag: ClassTag[A]): T = {
+  def postOrderTraverseDepthFirst[T, A <: TreeNode](z: T)
+                                                   (seqOp: (A, T) => T, combOp: (T, T) => T)
+                                                   (implicit tag: ClassTag[A]): T = {
 
     val clazz = tag.runtimeClass
-    if(clazz.isAssignableFrom(this.getClass)){
+    if(clazz.isAssignableFrom(this.getClass)) {
       val cv = children.foldLeft(z) { (ov, tn) =>
         combOp(ov, tn.postOrderTraverseDepthFirst(z)(seqOp, combOp))
       }
       combOp(z, seqOp(this.asInstanceOf[A], cv))
     }
-    else{
+    else {
       z
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/BasicParser.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/BasicParser.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/BasicParser.scala
index 18f7754..fe6678d 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/BasicParser.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/BasicParser.scala
@@ -18,9 +18,10 @@ under the License.
 */
 package org.apache.griffin.measure.step.builder.dsl.parser
 
+import scala.util.parsing.combinator.JavaTokenParsers
+
 import org.apache.griffin.measure.step.builder.dsl.expr._
 
-import scala.util.parsing.combinator.JavaTokenParsers
 
 /**
   * basic parser for sql like syntax
@@ -388,10 +389,9 @@ trait BasicParser extends JavaTokenParsers with Serializable {
 
   def combinedClause: Parser[CombinedClause] = selectClause ~ opt(fromClause) ~ opt(whereClause) ~
     opt(groupbyClause) ~ opt(orderbyClause) ~ opt(limitClause) ^^ {
-    case sel ~ fromOpt ~ whereOpt ~ groupbyOpt ~ orderbyOpt ~ limitOpt => {
+    case sel ~ fromOpt ~ whereOpt ~ groupbyOpt ~ orderbyOpt ~ limitOpt =>
       val tails = Seq(whereOpt, groupbyOpt, orderbyOpt, limitOpt).flatMap(opt => opt)
       CombinedClause(sel, fromOpt, tails)
-    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/GriffinDslParser.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/GriffinDslParser.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/GriffinDslParser.scala
index 2cd638c..77ed987 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/GriffinDslParser.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/GriffinDslParser.scala
@@ -37,11 +37,10 @@ case class GriffinDslParser(dataSourceNames: Seq[String], functionNames: Seq[Str
 
   def profilingClause: Parser[ProfilingClause] = selectClause ~ opt(fromClause) ~ opt(whereClause) ~
     opt(groupbyClause) ~ opt(orderbyClause) ~ opt(limitClause) ^^ {
-    case sel ~ fromOpt ~ whereOpt ~ groupbyOpt ~ orderbyOpt ~ limitOpt => {
+    case sel ~ fromOpt ~ whereOpt ~ groupbyOpt ~ orderbyOpt ~ limitOpt =>
       val preClauses = Seq(whereOpt).flatMap(opt => opt)
       val postClauses = Seq(orderbyOpt, limitOpt).flatMap(opt => opt)
       ProfilingClause(sel, fromOpt, groupbyOpt, preClauses, postClauses)
-    }
   }
 
   /**
@@ -59,7 +58,7 @@ case class GriffinDslParser(dataSourceNames: Seq[String], functionNames: Seq[Str
     * <distinctness-clauses> = <distExpr> [, <distExpr>]+
     */
   def sqbrExpr: Parser[Expr] = LSQBR ~> expression <~ RSQBR ^^ {
-    case expr => { expr.tag = "[]"; expr}
+    case expr => expr.tag = "[]"; expr
   }
   def distExpr: Parser[Expr] = expression | sqbrExpr
   def distinctnessClause: Parser[DistinctnessClause] = rep1sep(distExpr, Operator.COMMA) ^^ {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
index b97a1a0..3bf7d04 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
@@ -18,15 +18,15 @@ under the License.
 */
 package org.apache.griffin.measure.step.builder.dsl.transform
 
-import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
+import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.DQStep
 import org.apache.griffin.measure.step.builder.ConstantColumns
 import org.apache.griffin.measure.step.builder.dsl.expr._
 import org.apache.griffin.measure.step.builder.dsl.transform.analyzer.AccuracyAnalyzer
-import org.apache.griffin.measure.step.transform.DataFrameOps.AccuracyOprKeys
 import org.apache.griffin.measure.step.transform.{DataFrameOps, DataFrameOpsTransformStep, SparkSqlTransformStep}
+import org.apache.griffin.measure.step.transform.DataFrameOps.AccuracyOprKeys
 import org.apache.griffin.measure.step.write.{DataSourceUpdateWriteStep, MetricWriteStep, RecordWriteStep}
 import org.apache.griffin.measure.utils.ParamUtil._
 
@@ -77,30 +77,37 @@ case class AccuracyExpr2DQSteps(context: DQContext,
           s"${sel.desc} IS NULL"
         }.mkString(" AND ")
         val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})"
-        s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}"
+        s"SELECT ${selClause} FROM `${sourceName}` " +
+          s"LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}"
       }
-      val missRecordsTransStep = SparkSqlTransformStep(missRecordsTableName, missRecordsSql, emptyMap, true)
+      val missRecordsTransStep =
+        SparkSqlTransformStep(missRecordsTableName, missRecordsSql, emptyMap, true)
+
       val missRecordsWriteSteps = procType match {
-        case BatchProcessType => {
-          val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(missRecordsTableName)
+        case BatchProcessType =>
+          val rwName =
+            ruleParam.getOutputOpt(RecordOutputType).
+              flatMap(_.getNameOpt).getOrElse(missRecordsTableName)
           RecordWriteStep(rwName, missRecordsTableName) :: Nil
-        }
         case StreamingProcessType => Nil
       }
       val missRecordsUpdateWriteSteps = procType match {
         case BatchProcessType => Nil
-        case StreamingProcessType => {
-          val dsName = ruleParam.getOutputOpt(DscUpdateOutputType).flatMap(_.getNameOpt).getOrElse(sourceName)
+        case StreamingProcessType =>
+          val dsName =
+            ruleParam.getOutputOpt(DscUpdateOutputType).flatMap(_.getNameOpt).getOrElse(sourceName)
           DataSourceUpdateWriteStep(dsName, missRecordsTableName) :: Nil
-        }
       }
 
       // 2. miss count
       val missCountTableName = "__missCount"
       val missColName = details.getStringOrKey(_miss)
       val missCountSql = procType match {
-        case BatchProcessType => s"SELECT COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}`"
-        case StreamingProcessType => s"SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}` GROUP BY `${ConstantColumns.tmst}`"
+        case BatchProcessType =>
+          s"SELECT COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}`"
+        case StreamingProcessType =>
+          s"SELECT `${ConstantColumns.tmst}`,COUNT(*) AS `${missColName}` " +
+            s"FROM `${missRecordsTableName}` GROUP BY `${ConstantColumns.tmst}`"
       }
       val missCountTransStep = SparkSqlTransformStep(missCountTableName, missCountSql, emptyMap)
 
@@ -109,7 +116,9 @@ case class AccuracyExpr2DQSteps(context: DQContext,
       val totalColName = details.getStringOrKey(_total)
       val totalCountSql = procType match {
         case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceName}`"
-        case StreamingProcessType => s"SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${totalColName}` FROM `${sourceName}` GROUP BY `${ConstantColumns.tmst}`"
+        case StreamingProcessType =>
+          s"SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${totalColName}` " +
+            s"FROM `${sourceName}` GROUP BY `${ConstantColumns.tmst}`"
       }
       val totalCountTransStep = SparkSqlTransformStep(totalCountTableName, totalCountSql, emptyMap)
 
@@ -117,15 +126,14 @@ case class AccuracyExpr2DQSteps(context: DQContext,
       val accuracyTableName = ruleParam.getOutDfName()
       val matchedColName = details.getStringOrKey(_matched)
       val accuracyMetricSql = procType match {
-        case BatchProcessType => {
+        case BatchProcessType =>
           s"""
              |SELECT `${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
              |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS `${missColName}`,
              |(`${totalCountTableName}`.`${totalColName}` - coalesce(`${missCountTableName}`.`${missColName}`, 0)) AS `${matchedColName}`
              |FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}`
          """.stripMargin
-        }
-        case StreamingProcessType => {
+        case StreamingProcessType =>
           s"""
              |SELECT `${totalCountTableName}`.`${ConstantColumns.tmst}` AS `${ConstantColumns.tmst}`,
              |`${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
@@ -134,27 +142,26 @@ case class AccuracyExpr2DQSteps(context: DQContext,
              |FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}`
              |ON `${totalCountTableName}`.`${ConstantColumns.tmst}` = `${missCountTableName}`.`${ConstantColumns.tmst}`
          """.stripMargin
-        }
       }
       val accuracyTransStep = SparkSqlTransformStep(accuracyTableName, accuracyMetricSql, emptyMap)
       val accuracyMetricWriteSteps = procType match {
-        case BatchProcessType => {
+        case BatchProcessType =>
           val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
           val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
           val flattenType = metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
           MetricWriteStep(mwName, accuracyTableName, flattenType) :: Nil
-        }
         case StreamingProcessType => Nil
       }
 
       // accuracy current steps
       val transSteps1 = missRecordsTransStep :: missCountTransStep :: totalCountTransStep :: accuracyTransStep :: Nil
-      val writeSteps1 = accuracyMetricWriteSteps ++ missRecordsWriteSteps ++ missRecordsUpdateWriteSteps
+      val writeSteps1 =
+        accuracyMetricWriteSteps ++ missRecordsWriteSteps ++ missRecordsUpdateWriteSteps
 
       // streaming extra steps
       val (transSteps2, writeSteps2) = procType match {
         case BatchProcessType => (Nil, Nil)
-        case StreamingProcessType => {
+        case StreamingProcessType =>
           // 5. accuracy metric merge
           val accuracyMetricTableName = "__accuracy"
           val accuracyMetricRule = DataFrameOps._accuracy
@@ -183,14 +190,16 @@ case class AccuracyExpr2DQSteps(context: DQContext,
           val accuracyRecordTransStep = SparkSqlTransformStep(
             accuracyRecordTableName, accuracyRecordSql, emptyMap)
           val accuracyRecordWriteStep = {
-            val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(missRecordsTableName)
+            val rwName =
+              ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt)
+                .getOrElse(missRecordsTableName)
+
             RecordWriteStep(rwName, missRecordsTableName, Some(accuracyRecordTableName))
           }
 
           // extra steps
           (accuracyMetricTransStep :: accuracyRecordTransStep :: Nil,
             accuracyMetricWriteStep :: accuracyRecordWriteStep :: Nil)
-        }
       }
 
       // full steps

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
index 4852a5b..87cfa86 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
@@ -18,8 +18,8 @@ under the License.
 */
 package org.apache.griffin.measure.step.builder.dsl.transform
 
-import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
+import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.DQStep
 import org.apache.griffin.measure.step.builder.ConstantColumns
@@ -80,16 +80,23 @@ case class CompletenessExpr2DQSteps(context: DQContext,
       val sourceAliasSql = {
         s"SELECT ${selClause} FROM `${sourceName}`"
       }
-      val sourceAliasTransStep = SparkSqlTransformStep(sourceAliasTableName, sourceAliasSql, emptyMap, true)
+      val sourceAliasTransStep =
+        SparkSqlTransformStep(sourceAliasTableName, sourceAliasSql, emptyMap, true)
 
       // 2. incomplete record
       val incompleteRecordsTableName = "__incompleteRecords"
       val completeWhereClause = aliases.map(a => s"`${a}` IS NOT NULL").mkString(" AND ")
       val incompleteWhereClause = s"NOT (${completeWhereClause})"
-      val incompleteRecordsSql = s"SELECT * FROM `${sourceAliasTableName}` WHERE ${incompleteWhereClause}"
-      val incompleteRecordTransStep = SparkSqlTransformStep(incompleteRecordsTableName, incompleteRecordsSql, emptyMap, true)
+
+      val incompleteRecordsSql =
+        s"SELECT * FROM `${sourceAliasTableName}` WHERE ${incompleteWhereClause}"
+
+      val incompleteRecordTransStep =
+        SparkSqlTransformStep(incompleteRecordsTableName, incompleteRecordsSql, emptyMap, true)
       val incompleteRecordWriteStep = {
-        val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(incompleteRecordsTableName)
+        val rwName =
+          ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt)
+            .getOrElse(incompleteRecordsTableName)
         RecordWriteStep(rwName, incompleteRecordsTableName)
       }
 
@@ -97,17 +104,24 @@ case class CompletenessExpr2DQSteps(context: DQContext,
       val incompleteCountTableName = "__incompleteCount"
       val incompleteColName = details.getStringOrKey(_incomplete)
       val incompleteCountSql = procType match {
-        case BatchProcessType => s"SELECT COUNT(*) AS `${incompleteColName}` FROM `${incompleteRecordsTableName}`"
-        case StreamingProcessType => s"SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${incompleteColName}` FROM `${incompleteRecordsTableName}` GROUP BY `${ConstantColumns.tmst}`"
+        case BatchProcessType =>
+          s"SELECT COUNT(*) AS `${incompleteColName}` FROM `${incompleteRecordsTableName}`"
+        case StreamingProcessType =>
+          s"SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${incompleteColName}` " +
+            s"FROM `${incompleteRecordsTableName}` GROUP BY `${ConstantColumns.tmst}`"
       }
-      val incompleteCountTransStep = SparkSqlTransformStep(incompleteCountTableName, incompleteCountSql, emptyMap)
+      val incompleteCountTransStep =
+        SparkSqlTransformStep(incompleteCountTableName, incompleteCountSql, emptyMap)
 
       // 4. total count
       val totalCountTableName = "__totalCount"
       val totalColName = details.getStringOrKey(_total)
       val totalCountSql = procType match {
-        case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceAliasTableName}`"
-        case StreamingProcessType => s"SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${totalColName}` FROM `${sourceAliasTableName}` GROUP BY `${ConstantColumns.tmst}`"
+        case BatchProcessType =>
+          s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceAliasTableName}`"
+        case StreamingProcessType =>
+          s"SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${totalColName}` " +
+            s"FROM `${sourceAliasTableName}` GROUP BY `${ConstantColumns.tmst}`"
       }
       val totalCountTransStep = SparkSqlTransformStep(totalCountTableName, totalCountSql, emptyMap)
 
@@ -115,15 +129,14 @@ case class CompletenessExpr2DQSteps(context: DQContext,
       val completeTableName = ruleParam.getOutDfName()
       val completeColName = details.getStringOrKey(_complete)
       val completeMetricSql = procType match {
-        case BatchProcessType => {
+        case BatchProcessType =>
           s"""
              |SELECT `${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
              |coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 0) AS `${incompleteColName}`,
              |(`${totalCountTableName}`.`${totalColName}` - coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 0)) AS `${completeColName}`
              |FROM `${totalCountTableName}` LEFT JOIN `${incompleteCountTableName}`
          """.stripMargin
-        }
-        case StreamingProcessType => {
+        case StreamingProcessType =>
           s"""
              |SELECT `${totalCountTableName}`.`${ConstantColumns.tmst}` AS `${ConstantColumns.tmst}`,
              |`${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
@@ -132,7 +145,6 @@ case class CompletenessExpr2DQSteps(context: DQContext,
              |FROM `${totalCountTableName}` LEFT JOIN `${incompleteCountTableName}`
              |ON `${totalCountTableName}`.`${ConstantColumns.tmst}` = `${incompleteCountTableName}`.`${ConstantColumns.tmst}`
          """.stripMargin
-        }
       }
       val completeTransStep = SparkSqlTransformStep(completeTableName, completeMetricSql, emptyMap)
       val completeWriteStep = {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
index 6c56a77..70fee6c 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
@@ -18,8 +18,8 @@ under the License.
 */
 package org.apache.griffin.measure.step.builder.dsl.transform
 
-import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
+import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.DQStep
 import org.apache.griffin.measure.step.builder.ConstantColumns
@@ -101,7 +101,8 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
       val sourceAliasSql = {
         s"SELECT ${selClause} FROM `${sourceName}`"
       }
-      val sourceAliasTransStep = SparkSqlTransformStep(sourceAliasTableName, sourceAliasSql, emptyMap, true)
+      val sourceAliasTransStep =
+        SparkSqlTransformStep(sourceAliasTableName, sourceAliasSql, emptyMap, true)
 
       // 2. total metric
       val totalTableName = "__totalMetric"
@@ -125,13 +126,14 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
            |FROM `${sourceAliasTableName}` GROUP BY ${distAliasesClause}
           """.stripMargin
       }
-      val selfGroupTransStep = SparkSqlTransformStep(selfGroupTableName, selfGroupSql, emptyMap, true)
+      val selfGroupTransStep =
+        SparkSqlTransformStep(selfGroupTableName, selfGroupSql, emptyMap, true)
 
       val transSteps1 = sourceAliasTransStep :: totalTransStep :: selfGroupTransStep :: Nil
       val writeSteps1 = totalMetricWriteStep :: Nil
 
       val ((transSteps2, writeSteps2), dupCountTableName) = procType match {
-        case StreamingProcessType if (withOlderTable) => {
+        case StreamingProcessType if (withOlderTable) =>
           // 4.0 update old data
           val targetDsUpdateWriteStep = DataSourceUpdateWriteStep(targetName, targetName)
 
@@ -200,14 +202,14 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
                |FROM `${groupTableName}`
              """.stripMargin
           }
-          val finalDupCountTransStep = SparkSqlTransformStep(finalDupCountTableName, finalDupCountSql, emptyMap, true)
+          val finalDupCountTransStep =
+            SparkSqlTransformStep(finalDupCountTableName, finalDupCountSql, emptyMap, true)
 
-          ((olderAliasTransStep :: joinedTransStep :: groupTransStep :: finalDupCountTransStep :: Nil,
+          ((olderAliasTransStep :: joinedTransStep
+            :: groupTransStep :: finalDupCountTransStep :: Nil,
             targetDsUpdateWriteStep :: Nil), finalDupCountTableName)
-        }
-        case _ => {
+        case _ =>
           ((Nil, Nil), selfGroupTableName)
-        }
       }
 
       // 8. distinct metric
@@ -285,9 +287,13 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
                |FROM `${dupItemsTableName}` GROUP BY ${groupSelClause}, `${dupColName}`
              """.stripMargin
           }
-          val groupDupMetricTransStep = SparkSqlTransformStep(groupDupMetricTableName, groupDupMetricSql, emptyMap)
+          val groupDupMetricTransStep =
+            SparkSqlTransformStep(groupDupMetricTableName, groupDupMetricSql, emptyMap)
           val groupDupMetricWriteStep = {
-            MetricWriteStep(duplicationArrayName, groupDupMetricTableName, ArrayFlattenType, writeTimestampOpt)
+            MetricWriteStep(duplicationArrayName,
+              groupDupMetricTableName,
+              ArrayFlattenType,
+              writeTimestampOpt)
           }
 
           val msteps = {
@@ -306,7 +312,9 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
           // 9. duplicate record
           val dupRecordTableName = "__dupRecords"
           val dupRecordSelClause = procType match {
-            case StreamingProcessType if (withOlderTable) => s"${distAliasesClause}, `${dupColName}`, `${accuDupColName}`"
+            case StreamingProcessType if (withOlderTable) =>
+              s"${distAliasesClause}, `${dupColName}`, `${accuDupColName}`"
+
             case _ => s"${distAliasesClause}, `${dupColName}`"
           }
           val dupRecordSql = {
@@ -315,9 +323,14 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
                |FROM `${dupCountTableName}` WHERE `${dupColName}` > 0
               """.stripMargin
           }
-          val dupRecordTransStep = SparkSqlTransformStep(dupRecordTableName, dupRecordSql, emptyMap, true)
+          val dupRecordTransStep =
+            SparkSqlTransformStep(dupRecordTableName, dupRecordSql, emptyMap, true)
+
           val dupRecordWriteStep = {
-            val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(dupRecordTableName)
+            val rwName =
+              ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt)
+                .getOrElse(dupRecordTableName)
+
             RecordWriteStep(rwName, dupRecordTableName, None, writeTimestampOpt)
           }
 
@@ -332,7 +345,12 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
           }
           val dupMetricTransStep = SparkSqlTransformStep(dupMetricTableName, dupMetricSql, emptyMap)
           val dupMetricWriteStep = {
-            MetricWriteStep(duplicationArrayName, dupMetricTableName, ArrayFlattenType, writeTimestampOpt)
+            MetricWriteStep(
+              duplicationArrayName,
+              dupMetricTableName,
+              ArrayFlattenType,
+              writeTimestampOpt
+            )
           }
 
           val msteps = {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala
index d986b56..492f4fd 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala
@@ -19,8 +19,8 @@ under the License.
 package org.apache.griffin.measure.step.builder.dsl.transform
 
 import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
+import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.context.{ContextId, DQContext, TimeRange}
 import org.apache.griffin.measure.step.DQStep
 import org.apache.griffin.measure.step.builder.dsl.expr.Expr

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
index ecc115c..af493af 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
@@ -19,8 +19,9 @@ under the License.
 package org.apache.griffin.measure.step.builder.dsl.transform
 
 import org.apache.commons.lang.StringUtils
-import org.apache.griffin.measure.configuration.enums.{BatchProcessType, FlattenType, MetricOutputType, StreamingProcessType}
+
 import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
+import org.apache.griffin.measure.configuration.enums.{BatchProcessType, FlattenType, MetricOutputType, StreamingProcessType}
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.DQStep
 import org.apache.griffin.measure.step.builder.ConstantColumns
@@ -63,7 +64,9 @@ case class ProfilingExpr2DQSteps(context: DQContext,
       val analyzer = ProfilingAnalyzer(profilingExpr, sourceName)
       val selExprDescs = analyzer.selectionExprs.map { sel =>
         val alias = sel match {
-          case s: AliasableExpr => s.alias.filter(StringUtils.isNotEmpty).map(a => s" AS `${a}`").getOrElse("")
+          case s: AliasableExpr =>
+            s.alias.filter(StringUtils.isNotEmpty).map(a => s" AS `${a}`").getOrElse("")
+
           case _ => ""
         }
         s"${sel.desc}${alias}"
@@ -76,21 +79,22 @@ case class ProfilingExpr2DQSteps(context: DQContext,
       val groupByClauseOpt = analyzer.groupbyExprOpt
       val groupbyClause = procType match {
         case BatchProcessType => groupByClauseOpt.map(_.desc).getOrElse("")
-        case StreamingProcessType => {
-          val tmstGroupbyClause = GroupbyClause(LiteralStringExpr(s"`${ConstantColumns.tmst}`") :: Nil, None)
+        case StreamingProcessType =>
+          val tmstGroupbyClause =
+            GroupbyClause(LiteralStringExpr(s"`${ConstantColumns.tmst}`") :: Nil, None)
           val mergedGroubbyClause = tmstGroupbyClause.merge(groupByClauseOpt match {
             case Some(gbc) => gbc
             case _ => GroupbyClause(Nil, None)
           })
           mergedGroubbyClause.desc
-        }
       }
       val preGroupbyClause = analyzer.preGroupbyExprs.map(_.desc).mkString(" ")
       val postGroupbyClause = analyzer.postGroupbyExprs.map(_.desc).mkString(" ")
 
       // 1. select statement
       val profilingSql = {
-        s"SELECT ${selCondition} ${selClause} ${fromClause} ${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}"
+        s"SELECT ${selCondition} ${selClause} " +
+          s"${fromClause} ${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}"
       }
       val profilingName = ruleParam.getOutDfName()
       val profilingTransStep = SparkSqlTransformStep(profilingName, profilingSql, details)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
index 8a3924b..3731da9 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
@@ -18,8 +18,8 @@ under the License.
 */
 package org.apache.griffin.measure.step.builder.dsl.transform
 
-import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
+import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.DQStep
 import org.apache.griffin.measure.step.builder.ConstantColumns
@@ -79,19 +79,17 @@ case class TimelinessExpr2DQSteps(context: DQContext,
       // 1. in time
       val inTimeTableName = "__inTime"
       val inTimeSql = etsSelOpt match {
-        case Some(etsSel) => {
+        case Some(etsSel) =>
           s"""
              |SELECT *, (${btsSel}) AS `${ConstantColumns.beginTs}`,
              |(${etsSel}) AS `${ConstantColumns.endTs}`
              |FROM ${sourceName} WHERE (${btsSel}) IS NOT NULL AND (${etsSel}) IS NOT NULL
            """.stripMargin
-        }
-        case _ => {
+        case _ =>
           s"""
              |SELECT *, (${btsSel}) AS `${ConstantColumns.beginTs}`
              |FROM ${sourceName} WHERE (${btsSel}) IS NOT NULL
            """.stripMargin
-        }
       }
       val inTimeTransStep = SparkSqlTransformStep(inTimeTableName, inTimeSql, emptyMap)
 
@@ -103,7 +101,8 @@ case class TimelinessExpr2DQSteps(context: DQContext,
         case _ => ConstantColumns.tmst
       }
       val latencySql = {
-        s"SELECT *, (`${etsColName}` - `${ConstantColumns.beginTs}`) AS `${latencyColName}` FROM `${inTimeTableName}`"
+        s"SELECT *, (`${etsColName}` - `${ConstantColumns.beginTs}`) AS `${latencyColName}` " +
+          s"FROM `${inTimeTableName}`"
       }
       val latencyTransStep = SparkSqlTransformStep(latencyTableName, latencySql, emptyMap, true)
 
@@ -112,14 +111,15 @@ case class TimelinessExpr2DQSteps(context: DQContext,
       val totalColName = details.getStringOrKey(_total)
       val avgColName = details.getStringOrKey(_avg)
       val metricSql = procType match {
-        case BatchProcessType => {
+
+        case BatchProcessType =>
           s"""
              |SELECT COUNT(*) AS `${totalColName}`,
              |CAST(AVG(`${latencyColName}`) AS BIGINT) AS `${avgColName}`
              |FROM `${latencyTableName}`
            """.stripMargin
-        }
-        case StreamingProcessType => {
+
+        case StreamingProcessType =>
           s"""
              |SELECT `${ConstantColumns.tmst}`,
              |COUNT(*) AS `${totalColName}`,
@@ -127,7 +127,6 @@ case class TimelinessExpr2DQSteps(context: DQContext,
              |FROM `${latencyTableName}`
              |GROUP BY `${ConstantColumns.tmst}`
            """.stripMargin
-        }
       }
       val metricTransStep = SparkSqlTransformStep(metricTableName, metricSql, emptyMap)
       val metricWriteStep = {
@@ -143,24 +142,26 @@ case class TimelinessExpr2DQSteps(context: DQContext,
 
       // 4. timeliness record
       val (transSteps2, writeSteps2) = TimeUtil.milliseconds(details.getString(_threshold, "")) match {
-        case Some(tsh) => {
+        case Some(tsh) =>
           val recordTableName = "__lateRecords"
           val recordSql = {
             s"SELECT * FROM `${latencyTableName}` WHERE `${latencyColName}` > ${tsh}"
           }
           val recordTransStep = SparkSqlTransformStep(recordTableName, recordSql, emptyMap)
           val recordWriteStep = {
-            val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(recordTableName)
+            val rwName =
+              ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt)
+                .getOrElse(recordTableName)
+
             RecordWriteStep(rwName, recordTableName, None)
           }
           (recordTransStep :: Nil, recordWriteStep :: Nil)
-        }
         case _ => (Nil, Nil)
       }
 
       // 5. ranges
       val (transSteps3, writeSteps3) = TimeUtil.milliseconds(details.getString(_stepSize, "")) match {
-        case Some(stepSize) => {
+        case Some(stepSize) =>
           // 5.1 range
           val rangeTableName = "__range"
           val stepColName = details.getStringOrKey(_step)
@@ -176,26 +177,24 @@ case class TimelinessExpr2DQSteps(context: DQContext,
           val rangeMetricTableName = "__rangeMetric"
           val countColName = details.getStringOrKey(_count)
           val rangeMetricSql = procType match {
-            case BatchProcessType => {
+            case BatchProcessType =>
               s"""
                  |SELECT `${stepColName}`, COUNT(*) AS `${countColName}`
                  |FROM `${rangeTableName}` GROUP BY `${stepColName}`
                 """.stripMargin
-            }
-            case StreamingProcessType => {
+            case StreamingProcessType =>
               s"""
                  |SELECT `${ConstantColumns.tmst}`, `${stepColName}`, COUNT(*) AS `${countColName}`
                  |FROM `${rangeTableName}` GROUP BY `${ConstantColumns.tmst}`, `${stepColName}`
                 """.stripMargin
-            }
           }
-          val rangeMetricTransStep = SparkSqlTransformStep(rangeMetricTableName, rangeMetricSql, emptyMap)
+          val rangeMetricTransStep =
+            SparkSqlTransformStep(rangeMetricTableName, rangeMetricSql, emptyMap)
           val rangeMetricWriteStep = {
             MetricWriteStep(stepColName, rangeMetricTableName, ArrayFlattenType)
           }
 
           (rangeTransStep :: rangeMetricTransStep :: Nil, rangeMetricWriteStep :: Nil)
-        }
         case _ => (Nil, Nil)
       }
 
@@ -206,7 +205,8 @@ case class TimelinessExpr2DQSteps(context: DQContext,
         val percentileColName = details.getStringOrKey(_percentileColPrefix)
         val percentileCols = percentiles.map { pct =>
           val pctName = (pct * 100).toInt.toString
-          s"floor(percentile_approx(${latencyColName}, ${pct})) AS `${percentileColName}_${pctName}`"
+          s"floor(percentile_approx(${latencyColName}, ${pct})) " +
+            s"AS `${percentileColName}_${pctName}`"
         }.mkString(", ")
         val percentileSql = {
           s"""
@@ -214,7 +214,9 @@ case class TimelinessExpr2DQSteps(context: DQContext,
              |FROM `${latencyTableName}`
             """.stripMargin
         }
-        val percentileTransStep = SparkSqlTransformStep(percentileTableName, percentileSql, emptyMap)
+        val percentileTransStep =
+          SparkSqlTransformStep(percentileTableName, percentileSql, emptyMap)
+
         val percentileWriteStep = {
           MetricWriteStep(percentileTableName, percentileTableName, DefaultFlattenType)
         }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
index 443239c..28e9d48 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
@@ -18,8 +18,8 @@ under the License.
 */
 package org.apache.griffin.measure.step.builder.dsl.transform
 
-import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
+import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.DQStep
 import org.apache.griffin.measure.step.builder.ConstantColumns
@@ -112,7 +112,8 @@ case class UniquenessExpr2DQSteps(context: DQContext,
       }.mkString(", ")
       val dupColName = details.getStringOrKey(_dup)
       val groupSql = {
-        s"SELECT ${groupSelClause}, (COUNT(*) - 1) AS `${dupColName}` FROM `${joinedTableName}` GROUP BY ${groupSelClause}"
+        s"SELECT ${groupSelClause}, (COUNT(*) - 1) AS `${dupColName}` " +
+          s"FROM `${joinedTableName}` GROUP BY ${groupSelClause}"
       }
       val groupTransStep = SparkSqlTransformStep(groupTableName, groupSql, emptyMap, true)
 
@@ -121,12 +122,11 @@ case class UniquenessExpr2DQSteps(context: DQContext,
       val totalColName = details.getStringOrKey(_total)
       val totalSql = procType match {
         case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceName}`"
-        case StreamingProcessType => {
+        case StreamingProcessType =>
           s"""
              |SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${totalColName}`
              |FROM `${sourceName}` GROUP BY `${ConstantColumns.tmst}`
            """.stripMargin
-        }
       }
       val totalTransStep = SparkSqlTransformStep(totalTableName, totalSql, emptyMap)
       val totalMetricWriteStep = MetricWriteStep(totalColName, totalTableName, EntriesFlattenType)
@@ -136,22 +136,25 @@ case class UniquenessExpr2DQSteps(context: DQContext,
       val uniqueRecordSql = {
         s"SELECT * FROM `${groupTableName}` WHERE `${dupColName}` = 0"
       }
-      val uniqueRecordTransStep = SparkSqlTransformStep(uniqueRecordTableName, uniqueRecordSql, emptyMap)
+      val uniqueRecordTransStep =
+        SparkSqlTransformStep(uniqueRecordTableName, uniqueRecordSql, emptyMap)
 
       // 7. unique metric
       val uniqueTableName = "__uniqueMetric"
       val uniqueColName = details.getStringOrKey(_unique)
       val uniqueSql = procType match {
-        case BatchProcessType => s"SELECT COUNT(*) AS `${uniqueColName}` FROM `${uniqueRecordTableName}`"
-        case StreamingProcessType => {
+        case BatchProcessType =>
+          s"SELECT COUNT(*) AS `${uniqueColName}` FROM `${uniqueRecordTableName}`"
+        case StreamingProcessType =>
           s"""
              |SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${uniqueColName}`
              |FROM `${uniqueRecordTableName}` GROUP BY `${ConstantColumns.tmst}`
            """.stripMargin
-        }
       }
       val uniqueTransStep = SparkSqlTransformStep(uniqueTableName, uniqueSql, emptyMap)
-      val uniqueMetricWriteStep = MetricWriteStep(uniqueColName, uniqueTableName, EntriesFlattenType)
+
+      val uniqueMetricWriteStep =
+        MetricWriteStep(uniqueColName, uniqueTableName, EntriesFlattenType)
 
       val transSteps1 = sourceTransStep :: targetTransStep :: joinedTransStep :: groupTransStep ::
         totalTransStep :: uniqueRecordTransStep :: uniqueTransStep :: Nil
@@ -164,9 +167,14 @@ case class UniquenessExpr2DQSteps(context: DQContext,
         val dupRecordSql = {
           s"SELECT * FROM `${groupTableName}` WHERE `${dupColName}` > 0"
         }
-        val dupRecordTransStep = SparkSqlTransformStep(dupRecordTableName, dupRecordSql, emptyMap, true)
+        val dupRecordTransStep =
+          SparkSqlTransformStep(dupRecordTableName, dupRecordSql, emptyMap, true)
+
         val dupRecordWriteStep = {
-          val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(dupRecordTableName)
+          val rwName =
+            ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt)
+              .getOrElse(dupRecordTableName)
+
           RecordWriteStep(rwName, dupRecordTableName)
         }
 
@@ -175,7 +183,9 @@ case class UniquenessExpr2DQSteps(context: DQContext,
         val numColName = details.getStringOrKey(_num)
         val dupMetricSelClause = procType match {
           case BatchProcessType => s"`${dupColName}`, COUNT(*) AS `${numColName}`"
-          case StreamingProcessType => s"`${ConstantColumns.tmst}`, `${dupColName}`, COUNT(*) AS `${numColName}`"
+
+          case StreamingProcessType =>
+            s"`${ConstantColumns.tmst}`, `${dupColName}`, COUNT(*) AS `${numColName}`"
         }
         val dupMetricGroupbyClause = procType match {
           case BatchProcessType => s"`${dupColName}`"

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/AccuracyAnalyzer.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/AccuracyAnalyzer.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/AccuracyAnalyzer.scala
index b2a95ce..d57bf23 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/AccuracyAnalyzer.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/AccuracyAnalyzer.scala
@@ -21,9 +21,11 @@ package org.apache.griffin.measure.step.builder.dsl.transform.analyzer
 import org.apache.griffin.measure.step.builder.dsl.expr._
 
 
-case class AccuracyAnalyzer(expr: LogicalExpr, sourceName: String, targetName: String) extends BasicAnalyzer {
+case class AccuracyAnalyzer(expr: LogicalExpr, sourceName: String, targetName: String)
+  extends BasicAnalyzer {
 
-  val dataSourceNames = expr.preOrderTraverseDepthFirst(Set[String]())(seqDataSourceNames, combDataSourceNames)
+  val dataSourceNames =
+    expr.preOrderTraverseDepthFirst(Set[String]())(seqDataSourceNames, combDataSourceNames)
 
   val sourceSelectionExprs = {
     val seq = seqSelectionExprs(sourceName)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/CompletenessAnalyzer.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/CompletenessAnalyzer.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/CompletenessAnalyzer.scala
index f8b872a..5a73bce 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/CompletenessAnalyzer.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/CompletenessAnalyzer.scala
@@ -21,7 +21,8 @@ package org.apache.griffin.measure.step.builder.dsl.transform.analyzer
 import org.apache.griffin.measure.step.builder.dsl.expr._
 
 
-case class CompletenessAnalyzer(expr: CompletenessClause, sourceName: String) extends BasicAnalyzer {
+case class CompletenessAnalyzer(expr: CompletenessClause, sourceName: String)
+  extends BasicAnalyzer {
 
   val seqAlias = (expr: Expr, v: Seq[String]) => {
     expr match {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/DistinctnessAnalyzer.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/DistinctnessAnalyzer.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/DistinctnessAnalyzer.scala
index 9cee8ab..b1f4229 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/DistinctnessAnalyzer.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/DistinctnessAnalyzer.scala
@@ -21,8 +21,8 @@ package org.apache.griffin.measure.step.builder.dsl.transform.analyzer
 import org.apache.griffin.measure.step.builder.dsl.expr._
 
 
-//case class DistinctnessAnalyzer(expr: DistinctnessClause, sourceName: String, targetName: String) extends BasicAnalyzer {
-case class DistinctnessAnalyzer(expr: DistinctnessClause, sourceName: String) extends BasicAnalyzer {
+case class DistinctnessAnalyzer(expr: DistinctnessClause, sourceName: String)
+  extends BasicAnalyzer {
 
   val seqAlias = (expr: Expr, v: Seq[String]) => {
     expr match {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/ProfilingAnalyzer.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/ProfilingAnalyzer.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/ProfilingAnalyzer.scala
index 049e6fd..f66d482 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/ProfilingAnalyzer.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/ProfilingAnalyzer.scala
@@ -23,7 +23,8 @@ import org.apache.griffin.measure.step.builder.dsl.expr._
 
 case class ProfilingAnalyzer(expr: ProfilingClause, sourceName: String) extends BasicAnalyzer {
 
-  val dataSourceNames = expr.preOrderTraverseDepthFirst(Set[String]())(seqDataSourceNames, combDataSourceNames)
+  val dataSourceNames =
+    expr.preOrderTraverseDepthFirst(Set[String]())(seqDataSourceNames, combDataSourceNames)
 
   val selectionExprs: Seq[Expr] = {
     expr.selectClause.exprs.map(_.extractSelf).flatMap { expr =>

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/UniquenessAnalyzer.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/UniquenessAnalyzer.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/UniquenessAnalyzer.scala
index 0976d6c..8a05d17 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/UniquenessAnalyzer.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/UniquenessAnalyzer.scala
@@ -21,7 +21,8 @@ package org.apache.griffin.measure.step.builder.dsl.transform.analyzer
 import org.apache.griffin.measure.step.builder.dsl.expr.{AliasableExpr, _}
 
 
-case class UniquenessAnalyzer(expr: UniquenessClause, sourceName: String, targetName: String) extends BasicAnalyzer {
+case class UniquenessAnalyzer(expr: UniquenessClause, sourceName: String, targetName: String)
+  extends BasicAnalyzer {
 
   val seqAlias = (expr: Expr, v: Seq[String]) => {
     expr match {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcParamMaker.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcParamMaker.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcParamMaker.scala
index eac3b2b..713dc55 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcParamMaker.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcParamMaker.scala
@@ -26,9 +26,10 @@ import org.apache.griffin.measure.configuration.enums._
   */
 object PreProcParamMaker {
 
-  case class StringAnyMap(values:Map[String,Any])
+  case class StringAnyMap ( values: Map[String, Any] )
 
-  def makePreProcRules(rules: Seq[RuleParam], suffix: String, dfName: String): (Seq[RuleParam], String) = {
+  def makePreProcRules(rules: Seq[RuleParam],
+                       suffix: String, dfName: String): (Seq[RuleParam], String) = {
     val len = rules.size
     val (newRules, _) = rules.zipWithIndex.foldLeft((Nil: Seq[RuleParam], dfName)) { (ret, pair) =>
       val (rls, prevOutDfName) = ret
@@ -47,10 +48,9 @@ object PreProcParamMaker {
     val rpRule = rule.replaceInOutDfName(newInDfName, newOutDfName)
     rule.getDslType match {
       case DataFrameOpsType => rpRule
-      case _ => {
+      case _ =>
         val newRule = replaceDfNameSuffix(rule.getRule, rule.getInDfName(), suffix)
         rpRule.replaceRule(newRule)
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/read/ReadStep.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/read/ReadStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/read/ReadStep.scala
index 8b1df82..9194da1 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/read/ReadStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/read/ReadStep.scala
@@ -18,9 +18,10 @@ under the License.
 */
 package org.apache.griffin.measure.step.read
 
+import org.apache.spark.sql._
+
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.DQStep
-import org.apache.spark.sql._
 
 trait ReadStep extends DQStep {
 
@@ -31,15 +32,13 @@ trait ReadStep extends DQStep {
   def execute(context: DQContext): Boolean = {
     info(s"read data source [${name}]")
     read(context) match {
-      case Some(df) => {
+      case Some(df) =>
 //        if (needCache) context.dataFrameCache.cacheDataFrame(name, df)
         context.runTimeTableRegister.registerTable(name, df)
         true
-      }
-      case _ => {
+      case _ =>
         warn(s"read data source [${name}] fails")
         false
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/read/UnionReadStep.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/read/UnionReadStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/read/UnionReadStep.scala
index 6dae1cb..88a39cf 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/read/UnionReadStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/read/UnionReadStep.scala
@@ -18,8 +18,9 @@ under the License.
 */
 package org.apache.griffin.measure.step.read
 
-import org.apache.griffin.measure.context.DQContext
 import org.apache.spark.sql._
+
+import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.utils.DataFrameUtil._
 
 case class UnionReadStep(name: String,

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala
index 86b367c..088f328 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala
@@ -20,13 +20,14 @@ package org.apache.griffin.measure.step.transform
 
 import java.util.Date
 
+import org.apache.spark.sql.{Encoders, Row, SQLContext, _}
+import org.apache.spark.sql.types.{BooleanType, LongType, StructField, StructType}
+
 import org.apache.griffin.measure.context.ContextId
-import org.apache.griffin.measure.context.streaming.metric.CacheResults.CacheResult
 import org.apache.griffin.measure.context.streaming.metric._
+import org.apache.griffin.measure.context.streaming.metric.CacheResults.CacheResult
 import org.apache.griffin.measure.step.builder.ConstantColumns
 import org.apache.griffin.measure.utils.ParamUtil._
-import org.apache.spark.sql.types.{BooleanType, LongType, StructField, StructType}
-import org.apache.spark.sql.{Encoders, Row, SQLContext, _}
 
 /**
   * pre-defined data frame operations
@@ -44,7 +45,9 @@ object DataFrameOps {
     val _matched = "matched"
   }
 
-  def fromJson(sqlContext: SQLContext, inputDfName: String, details: Map[String, Any]): DataFrame = {
+  def fromJson(sqlContext: SQLContext,
+               inputDfName: String,
+               details: Map[String, Any]): DataFrame = {
     val _colName = "col.name"
     val colNameOpt = details.get(_colName).map(_.toString)
 
@@ -58,7 +61,10 @@ object DataFrameOps {
     sqlContext.read.json(rdd) // slow process
   }
 
-  def accuracy(sqlContext: SQLContext, inputDfName: String, contextId: ContextId, details: Map[String, Any]): DataFrame = {
+  def accuracy(sqlContext: SQLContext,
+               inputDfName: String,
+               contextId: ContextId,
+               details: Map[String, Any]): DataFrame = {
     import AccuracyOprKeys._
 
     val miss = details.getStringOrKey(_miss)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
index 5f99ed2..a2bf46e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
@@ -35,7 +35,9 @@ case class DataFrameOpsTransformStep(name: String,
     try {
       val df = rule match {
         case DataFrameOps._fromJson => DataFrameOps.fromJson(sqlContext, inputDfName, details)
-        case DataFrameOps._accuracy => DataFrameOps.accuracy(sqlContext, inputDfName, context.contextId, details)
+        case DataFrameOps._accuracy =>
+          DataFrameOps.accuracy(sqlContext, inputDfName, context.contextId, details)
+
         case DataFrameOps._clear => DataFrameOps.clear(sqlContext, inputDfName, details)
         case _ => throw new Exception(s"df opr [ ${rule} ] not supported")
       }
@@ -43,10 +45,9 @@ case class DataFrameOpsTransformStep(name: String,
       context.runTimeTableRegister.registerTable(name, df)
       true
     } catch {
-      case e: Throwable => {
+      case e: Throwable =>
         error(s"run data frame ops [ ${rule} ] error: ${e.getMessage}")
         false
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
index ead7344..ca03f79 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
@@ -37,10 +37,9 @@ case class SparkSqlTransformStep(name: String,
       context.runTimeTableRegister.registerTable(name, df)
       true
     } catch {
-      case e: Throwable => {
+      case e: Throwable =>
         error(s"run spark sql [ ${rule} ] error: ${e.getMessage}")
         false
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
index 9415998..f34e003 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
@@ -19,9 +19,10 @@ under the License.
 package org.apache.griffin.measure.step.write
 
 import org.apache.commons.lang.StringUtils
-import org.apache.griffin.measure.context.DQContext
 import org.apache.spark.sql.DataFrame
 
+import org.apache.griffin.measure.context.DQContext
+
 /**
   * update data source streaming cache
   */
@@ -34,12 +35,12 @@ case class DataSourceUpdateWriteStep(dsName: String,
 
   def execute(context: DQContext): Boolean = {
     getDataSourceCacheUpdateDf(context) match {
-      case Some(df) => {
-        context.dataSources.find(ds => StringUtils.equals(ds.name, dsName)).foreach(_.updateData(df))
-      }
-      case _ => {
+      case Some(df) =>
+        context.dataSources
+          .find(ds => StringUtils.equals(ds.name, dsName))
+          .foreach(_.updateData(df))
+      case _ =>
         warn(s"update ${dsName} from ${inputName} fails")
-      }
     }
     true
   }
@@ -49,13 +50,13 @@ case class DataSourceUpdateWriteStep(dsName: String,
       val df = context.sqlContext.table(s"`${name}`")
       Some(df)
     } catch {
-      case e: Throwable => {
+      case e: Throwable =>
         error(s"get data frame ${name} fails")
         None
-      }
     }
   }
 
-  private def getDataSourceCacheUpdateDf(context: DQContext): Option[DataFrame] = getDataFrame(context, inputName)
+  private def getDataSourceCacheUpdateDf(context: DQContext): Option[DataFrame]
+    = getDataFrame(context, inputName)
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
index 8f7d01c..e787d96 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
@@ -36,10 +36,9 @@ case class MetricFlushStep() extends WriteStep {
         context.getSink(t).sinkMetrics(metric)
         true
       } catch {
-        case e: Throwable => {
+        case e: Throwable =>
           error(s"flush metrics error: ${e.getMessage}")
           false
-        }
       }
       ret && pr
     }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
index 4771891..bc721f2 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
@@ -45,11 +45,12 @@ case class MetricWriteStep(name: String,
     // get timestamp and normalize metric
     val writeMode = writeTimestampOpt.map(_ => SimpleMode).getOrElse(context.writeMode)
     val timestampMetricMap: Map[Long, Map[String, Any]] = writeMode match {
-      case SimpleMode => {
+
+      case SimpleMode =>
         val metrics: Map[String, Any] = flattenMetric(metricMaps, name, flattenType)
         emptyMetricMap + (timestamp -> metrics)
-      }
-      case TimestampMode => {
+
+      case TimestampMode =>
         val tmstMetrics = metricMaps.map { metric =>
           val tmst = metric.getLong(ConstantColumns.tmst, timestamp)
           val pureMetric = metric.removeKeys(ConstantColumns.columns)
@@ -61,7 +62,6 @@ case class MetricWriteStep(name: String,
           val mtc = flattenMetric(maps, name, flattenType)
           (k, mtc)
         }
-      }
     }
 
     // write to metric wrapper
@@ -88,10 +88,9 @@ case class MetricWriteStep(name: String,
         }.toSeq
       } else Nil
     } catch {
-      case e: Throwable => {
+      case e: Throwable =>
         error(s"get metric ${name} fails")
         Nil
-      }
     }
   }
 
@@ -100,14 +99,12 @@ case class MetricWriteStep(name: String,
     flattenType match {
       case EntriesFlattenType => metrics.headOption.getOrElse(emptyMap)
       case ArrayFlattenType => Map[String, Any]((name -> metrics))
-      case MapFlattenType => {
+      case MapFlattenType =>
         val v = metrics.headOption.getOrElse(emptyMap)
         Map[String, Any]((name -> v))
-      }
-      case _ => {
+      case _ =>
         if (metrics.size > 1) Map[String, Any]((name -> metrics))
         else metrics.headOption.getOrElse(emptyMap)
-      }
     }
   }