You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2018/09/29 10:21:51 UTC
[1/3] incubator-griffin git commit: Fix case clauses
Repository: incubator-griffin
Updated Branches:
refs/heads/master 485c5cfc7 -> 18fc4cf4c
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala
index 2bc373c..1fef694 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala
@@ -18,12 +18,13 @@ under the License.
*/
package org.apache.griffin.measure.step.write
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql._
+
import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.step.builder.ConstantColumns
import org.apache.griffin.measure.utils.JsonUtil
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql._
/**
* write records needs to be sink
@@ -39,18 +40,16 @@ case class RecordWriteStep(name: String,
val writeMode = writeTimestampOpt.map(_ => SimpleMode).getOrElse(context.writeMode)
writeMode match {
- case SimpleMode => {
+ case SimpleMode =>
// batch records
val recordsOpt = getBatchRecords(context)
// write records
recordsOpt match {
- case Some(records) => {
+ case Some(records) =>
context.getSink(timestamp).sinkRecords(records, name)
- }
- case _ => {}
+ case _ =>
}
- }
- case TimestampMode => {
+ case TimestampMode =>
// streaming records
val (recordsOpt, emptyTimestamps) = getStreamingRecords(context)
// write records
@@ -63,7 +62,6 @@ case class RecordWriteStep(name: String,
emptyTimestamps.foreach { t =>
context.getSink(t).sinkRecords(Nil, name)
}
- }
}
true
}
@@ -81,28 +79,31 @@ case class RecordWriteStep(name: String,
val df = context.sqlContext.table(s"`${name}`")
Some(df)
} catch {
- case e: Throwable => {
+ case e: Throwable =>
error(s"get data frame ${name} fails")
None
- }
}
}
- private def getRecordDataFrame(context: DQContext): Option[DataFrame] = getDataFrame(context, inputName)
+ private def getRecordDataFrame(context: DQContext): Option[DataFrame]
+ = getDataFrame(context, inputName)
- private def getFilterTableDataFrame(context: DQContext): Option[DataFrame] = filterTableNameOpt.flatMap(getDataFrame(context, _))
+ private def getFilterTableDataFrame(context: DQContext): Option[DataFrame]
+ = filterTableNameOpt.flatMap(getDataFrame(context, _))
private def getBatchRecords(context: DQContext): Option[RDD[String]] = {
getRecordDataFrame(context).map(_.toJSON.rdd);
}
- private def getStreamingRecords(context: DQContext): (Option[RDD[(Long, Iterable[String])]], Set[Long]) = {
+ private def getStreamingRecords(context: DQContext)
+ : (Option[RDD[(Long, Iterable[String])]], Set[Long])
+ = {
implicit val encoder = Encoders.tuple(Encoders.scalaLong, Encoders.STRING)
val defTimestamp = context.contextId.timestamp
getRecordDataFrame(context) match {
- case Some(df) => {
+ case Some(df) =>
val (filterFuncOpt, emptyTimestamps) = getFilterTableDataFrame(context) match {
- case Some(filterDf) => {
+ case Some(filterDf) =>
// timestamps with empty flag
val tmsts: Array[(Long, Boolean)] = (filterDf.collect.flatMap { row =>
try {
@@ -120,13 +121,12 @@ case class RecordWriteStep(name: String,
} else None
(filterFuncOpt, emptyTmsts)
- }
case _ => (Some((t: Long) => true), Set[Long]())
}
// filter timestamps need to record
filterFuncOpt match {
- case Some(filterFunc) => {
+ case Some(filterFunc) =>
val records = df.flatMap { row =>
val tmst = getTmst(row, defTimestamp)
if (filterFunc(tmst)) {
@@ -140,10 +140,8 @@ case class RecordWriteStep(name: String,
} else None
}
(Some(records.rdd.groupByKey), emptyTimestamps)
- }
case _ => (None, emptyTimestamps)
}
- }
case _ => (None, Set[Long]())
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/write/SparkRowFormatter.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/SparkRowFormatter.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/SparkRowFormatter.scala
index 592c1d4..220c46b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/write/SparkRowFormatter.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/SparkRowFormatter.scala
@@ -18,10 +18,11 @@ under the License.
*/
package org.apache.griffin.measure.step.write
+import scala.collection.mutable.ArrayBuffer
+
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{ArrayType, DataType, StructField, StructType}
-import scala.collection.mutable.ArrayBuffer
/**
* spark row formatter
@@ -46,7 +47,8 @@ object SparkRowFormatter {
case (sf, a) =>
sf.dataType match {
case ArrayType(et, _) =>
- Map(sf.name -> (if (a == null) a else formatArray(et, a.asInstanceOf[ArrayBuffer[Any]])))
+ Map(sf.name ->
+ (if (a == null) a else formatArray(et, a.asInstanceOf[ArrayBuffer[Any]])))
case StructType(s) =>
Map(sf.name -> (if (a == null) a else formatStruct(s, a.asInstanceOf[Row])))
case _ => Map(sf.name -> a)
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/utils/FSUtil.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/FSUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/FSUtil.scala
index 023a138..7ac8b4f 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/FSUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/FSUtil.scala
@@ -21,11 +21,12 @@ package org.apache.griffin.measure.utils
import java.io.File
import java.net.URI
-import org.apache.griffin.measure.Loggable
+import scala.collection.mutable.{Map => MutableMap}
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
-import scala.collection.mutable.{Map => MutableMap}
+import org.apache.griffin.measure.Loggable
object FSUtil extends Loggable {
@@ -34,23 +35,20 @@ object FSUtil extends Loggable {
def getFileSystem(path: String): FileSystem = {
getUriOpt(path) match {
- case Some(uri) => {
+ case Some(uri) =>
fsMap.get(uri.getScheme) match {
case Some(fs) => fs
- case _ => {
+ case _ =>
val fs = try {
FileSystem.get(uri, getConfiguration)
} catch {
- case e: Throwable => {
+ case e: Throwable =>
error(s"get file system error: ${e.getMessage}")
throw e
- }
}
fsMap += (uri.getScheme -> fs)
fs
- }
}
- }
case _ => defaultFS
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala
index 89f505a..0cae5bd 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala
@@ -18,9 +18,10 @@ under the License.
*/
package org.apache.griffin.measure.utils
-import org.apache.griffin.measure.Loggable
import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, Path}
+import org.apache.griffin.measure.Loggable
+
object HdfsUtil extends Loggable {
private val seprator = "/"
@@ -90,24 +91,9 @@ object HdfsUtil extends Loggable {
}
}
- // def listPathFiles(dirPath: String): Iterable[String] = {
- // val path = new Path(dirPath)
- // try {
- // val fileStatusArray = dfs.listStatus(path)
- // fileStatusArray.flatMap { fileStatus =>
- // if (fileStatus.isFile) {
- // Some(fileStatus.getPath.getName)
- // } else None
- // }
- // } catch {
- // case e: Throwable => {
- // println(s"list path files error: ${e.getMessage}")
- // Nil
- // }
- // }
- // }
-
- def listSubPathsByType(dirPath: String, subType: String, fullPath: Boolean = false): Iterable[String] = {
+
+ def listSubPathsByType(dirPath: String, subType: String, fullPath: Boolean = false)
+ : Iterable[String] = {
if (existPath(dirPath)) {
try {
implicit val path = new Path(dirPath)
@@ -123,15 +109,15 @@ object HdfsUtil extends Loggable {
if (fullPath) getHdfsFilePath(dirPath, fname) else fname
}
} catch {
- case e: Throwable => {
+ case e: Throwable =>
warn(s"list path [${dirPath}] warn: ${e.getMessage}")
Nil
- }
}
} else Nil
}
- def listSubPathsByTypes(dirPath: String, subTypes: Iterable[String], fullPath: Boolean = false): Iterable[String] = {
+ def listSubPathsByTypes(dirPath: String, subTypes: Iterable[String], fullPath: Boolean = false)
+ : Iterable[String] = {
subTypes.flatMap { subType =>
listSubPathsByType(dirPath, subType, fullPath)
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala
index e016b60..4949642 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/HttpUtil.scala
@@ -27,22 +27,30 @@ object HttpUtil {
val PUT_REGEX = """^(?i)put$""".r
val DELETE_REGEX = """^(?i)delete$""".r
- def postData(url: String, params: Map[String, Object], headers: Map[String, Object], data: String): Boolean = {
- val response = Http(url).params(convertObjMap2StrMap(params)).headers(convertObjMap2StrMap(headers)).postData(data).asString
+ def postData(url: String,
+ params: Map[String, Object],
+ headers: Map[String, Object],
+ data: String): Boolean = {
+ val response = Http(url).params(convertObjMap2StrMap(params))
+ .headers(convertObjMap2StrMap(headers)).postData(data).asString
+
response.isSuccess
}
- def httpRequest(url: String, method: String, params: Map[String, Object], headers: Map[String, Object], data: String): Boolean = {
- val httpReq = Http(url).params(convertObjMap2StrMap(params)).headers(convertObjMap2StrMap(headers))
+ def httpRequest(url: String,
+ method: String,
+ params: Map[String, Object],
+ headers: Map[String, Object],
+ data: String): Boolean = {
+ val httpReq = Http(url).params(convertObjMap2StrMap(params))
+ .headers(convertObjMap2StrMap(headers))
method match {
- case POST_REGEX() => {
+ case POST_REGEX() =>
val res = httpReq.postData(data).asString
res.isSuccess
- }
- case PUT_REGEX() => {
+ case PUT_REGEX() =>
val res = httpReq.put(data).asString
res.isSuccess
- }
case _ => false
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/utils/JsonUtil.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/JsonUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/JsonUtil.scala
index 175bbd8..cbb8734 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/JsonUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/JsonUtil.scala
@@ -20,10 +20,11 @@ package org.apache.griffin.measure.utils
import java.io.InputStream
+import scala.reflect._
+
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
-import scala.reflect._
object JsonUtil {
val mapper = new ObjectMapper()
@@ -31,7 +32,7 @@ object JsonUtil {
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
def toJson(value: Map[Symbol, Any]): String = {
- toJson(value map { case (k,v) => k.name -> v})
+ toJson(value map { case (k, v) => k.name -> v})
}
def toJson(value: Any): String = {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala
index fccbfb5..c4420ef 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala
@@ -177,8 +177,10 @@ object ParamUtil {
}
}
- case class StringAnyMap(values:Map[String,Any])
- def getParamMap(key: String, defValue: Map[String, Any] = Map[String, Any]()): Map[String, Any] = {
+ case class StringAnyMap(values: Map[String, Any])
+
+ def getParamMap(key: String, defValue: Map[String, Any]
+ = Map[String, Any]()): Map[String, Any] = {
params.get(key) match {
case Some(v: StringAnyMap) => v.values
case _ => defValue
@@ -193,7 +195,7 @@ object ParamUtil {
}
def getArr[T](key: String): Seq[T] = {
- case class TSeqs(values:Seq[T])
+ case class TSeqs(values: Seq[T])
params.get(key) match {
case Some(seq: TSeqs) => seq.values
case _ => Nil
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala
index e96cbb1..9707c65 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala
@@ -18,18 +18,18 @@ under the License.
*/
package org.apache.griffin.measure.utils
-import org.apache.griffin.measure.Loggable
-
-import scala.util.matching.Regex
import scala.util.{Failure, Success, Try}
+import scala.util.matching.Regex
+
+import org.apache.griffin.measure.Loggable
object TimeUtil extends Loggable {
private object Units {
case class TimeUnit(name: String, shortName: String, ut: Long, regex: Regex) {
- def toMs(t: Long) = t * ut
- def fromMs(ms: Long) = ms / ut
- def fitUnit(ms: Long) = (ms % ut == 0)
+ def toMs(t: Long) : Long = t * ut
+ def fromMs(ms: Long) : Long = ms / ut
+ def fitUnit(ms: Long) : Boolean = (ms % ut == 0)
}
val dayUnit = TimeUnit("day", "d", 24 * 60 * 60 * 1000, """^(?i)d(?:ay)?$""".r)
@@ -50,7 +50,7 @@ object TimeUtil extends Loggable {
val value: Option[Long] = {
Try {
timeString match {
- case TimeRegex(time, unit) => {
+ case TimeRegex(time, unit) =>
val t = time.toLong
unit match {
case dayUnit.regex() => dayUnit.toMs(t)
@@ -60,11 +60,9 @@ object TimeUtil extends Loggable {
case msUnit.regex() => msUnit.toMs(t)
case _ => throw new Exception(s"${timeString} is invalid time format")
}
- }
- case PureTimeRegex(time) => {
+ case PureTimeRegex(time) =>
val t = time.toLong
msUnit.toMs(t)
- }
case _ => throw new Exception(s"${timeString} is invalid time format")
}
} match {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/scalastyle-config.xml
----------------------------------------------------------------------
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
new file mode 100644
index 0000000..05877d8
--- /dev/null
+++ b/scalastyle-config.xml
@@ -0,0 +1,246 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<!--
+
+If you wish to turn off checking for a section of code, you can put a comment in the source
+before and after the section, with the following syntax:
+
+ // scalastyle:off
+ ... // stuff that breaks the styles
+ // scalastyle:on
+
+You can also disable only one rule, by specifying its rule id, as specified in:
+ http://www.scalastyle.org/rules-0.7.0.html
+
+ // scalastyle:off no.finalize
+ override def finalize(): Unit = ...
+ // scalastyle:on no.finalize
+
+This file is divided into 3 sections:
+ (1) rules that we enforce.
+ (2) rules that we would like to enforce, but haven't cleaned up the codebase to turn on yet
+ (or we need to make the scalastyle rule more configurable).
+ (3) rules that we don't want to enforce.
+-->
+
+<!--
+Reference: Spark scalastyle-config.xml (https://github.com/apache/spark/blob/master/scalastyle-config.xml)
+-->
+<scalastyle>
+ <name>Scalastyle standard configuration</name>
+
+ <!-- ================================================================================ -->
+ <!-- rules we enforce -->
+ <!-- ================================================================================ -->
+
+ <check level="error" class="org.scalastyle.file.FileTabChecker" enabled="true"></check>
+
+ <check level="error" class="org.scalastyle.file.HeaderMatchesChecker" enabled="true">
+ <parameters>
+ <parameter name="header"><![CDATA[/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */]]></parameter>
+ </parameters>
+ </check>
+
+ <check level="error" class="org.scalastyle.scalariform.SpacesAfterPlusChecker" enabled="true"></check>
+
+ <check level="error" class="org.scalastyle.scalariform.SpacesBeforePlusChecker" enabled="true"></check>
+
+ <check level="error" class="org.scalastyle.file.WhitespaceEndOfLineChecker" enabled="true"></check>
+
+ <check level="error" class="org.scalastyle.file.FileLineLengthChecker" enabled="true">
+ <parameters>
+ <parameter name="maxLineLength"><![CDATA[120]]></parameter>
+ <parameter name="tabSize"><![CDATA[2]]></parameter>
+ <parameter name="ignoreImports">true</parameter>
+ </parameters>
+ </check>
+
+ <check level="error" class="org.scalastyle.scalariform.ClassNamesChecker" enabled="true">
+ <parameters><parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter></parameters>
+ </check>
+
+ <check level="error" class="org.scalastyle.scalariform.ObjectNamesChecker" enabled="true">
+ <parameters><parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter></parameters>
+ </check>
+
+ <check level="error" class="org.scalastyle.scalariform.PackageObjectNamesChecker" enabled="true">
+ <parameters><parameter name="regex"><![CDATA[^[a-z][A-Za-z]*$]]></parameter></parameters>
+ </check>
+
+ <check level="error" class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true">
+ <parameters><parameter name="maxParameters"><![CDATA[10]]></parameter></parameters>
+ </check>
+
+ <check level="error" class="org.scalastyle.scalariform.NoFinalizeChecker" enabled="true"></check>
+
+ <check level="error" class="org.scalastyle.scalariform.CovariantEqualsChecker" enabled="true"></check>
+
+ <check level="error" class="org.scalastyle.scalariform.StructuralTypeChecker" enabled="true"></check>
+
+ <check level="error" class="org.scalastyle.scalariform.UppercaseLChecker" enabled="true"></check>
+
+ <check level="error" class="org.scalastyle.scalariform.IfBraceChecker" enabled="true">
+ <parameters>
+ <parameter name="singleLineAllowed"><![CDATA[true]]></parameter>
+ <parameter name="doubleLineAllowed"><![CDATA[true]]></parameter>
+ </parameters>
+ </check>
+
+ <check level="error" class="org.scalastyle.scalariform.PublicMethodsHaveTypeChecker" enabled="true"></check>
+
+ <check level="error" class="org.scalastyle.file.NewLineAtEofChecker" enabled="true"></check>
+
+ <check customId="nonascii" level="error" class="org.scalastyle.scalariform.NonASCIICharacterChecker" enabled="true"></check>
+
+ <check level="error" class="org.scalastyle.scalariform.SpaceAfterCommentStartChecker" enabled="true"></check>
+
+ <check level="error" class="org.scalastyle.scalariform.EnsureSingleSpaceBeforeTokenChecker" enabled="true">
+ <parameters>
+ <parameter name="tokens">ARROW, EQUALS, ELSE, TRY, CATCH, FINALLY, LARROW, RARROW</parameter>
+ </parameters>
+ </check>
+
+ <check level="error" class="org.scalastyle.scalariform.EnsureSingleSpaceAfterTokenChecker" enabled="true">
+ <parameters>
+ <parameter name="tokens">ARROW, EQUALS, COMMA, COLON, IF, ELSE, DO, WHILE, FOR, MATCH, TRY, CATCH, FINALLY, LARROW, RARROW</parameter>
+ </parameters>
+ </check>
+
+ <!-- ??? usually shouldn't be checked into the code base. -->
+ <check level="error" class="org.scalastyle.scalariform.NotImplementedErrorUsage" enabled="true"></check>
+
+ <check level="error" class="org.scalastyle.scalariform.ImportOrderChecker" enabled="true">
+ <parameters>
+ <parameter name="groups">java,scala,3rdParty,griffin</parameter>
+ <parameter name="group.java">javax?\..*</parameter>
+ <parameter name="group.scala">scala\..*</parameter>
+ <parameter name="group.3rdParty">(?!org\.apache\.griffin\.).*</parameter>
+ <parameter name="group.griffin">org\.apache\.griffin\..*</parameter>
+ </parameters>
+ </check>
+
+ <check level="error" class="org.scalastyle.scalariform.DisallowSpaceBeforeTokenChecker" enabled="true">
+ <parameters>
+ <parameter name="tokens">COMMA</parameter>
+ </parameters>
+ </check>
+
+ <!-- Single Space between ')' and '{' -->
+ <check customId="SingleSpaceBetweenRParenAndLCurlyBrace" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
+ <parameters><parameter name="regex">\)\{</parameter></parameters>
+ <customMessage><![CDATA[
+ Single Space between ')' and `{`.
+ ]]></customMessage>
+ </check>
+
+ <check customId="OmitBracesInCase" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
+ <parameters><parameter name="regex">case[^\n>]*=>\s*\{</parameter></parameters>
+ <customMessage>Omit braces in case clauses.</customMessage>
+ </check>
+
+ <check level="error" class="org.scalastyle.scalariform.DeprecatedJavaChecker" enabled="true"></check>
+
+ <!-- ================================================================================ -->
+ <!-- rules we'd like to enforce, but haven't cleaned up the codebase yet -->
+ <!-- ================================================================================ -->
+
+ <!-- We cannot turn the following two on, because it'd fail a lot of string interpolation use cases. -->
+ <!-- Ideally the following two rules should be configurable to rule out string interpolation. -->
+ <!--<check level="error" class="org.scalastyle.scalariform.NoWhitespaceBeforeLeftBracketChecker" enabled="false"></check>-->
+ <!--<check level="error" class="org.scalastyle.scalariform.NoWhitespaceAfterLeftBracketChecker" enabled="false"></check>-->
+
+ <!-- This breaks symbolic method names so we don't turn it on. -->
+ <!-- Maybe we should update it to allow basic symbolic names, and then we are good to go. -->
+ <!--<check level="error" class="org.scalastyle.scalariform.MethodNamesChecker" enabled="false">-->
+ <!--<parameters>-->
+ <!--<parameter name="regex"><![CDATA[^[a-z][A-Za-z0-9]*$]]></parameter>-->
+ <!--</parameters>-->
+ <!--</check>-->
+
+ <!-- Should turn this on, but we have a few places that need to be fixed first -->
+ <!--<check level="error" class="org.scalastyle.scalariform.EqualsHashCodeChecker" enabled="true"></check>-->
+
+ <!-- ================================================================================ -->
+ <!-- rules we don't want -->
+ <!-- ================================================================================ -->
+
+ <!--<check level="error" class="org.scalastyle.scalariform.IllegalImportsChecker" enabled="false">-->
+ <!--<parameters><parameter name="illegalImports"><![CDATA[sun._,java.awt._]]></parameter></parameters>-->
+ <!--</check>-->
+
+ <!-- We want the opposite of this: NewLineAtEofChecker -->
+ <check level="error" class="org.scalastyle.file.NoNewLineAtEofChecker" enabled="false"></check>
+
+ <!-- This one complains about all kinds of random things. Disable. -->
+ <check level="error" class="org.scalastyle.scalariform.SimplifyBooleanExpressionChecker" enabled="false"></check>
+
+ <!-- We use return quite a bit for control flows and guards -->
+ <check level="error" class="org.scalastyle.scalariform.ReturnChecker" enabled="false"></check>
+
+ <!-- We use null a lot in low level code and to interface with 3rd party code -->
+ <check level="error" class="org.scalastyle.scalariform.NullChecker" enabled="false"></check>
+
+ <!-- Doesn't seem super big deal here ... -->
+ <check level="error" class="org.scalastyle.scalariform.NoCloneChecker" enabled="false"></check>
+
+ <!-- Doesn't seem super big deal here ... -->
+ <check level="error" class="org.scalastyle.file.FileLengthChecker" enabled="false">
+ <parameters><parameter name="maxFileLength">800></parameter></parameters>
+ </check>
+
+ <!-- Doesn't seem super big deal here ... -->
+ <check level="error" class="org.scalastyle.scalariform.NumberOfTypesChecker" enabled="false">
+ <parameters><parameter name="maxTypes">30</parameter></parameters>
+ </check>
+
+ <!-- Doesn't seem super big deal here ... -->
+ <check level="error" class="org.scalastyle.scalariform.CyclomaticComplexityChecker" enabled="false">
+ <parameters><parameter name="maximum">10</parameter></parameters>
+ </check>
+
+ <!-- Doesn't seem super big deal here ... -->
+ <check level="error" class="org.scalastyle.scalariform.MethodLengthChecker" enabled="false">
+ <parameters><parameter name="maxLength">50</parameter></parameters>
+ </check>
+
+ <!-- Not exactly feasible to enforce this right now. -->
+ <!-- It is also infrequent that somebody introduces a new class with a lot of methods. -->
+ <check level="error" class="org.scalastyle.scalariform.NumberOfMethodsInTypeChecker" enabled="false">
+ <parameters><parameter name="maxMethods"><![CDATA[30]]></parameter></parameters>
+ </check>
+
+ <!-- Doesn't seem super big deal here, and we have a lot of magic numbers ... -->
+ <check level="error" class="org.scalastyle.scalariform.MagicNumberChecker" enabled="false">
+ <parameters><parameter name="ignore">-1,0,1,2,3</parameter></parameters>
+ </check>
+
+</scalastyle>
[3/3] incubator-griffin git commit: Fix case clauses
Posted by gu...@apache.org.
Fix case clauses
Author: William Guo <gu...@apache.org>
Closes #425 from guoyuepeng/fix_case_clauses.
Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/18fc4cf4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/18fc4cf4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/18fc4cf4
Branch: refs/heads/master
Commit: 18fc4cf4cd706aa3e793fb0790c12ece5e5342a3
Parents: 485c5cf
Author: William Guo <gu...@apache.org>
Authored: Sat Sep 29 18:21:42 2018 +0800
Committer: Lionel Liu <bh...@163.com>
Committed: Sat Sep 29 18:21:42 2018 +0800
----------------------------------------------------------------------
.../apache/griffin/measure/Application.scala | 36 ++-
.../configuration/dqdefinition/DQConfig.scala | 12 +-
.../configuration/dqdefinition/EnvConfig.scala | 5 +-
.../dqdefinition/reader/ParamFileReader.scala | 6 +-
.../dqdefinition/reader/ParamJsonReader.scala | 5 +-
.../dqdefinition/reader/ParamReader.scala | 5 +-
.../measure/configuration/enums/DqType.scala | 8 +-
.../configuration/enums/FlattenType.scala | 8 +-
.../configuration/enums/OutputType.scala | 8 +-
.../configuration/enums/ProcessType.scala | 6 +-
.../measure/configuration/enums/SinkType.scala | 19 +-
.../griffin/measure/context/DQContext.scala | 11 +-
.../measure/context/DataFrameCache.scala | 12 +-
.../griffin/measure/context/TableRegister.scala | 8 +-
.../checkpoint/lock/CheckpointLockInZK.scala | 6 +-
.../offset/OffsetCheckpointClient.scala | 2 +-
.../offset/OffsetCheckpointFactory.scala | 5 +-
.../offset/OffsetCheckpointInZK.scala | 29 ++-
.../streaming/metric/AccuracyMetric.scala | 8 +-
.../context/streaming/metric/CacheResults.scala | 6 +-
.../griffin/measure/datasource/DataSource.scala | 11 +-
.../measure/datasource/DataSourceFactory.scala | 8 +-
.../measure/datasource/TimestampStorage.scala | 29 ++-
.../datasource/cache/StreamingCacheClient.scala | 65 ++---
.../cache/StreamingCacheClientFactory.scala | 18 +-
.../cache/StreamingCacheJsonClient.scala | 3 +-
.../cache/StreamingCacheOrcClient.scala | 3 +-
.../cache/StreamingCacheParquetClient.scala | 10 +-
.../cache/StreamingOffsetCacheable.scala | 18 +-
.../measure/datasource/cache/WithFanIn.scala | 8 +-
.../datasource/connector/DataConnector.scala | 15 +-
.../connector/DataConnectorFactory.scala | 35 ++-
.../batch/AvroBatchDataConnector.scala | 6 +-
.../batch/HiveBatchDataConnector.scala | 6 +-
.../batch/TextDirBatchDataConnector.scala | 18 +-
.../streaming/KafkaStreamingDataConnector.scala | 6 +-
.../KafkaStreamingStringDataConnector.scala | 12 +-
.../streaming/StreamingDataConnector.scala | 10 +-
.../measure/job/builder/DQJobBuilder.scala | 2 -
.../apache/griffin/measure/launch/DQApp.scala | 3 +-
.../measure/launch/batch/BatchDQApp.scala | 10 +-
.../launch/streaming/StreamingDQApp.scala | 19 +-
.../griffin/measure/sink/ConsoleSink.scala | 3 +-
.../measure/sink/ElasticSearchSink.scala | 15 +-
.../apache/griffin/measure/sink/HdfsSink.scala | 17 +-
.../apache/griffin/measure/sink/MongoSink.scala | 10 +-
.../org/apache/griffin/measure/sink/Sink.scala | 3 +-
.../griffin/measure/sink/SinkFactory.scala | 3 +-
.../griffin/measure/sink/SinkTaskRunner.scala | 18 +-
.../measure/step/builder/DQStepBuilder.scala | 10 +-
.../step/builder/GriffinDslDQStepBuilder.scala | 4 +-
.../step/builder/RuleParamStepBuilder.scala | 4 +-
.../builder/dsl/expr/ClauseExpression.scala | 14 +-
.../step/builder/dsl/expr/LogicalExpr.scala | 5 +-
.../step/builder/dsl/expr/SelectExpr.scala | 8 +-
.../step/builder/dsl/expr/TreeNode.scala | 18 +-
.../step/builder/dsl/parser/BasicParser.scala | 6 +-
.../builder/dsl/parser/GriffinDslParser.scala | 5 +-
.../dsl/transform/AccuracyExpr2DQSteps.scala | 55 +++--
.../transform/CompletenessExpr2DQSteps.scala | 40 +--
.../transform/DistinctnessExpr2DQSteps.scala | 48 ++--
.../builder/dsl/transform/Expr2DQSteps.scala | 2 +-
.../dsl/transform/ProfilingExpr2DQSteps.scala | 16 +-
.../dsl/transform/TimelinessExpr2DQSteps.scala | 46 ++--
.../dsl/transform/UniquenessExpr2DQSteps.scala | 34 ++-
.../transform/analyzer/AccuracyAnalyzer.scala | 6 +-
.../analyzer/CompletenessAnalyzer.scala | 3 +-
.../analyzer/DistinctnessAnalyzer.scala | 4 +-
.../transform/analyzer/ProfilingAnalyzer.scala | 3 +-
.../transform/analyzer/UniquenessAnalyzer.scala | 3 +-
.../builder/preproc/PreProcParamMaker.scala | 8 +-
.../griffin/measure/step/read/ReadStep.scala | 9 +-
.../measure/step/read/UnionReadStep.scala | 3 +-
.../measure/step/transform/DataFrameOps.scala | 16 +-
.../transform/DataFrameOpsTransformStep.scala | 7 +-
.../step/transform/SparkSqlTransformStep.scala | 3 +-
.../step/write/DataSourceUpdateWriteStep.scala | 19 +-
.../measure/step/write/MetricFlushStep.scala | 3 +-
.../measure/step/write/MetricWriteStep.scala | 17 +-
.../measure/step/write/RecordWriteStep.scala | 38 ++-
.../measure/step/write/SparkRowFormatter.scala | 6 +-
.../apache/griffin/measure/utils/FSUtil.scala | 14 +-
.../apache/griffin/measure/utils/HdfsUtil.scala | 30 +--
.../apache/griffin/measure/utils/HttpUtil.scala | 24 +-
.../apache/griffin/measure/utils/JsonUtil.scala | 5 +-
.../griffin/measure/utils/ParamUtil.scala | 8 +-
.../apache/griffin/measure/utils/TimeUtil.scala | 18 +-
scalastyle-config.xml | 246 +++++++++++++++++++
88 files changed, 887 insertions(+), 510 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/Application.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/Application.scala b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
index e7df806..1bac17b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/Application.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
@@ -18,15 +18,16 @@ under the License.
*/
package org.apache.griffin.measure
-import org.apache.griffin.measure.configuration.enums._
+import scala.reflect.ClassTag
+import scala.util.{Failure, Success, Try}
+
+import org.apache.griffin.measure.configuration.dqdefinition.{DQConfig, EnvConfig, GriffinConfig, Param}
import org.apache.griffin.measure.configuration.dqdefinition.reader.ParamReaderFactory
-import org.apache.griffin.measure.configuration.dqdefinition.{GriffinConfig, DQConfig, EnvConfig, Param}
+import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.launch.DQApp
import org.apache.griffin.measure.launch.batch.BatchDQApp
import org.apache.griffin.measure.launch.streaming.StreamingDQApp
-import scala.reflect.ClassTag
-import scala.util.{Failure, Success, Try}
/**
* application entrance
@@ -49,17 +50,15 @@ object Application extends Loggable {
// read param files
val envParam = readParamFile[EnvConfig](envParamFile) match {
case Success(p) => p
- case Failure(ex) => {
+ case Failure(ex) =>
error(ex.getMessage)
sys.exit(-2)
- }
}
val dqParam = readParamFile[DQConfig](dqParamFile) match {
case Success(p) => p
- case Failure(ex) => {
+ case Failure(ex) =>
error(ex.getMessage)
sys.exit(-2)
- }
}
val allParam: GriffinConfig = GriffinConfig(envParam, dqParam)
@@ -68,32 +67,28 @@ object Application extends Loggable {
val dqApp: DQApp = procType match {
case BatchProcessType => BatchDQApp(allParam)
case StreamingProcessType => StreamingDQApp(allParam)
- case _ => {
+ case _ =>
error(s"${procType} is unsupported process type!")
sys.exit(-4)
- }
}
startup
// dq app init
dqApp.init match {
- case Success(_) => {
+ case Success(_) =>
info("process init success")
- }
- case Failure(ex) => {
+ case Failure(ex) =>
error(s"process init error: ${ex.getMessage}")
shutdown
sys.exit(-5)
- }
}
// dq app run
dqApp.run match {
- case Success(_) => {
+ case Success(_) =>
info("process run success")
- }
- case Failure(ex) => {
+ case Failure(ex) =>
error(s"process run error: ${ex.getMessage}")
if (dqApp.retryable) {
@@ -102,19 +97,16 @@ object Application extends Loggable {
shutdown
sys.exit(-5)
}
- }
}
// dq app end
dqApp.close match {
- case Success(_) => {
+ case Success(_) =>
info("process end success")
- }
- case Failure(ex) => {
+ case Failure(ex) =>
error(s"process end error: ${ex.getMessage}")
shutdown
sys.exit(-5)
- }
}
shutdown
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
index 4943329..b281481 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
@@ -18,9 +18,10 @@ under the License.
*/
package org.apache.griffin.measure.configuration.dqdefinition
-import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
import org.apache.commons.lang.StringUtils
+
import org.apache.griffin.measure.configuration.enums._
/**
@@ -46,7 +47,7 @@ case class DQConfig(@JsonProperty("name") private val name: String,
def getDataSources: Seq[DataSourceParam] = {
dataSources.foldLeft((Nil: Seq[DataSourceParam], Set[String]())) { (ret, ds) =>
val (seq, names) = ret
- if (!names.contains(ds.getName)){
+ if (!names.contains(ds.getName)) {
(seq :+ ds, names + ds.getName)
} else ret
}._1
@@ -133,8 +134,9 @@ case class EvaluateRuleParam( @JsonProperty("rules") private val rules: List[Rul
* rule param
* @param dslType dsl type of this rule (must)
* @param dqType dq type of this rule (must if dsl type is "griffin-dsl")
- * @param inDfName name of input dataframe of this rule, by default will be the previous rule output dataframe name
- * @param outDfName name of output dataframe of this rule, by default will be generated as data connector dataframe name with index suffix
+ * @param inDfName name of input dataframe of this rule, by default will be the previous rule output dataframe name
+ * @param outDfName name of output dataframe of this rule, by default will be generated
+ * as data connector dataframe name with index suffix
* @param rule rule to define dq step calculation (must)
* @param details detail config of rule (optional)
* @param cache cache the result for multiple usage (optional, valid for "spark-sql" and "df-ops" mode)
@@ -206,4 +208,4 @@ case class RuleOutputParam( @JsonProperty("type") private val outputType: String
def getFlatten: FlattenType = if (StringUtils.isNotBlank(flatten)) FlattenType(flatten) else FlattenType("")
def validate(): Unit = {}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
index bf77d13..2e227a3 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
@@ -18,9 +18,10 @@ under the License.
*/
package org.apache.griffin.measure.configuration.dqdefinition
-import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
import org.apache.commons.lang.StringUtils
+
import org.apache.griffin.measure.configuration.enums._
/**
@@ -109,4 +110,4 @@ case class CheckpointParam(@JsonProperty("type") private val cpType: String,
def validate(): Unit = {
assert(StringUtils.isNotBlank(cpType), "griffin checkpoint type should not be empty")
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReader.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReader.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReader.scala
index 4a4aeed..2739f74 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReader.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReader.scala
@@ -18,11 +18,13 @@ under the License.
*/
package org.apache.griffin.measure.configuration.dqdefinition.reader
+import scala.reflect.ClassTag
+import scala.util.Try
+
import org.apache.griffin.measure.configuration.dqdefinition.Param
import org.apache.griffin.measure.utils.{HdfsUtil, JsonUtil}
-import scala.reflect.ClassTag
-import scala.util.Try
+
/**
* read params from config file path
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReader.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReader.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReader.scala
index 063dc7e..ba51d5f 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReader.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReader.scala
@@ -18,11 +18,12 @@ under the License.
*/
package org.apache.griffin.measure.configuration.dqdefinition.reader
+import scala.reflect.ClassTag
+import scala.util.Try
+
import org.apache.griffin.measure.configuration.dqdefinition.Param
import org.apache.griffin.measure.utils.JsonUtil
-import scala.reflect.ClassTag
-import scala.util.Try
/**
* read params from json string directly
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReader.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReader.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReader.scala
index 9a9a46c..5c914c6 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReader.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReader.scala
@@ -18,11 +18,12 @@ under the License.
*/
package org.apache.griffin.measure.configuration.dqdefinition.reader
+import scala.reflect.ClassTag
+import scala.util.Try
+
import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.dqdefinition.Param
-import scala.reflect.ClassTag
-import scala.util.Try
trait ParamReader extends Loggable with Serializable {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DqType.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DqType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DqType.scala
index cee8d98..bbeb04f 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DqType.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DqType.scala
@@ -31,7 +31,13 @@ sealed trait DqType {
object DqType {
private val dqTypes: List[DqType] = List(
- AccuracyType, ProfilingType, UniquenessType, DistinctnessType, TimelinessType, CompletenessType, UnknownType
+ AccuracyType,
+ ProfilingType,
+ UniquenessType,
+ DistinctnessType,
+ TimelinessType,
+ CompletenessType,
+ UnknownType
)
def apply(ptn: String): DqType = {
dqTypes.find(dqType => ptn match {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/FlattenType.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/FlattenType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/FlattenType.scala
index 160ecaf..cb2fba1 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/FlattenType.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/FlattenType.scala
@@ -29,7 +29,13 @@ sealed trait FlattenType {
}
object FlattenType {
- private val flattenTypes: List[FlattenType] = List(DefaultFlattenType, EntriesFlattenType, ArrayFlattenType, MapFlattenType)
+ private val flattenTypes: List[FlattenType] = List(
+ DefaultFlattenType,
+ EntriesFlattenType,
+ ArrayFlattenType,
+ MapFlattenType
+ )
+
val default = DefaultFlattenType
def apply(ptn: String): FlattenType = {
flattenTypes.find(tp => ptn match {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/OutputType.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/OutputType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/OutputType.scala
index 5b1d261..8a80044 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/OutputType.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/OutputType.scala
@@ -29,7 +29,13 @@ sealed trait OutputType {
}
object OutputType {
- private val outputTypes: List[OutputType] = List(MetricOutputType, RecordOutputType, DscUpdateOutputType, UnknownOutputType)
+ private val outputTypes: List[OutputType] = List(
+ MetricOutputType,
+ RecordOutputType,
+ DscUpdateOutputType,
+ UnknownOutputType
+ )
+
val default = UnknownOutputType
def apply(ptn: String): OutputType = {
outputTypes.find(tp => ptn match {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/ProcessType.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/ProcessType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/ProcessType.scala
index 4f799ed..3cbc749 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/ProcessType.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/ProcessType.scala
@@ -29,7 +29,11 @@ sealed trait ProcessType {
}
object ProcessType {
- private val procTypes: List[ProcessType] = List(BatchProcessType, StreamingProcessType)
+ private val procTypes: List[ProcessType] = List(
+ BatchProcessType,
+ StreamingProcessType
+ )
+
def apply(ptn: String): ProcessType = {
procTypes.find(tp => ptn match {
case tp.idPattern() => true
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
index 5471e83..d9e5d2b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
@@ -30,15 +30,22 @@ sealed trait SinkType {
object SinkType {
private val sinkTypes: List[SinkType] = List(
- ConsoleSinkType, HdfsSinkType, ElasticsearchSinkType, MongoSinkType, UnknownSinkType
+ ConsoleSinkType,
+ HdfsSinkType,
+ ElasticsearchSinkType,
+ MongoSinkType,
+ UnknownSinkType
)
+
def apply(ptn: String): SinkType = {
sinkTypes.find(tp => ptn match {
case tp.idPattern() => true
case _ => false
}).getOrElse(UnknownSinkType)
}
+
def unapply(pt: SinkType): Option[String] = Some(pt.desc)
+
def validSinkTypes(strs: Seq[String]): Seq[SinkType] = {
val seq = strs.map(s => SinkType(s)).filter(_ != UnknownSinkType).distinct
if (seq.size > 0) seq else Seq(ElasticsearchSinkType)
@@ -48,7 +55,7 @@ object SinkType {
/**
* console sink, will sink metric in console
*/
- case object ConsoleSinkType extends SinkType {
+case object ConsoleSinkType extends SinkType {
val idPattern = "^(?i)console|log$".r
val desc = "console"
}
@@ -56,7 +63,7 @@ object SinkType {
/**
* hdfs sink, will sink metric and record in hdfs
*/
- case object HdfsSinkType extends SinkType {
+case object HdfsSinkType extends SinkType {
val idPattern = "^(?i)hdfs$".r
val desc = "hdfs"
}
@@ -64,7 +71,7 @@ object SinkType {
/**
* elasticsearch sink, will sink metric in elasticsearch
*/
- case object ElasticsearchSinkType extends SinkType {
+case object ElasticsearchSinkType extends SinkType {
val idPattern = "^(?i)es|elasticsearch|http$".r
val desc = "elasticsearch"
}
@@ -72,12 +79,12 @@ object SinkType {
/**
* mongo sink, will sink metric in mongo db
*/
- case object MongoSinkType extends SinkType {
+case object MongoSinkType extends SinkType {
val idPattern = "^(?i)mongo|mongodb$".r
val desc = "distinct"
}
- case object UnknownSinkType extends SinkType {
+case object UnknownSinkType extends SinkType {
val idPattern = "".r
val desc = "unknown"
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
index a9f3da0..b0759c5 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
@@ -18,11 +18,12 @@ under the License.
*/
package org.apache.griffin.measure.context
-import org.apache.griffin.measure.configuration.enums._
+import org.apache.spark.sql.{Encoders, SparkSession, SQLContext}
+
import org.apache.griffin.measure.configuration.dqdefinition._
+import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.datasource._
import org.apache.griffin.measure.sink.{Sink, SinkFactory}
-import org.apache.spark.sql.{Encoders, SQLContext, SparkSession}
/**
* dq context: the context of each calculation
@@ -58,6 +59,7 @@ case class DQContext(contextId: ContextId,
}
}
dataSourceNames.foreach(name => compileTableRegister.registerTable(name))
+
def getDataSourceName(index: Int): String = {
if (dataSourceNames.size > index) dataSourceNames(index) else ""
}
@@ -66,20 +68,25 @@ case class DQContext(contextId: ContextId,
val functionNames: Seq[String] = sparkSession.catalog.listFunctions.map(_.name).collect.toSeq
val dataSourceTimeRanges = loadDataSources()
+
def loadDataSources(): Map[String, TimeRange] = {
dataSources.map { ds =>
(ds.name, ds.loadData(this))
}.toMap
}
+
printTimeRanges
private val sinkFactory = SinkFactory(sinkParams, name)
private val defaultSink: Sink = createSink(contextId.timestamp)
+
def getSink(timestamp: Long): Sink = {
if (timestamp == contextId.timestamp) getSink()
else createSink(timestamp)
}
+
def getSink(): Sink = defaultSink
+
private def createSink(t: Long): Sink = {
procType match {
case BatchProcessType => sinkFactory.getSinks(t, true)
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala b/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala
index 4e8f4df..0671565 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala
@@ -18,11 +18,11 @@ under the License.
*/
package org.apache.griffin.measure.context
-import org.apache.griffin.measure.Loggable
+import scala.collection.mutable.{Map => MutableMap, MutableList}
+
import org.apache.spark.sql.DataFrame
-import scala.collection.concurrent.{Map => ConcMap}
-import scala.collection.mutable.{MutableList, Map => MutableMap}
+import org.apache.griffin.measure.Loggable
/**
* cache and unpersist dataframes
@@ -42,17 +42,15 @@ case class DataFrameCache() extends Loggable {
def cacheDataFrame(name: String, df: DataFrame): Unit = {
info(s"try to cache data frame ${name}")
dataFrames.get(name) match {
- case Some(odf) => {
+ case Some(odf) =>
trashDataFrame(odf)
dataFrames += (name -> df)
df.cache
info("cache after replace old df")
- }
- case _ => {
+ case _ =>
dataFrames += (name -> df)
df.cache
info("cache after replace no old df")
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala b/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala
index 8d86170..c4dda3b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala
@@ -18,10 +18,12 @@ under the License.
*/
package org.apache.griffin.measure.context
-import org.apache.griffin.measure.Loggable
+import scala.collection.mutable.{Set => MutableSet}
+
import org.apache.spark.sql._
-import scala.collection.mutable.{Set => MutableSet}
+import org.apache.griffin.measure.Loggable
+
/**
* register table name
@@ -78,4 +80,4 @@ case class RunTimeTableRegister(@transient sqlContext: SQLContext) extends Table
tables.clear
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLockInZK.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLockInZK.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLockInZK.scala
index b1cbe0f..d78aedf 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLockInZK.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLockInZK.scala
@@ -32,10 +32,9 @@ case class CheckpointLockInZK(@transient mutex: InterProcessMutex) extends Check
mutex.acquire(-1, null)
}
} catch {
- case e: Throwable => {
+ case e: Throwable =>
error(s"lock error: ${e.getMessage}")
false
- }
}
}
@@ -44,9 +43,8 @@ case class CheckpointLockInZK(@transient mutex: InterProcessMutex) extends Check
try {
if (mutex.isAcquiredInThisProcess) mutex.release
} catch {
- case e: Throwable => {
+ case e: Throwable =>
error(s"unlock error: ${e.getMessage}")
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointClient.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointClient.scala
index 8acfbeb..48337d5 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointClient.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointClient.scala
@@ -24,7 +24,7 @@ import org.apache.griffin.measure.context.streaming.checkpoint.lock.{CheckpointL
object OffsetCheckpointClient extends OffsetCheckpoint with OffsetOps {
var offsetCheckpoints: Seq[OffsetCheckpoint] = Nil
- def initClient(checkpointParams: Iterable[CheckpointParam], metricName: String) = {
+ def initClient(checkpointParams: Iterable[CheckpointParam], metricName: String) : Unit = {
val fac = OffsetCheckpointFactory(checkpointParams, metricName)
offsetCheckpoints = checkpointParams.flatMap(param => fac.getOffsetCheckpoint(param)).toList
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointFactory.scala
index 5fe8e15..f19542a 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointFactory.scala
@@ -18,9 +18,10 @@ under the License.
*/
package org.apache.griffin.measure.context.streaming.checkpoint.offset
+import scala.util.Try
+
import org.apache.griffin.measure.configuration.dqdefinition.CheckpointParam
-import scala.util.Try
case class OffsetCheckpointFactory(checkpointParams: Iterable[CheckpointParam], metricName: String
) extends Serializable {
@@ -36,4 +37,4 @@ case class OffsetCheckpointFactory(checkpointParams: Iterable[CheckpointParam],
offsetCheckpointTry.toOption
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointInZK.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointInZK.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointInZK.scala
index e051779..b575573 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointInZK.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointInZK.scala
@@ -18,22 +18,24 @@ under the License.
*/
package org.apache.griffin.measure.context.streaming.checkpoint.offset
+import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.framework.imps.CuratorFrameworkState
import org.apache.curator.framework.recipes.locks.InterProcessMutex
-import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.curator.utils.ZKPaths
-import org.apache.griffin.measure.context.streaming.checkpoint.lock.CheckpointLockInZK
import org.apache.zookeeper.CreateMode
-
import scala.collection.JavaConverters._
+import org.apache.griffin.measure.context.streaming.checkpoint.lock.CheckpointLockInZK
+
+
/**
* leverage zookeeper for info cache
* @param config
* @param metricName
*/
-case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String) extends OffsetCheckpoint with OffsetOps {
+case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String)
+ extends OffsetCheckpoint with OffsetOps {
val Hosts = "hosts"
val Namespace = "namespace"
@@ -67,7 +69,9 @@ case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String) ex
}
val lockPath = config.getOrElse(LockPath, "lock").toString
- private val cacheNamespace: String = if (namespace.isEmpty) metricName else namespace + separator + metricName
+ private val cacheNamespace: String =
+ if (namespace.isEmpty) metricName else namespace + separator + metricName
+
private val builder = CuratorFrameworkFactory.builder()
.connectString(hosts)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
@@ -141,10 +145,9 @@ case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String) ex
try {
client.getChildren().forPath(path).asScala.toList
} catch {
- case e: Throwable => {
+ case e: Throwable =>
warn(s"list ${path} warn: ${e.getMessage}")
Nil
- }
}
}
@@ -162,10 +165,9 @@ case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String) ex
.forPath(path, content.getBytes("utf-8"))
true
} catch {
- case e: Throwable => {
+ case e: Throwable =>
error(s"create ( ${path} -> ${content} ) error: ${e.getMessage}")
false
- }
}
}
@@ -174,10 +176,9 @@ case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String) ex
client.setData().forPath(path, content.getBytes("utf-8"))
true
} catch {
- case e: Throwable => {
+ case e: Throwable =>
error(s"update ( ${path} -> ${content} ) error: ${e.getMessage}")
false
- }
}
}
@@ -185,10 +186,9 @@ case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String) ex
try {
Some(new String(client.getData().forPath(path), "utf-8"))
} catch {
- case e: Throwable => {
+ case e: Throwable =>
warn(s"read ${path} warn: ${e.getMessage}")
None
- }
}
}
@@ -204,10 +204,9 @@ case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String) ex
try {
client.checkExists().forPath(path) != null
} catch {
- case e: Throwable => {
+ case e: Throwable =>
warn(s"check exists ${path} warn: ${e.getMessage}")
false
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/AccuracyMetric.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/AccuracyMetric.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/AccuracyMetric.scala
index e69716e..19dfb9e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/AccuracyMetric.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/AccuracyMetric.scala
@@ -45,9 +45,11 @@ case class AccuracyMetric(miss: Long, total: Long) extends Metric {
(this.miss != other.miss) || (this.total != other.total)
}
- def getMiss = miss
- def getTotal = total
- def getMatch = total - miss
+ def getMiss: Long = miss
+
+ def getTotal: Long = total
+
+ def getMatch: Long = total - miss
def matchPercentage: Double = if (getTotal <= 0) 0 else getMatch.toDouble / getTotal * 100
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/CacheResults.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/CacheResults.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/CacheResults.scala
index cc8e772..6c99618 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/CacheResults.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/CacheResults.scala
@@ -18,9 +18,10 @@ under the License.
*/
package org.apache.griffin.measure.context.streaming.metric
+import scala.collection.mutable.{Map => MutableMap}
+
import org.apache.griffin.measure.Loggable
-import scala.collection.mutable.{Map => MutableMap}
/**
* in streaming mode, some metrics may update,
@@ -32,10 +33,9 @@ object CacheResults extends Loggable {
def olderThan(ut: Long): Boolean = updateTime < ut
def update(ut: Long, r: Metric): Option[Metric] = {
r match {
- case m: result.T if (olderThan(ut)) => {
+ case m: result.T if (olderThan(ut)) =>
val ur = result.update(m)
if (result.differsFrom(ur)) Some(ur) else None
- }
case _ => None
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
index 6bf6373..f2cd0ec 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
@@ -18,13 +18,14 @@ under the License.
*/
package org.apache.griffin.measure.datasource
+import org.apache.spark.sql._
+
import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.dqdefinition.DataSourceParam
-import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
import org.apache.griffin.measure.context.{DQContext, TimeRange}
+import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
import org.apache.griffin.measure.datasource.connector.DataConnector
import org.apache.griffin.measure.utils.DataFrameUtil._
-import org.apache.spark.sql._
/**
* data source
@@ -50,12 +51,10 @@ case class DataSource(name: String,
val timestamp = context.contextId.timestamp
val (dfOpt, timeRange) = data(timestamp)
dfOpt match {
- case Some(df) => {
+ case Some(df) =>
context.runTimeTableRegister.registerTable(name, df)
- }
- case None => {
+ case None =>
warn(s"load data source [${name}] fails")
- }
}
timeRange
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
index 7807dfd..28e616b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
@@ -18,14 +18,16 @@ under the License.
*/
package org.apache.griffin.measure.datasource
+import scala.util.Success
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.streaming.StreamingContext
+
import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.dqdefinition.DataSourceParam
import org.apache.griffin.measure.datasource.cache.StreamingCacheClientFactory
import org.apache.griffin.measure.datasource.connector.{DataConnector, DataConnectorFactory}
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.streaming.StreamingContext
-import scala.util.Success
object DataSourceFactory extends Loggable {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/TimestampStorage.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/TimestampStorage.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/TimestampStorage.scala
index a305563..04a7f85 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/TimestampStorage.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/TimestampStorage.scala
@@ -18,10 +18,9 @@ under the License.
*/
package org.apache.griffin.measure.datasource
-import org.apache.griffin.measure.Loggable
-
import scala.collection.mutable.{SortedSet => MutableSortedSet}
+import org.apache.griffin.measure.Loggable
/**
* tmst cache, CRUD of timestamps
*/
@@ -29,19 +28,19 @@ case class TimestampStorage() extends Loggable {
private val tmstGroup: MutableSortedSet[Long] = MutableSortedSet.empty[Long]
- //-- insert tmst into tmst group --
- def insert(tmst: Long) = tmstGroup += tmst
- def insert(tmsts: Iterable[Long]) = tmstGroup ++= tmsts
+ // -- insert tmst into tmst group --
+ def insert(tmst: Long) : MutableSortedSet[Long] = tmstGroup += tmst
+ def insert(tmsts: Iterable[Long]) : MutableSortedSet[Long] = tmstGroup ++= tmsts
- //-- remove tmst from tmst group --
- def remove(tmst: Long) = tmstGroup -= tmst
- def remove(tmsts: Iterable[Long]) = tmstGroup --= tmsts
+ // -- remove tmst from tmst group --
+ def remove(tmst: Long) : MutableSortedSet[Long] = tmstGroup -= tmst
+ def remove(tmsts: Iterable[Long]) : MutableSortedSet[Long] = tmstGroup --= tmsts
- //-- get subset of tmst group --
- def fromUntil(from: Long, until: Long) = tmstGroup.range(from, until).toSet
- def afterTil(after: Long, til: Long) = tmstGroup.range(after + 1, til + 1).toSet
- def until(until: Long) = tmstGroup.until(until).toSet
- def from(from: Long) = tmstGroup.from(from).toSet
- def all = tmstGroup.toSet
+ // -- get subset of tmst group --
+ def fromUntil(from: Long, until: Long) : Set[Long] = tmstGroup.range(from, until).toSet
+ def afterTil(after: Long, til: Long) : Set[Long] = tmstGroup.range(after + 1, til + 1).toSet
+ def until(until: Long) : Set[Long] = tmstGroup.until(until).toSet
+ def from(from: Long) : Set[Long] = tmstGroup.from(from).toSet
+ def all : Set[Long] = tmstGroup.toSet
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
index b351f82..a03a468 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
@@ -20,17 +20,19 @@ package org.apache.griffin.measure.datasource.cache
import java.util.concurrent.TimeUnit
+import scala.util.Random
+
+import org.apache.spark.sql._
+
import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.context.TimeRange
import org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointClient
import org.apache.griffin.measure.datasource.TimestampStorage
import org.apache.griffin.measure.step.builder.ConstantColumns
+import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil}
import org.apache.griffin.measure.utils.DataFrameUtil._
import org.apache.griffin.measure.utils.ParamUtil._
-import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil}
-import org.apache.spark.sql._
-import scala.util.Random
/**
* data source cache in streaming mode
@@ -38,7 +40,8 @@ import scala.util.Random
* read data frame from hdfs in calculate phase
* with update and clean actions for the cache data
*/
-trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long] with Loggable with Serializable {
+trait StreamingCacheClient
+ extends StreamingOffsetCacheable with WithFanIn[Long] with Loggable with Serializable {
val sqlContext: SQLContext
val param: Map[String, Any]
@@ -46,7 +49,9 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
val index: Int
val timestampStorage: TimestampStorage
- protected def fromUntilRangeTmsts(from: Long, until: Long) = timestampStorage.fromUntil(from, until)
+ protected def fromUntilRangeTmsts(from: Long, until: Long) =
+ timestampStorage.fromUntil(from, until)
+
protected def clearTmst(t: Long) = timestampStorage.remove(t)
protected def clearTmstsUntil(until: Long) = {
val outDateTmsts = timestampStorage.until(until)
@@ -67,17 +72,20 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
val filePath: String = param.getString(_FilePath, defFilePath)
val cacheInfoPath: String = param.getString(_InfoPath, defInfoPath)
- val readyTimeInterval: Long = TimeUtil.milliseconds(param.getString(_ReadyTimeInterval, "1m")).getOrElse(60000L)
- val readyTimeDelay: Long = TimeUtil.milliseconds(param.getString(_ReadyTimeDelay, "1m")).getOrElse(60000L)
+ val readyTimeInterval: Long =
+ TimeUtil.milliseconds(param.getString(_ReadyTimeInterval, "1m")).getOrElse(60000L)
+
+ val readyTimeDelay: Long =
+ TimeUtil.milliseconds(param.getString(_ReadyTimeDelay, "1m")).getOrElse(60000L)
+
val deltaTimeRange: (Long, Long) = {
def negative(n: Long): Long = if (n <= 0) n else 0
param.get(_TimeRange) match {
- case Some(seq: Seq[String]) => {
+ case Some(seq: Seq[String]) =>
val nseq = seq.flatMap(TimeUtil.milliseconds(_))
val ns = negative(nseq.headOption.getOrElse(0))
val ne = negative(nseq.tail.headOption.getOrElse(0))
(ns, ne)
- }
case _ => (0, 0)
}
}
@@ -112,7 +120,7 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
def saveData(dfOpt: Option[DataFrame], ms: Long): Unit = {
if (!readOnly) {
dfOpt match {
- case Some(df) => {
+ case Some(df) =>
// cache df
df.cache
@@ -137,10 +145,8 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
// uncache df
df.unpersist
- }
- case _ => {
+ case _ =>
info("no data frame to save")
- }
}
// submit cache time and ready time
@@ -168,7 +174,9 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
s"`${ConstantColumns.tmst}` = ${reviseTimeRange._1}"
} else {
info(s"read time range: (${reviseTimeRange._1}, ${reviseTimeRange._2}]")
- s"`${ConstantColumns.tmst}` > ${reviseTimeRange._1} AND `${ConstantColumns.tmst}` <= ${reviseTimeRange._2}"
+
+ s"`${ConstantColumns.tmst}` > ${reviseTimeRange._1} " +
+ s"AND `${ConstantColumns.tmst}` <= ${reviseTimeRange._2}"
}
// new cache data
@@ -176,10 +184,9 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
val dfr = sqlContext.read
readDataFrameOpt(dfr, newFilePath).map(_.filter(filterStr))
} catch {
- case e: Throwable => {
+ case e: Throwable =>
warn(s"read data source cache warn: ${e.getMessage}")
None
- }
}
// old cache data
@@ -190,10 +197,9 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
val dfr = sqlContext.read
readDataFrameOpt(dfr, oldDfPath).map(_.filter(filterStr))
} catch {
- case e: Throwable => {
+ case e: Throwable =>
warn(s"read old data source cache warn: ${e.getMessage}")
None
- }
}
}
@@ -228,12 +234,11 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
}
names.filter { name =>
name match {
- case regex(value) => {
+ case regex(value) =>
str2Long(value) match {
case Some(t) => func(t, bound)
case _ => false
}
- }
case _ => false
}
}.map(name => s"${path}/${name}")
@@ -258,7 +263,7 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
// new cache data
val newCacheCleanTime = if (updatable) readLastProcTime else readCleanTime
newCacheCleanTime match {
- case Some(nct) => {
+ case Some(nct) =>
// clean calculated new cache data
val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
if (newCacheLocked) {
@@ -271,16 +276,15 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
newCacheLock.unlock()
}
}
- }
- case _ => {
+ case _ =>
// do nothing
- }
+ info("should not happen")
}
// old cache data
val oldCacheCleanTime = if (updatable) readCleanTime else None
oldCacheCleanTime match {
- case Some(oct) => {
+ case Some(oct) =>
val oldCacheIndexOpt = readOldCacheIndex
oldCacheIndexOpt.foreach { idx =>
val oldDfPath = s"${oldFilePath}/${idx}"
@@ -298,10 +302,9 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
}
}
}
- }
- case _ => {
+ case _ =>
// do nothing
- }
+ info("should not happen")
}
}
}
@@ -313,7 +316,7 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
def updateData(dfOpt: Option[DataFrame]): Unit = {
if (!readOnly && updatable) {
dfOpt match {
- case Some(df) => {
+ case Some(df) =>
// old cache lock
val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
if (oldCacheLocked) {
@@ -339,10 +342,8 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
oldCacheLock.unlock()
}
}
- }
- case _ => {
+ case _ =>
info("no data frame to update")
- }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala
index f991e2d..eeda8ef 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala
@@ -18,10 +18,11 @@ under the License.
*/
package org.apache.griffin.measure.datasource.cache
+import org.apache.spark.sql.SQLContext
+
import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.datasource.TimestampStorage
import org.apache.griffin.measure.utils.ParamUtil._
-import org.apache.spark.sql.SQLContext
object StreamingCacheClientFactory extends Loggable {
@@ -50,17 +51,20 @@ object StreamingCacheClientFactory extends Loggable {
try {
val tp = param.getString(_type, "")
val dsCache = tp match {
- case ParquetRegex() => StreamingCacheParquetClient(sqlContext, param, name, index, tmstCache)
- case JsonRegex() => StreamingCacheJsonClient(sqlContext, param, name, index, tmstCache)
- case OrcRegex() => StreamingCacheOrcClient(sqlContext, param, name, index, tmstCache)
- case _ => StreamingCacheParquetClient(sqlContext, param, name, index, tmstCache)
+ case ParquetRegex() =>
+ StreamingCacheParquetClient(sqlContext, param, name, index, tmstCache)
+ case JsonRegex() =>
+ StreamingCacheJsonClient(sqlContext, param, name, index, tmstCache)
+ case OrcRegex() =>
+ StreamingCacheOrcClient(sqlContext, param, name, index, tmstCache)
+ case _ =>
+ StreamingCacheParquetClient(sqlContext, param, name, index, tmstCache)
}
Some(dsCache)
} catch {
- case e: Throwable => {
+ case e: Throwable =>
error("generate data source cache fails")
None
- }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala
index a12ef87..c81d4d1 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala
@@ -18,9 +18,10 @@ under the License.
*/
package org.apache.griffin.measure.datasource.cache
-import org.apache.griffin.measure.datasource.TimestampStorage
import org.apache.spark.sql._
+import org.apache.griffin.measure.datasource.TimestampStorage
+
/**
* data source cache in json format
*/
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala
index 63e7104..0649b74 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala
@@ -18,9 +18,10 @@ under the License.
*/
package org.apache.griffin.measure.datasource.cache
-import org.apache.griffin.measure.datasource.TimestampStorage
import org.apache.spark.sql._
+import org.apache.griffin.measure.datasource.TimestampStorage
+
/**
* data source cache in orc format
*/
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala
index c275227..9c369ee 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala
@@ -18,14 +18,18 @@ under the License.
*/
package org.apache.griffin.measure.datasource.cache
-import org.apache.griffin.measure.datasource.TimestampStorage
import org.apache.spark.sql._
+import org.apache.griffin.measure.datasource.TimestampStorage
+
/**
* data source cache in parquet format
*/
-case class StreamingCacheParquetClient(sqlContext: SQLContext, param: Map[String, Any],
- dsName: String, index: Int, timestampStorage: TimestampStorage
+case class StreamingCacheParquetClient(sqlContext: SQLContext,
+ param: Map[String, Any],
+ dsName: String,
+ index: Int,
+ timestampStorage: TimestampStorage
) extends StreamingCacheClient {
sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala
index 7b7f506..e73a058 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala
@@ -30,13 +30,13 @@ trait StreamingOffsetCacheable extends Loggable with Serializable {
val readyTimeInterval: Long
val readyTimeDelay: Long
- def selfCacheInfoPath = s"${OffsetCheckpointClient.infoPath}/${cacheInfoPath}"
+ def selfCacheInfoPath : String = s"${OffsetCheckpointClient.infoPath}/${cacheInfoPath}"
- def selfCacheTime = OffsetCheckpointClient.cacheTime(selfCacheInfoPath)
- def selfLastProcTime = OffsetCheckpointClient.lastProcTime(selfCacheInfoPath)
- def selfReadyTime = OffsetCheckpointClient.readyTime(selfCacheInfoPath)
- def selfCleanTime = OffsetCheckpointClient.cleanTime(selfCacheInfoPath)
- def selfOldCacheIndex = OffsetCheckpointClient.oldCacheIndex(selfCacheInfoPath)
+ def selfCacheTime : String = OffsetCheckpointClient.cacheTime(selfCacheInfoPath)
+ def selfLastProcTime : String = OffsetCheckpointClient.lastProcTime(selfCacheInfoPath)
+ def selfReadyTime : String = OffsetCheckpointClient.readyTime(selfCacheInfoPath)
+ def selfCleanTime : String = OffsetCheckpointClient.cleanTime(selfCacheInfoPath)
+ def selfOldCacheIndex : String = OffsetCheckpointClient.oldCacheIndex(selfCacheInfoPath)
protected def submitCacheTime(ms: Long): Unit = {
val map = Map[String, String]((selfCacheTime -> ms.toString))
@@ -80,9 +80,11 @@ trait StreamingOffsetCacheable extends Loggable with Serializable {
try {
Some(v.toLong)
} catch {
- case _:Throwable => error("try to read not existing value from OffsetCacheClient::readSelfInfo");None
+ case _: Throwable =>
+ error("try to read not existing value from OffsetCacheClient::readSelfInfo")
+ None
}
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/WithFanIn.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/WithFanIn.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/WithFanIn.scala
index 413b5a2..675f2f2 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/WithFanIn.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/WithFanIn.scala
@@ -20,7 +20,7 @@ package org.apache.griffin.measure.datasource.cache
import java.util.concurrent.atomic.AtomicInteger
-import scala.collection.concurrent.{TrieMap, Map => ConcMap}
+import scala.collection.concurrent.{Map => ConcMap, TrieMap}
/**
* fan in trait, for multiple input and one output
@@ -55,14 +55,12 @@ trait WithFanIn[T] {
private def fanInc(key: T): Unit = {
fanInCountMap.get(key) match {
- case Some(n) => {
+ case Some(n) =>
val suc = fanInCountMap.replace(key, n, n + 1)
if (!suc) fanInc(key)
- }
- case _ => {
+ case _ =>
val oldOpt = fanInCountMap.putIfAbsent(key, 1)
if (oldOpt.nonEmpty) fanInc(key)
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
index 05d3c75..ae6a18d 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
@@ -20,16 +20,17 @@ package org.apache.griffin.measure.datasource.connector
import java.util.concurrent.atomic.AtomicLong
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.functions._
+
import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.configuration.enums.BatchProcessType
import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
+import org.apache.griffin.measure.configuration.enums.BatchProcessType
import org.apache.griffin.measure.context.{ContextId, DQContext, TimeRange}
import org.apache.griffin.measure.datasource.TimestampStorage
import org.apache.griffin.measure.job.builder.DQJobBuilder
import org.apache.griffin.measure.step.builder.ConstantColumns
import org.apache.griffin.measure.step.builder.preproc.PreProcParamMaker
-import org.apache.spark.sql.{DataFrame, SparkSession}
-import org.apache.spark.sql.functions._
trait DataConnector extends Loggable with Serializable {
@@ -64,7 +65,8 @@ trait DataConnector extends Loggable with Serializable {
saveTmst(timestamp) // save timestamp
dfOpt.flatMap { df =>
- val (preProcRules, thisTable) = PreProcParamMaker.makePreProcRules(dcParam.getPreProcRules, suffix, dcDfName)
+ val (preProcRules, thisTable) =
+ PreProcParamMaker.makePreProcRules(dcParam.getPreProcRules, suffix, dcDfName)
// init data
context.compileTableRegister.registerTable(thisTable)
@@ -89,10 +91,9 @@ trait DataConnector extends Loggable with Serializable {
}
} catch {
- case e: Throwable => {
+ case e: Throwable =>
error(s"pre-process of data connector [${id}] error: ${e.getMessage}")
None
- }
}
}
}
@@ -108,4 +109,4 @@ object DataConnectorIdGenerator {
private def increment: Long = {
counter.incrementAndGet()
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
index f4911fc..b51d4fb 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
@@ -18,17 +18,18 @@ under the License.
*/
package org.apache.griffin.measure.datasource.connector
+import scala.util.Try
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.streaming.StreamingContext
+
import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
import org.apache.griffin.measure.datasource.TimestampStorage
import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
import org.apache.griffin.measure.datasource.connector.batch._
import org.apache.griffin.measure.datasource.connector.streaming._
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.streaming.StreamingContext
-import scala.reflect.ClassTag
-import scala.util.Try
object DataConnectorFactory extends Loggable {
@@ -60,9 +61,8 @@ object DataConnectorFactory extends Loggable {
case HiveRegex() => HiveBatchDataConnector(sparkSession, dcParam, tmstCache)
case AvroRegex() => AvroBatchDataConnector(sparkSession, dcParam, tmstCache)
case TextDirRegex() => TextDirBatchDataConnector(sparkSession, dcParam, tmstCache)
- case KafkaRegex() => {
+ case KafkaRegex() =>
getStreamingDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
- }
case _ => throw new Exception("connector creation error!")
}
}
@@ -78,7 +78,8 @@ object DataConnectorFactory extends Loggable {
val conType = dcParam.getType
val version = dcParam.getVersion
conType match {
- case KafkaRegex() => getKafkaDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
+ case KafkaRegex() =>
+ getKafkaDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
case _ => throw new Exception("streaming connector creation error!")
}
}
@@ -88,7 +89,7 @@ object DataConnectorFactory extends Loggable {
dcParam: DataConnectorParam,
tmstCache: TimestampStorage,
streamingCacheClientOpt: Option[StreamingCacheClient]
- ): KafkaStreamingDataConnector = {
+ ): KafkaStreamingDataConnector = {
val KeyType = "key.type"
val ValueType = "value.type"
val config = dcParam.getConfig
@@ -96,22 +97,14 @@ object DataConnectorFactory extends Loggable {
val valueType = config.getOrElse(ValueType, "java.lang.String").toString
(keyType, valueType) match {
- case ("java.lang.String", "java.lang.String") => {
- KafkaStreamingStringDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
- }
- case _ => {
+ case ("java.lang.String", "java.lang.String") =>
+ KafkaStreamingStringDataConnector(
+ sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
+ case _ =>
throw new Exception("not supported type kafka data connector")
- }
}
}
-// def filterDataConnectors[T <: DataConnector : ClassTag](connectors: Seq[DataConnector]): Seq[T] = {
-// connectors.flatMap { dc =>
-// dc match {
-// case mdc: T => Some(mdc)
-// case _ => None
-// }
-// }
-// }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
index 7fa9080..bf71b2c 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
@@ -18,11 +18,12 @@ under the License.
*/
package org.apache.griffin.measure.datasource.connector.batch
+import org.apache.spark.sql.{DataFrame, SparkSession}
+
import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
import org.apache.griffin.measure.context.TimeRange
import org.apache.griffin.measure.datasource.TimestampStorage
import org.apache.griffin.measure.utils.HdfsUtil
-import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.griffin.measure.utils.ParamUtil._
/**
@@ -58,10 +59,9 @@ case class AvroBatchDataConnector(@transient sparkSession: SparkSession,
val preDfOpt = preProcess(dfOpt, ms)
preDfOpt
} catch {
- case e: Throwable => {
+ case e: Throwable =>
error(s"load avro file ${concreteFileFullPath} fails")
None
- }
}
val tmsts = readTmst(ms)
(dfOpt, TimeRange(ms, tmsts))
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/HiveBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/HiveBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/HiveBatchDataConnector.scala
index ab1e823..1df3bd7 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/HiveBatchDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/HiveBatchDataConnector.scala
@@ -18,10 +18,11 @@ under the License.
*/
package org.apache.griffin.measure.datasource.connector.batch
+import org.apache.spark.sql.{DataFrame, SparkSession}
+
import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
import org.apache.griffin.measure.context.TimeRange
import org.apache.griffin.measure.datasource.TimestampStorage
-import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.griffin.measure.utils.ParamUtil._
/**
@@ -54,10 +55,9 @@ case class HiveBatchDataConnector(@transient sparkSession: SparkSession,
val preDfOpt = preProcess(dfOpt, ms)
preDfOpt
} catch {
- case e: Throwable => {
+ case e: Throwable =>
error(s"load hive table ${concreteTableName} fails: ${e.getMessage}")
None
- }
}
val tmsts = readTmst(ms)
(dfOpt, TimeRange(ms, tmsts))
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
index 3dcab16..a7ab02e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
@@ -18,11 +18,12 @@ under the License.
*/
package org.apache.griffin.measure.datasource.connector.batch
+import org.apache.spark.sql.{DataFrame, SparkSession}
+
import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
import org.apache.griffin.measure.context.TimeRange
import org.apache.griffin.measure.datasource.TimestampStorage
import org.apache.griffin.measure.utils.HdfsUtil
-import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.griffin.measure.utils.ParamUtil._
/**
@@ -60,7 +61,7 @@ case class TextDirBatchDataConnector(@transient sparkSession: SparkSession,
val validDataDirs = dataDirs.filter(dir => !emptyDir(dir))
if (validDataDirs.nonEmpty) {
- val df = sparkSession.read.text(validDataDirs: _*)
+ val df = sparkSession.read.text(validDataDirs: _*)
val dfOpt = Some(df)
val preDfOpt = preProcess(dfOpt, ms)
preDfOpt
@@ -68,16 +69,17 @@ case class TextDirBatchDataConnector(@transient sparkSession: SparkSession,
None
}
} catch {
- case e: Throwable => {
+ case e: Throwable =>
error(s"load text dir ${dirPath} fails: ${e.getMessage}")
None
- }
}
val tmsts = readTmst(ms)
(dfOpt, TimeRange(ms, tmsts))
}
- private def listSubDirs(paths: Seq[String], depth: Int, filteFunc: (String) => Boolean): Seq[String] = {
+ private def listSubDirs(paths: Seq[String],
+ depth: Int,
+ filteFunc: (String) => Boolean): Seq[String] = {
val subDirs = paths.flatMap { path => HdfsUtil.listSubPathsByType(path, "dir", true) }
if (depth <= 0) {
subDirs.filter(filteFunc)
@@ -90,7 +92,8 @@ case class TextDirBatchDataConnector(@transient sparkSession: SparkSession,
private def isDone(dir: String): Boolean = HdfsUtil.existFileInDir(dir, doneFile)
private def isSuccess(dir: String): Boolean = HdfsUtil.existFileInDir(dir, successFile)
- private def touchDone(dir: String): Unit = HdfsUtil.createEmptyFile(HdfsUtil.getHdfsFilePath(dir, doneFile))
+ private def touchDone(dir: String): Unit =
+ HdfsUtil.createEmptyFile(HdfsUtil.getHdfsFilePath(dir, doneFile))
private def emptyDir(dir: String): Boolean = {
HdfsUtil.listSubPathsByType(dir, "file").filter(!_.startsWith(ignoreFilePrefix)).size == 0
@@ -98,7 +101,8 @@ case class TextDirBatchDataConnector(@transient sparkSession: SparkSession,
// def metaData(): Try[Iterable[(String, String)]] = {
// Try {
-// val st = sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath).schema
+// val st = sqlContext.read.format("com.databricks.spark.avro").
+ // load(concreteFileFullPath).schema
// st.fields.map(f => (f.name, f.dataType.typeName))
// }
// }
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala
index 1475898..ec09ffc 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala
@@ -18,10 +18,11 @@ under the License.
*/
package org.apache.griffin.measure.datasource.connector.streaming
+import scala.util.{Failure, Success, Try}
+
import kafka.serializer.Decoder
import org.apache.spark.streaming.dstream.InputDStream
-import scala.util.{Failure, Success, Try}
import org.apache.griffin.measure.utils.ParamUtil._
/**
@@ -64,10 +65,9 @@ trait KafkaStreamingDataConnector extends StreamingDataConnector {
// pre-process
preProcess(dfOpt, ms)
} catch {
- case e: Throwable => {
+ case e: Throwable =>
error(s"streaming data connector error: ${e.getMessage}")
None
- }
}
// save data frame
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala
index 8477fcf..ee5e497 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala
@@ -19,16 +19,17 @@ under the License.
package org.apache.griffin.measure.datasource.connector.streaming
import kafka.serializer.StringDecoder
-import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
-import org.apache.griffin.measure.datasource.TimestampStorage
-import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, _}
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaUtils
+import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
+
/**
* streaming data connector for kafka with string format key and value
*/
@@ -60,10 +61,9 @@ case class KafkaStreamingStringDataConnector(@transient sparkSession: SparkSessi
val df = sparkSession.createDataFrame(rowRdd, schema)
Some(df)
} catch {
- case e: Throwable => {
+ case e: Throwable =>
error("streaming data transform fails")
None
- }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnector.scala
index 5c2170c..323b0ac 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnector.scala
@@ -18,14 +18,16 @@ under the License.
*/
package org.apache.griffin.measure.datasource.connector.streaming
-import org.apache.griffin.measure.context.TimeRange
-import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
-import org.apache.griffin.measure.datasource.connector.DataConnector
+import scala.util.Try
+
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.streaming.dstream.InputDStream
-import scala.util.Try
+import org.apache.griffin.measure.context.TimeRange
+import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
+import org.apache.griffin.measure.datasource.connector.DataConnector
+
trait StreamingDataConnector extends DataConnector {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala
index 249d6d2..0917919 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala
@@ -18,11 +18,9 @@ under the License.
*/
package org.apache.griffin.measure.job.builder
-import org.apache.griffin.measure.configuration.enums.DslType
import org.apache.griffin.measure.configuration.dqdefinition._
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.job._
-import org.apache.griffin.measure.step.DQStep
import org.apache.griffin.measure.step.builder.DQStepBuilder
import org.apache.griffin.measure.step.write.MetricFlushStep
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
index b6cca98..92480d8 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
@@ -18,10 +18,11 @@ under the License.
*/
package org.apache.griffin.measure.launch
+import scala.util.Try
+
import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.dqdefinition.{DQConfig, EnvConfig, SinkParam}
-import scala.util.Try
/**
* dq application process
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
index ba1f389..e2dbc8d 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
@@ -20,17 +20,19 @@ package org.apache.griffin.measure.launch.batch
import java.util.Date
-import org.apache.griffin.measure.configuration.enums._
+import scala.util.Try
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{SparkSession, SQLContext}
+
import org.apache.griffin.measure.configuration.dqdefinition._
+import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.context._
import org.apache.griffin.measure.datasource.DataSourceFactory
import org.apache.griffin.measure.job.builder.DQJobBuilder
import org.apache.griffin.measure.launch.DQApp
import org.apache.griffin.measure.step.builder.udf.GriffinUDFAgent
-import org.apache.spark.SparkConf
-import org.apache.spark.sql.{SQLContext, SparkSession}
-import scala.util.Try
case class BatchDQApp(allParam: GriffinConfig) extends DQApp {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
index ceecb78..eb31a5e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
@@ -21,22 +21,24 @@ package org.apache.griffin.measure.launch.streaming
import java.util.{Date, Timer, TimerTask}
import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit}
+import scala.util.Try
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{SparkSession, SQLContext}
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
+
import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.configuration.dqdefinition._
+import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.context._
import org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointClient
-import org.apache.griffin.measure.datasource.DataSourceFactory
import org.apache.griffin.measure.context.streaming.metric.CacheResults
+import org.apache.griffin.measure.datasource.DataSourceFactory
import org.apache.griffin.measure.job.builder.DQJobBuilder
import org.apache.griffin.measure.launch.DQApp
import org.apache.griffin.measure.step.builder.udf.GriffinUDFAgent
import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil}
-import org.apache.spark.SparkConf
-import org.apache.spark.sql.{SQLContext, SparkSession}
-import org.apache.spark.streaming.{Milliseconds, StreamingContext}
-import scala.util.Try
case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
@@ -82,10 +84,9 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
try {
createStreamingContext
} catch {
- case e: Throwable => {
+ case e: Throwable =>
error(s"create streaming context error: ${e.getMessage}")
throw e
- }
}
})
@@ -118,7 +119,7 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
ssc.start()
ssc.awaitTermination()
- ssc.stop(stopSparkContext=true, stopGracefully=true)
+ ssc.stop(stopSparkContext = true, stopGracefully = true)
// clean context
globalContext.clean()
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
index 306befe..feebd91 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
@@ -18,9 +18,10 @@ under the License.
*/
package org.apache.griffin.measure.sink
+import org.apache.spark.rdd.RDD
+
import org.apache.griffin.measure.utils.JsonUtil
import org.apache.griffin.measure.utils.ParamUtil._
-import org.apache.spark.rdd.RDD
/**
* sink metric and record to console, for debug
[2/3] incubator-griffin git commit: Fix case clauses
Posted by gu...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
index e5f72d1..3c20a0e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
@@ -18,11 +18,13 @@ under the License.
*/
package org.apache.griffin.measure.sink
-import org.apache.griffin.measure.utils.ParamUtil._
-import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil, TimeUtil}
+import scala.concurrent.Future
+
import org.apache.spark.rdd.RDD
-import scala.concurrent.Future
+import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil, TimeUtil}
+import org.apache.griffin.measure.utils.ParamUtil._
+
/**
* sink metric and record through http request
@@ -38,7 +40,10 @@ case class ElasticSearchSink(config: Map[String, Any], metricName: String,
val api = config.getString(Api, "")
val method = config.getString(Method, "post")
- val connectionTimeout = TimeUtil.milliseconds(config.getString(ConnectionTimeout, "")).getOrElse(-1L)
+
+ val connectionTimeout =
+ TimeUtil.milliseconds(config.getString(ConnectionTimeout, "")).getOrElse(-1L)
+
val retry = config.getInt(Retry, 10)
val _Value = "value"
@@ -55,7 +60,7 @@ case class ElasticSearchSink(config: Map[String, Any], metricName: String,
val data = JsonUtil.toJson(dataMap)
// http request
val params = Map[String, Object]()
- val header = Map[String, Object](("Content-Type","application/json"))
+ val header = Map[String, Object](("Content-Type", "application/json"))
def func(): (Long, Future[Boolean]) = {
import scala.concurrent.ExecutionContext.Implicits.global
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala
index 718f1c1..588fabf 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala
@@ -20,10 +20,11 @@ package org.apache.griffin.measure.sink
import java.util.Date
-import org.apache.griffin.measure.utils.ParamUtil._
-import org.apache.griffin.measure.utils.{HdfsUtil, JsonUtil}
import org.apache.spark.rdd.RDD
+import org.apache.griffin.measure.utils.{HdfsUtil, JsonUtil}
+import org.apache.griffin.measure.utils.ParamUtil._
+
/**
* sink metric and record to hdfs
*/
@@ -83,6 +84,7 @@ case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Lon
case e: Throwable => error(e.getMessage)
}
}
+
def finish(): Unit = {
try {
HdfsUtil.createEmptyFile(FinishFile)
@@ -103,6 +105,7 @@ case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Lon
private def getHdfsPath(path: String, groupId: Int): String = {
HdfsUtil.getHdfsFilePath(path, s"${groupId}")
}
+
private def getHdfsPath(path: String, ptnId: Int, groupId: Int): String = {
HdfsUtil.getHdfsFilePath(path, s"${ptnId}.${groupId}")
}
@@ -116,7 +119,10 @@ case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Lon
clearOldRecords(path)
try {
val recordCount = records.count
- val count = if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount)
+
+ val count =
+ if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount)
+
if (count > 0) {
val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt
if (groupCount <= 1) {
@@ -145,7 +151,10 @@ case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Lon
clearOldRecords(path)
try {
val recordCount = records.size
- val count = if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount)
+
+ val count =
+ if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount)
+
if (count > 0) {
val groupCount = (count - 1) / maxLinesPerFile + 1
if (groupCount <= 1) {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala
index 206f187..ab59e59 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala
@@ -18,14 +18,16 @@ under the License.
*/
package org.apache.griffin.measure.sink
-import org.apache.griffin.measure.utils.ParamUtil._
-import org.apache.griffin.measure.utils.TimeUtil
+import scala.concurrent.Future
+
import org.apache.spark.rdd.RDD
import org.mongodb.scala._
import org.mongodb.scala.model.{Filters, UpdateOptions, Updates}
import org.mongodb.scala.result.UpdateResult
-import scala.concurrent.Future
+import org.apache.griffin.measure.utils.ParamUtil._
+import org.apache.griffin.measure.utils.TimeUtil
+
/**
* sink metric and record to mongo
@@ -98,7 +100,7 @@ object MongoConnection {
var dataConf: MongoConf = _
private var dataCollection: MongoCollection[Document] = _
- def getDataCollection = dataCollection
+ def getDataCollection : MongoCollection[Document] = dataCollection
def init(config: Map[String, Any]): Unit = {
if (!initialed) {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/sink/Sink.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/Sink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/Sink.scala
index 0c03bd8..9d598fb 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/Sink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/Sink.scala
@@ -18,9 +18,10 @@ under the License.
*/
package org.apache.griffin.measure.sink
-import org.apache.griffin.measure.Loggable
import org.apache.spark.rdd.RDD
+import org.apache.griffin.measure.Loggable
+
/**
* sink metric and record
*/
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
index 26b0178..49818f2 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
@@ -18,10 +18,11 @@ under the License.
*/
package org.apache.griffin.measure.sink
+import scala.util.{Success, Try}
+
import org.apache.griffin.measure.configuration.dqdefinition.SinkParam
import org.apache.griffin.measure.configuration.enums._
-import scala.util.{Success, Try}
case class SinkFactory(sinkParams: Iterable[SinkParam], metricName: String) extends Serializable {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/sink/SinkTaskRunner.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkTaskRunner.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkTaskRunner.scala
index 1cc3f3e..ca38629 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkTaskRunner.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkTaskRunner.scala
@@ -21,12 +21,13 @@ package org.apache.griffin.measure.sink
import java.util.Date
import java.util.concurrent.TimeUnit
-import org.apache.griffin.measure.Loggable
-
import scala.concurrent._
import scala.concurrent.duration._
import scala.util.{Failure, Success}
+import org.apache.griffin.measure.Loggable
+
+
/**
* sink task runner, to sink metrics in block or non-block mode
*/
@@ -52,11 +53,11 @@ object SinkTaskRunner extends Loggable {
val st = new Date().getTime
val (t, res) = func()
res.onComplete {
- case Success(value) => {
+ case Success(value) =>
val et = new Date().getTime
info(s"task ${t} success with (${value}) [ using time ${et - st} ms ]")
- }
- case Failure(e) => {
+
+ case Failure(e) =>
val et = new Date().getTime
warn(s"task ${t} fails [ using time ${et - st} ms ] : ${e.getMessage}")
if (nextRetry >= 0) {
@@ -65,11 +66,11 @@ object SinkTaskRunner extends Loggable {
} else {
error(s"task fails: task ${t} retry ends but fails")
}
- }
}
}
- private def blockExecute(func: () => (Long, Future[_]), retry: Int, waitDuration: Duration): Unit = {
+ private def blockExecute(func: () => (Long, Future[_]),
+ retry: Int, waitDuration: Duration): Unit = {
val nextRetry = nextRetryCount(retry)
val st = new Date().getTime
val (t, res) = func()
@@ -78,7 +79,7 @@ object SinkTaskRunner extends Loggable {
val et = new Date().getTime
info(s"task ${t} success with (${value}) [ using time ${et - st} ms ]")
} catch {
- case e: Throwable => {
+ case e: Throwable =>
val et = new Date().getTime
warn(s"task ${t} fails [ using time ${et - st} ms ] : ${e.getMessage}")
if (nextRetry >= 0) {
@@ -87,7 +88,6 @@ object SinkTaskRunner extends Loggable {
} else {
error(s"task fails: task ${t} retry ends but fails")
}
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala
index c37b5a3..5ce2b14 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala
@@ -19,9 +19,10 @@ under the License.
package org.apache.griffin.measure.step.builder
import org.apache.commons.lang.StringUtils
+
import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.configuration.dqdefinition.{DataSourceParam, Param, RuleParam}
+import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.step._
@@ -49,7 +50,8 @@ object DQStepBuilder {
.flatMap(_.buildDQStep(context, dsParam))
}
- private def getDataSourceParamStepBuilder(procType: ProcessType): Option[DataSourceParamStepBuilder] = {
+ private def getDataSourceParamStepBuilder(procType: ProcessType)
+ : Option[DataSourceParamStepBuilder] = {
procType match {
case BatchProcessType => Some(BatchDataSourceStepBuilder())
case StreamingProcessType => Some(StreamingDataSourceStepBuilder())
@@ -64,7 +66,9 @@ object DQStepBuilder {
val funcNames = context.functionNames
val dqStepOpt = getRuleParamStepBuilder(dslType, dsNames, funcNames)
.flatMap(_.buildDQStep(context, ruleParam))
- dqStepOpt.toSeq.flatMap(_.getNames).foreach(name => context.compileTableRegister.registerTable(name))
+ dqStepOpt.toSeq.flatMap(_.getNames).foreach(name =>
+ context.compileTableRegister.registerTable(name)
+ )
dqStepOpt
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala
index d3c0e41..aa43cf6 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala
@@ -24,7 +24,6 @@ import org.apache.griffin.measure.step.DQStep
import org.apache.griffin.measure.step.builder.dsl.parser.GriffinDslParser
import org.apache.griffin.measure.step.builder.dsl.transform.Expr2DQSteps
-import scala.util.{Failure, Success}
case class GriffinDslDQStepBuilder(dataSourceNames: Seq[String],
functionNames: Seq[String]
@@ -50,10 +49,9 @@ case class GriffinDslDQStepBuilder(dataSourceNames: Seq[String],
Nil
}
} catch {
- case e: Throwable => {
+ case e: Throwable =>
error(s"generate rule plan ${name} fails: ${e.getMessage}")
Nil
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
index 5a04c11..4af3ceb 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
@@ -18,11 +18,11 @@ under the License.
*/
package org.apache.griffin.measure.step.builder
-import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
+import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.context.DQContext
-import org.apache.griffin.measure.step.write.{DataSourceUpdateWriteStep, MetricWriteStep, RecordWriteStep}
import org.apache.griffin.measure.step.{DQStep, SeqDQStep}
+import org.apache.griffin.measure.step.write.{DataSourceUpdateWriteStep, MetricWriteStep, RecordWriteStep}
/**
* build dq step by rule param
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ClauseExpression.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ClauseExpression.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ClauseExpression.scala
index 17e678e..1c04e75 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ClauseExpression.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ClauseExpression.scala
@@ -35,7 +35,8 @@ case class SelectClause(exprs: Seq[Expr], extraConditionOpt: Option[ExtraConditi
def coalesceDesc: String = desc
override def map(func: (Expr) => Expr): SelectClause = {
- SelectClause(exprs.map(func(_)), extraConditionOpt.map(func(_).asInstanceOf[ExtraConditionExpr]))
+ SelectClause(exprs.map(func(_)),
+ extraConditionOpt.map(func(_).asInstanceOf[ExtraConditionExpr]))
}
}
@@ -81,11 +82,10 @@ case class GroupbyClause(exprs: Seq[Expr], havingClauseOpt: Option[Expr]) extend
def merge(other: GroupbyClause): GroupbyClause = {
val newHavingClauseOpt = (havingClauseOpt, other.havingClauseOpt) match {
- case (Some(hc), Some(ohc)) => {
+ case (Some(hc), Some(ohc)) =>
val logical1 = LogicalFactorExpr(hc, false, None)
val logical2 = LogicalFactorExpr(ohc, false, None)
Some(BinaryLogicalExpr(logical1, ("AND", logical2) :: Nil))
- }
case (a @ Some(_), _) => a
case (_, b @ Some(_)) => b
case (_, _) => None
@@ -250,7 +250,8 @@ case class DistinctnessClause(exprs: Seq[Expr]) extends ClauseExpression {
def desc: String = exprs.map(_.desc).mkString(", ")
def coalesceDesc: String = exprs.map(_.coalesceDesc).mkString(", ")
- override def map(func: (Expr) => Expr): DistinctnessClause = DistinctnessClause(exprs.map(func(_)))
+ override def map(func: (Expr) => Expr) : DistinctnessClause =
+ DistinctnessClause(exprs.map(func(_)))
}
case class TimelinessClause(exprs: Seq[Expr]) extends ClauseExpression {
@@ -266,5 +267,6 @@ case class CompletenessClause(exprs: Seq[Expr]) extends ClauseExpression {
def desc: String = exprs.map(_.desc).mkString(", ")
def coalesceDesc: String = exprs.map(_.coalesceDesc).mkString(", ")
- override def map(func: (Expr) => Expr): CompletenessClause = CompletenessClause(exprs.map(func(_)))
-}
\ No newline at end of file
+ override def map(func: (Expr) => Expr): CompletenessClause =
+ CompletenessClause(exprs.map(func(_)))
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/LogicalExpr.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/LogicalExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/LogicalExpr.scala
index 13317bb..426dbbb 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/LogicalExpr.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/LogicalExpr.scala
@@ -185,7 +185,8 @@ case class UnaryLogicalExpr(oprs: Seq[String], factor: LogicalExpr) extends Logi
}
}
-case class BinaryLogicalExpr(factor: LogicalExpr, tails: Seq[(String, LogicalExpr)]) extends LogicalExpr {
+case class BinaryLogicalExpr(factor: LogicalExpr, tails: Seq[(String, LogicalExpr)])
+ extends LogicalExpr {
addChildren(factor +: tails.map(_._2))
@@ -220,4 +221,4 @@ case class BinaryLogicalExpr(factor: LogicalExpr, tails: Seq[(String, LogicalExp
(pair._1, func(pair._2).asInstanceOf[LogicalExpr])
})
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/SelectExpr.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/SelectExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/SelectExpr.scala
index b64dae2..c4f80d1 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/SelectExpr.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/SelectExpr.scala
@@ -96,17 +96,17 @@ case class FunctionSelectExpr(functionName: String, args: Seq[Expr]) extends Sel
// -------------
-case class SelectionExpr(head: HeadExpr, selectors: Seq[SelectExpr], aliasOpt: Option[String]) extends SelectExpr {
+case class SelectionExpr(head: HeadExpr, selectors: Seq[SelectExpr], aliasOpt: Option[String])
+ extends SelectExpr {
addChildren(head +: selectors)
def desc: String = {
selectors.foldLeft(head.desc) { (hd, sel) =>
sel match {
- case FunctionSelectExpr(funcName, args) => {
+ case FunctionSelectExpr(funcName, args) =>
val nargs = hd +: args.map(_.desc)
s"${funcName}(${nargs.mkString(", ")})"
- }
case _ => s"${hd}${sel.desc}"
}
}
@@ -129,4 +129,4 @@ case class SelectionExpr(head: HeadExpr, selectors: Seq[SelectExpr], aliasOpt: O
SelectionExpr(func(head).asInstanceOf[HeadExpr],
selectors.map(func(_).asInstanceOf[SelectExpr]), aliasOpt)
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/TreeNode.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/TreeNode.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/TreeNode.scala
index 4ce0fe6..b10fd57 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/TreeNode.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/TreeNode.scala
@@ -24,13 +24,15 @@ trait TreeNode extends Serializable {
var children = Seq[TreeNode]()
- def addChild(expr: TreeNode) = { children :+= expr }
- def addChildren(exprs: Seq[TreeNode]) = { children ++= exprs }
+ def addChild(expr: TreeNode) : Unit = { children :+= expr }
+ def addChildren(exprs: Seq[TreeNode]) : Unit = { children ++= exprs }
- def preOrderTraverseDepthFirst[T, A <: TreeNode](z: T)(seqOp: (A, T) => T, combOp: (T, T) => T)(implicit tag: ClassTag[A]): T = {
+ def preOrderTraverseDepthFirst[T, A <: TreeNode](z: T)
+ (seqOp: (A, T) => T, combOp: (T, T) => T)
+ (implicit tag: ClassTag[A]): T = {
val clazz = tag.runtimeClass
- if(clazz.isAssignableFrom(this.getClass)){
+ if(clazz.isAssignableFrom(this.getClass)) {
val tv = seqOp(this.asInstanceOf[A], z)
children.foldLeft(combOp(z, tv)) { (ov, tn) =>
combOp(ov, tn.preOrderTraverseDepthFirst(z)(seqOp, combOp))
@@ -41,16 +43,18 @@ trait TreeNode extends Serializable {
}
}
- def postOrderTraverseDepthFirst[T, A <: TreeNode](z: T)(seqOp: (A, T) => T, combOp: (T, T) => T)(implicit tag: ClassTag[A]): T = {
+ def postOrderTraverseDepthFirst[T, A <: TreeNode](z: T)
+ (seqOp: (A, T) => T, combOp: (T, T) => T)
+ (implicit tag: ClassTag[A]): T = {
val clazz = tag.runtimeClass
- if(clazz.isAssignableFrom(this.getClass)){
+ if(clazz.isAssignableFrom(this.getClass)) {
val cv = children.foldLeft(z) { (ov, tn) =>
combOp(ov, tn.postOrderTraverseDepthFirst(z)(seqOp, combOp))
}
combOp(z, seqOp(this.asInstanceOf[A], cv))
}
- else{
+ else {
z
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/BasicParser.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/BasicParser.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/BasicParser.scala
index 18f7754..fe6678d 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/BasicParser.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/BasicParser.scala
@@ -18,9 +18,10 @@ under the License.
*/
package org.apache.griffin.measure.step.builder.dsl.parser
+import scala.util.parsing.combinator.JavaTokenParsers
+
import org.apache.griffin.measure.step.builder.dsl.expr._
-import scala.util.parsing.combinator.JavaTokenParsers
/**
* basic parser for sql like syntax
@@ -388,10 +389,9 @@ trait BasicParser extends JavaTokenParsers with Serializable {
def combinedClause: Parser[CombinedClause] = selectClause ~ opt(fromClause) ~ opt(whereClause) ~
opt(groupbyClause) ~ opt(orderbyClause) ~ opt(limitClause) ^^ {
- case sel ~ fromOpt ~ whereOpt ~ groupbyOpt ~ orderbyOpt ~ limitOpt => {
+ case sel ~ fromOpt ~ whereOpt ~ groupbyOpt ~ orderbyOpt ~ limitOpt =>
val tails = Seq(whereOpt, groupbyOpt, orderbyOpt, limitOpt).flatMap(opt => opt)
CombinedClause(sel, fromOpt, tails)
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/GriffinDslParser.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/GriffinDslParser.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/GriffinDslParser.scala
index 2cd638c..77ed987 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/GriffinDslParser.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/GriffinDslParser.scala
@@ -37,11 +37,10 @@ case class GriffinDslParser(dataSourceNames: Seq[String], functionNames: Seq[Str
def profilingClause: Parser[ProfilingClause] = selectClause ~ opt(fromClause) ~ opt(whereClause) ~
opt(groupbyClause) ~ opt(orderbyClause) ~ opt(limitClause) ^^ {
- case sel ~ fromOpt ~ whereOpt ~ groupbyOpt ~ orderbyOpt ~ limitOpt => {
+ case sel ~ fromOpt ~ whereOpt ~ groupbyOpt ~ orderbyOpt ~ limitOpt =>
val preClauses = Seq(whereOpt).flatMap(opt => opt)
val postClauses = Seq(orderbyOpt, limitOpt).flatMap(opt => opt)
ProfilingClause(sel, fromOpt, groupbyOpt, preClauses, postClauses)
- }
}
/**
@@ -59,7 +58,7 @@ case class GriffinDslParser(dataSourceNames: Seq[String], functionNames: Seq[Str
* <distinctness-clauses> = <distExpr> [, <distExpr>]+
*/
def sqbrExpr: Parser[Expr] = LSQBR ~> expression <~ RSQBR ^^ {
- case expr => { expr.tag = "[]"; expr}
+ case expr => expr.tag = "[]"; expr
}
def distExpr: Parser[Expr] = expression | sqbrExpr
def distinctnessClause: Parser[DistinctnessClause] = rep1sep(distExpr, Operator.COMMA) ^^ {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
index b97a1a0..3bf7d04 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
@@ -18,15 +18,15 @@ under the License.
*/
package org.apache.griffin.measure.step.builder.dsl.transform
-import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
+import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.step.DQStep
import org.apache.griffin.measure.step.builder.ConstantColumns
import org.apache.griffin.measure.step.builder.dsl.expr._
import org.apache.griffin.measure.step.builder.dsl.transform.analyzer.AccuracyAnalyzer
-import org.apache.griffin.measure.step.transform.DataFrameOps.AccuracyOprKeys
import org.apache.griffin.measure.step.transform.{DataFrameOps, DataFrameOpsTransformStep, SparkSqlTransformStep}
+import org.apache.griffin.measure.step.transform.DataFrameOps.AccuracyOprKeys
import org.apache.griffin.measure.step.write.{DataSourceUpdateWriteStep, MetricWriteStep, RecordWriteStep}
import org.apache.griffin.measure.utils.ParamUtil._
@@ -77,30 +77,37 @@ case class AccuracyExpr2DQSteps(context: DQContext,
s"${sel.desc} IS NULL"
}.mkString(" AND ")
val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})"
- s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}"
+ s"SELECT ${selClause} FROM `${sourceName}` " +
+ s"LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}"
}
- val missRecordsTransStep = SparkSqlTransformStep(missRecordsTableName, missRecordsSql, emptyMap, true)
+ val missRecordsTransStep =
+ SparkSqlTransformStep(missRecordsTableName, missRecordsSql, emptyMap, true)
+
val missRecordsWriteSteps = procType match {
- case BatchProcessType => {
- val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(missRecordsTableName)
+ case BatchProcessType =>
+ val rwName =
+ ruleParam.getOutputOpt(RecordOutputType).
+ flatMap(_.getNameOpt).getOrElse(missRecordsTableName)
RecordWriteStep(rwName, missRecordsTableName) :: Nil
- }
case StreamingProcessType => Nil
}
val missRecordsUpdateWriteSteps = procType match {
case BatchProcessType => Nil
- case StreamingProcessType => {
- val dsName = ruleParam.getOutputOpt(DscUpdateOutputType).flatMap(_.getNameOpt).getOrElse(sourceName)
+ case StreamingProcessType =>
+ val dsName =
+ ruleParam.getOutputOpt(DscUpdateOutputType).flatMap(_.getNameOpt).getOrElse(sourceName)
DataSourceUpdateWriteStep(dsName, missRecordsTableName) :: Nil
- }
}
// 2. miss count
val missCountTableName = "__missCount"
val missColName = details.getStringOrKey(_miss)
val missCountSql = procType match {
- case BatchProcessType => s"SELECT COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}`"
- case StreamingProcessType => s"SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}` GROUP BY `${ConstantColumns.tmst}`"
+ case BatchProcessType =>
+ s"SELECT COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}`"
+ case StreamingProcessType =>
+ s"SELECT `${ConstantColumns.tmst}`,COUNT(*) AS `${missColName}` " +
+ s"FROM `${missRecordsTableName}` GROUP BY `${ConstantColumns.tmst}`"
}
val missCountTransStep = SparkSqlTransformStep(missCountTableName, missCountSql, emptyMap)
@@ -109,7 +116,9 @@ case class AccuracyExpr2DQSteps(context: DQContext,
val totalColName = details.getStringOrKey(_total)
val totalCountSql = procType match {
case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceName}`"
- case StreamingProcessType => s"SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${totalColName}` FROM `${sourceName}` GROUP BY `${ConstantColumns.tmst}`"
+ case StreamingProcessType =>
+ s"SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${totalColName}` " +
+ s"FROM `${sourceName}` GROUP BY `${ConstantColumns.tmst}`"
}
val totalCountTransStep = SparkSqlTransformStep(totalCountTableName, totalCountSql, emptyMap)
@@ -117,15 +126,14 @@ case class AccuracyExpr2DQSteps(context: DQContext,
val accuracyTableName = ruleParam.getOutDfName()
val matchedColName = details.getStringOrKey(_matched)
val accuracyMetricSql = procType match {
- case BatchProcessType => {
+ case BatchProcessType =>
s"""
|SELECT `${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
|coalesce(`${missCountTableName}`.`${missColName}`, 0) AS `${missColName}`,
|(`${totalCountTableName}`.`${totalColName}` - coalesce(`${missCountTableName}`.`${missColName}`, 0)) AS `${matchedColName}`
|FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}`
""".stripMargin
- }
- case StreamingProcessType => {
+ case StreamingProcessType =>
s"""
|SELECT `${totalCountTableName}`.`${ConstantColumns.tmst}` AS `${ConstantColumns.tmst}`,
|`${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
@@ -134,27 +142,26 @@ case class AccuracyExpr2DQSteps(context: DQContext,
|FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}`
|ON `${totalCountTableName}`.`${ConstantColumns.tmst}` = `${missCountTableName}`.`${ConstantColumns.tmst}`
""".stripMargin
- }
}
val accuracyTransStep = SparkSqlTransformStep(accuracyTableName, accuracyMetricSql, emptyMap)
val accuracyMetricWriteSteps = procType match {
- case BatchProcessType => {
+ case BatchProcessType =>
val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
val flattenType = metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
MetricWriteStep(mwName, accuracyTableName, flattenType) :: Nil
- }
case StreamingProcessType => Nil
}
// accuracy current steps
val transSteps1 = missRecordsTransStep :: missCountTransStep :: totalCountTransStep :: accuracyTransStep :: Nil
- val writeSteps1 = accuracyMetricWriteSteps ++ missRecordsWriteSteps ++ missRecordsUpdateWriteSteps
+ val writeSteps1 =
+ accuracyMetricWriteSteps ++ missRecordsWriteSteps ++ missRecordsUpdateWriteSteps
// streaming extra steps
val (transSteps2, writeSteps2) = procType match {
case BatchProcessType => (Nil, Nil)
- case StreamingProcessType => {
+ case StreamingProcessType =>
// 5. accuracy metric merge
val accuracyMetricTableName = "__accuracy"
val accuracyMetricRule = DataFrameOps._accuracy
@@ -183,14 +190,16 @@ case class AccuracyExpr2DQSteps(context: DQContext,
val accuracyRecordTransStep = SparkSqlTransformStep(
accuracyRecordTableName, accuracyRecordSql, emptyMap)
val accuracyRecordWriteStep = {
- val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(missRecordsTableName)
+ val rwName =
+ ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt)
+ .getOrElse(missRecordsTableName)
+
RecordWriteStep(rwName, missRecordsTableName, Some(accuracyRecordTableName))
}
// extra steps
(accuracyMetricTransStep :: accuracyRecordTransStep :: Nil,
accuracyMetricWriteStep :: accuracyRecordWriteStep :: Nil)
- }
}
// full steps
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
index 4852a5b..87cfa86 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
@@ -18,8 +18,8 @@ under the License.
*/
package org.apache.griffin.measure.step.builder.dsl.transform
-import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
+import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.step.DQStep
import org.apache.griffin.measure.step.builder.ConstantColumns
@@ -80,16 +80,23 @@ case class CompletenessExpr2DQSteps(context: DQContext,
val sourceAliasSql = {
s"SELECT ${selClause} FROM `${sourceName}`"
}
- val sourceAliasTransStep = SparkSqlTransformStep(sourceAliasTableName, sourceAliasSql, emptyMap, true)
+ val sourceAliasTransStep =
+ SparkSqlTransformStep(sourceAliasTableName, sourceAliasSql, emptyMap, true)
// 2. incomplete record
val incompleteRecordsTableName = "__incompleteRecords"
val completeWhereClause = aliases.map(a => s"`${a}` IS NOT NULL").mkString(" AND ")
val incompleteWhereClause = s"NOT (${completeWhereClause})"
- val incompleteRecordsSql = s"SELECT * FROM `${sourceAliasTableName}` WHERE ${incompleteWhereClause}"
- val incompleteRecordTransStep = SparkSqlTransformStep(incompleteRecordsTableName, incompleteRecordsSql, emptyMap, true)
+
+ val incompleteRecordsSql =
+ s"SELECT * FROM `${sourceAliasTableName}` WHERE ${incompleteWhereClause}"
+
+ val incompleteRecordTransStep =
+ SparkSqlTransformStep(incompleteRecordsTableName, incompleteRecordsSql, emptyMap, true)
val incompleteRecordWriteStep = {
- val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(incompleteRecordsTableName)
+ val rwName =
+ ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt)
+ .getOrElse(incompleteRecordsTableName)
RecordWriteStep(rwName, incompleteRecordsTableName)
}
@@ -97,17 +104,24 @@ case class CompletenessExpr2DQSteps(context: DQContext,
val incompleteCountTableName = "__incompleteCount"
val incompleteColName = details.getStringOrKey(_incomplete)
val incompleteCountSql = procType match {
- case BatchProcessType => s"SELECT COUNT(*) AS `${incompleteColName}` FROM `${incompleteRecordsTableName}`"
- case StreamingProcessType => s"SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${incompleteColName}` FROM `${incompleteRecordsTableName}` GROUP BY `${ConstantColumns.tmst}`"
+ case BatchProcessType =>
+ s"SELECT COUNT(*) AS `${incompleteColName}` FROM `${incompleteRecordsTableName}`"
+ case StreamingProcessType =>
+ s"SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${incompleteColName}` " +
+ s"FROM `${incompleteRecordsTableName}` GROUP BY `${ConstantColumns.tmst}`"
}
- val incompleteCountTransStep = SparkSqlTransformStep(incompleteCountTableName, incompleteCountSql, emptyMap)
+ val incompleteCountTransStep =
+ SparkSqlTransformStep(incompleteCountTableName, incompleteCountSql, emptyMap)
// 4. total count
val totalCountTableName = "__totalCount"
val totalColName = details.getStringOrKey(_total)
val totalCountSql = procType match {
- case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceAliasTableName}`"
- case StreamingProcessType => s"SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${totalColName}` FROM `${sourceAliasTableName}` GROUP BY `${ConstantColumns.tmst}`"
+ case BatchProcessType =>
+ s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceAliasTableName}`"
+ case StreamingProcessType =>
+ s"SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${totalColName}` " +
+ s"FROM `${sourceAliasTableName}` GROUP BY `${ConstantColumns.tmst}`"
}
val totalCountTransStep = SparkSqlTransformStep(totalCountTableName, totalCountSql, emptyMap)
@@ -115,15 +129,14 @@ case class CompletenessExpr2DQSteps(context: DQContext,
val completeTableName = ruleParam.getOutDfName()
val completeColName = details.getStringOrKey(_complete)
val completeMetricSql = procType match {
- case BatchProcessType => {
+ case BatchProcessType =>
s"""
|SELECT `${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
|coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 0) AS `${incompleteColName}`,
|(`${totalCountTableName}`.`${totalColName}` - coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 0)) AS `${completeColName}`
|FROM `${totalCountTableName}` LEFT JOIN `${incompleteCountTableName}`
""".stripMargin
- }
- case StreamingProcessType => {
+ case StreamingProcessType =>
s"""
|SELECT `${totalCountTableName}`.`${ConstantColumns.tmst}` AS `${ConstantColumns.tmst}`,
|`${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
@@ -132,7 +145,6 @@ case class CompletenessExpr2DQSteps(context: DQContext,
|FROM `${totalCountTableName}` LEFT JOIN `${incompleteCountTableName}`
|ON `${totalCountTableName}`.`${ConstantColumns.tmst}` = `${incompleteCountTableName}`.`${ConstantColumns.tmst}`
""".stripMargin
- }
}
val completeTransStep = SparkSqlTransformStep(completeTableName, completeMetricSql, emptyMap)
val completeWriteStep = {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
index 6c56a77..70fee6c 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
@@ -18,8 +18,8 @@ under the License.
*/
package org.apache.griffin.measure.step.builder.dsl.transform
-import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
+import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.step.DQStep
import org.apache.griffin.measure.step.builder.ConstantColumns
@@ -101,7 +101,8 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
val sourceAliasSql = {
s"SELECT ${selClause} FROM `${sourceName}`"
}
- val sourceAliasTransStep = SparkSqlTransformStep(sourceAliasTableName, sourceAliasSql, emptyMap, true)
+ val sourceAliasTransStep =
+ SparkSqlTransformStep(sourceAliasTableName, sourceAliasSql, emptyMap, true)
// 2. total metric
val totalTableName = "__totalMetric"
@@ -125,13 +126,14 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
|FROM `${sourceAliasTableName}` GROUP BY ${distAliasesClause}
""".stripMargin
}
- val selfGroupTransStep = SparkSqlTransformStep(selfGroupTableName, selfGroupSql, emptyMap, true)
+ val selfGroupTransStep =
+ SparkSqlTransformStep(selfGroupTableName, selfGroupSql, emptyMap, true)
val transSteps1 = sourceAliasTransStep :: totalTransStep :: selfGroupTransStep :: Nil
val writeSteps1 = totalMetricWriteStep :: Nil
val ((transSteps2, writeSteps2), dupCountTableName) = procType match {
- case StreamingProcessType if (withOlderTable) => {
+ case StreamingProcessType if (withOlderTable) =>
// 4.0 update old data
val targetDsUpdateWriteStep = DataSourceUpdateWriteStep(targetName, targetName)
@@ -200,14 +202,14 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
|FROM `${groupTableName}`
""".stripMargin
}
- val finalDupCountTransStep = SparkSqlTransformStep(finalDupCountTableName, finalDupCountSql, emptyMap, true)
+ val finalDupCountTransStep =
+ SparkSqlTransformStep(finalDupCountTableName, finalDupCountSql, emptyMap, true)
- ((olderAliasTransStep :: joinedTransStep :: groupTransStep :: finalDupCountTransStep :: Nil,
+ ((olderAliasTransStep :: joinedTransStep
+ :: groupTransStep :: finalDupCountTransStep :: Nil,
targetDsUpdateWriteStep :: Nil), finalDupCountTableName)
- }
- case _ => {
+ case _ =>
((Nil, Nil), selfGroupTableName)
- }
}
// 8. distinct metric
@@ -285,9 +287,13 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
|FROM `${dupItemsTableName}` GROUP BY ${groupSelClause}, `${dupColName}`
""".stripMargin
}
- val groupDupMetricTransStep = SparkSqlTransformStep(groupDupMetricTableName, groupDupMetricSql, emptyMap)
+ val groupDupMetricTransStep =
+ SparkSqlTransformStep(groupDupMetricTableName, groupDupMetricSql, emptyMap)
val groupDupMetricWriteStep = {
- MetricWriteStep(duplicationArrayName, groupDupMetricTableName, ArrayFlattenType, writeTimestampOpt)
+ MetricWriteStep(duplicationArrayName,
+ groupDupMetricTableName,
+ ArrayFlattenType,
+ writeTimestampOpt)
}
val msteps = {
@@ -306,7 +312,9 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
// 9. duplicate record
val dupRecordTableName = "__dupRecords"
val dupRecordSelClause = procType match {
- case StreamingProcessType if (withOlderTable) => s"${distAliasesClause}, `${dupColName}`, `${accuDupColName}`"
+ case StreamingProcessType if (withOlderTable) =>
+ s"${distAliasesClause}, `${dupColName}`, `${accuDupColName}`"
+
case _ => s"${distAliasesClause}, `${dupColName}`"
}
val dupRecordSql = {
@@ -315,9 +323,14 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
|FROM `${dupCountTableName}` WHERE `${dupColName}` > 0
""".stripMargin
}
- val dupRecordTransStep = SparkSqlTransformStep(dupRecordTableName, dupRecordSql, emptyMap, true)
+ val dupRecordTransStep =
+ SparkSqlTransformStep(dupRecordTableName, dupRecordSql, emptyMap, true)
+
val dupRecordWriteStep = {
- val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(dupRecordTableName)
+ val rwName =
+ ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt)
+ .getOrElse(dupRecordTableName)
+
RecordWriteStep(rwName, dupRecordTableName, None, writeTimestampOpt)
}
@@ -332,7 +345,12 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
}
val dupMetricTransStep = SparkSqlTransformStep(dupMetricTableName, dupMetricSql, emptyMap)
val dupMetricWriteStep = {
- MetricWriteStep(duplicationArrayName, dupMetricTableName, ArrayFlattenType, writeTimestampOpt)
+ MetricWriteStep(
+ duplicationArrayName,
+ dupMetricTableName,
+ ArrayFlattenType,
+ writeTimestampOpt
+ )
}
val msteps = {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala
index d986b56..492f4fd 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala
@@ -19,8 +19,8 @@ under the License.
package org.apache.griffin.measure.step.builder.dsl.transform
import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
+import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.context.{ContextId, DQContext, TimeRange}
import org.apache.griffin.measure.step.DQStep
import org.apache.griffin.measure.step.builder.dsl.expr.Expr
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
index ecc115c..af493af 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
@@ -19,8 +19,9 @@ under the License.
package org.apache.griffin.measure.step.builder.dsl.transform
import org.apache.commons.lang.StringUtils
-import org.apache.griffin.measure.configuration.enums.{BatchProcessType, FlattenType, MetricOutputType, StreamingProcessType}
+
import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
+import org.apache.griffin.measure.configuration.enums.{BatchProcessType, FlattenType, MetricOutputType, StreamingProcessType}
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.step.DQStep
import org.apache.griffin.measure.step.builder.ConstantColumns
@@ -63,7 +64,9 @@ case class ProfilingExpr2DQSteps(context: DQContext,
val analyzer = ProfilingAnalyzer(profilingExpr, sourceName)
val selExprDescs = analyzer.selectionExprs.map { sel =>
val alias = sel match {
- case s: AliasableExpr => s.alias.filter(StringUtils.isNotEmpty).map(a => s" AS `${a}`").getOrElse("")
+ case s: AliasableExpr =>
+ s.alias.filter(StringUtils.isNotEmpty).map(a => s" AS `${a}`").getOrElse("")
+
case _ => ""
}
s"${sel.desc}${alias}"
@@ -76,21 +79,22 @@ case class ProfilingExpr2DQSteps(context: DQContext,
val groupByClauseOpt = analyzer.groupbyExprOpt
val groupbyClause = procType match {
case BatchProcessType => groupByClauseOpt.map(_.desc).getOrElse("")
- case StreamingProcessType => {
- val tmstGroupbyClause = GroupbyClause(LiteralStringExpr(s"`${ConstantColumns.tmst}`") :: Nil, None)
+ case StreamingProcessType =>
+ val tmstGroupbyClause =
+ GroupbyClause(LiteralStringExpr(s"`${ConstantColumns.tmst}`") :: Nil, None)
val mergedGroubbyClause = tmstGroupbyClause.merge(groupByClauseOpt match {
case Some(gbc) => gbc
case _ => GroupbyClause(Nil, None)
})
mergedGroubbyClause.desc
- }
}
val preGroupbyClause = analyzer.preGroupbyExprs.map(_.desc).mkString(" ")
val postGroupbyClause = analyzer.postGroupbyExprs.map(_.desc).mkString(" ")
// 1. select statement
val profilingSql = {
- s"SELECT ${selCondition} ${selClause} ${fromClause} ${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}"
+ s"SELECT ${selCondition} ${selClause} " +
+ s"${fromClause} ${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}"
}
val profilingName = ruleParam.getOutDfName()
val profilingTransStep = SparkSqlTransformStep(profilingName, profilingSql, details)
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
index 8a3924b..3731da9 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
@@ -18,8 +18,8 @@ under the License.
*/
package org.apache.griffin.measure.step.builder.dsl.transform
-import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
+import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.step.DQStep
import org.apache.griffin.measure.step.builder.ConstantColumns
@@ -79,19 +79,17 @@ case class TimelinessExpr2DQSteps(context: DQContext,
// 1. in time
val inTimeTableName = "__inTime"
val inTimeSql = etsSelOpt match {
- case Some(etsSel) => {
+ case Some(etsSel) =>
s"""
|SELECT *, (${btsSel}) AS `${ConstantColumns.beginTs}`,
|(${etsSel}) AS `${ConstantColumns.endTs}`
|FROM ${sourceName} WHERE (${btsSel}) IS NOT NULL AND (${etsSel}) IS NOT NULL
""".stripMargin
- }
- case _ => {
+ case _ =>
s"""
|SELECT *, (${btsSel}) AS `${ConstantColumns.beginTs}`
|FROM ${sourceName} WHERE (${btsSel}) IS NOT NULL
""".stripMargin
- }
}
val inTimeTransStep = SparkSqlTransformStep(inTimeTableName, inTimeSql, emptyMap)
@@ -103,7 +101,8 @@ case class TimelinessExpr2DQSteps(context: DQContext,
case _ => ConstantColumns.tmst
}
val latencySql = {
- s"SELECT *, (`${etsColName}` - `${ConstantColumns.beginTs}`) AS `${latencyColName}` FROM `${inTimeTableName}`"
+ s"SELECT *, (`${etsColName}` - `${ConstantColumns.beginTs}`) AS `${latencyColName}` " +
+ s"FROM `${inTimeTableName}`"
}
val latencyTransStep = SparkSqlTransformStep(latencyTableName, latencySql, emptyMap, true)
@@ -112,14 +111,15 @@ case class TimelinessExpr2DQSteps(context: DQContext,
val totalColName = details.getStringOrKey(_total)
val avgColName = details.getStringOrKey(_avg)
val metricSql = procType match {
- case BatchProcessType => {
+
+ case BatchProcessType =>
s"""
|SELECT COUNT(*) AS `${totalColName}`,
|CAST(AVG(`${latencyColName}`) AS BIGINT) AS `${avgColName}`
|FROM `${latencyTableName}`
""".stripMargin
- }
- case StreamingProcessType => {
+
+ case StreamingProcessType =>
s"""
|SELECT `${ConstantColumns.tmst}`,
|COUNT(*) AS `${totalColName}`,
@@ -127,7 +127,6 @@ case class TimelinessExpr2DQSteps(context: DQContext,
|FROM `${latencyTableName}`
|GROUP BY `${ConstantColumns.tmst}`
""".stripMargin
- }
}
val metricTransStep = SparkSqlTransformStep(metricTableName, metricSql, emptyMap)
val metricWriteStep = {
@@ -143,24 +142,26 @@ case class TimelinessExpr2DQSteps(context: DQContext,
// 4. timeliness record
val (transSteps2, writeSteps2) = TimeUtil.milliseconds(details.getString(_threshold, "")) match {
- case Some(tsh) => {
+ case Some(tsh) =>
val recordTableName = "__lateRecords"
val recordSql = {
s"SELECT * FROM `${latencyTableName}` WHERE `${latencyColName}` > ${tsh}"
}
val recordTransStep = SparkSqlTransformStep(recordTableName, recordSql, emptyMap)
val recordWriteStep = {
- val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(recordTableName)
+ val rwName =
+ ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt)
+ .getOrElse(recordTableName)
+
RecordWriteStep(rwName, recordTableName, None)
}
(recordTransStep :: Nil, recordWriteStep :: Nil)
- }
case _ => (Nil, Nil)
}
// 5. ranges
val (transSteps3, writeSteps3) = TimeUtil.milliseconds(details.getString(_stepSize, "")) match {
- case Some(stepSize) => {
+ case Some(stepSize) =>
// 5.1 range
val rangeTableName = "__range"
val stepColName = details.getStringOrKey(_step)
@@ -176,26 +177,24 @@ case class TimelinessExpr2DQSteps(context: DQContext,
val rangeMetricTableName = "__rangeMetric"
val countColName = details.getStringOrKey(_count)
val rangeMetricSql = procType match {
- case BatchProcessType => {
+ case BatchProcessType =>
s"""
|SELECT `${stepColName}`, COUNT(*) AS `${countColName}`
|FROM `${rangeTableName}` GROUP BY `${stepColName}`
""".stripMargin
- }
- case StreamingProcessType => {
+ case StreamingProcessType =>
s"""
|SELECT `${ConstantColumns.tmst}`, `${stepColName}`, COUNT(*) AS `${countColName}`
|FROM `${rangeTableName}` GROUP BY `${ConstantColumns.tmst}`, `${stepColName}`
""".stripMargin
- }
}
- val rangeMetricTransStep = SparkSqlTransformStep(rangeMetricTableName, rangeMetricSql, emptyMap)
+ val rangeMetricTransStep =
+ SparkSqlTransformStep(rangeMetricTableName, rangeMetricSql, emptyMap)
val rangeMetricWriteStep = {
MetricWriteStep(stepColName, rangeMetricTableName, ArrayFlattenType)
}
(rangeTransStep :: rangeMetricTransStep :: Nil, rangeMetricWriteStep :: Nil)
- }
case _ => (Nil, Nil)
}
@@ -206,7 +205,8 @@ case class TimelinessExpr2DQSteps(context: DQContext,
val percentileColName = details.getStringOrKey(_percentileColPrefix)
val percentileCols = percentiles.map { pct =>
val pctName = (pct * 100).toInt.toString
- s"floor(percentile_approx(${latencyColName}, ${pct})) AS `${percentileColName}_${pctName}`"
+ s"floor(percentile_approx(${latencyColName}, ${pct})) " +
+ s"AS `${percentileColName}_${pctName}`"
}.mkString(", ")
val percentileSql = {
s"""
@@ -214,7 +214,9 @@ case class TimelinessExpr2DQSteps(context: DQContext,
|FROM `${latencyTableName}`
""".stripMargin
}
- val percentileTransStep = SparkSqlTransformStep(percentileTableName, percentileSql, emptyMap)
+ val percentileTransStep =
+ SparkSqlTransformStep(percentileTableName, percentileSql, emptyMap)
+
val percentileWriteStep = {
MetricWriteStep(percentileTableName, percentileTableName, DefaultFlattenType)
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
index 443239c..28e9d48 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
@@ -18,8 +18,8 @@ under the License.
*/
package org.apache.griffin.measure.step.builder.dsl.transform
-import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
+import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.step.DQStep
import org.apache.griffin.measure.step.builder.ConstantColumns
@@ -112,7 +112,8 @@ case class UniquenessExpr2DQSteps(context: DQContext,
}.mkString(", ")
val dupColName = details.getStringOrKey(_dup)
val groupSql = {
- s"SELECT ${groupSelClause}, (COUNT(*) - 1) AS `${dupColName}` FROM `${joinedTableName}` GROUP BY ${groupSelClause}"
+ s"SELECT ${groupSelClause}, (COUNT(*) - 1) AS `${dupColName}` " +
+ s"FROM `${joinedTableName}` GROUP BY ${groupSelClause}"
}
val groupTransStep = SparkSqlTransformStep(groupTableName, groupSql, emptyMap, true)
@@ -121,12 +122,11 @@ case class UniquenessExpr2DQSteps(context: DQContext,
val totalColName = details.getStringOrKey(_total)
val totalSql = procType match {
case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceName}`"
- case StreamingProcessType => {
+ case StreamingProcessType =>
s"""
|SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${totalColName}`
|FROM `${sourceName}` GROUP BY `${ConstantColumns.tmst}`
""".stripMargin
- }
}
val totalTransStep = SparkSqlTransformStep(totalTableName, totalSql, emptyMap)
val totalMetricWriteStep = MetricWriteStep(totalColName, totalTableName, EntriesFlattenType)
@@ -136,22 +136,25 @@ case class UniquenessExpr2DQSteps(context: DQContext,
val uniqueRecordSql = {
s"SELECT * FROM `${groupTableName}` WHERE `${dupColName}` = 0"
}
- val uniqueRecordTransStep = SparkSqlTransformStep(uniqueRecordTableName, uniqueRecordSql, emptyMap)
+ val uniqueRecordTransStep =
+ SparkSqlTransformStep(uniqueRecordTableName, uniqueRecordSql, emptyMap)
// 7. unique metric
val uniqueTableName = "__uniqueMetric"
val uniqueColName = details.getStringOrKey(_unique)
val uniqueSql = procType match {
- case BatchProcessType => s"SELECT COUNT(*) AS `${uniqueColName}` FROM `${uniqueRecordTableName}`"
- case StreamingProcessType => {
+ case BatchProcessType =>
+ s"SELECT COUNT(*) AS `${uniqueColName}` FROM `${uniqueRecordTableName}`"
+ case StreamingProcessType =>
s"""
|SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${uniqueColName}`
|FROM `${uniqueRecordTableName}` GROUP BY `${ConstantColumns.tmst}`
""".stripMargin
- }
}
val uniqueTransStep = SparkSqlTransformStep(uniqueTableName, uniqueSql, emptyMap)
- val uniqueMetricWriteStep = MetricWriteStep(uniqueColName, uniqueTableName, EntriesFlattenType)
+
+ val uniqueMetricWriteStep =
+ MetricWriteStep(uniqueColName, uniqueTableName, EntriesFlattenType)
val transSteps1 = sourceTransStep :: targetTransStep :: joinedTransStep :: groupTransStep ::
totalTransStep :: uniqueRecordTransStep :: uniqueTransStep :: Nil
@@ -164,9 +167,14 @@ case class UniquenessExpr2DQSteps(context: DQContext,
val dupRecordSql = {
s"SELECT * FROM `${groupTableName}` WHERE `${dupColName}` > 0"
}
- val dupRecordTransStep = SparkSqlTransformStep(dupRecordTableName, dupRecordSql, emptyMap, true)
+ val dupRecordTransStep =
+ SparkSqlTransformStep(dupRecordTableName, dupRecordSql, emptyMap, true)
+
val dupRecordWriteStep = {
- val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(dupRecordTableName)
+ val rwName =
+ ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt)
+ .getOrElse(dupRecordTableName)
+
RecordWriteStep(rwName, dupRecordTableName)
}
@@ -175,7 +183,9 @@ case class UniquenessExpr2DQSteps(context: DQContext,
val numColName = details.getStringOrKey(_num)
val dupMetricSelClause = procType match {
case BatchProcessType => s"`${dupColName}`, COUNT(*) AS `${numColName}`"
- case StreamingProcessType => s"`${ConstantColumns.tmst}`, `${dupColName}`, COUNT(*) AS `${numColName}`"
+
+ case StreamingProcessType =>
+ s"`${ConstantColumns.tmst}`, `${dupColName}`, COUNT(*) AS `${numColName}`"
}
val dupMetricGroupbyClause = procType match {
case BatchProcessType => s"`${dupColName}`"
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/AccuracyAnalyzer.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/AccuracyAnalyzer.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/AccuracyAnalyzer.scala
index b2a95ce..d57bf23 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/AccuracyAnalyzer.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/AccuracyAnalyzer.scala
@@ -21,9 +21,11 @@ package org.apache.griffin.measure.step.builder.dsl.transform.analyzer
import org.apache.griffin.measure.step.builder.dsl.expr._
-case class AccuracyAnalyzer(expr: LogicalExpr, sourceName: String, targetName: String) extends BasicAnalyzer {
+case class AccuracyAnalyzer(expr: LogicalExpr, sourceName: String, targetName: String)
+ extends BasicAnalyzer {
- val dataSourceNames = expr.preOrderTraverseDepthFirst(Set[String]())(seqDataSourceNames, combDataSourceNames)
+ val dataSourceNames =
+ expr.preOrderTraverseDepthFirst(Set[String]())(seqDataSourceNames, combDataSourceNames)
val sourceSelectionExprs = {
val seq = seqSelectionExprs(sourceName)
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/CompletenessAnalyzer.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/CompletenessAnalyzer.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/CompletenessAnalyzer.scala
index f8b872a..5a73bce 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/CompletenessAnalyzer.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/CompletenessAnalyzer.scala
@@ -21,7 +21,8 @@ package org.apache.griffin.measure.step.builder.dsl.transform.analyzer
import org.apache.griffin.measure.step.builder.dsl.expr._
-case class CompletenessAnalyzer(expr: CompletenessClause, sourceName: String) extends BasicAnalyzer {
+case class CompletenessAnalyzer(expr: CompletenessClause, sourceName: String)
+ extends BasicAnalyzer {
val seqAlias = (expr: Expr, v: Seq[String]) => {
expr match {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/DistinctnessAnalyzer.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/DistinctnessAnalyzer.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/DistinctnessAnalyzer.scala
index 9cee8ab..b1f4229 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/DistinctnessAnalyzer.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/DistinctnessAnalyzer.scala
@@ -21,8 +21,8 @@ package org.apache.griffin.measure.step.builder.dsl.transform.analyzer
import org.apache.griffin.measure.step.builder.dsl.expr._
-//case class DistinctnessAnalyzer(expr: DistinctnessClause, sourceName: String, targetName: String) extends BasicAnalyzer {
-case class DistinctnessAnalyzer(expr: DistinctnessClause, sourceName: String) extends BasicAnalyzer {
+case class DistinctnessAnalyzer(expr: DistinctnessClause, sourceName: String)
+ extends BasicAnalyzer {
val seqAlias = (expr: Expr, v: Seq[String]) => {
expr match {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/ProfilingAnalyzer.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/ProfilingAnalyzer.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/ProfilingAnalyzer.scala
index 049e6fd..f66d482 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/ProfilingAnalyzer.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/ProfilingAnalyzer.scala
@@ -23,7 +23,8 @@ import org.apache.griffin.measure.step.builder.dsl.expr._
case class ProfilingAnalyzer(expr: ProfilingClause, sourceName: String) extends BasicAnalyzer {
- val dataSourceNames = expr.preOrderTraverseDepthFirst(Set[String]())(seqDataSourceNames, combDataSourceNames)
+ val dataSourceNames =
+ expr.preOrderTraverseDepthFirst(Set[String]())(seqDataSourceNames, combDataSourceNames)
val selectionExprs: Seq[Expr] = {
expr.selectClause.exprs.map(_.extractSelf).flatMap { expr =>
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/UniquenessAnalyzer.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/UniquenessAnalyzer.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/UniquenessAnalyzer.scala
index 0976d6c..8a05d17 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/UniquenessAnalyzer.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/UniquenessAnalyzer.scala
@@ -21,7 +21,8 @@ package org.apache.griffin.measure.step.builder.dsl.transform.analyzer
import org.apache.griffin.measure.step.builder.dsl.expr.{AliasableExpr, _}
-case class UniquenessAnalyzer(expr: UniquenessClause, sourceName: String, targetName: String) extends BasicAnalyzer {
+case class UniquenessAnalyzer(expr: UniquenessClause, sourceName: String, targetName: String)
+ extends BasicAnalyzer {
val seqAlias = (expr: Expr, v: Seq[String]) => {
expr match {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcParamMaker.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcParamMaker.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcParamMaker.scala
index eac3b2b..713dc55 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcParamMaker.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcParamMaker.scala
@@ -26,9 +26,10 @@ import org.apache.griffin.measure.configuration.enums._
*/
object PreProcParamMaker {
- case class StringAnyMap(values:Map[String,Any])
+ case class StringAnyMap ( values: Map[String, Any] )
- def makePreProcRules(rules: Seq[RuleParam], suffix: String, dfName: String): (Seq[RuleParam], String) = {
+ def makePreProcRules(rules: Seq[RuleParam],
+ suffix: String, dfName: String): (Seq[RuleParam], String) = {
val len = rules.size
val (newRules, _) = rules.zipWithIndex.foldLeft((Nil: Seq[RuleParam], dfName)) { (ret, pair) =>
val (rls, prevOutDfName) = ret
@@ -47,10 +48,9 @@ object PreProcParamMaker {
val rpRule = rule.replaceInOutDfName(newInDfName, newOutDfName)
rule.getDslType match {
case DataFrameOpsType => rpRule
- case _ => {
+ case _ =>
val newRule = replaceDfNameSuffix(rule.getRule, rule.getInDfName(), suffix)
rpRule.replaceRule(newRule)
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/read/ReadStep.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/read/ReadStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/read/ReadStep.scala
index 8b1df82..9194da1 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/read/ReadStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/read/ReadStep.scala
@@ -18,9 +18,10 @@ under the License.
*/
package org.apache.griffin.measure.step.read
+import org.apache.spark.sql._
+
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.step.DQStep
-import org.apache.spark.sql._
trait ReadStep extends DQStep {
@@ -31,15 +32,13 @@ trait ReadStep extends DQStep {
def execute(context: DQContext): Boolean = {
info(s"read data source [${name}]")
read(context) match {
- case Some(df) => {
+ case Some(df) =>
// if (needCache) context.dataFrameCache.cacheDataFrame(name, df)
context.runTimeTableRegister.registerTable(name, df)
true
- }
- case _ => {
+ case _ =>
warn(s"read data source [${name}] fails")
false
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/read/UnionReadStep.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/read/UnionReadStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/read/UnionReadStep.scala
index 6dae1cb..88a39cf 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/read/UnionReadStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/read/UnionReadStep.scala
@@ -18,8 +18,9 @@ under the License.
*/
package org.apache.griffin.measure.step.read
-import org.apache.griffin.measure.context.DQContext
import org.apache.spark.sql._
+
+import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.utils.DataFrameUtil._
case class UnionReadStep(name: String,
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala
index 86b367c..088f328 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala
@@ -20,13 +20,14 @@ package org.apache.griffin.measure.step.transform
import java.util.Date
+import org.apache.spark.sql.{Encoders, Row, SQLContext, _}
+import org.apache.spark.sql.types.{BooleanType, LongType, StructField, StructType}
+
import org.apache.griffin.measure.context.ContextId
-import org.apache.griffin.measure.context.streaming.metric.CacheResults.CacheResult
import org.apache.griffin.measure.context.streaming.metric._
+import org.apache.griffin.measure.context.streaming.metric.CacheResults.CacheResult
import org.apache.griffin.measure.step.builder.ConstantColumns
import org.apache.griffin.measure.utils.ParamUtil._
-import org.apache.spark.sql.types.{BooleanType, LongType, StructField, StructType}
-import org.apache.spark.sql.{Encoders, Row, SQLContext, _}
/**
* pre-defined data frame operations
@@ -44,7 +45,9 @@ object DataFrameOps {
val _matched = "matched"
}
- def fromJson(sqlContext: SQLContext, inputDfName: String, details: Map[String, Any]): DataFrame = {
+ def fromJson(sqlContext: SQLContext,
+ inputDfName: String,
+ details: Map[String, Any]): DataFrame = {
val _colName = "col.name"
val colNameOpt = details.get(_colName).map(_.toString)
@@ -58,7 +61,10 @@ object DataFrameOps {
sqlContext.read.json(rdd) // slow process
}
- def accuracy(sqlContext: SQLContext, inputDfName: String, contextId: ContextId, details: Map[String, Any]): DataFrame = {
+ def accuracy(sqlContext: SQLContext,
+ inputDfName: String,
+ contextId: ContextId,
+ details: Map[String, Any]): DataFrame = {
import AccuracyOprKeys._
val miss = details.getStringOrKey(_miss)
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
index 5f99ed2..a2bf46e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
@@ -35,7 +35,9 @@ case class DataFrameOpsTransformStep(name: String,
try {
val df = rule match {
case DataFrameOps._fromJson => DataFrameOps.fromJson(sqlContext, inputDfName, details)
- case DataFrameOps._accuracy => DataFrameOps.accuracy(sqlContext, inputDfName, context.contextId, details)
+ case DataFrameOps._accuracy =>
+ DataFrameOps.accuracy(sqlContext, inputDfName, context.contextId, details)
+
case DataFrameOps._clear => DataFrameOps.clear(sqlContext, inputDfName, details)
case _ => throw new Exception(s"df opr [ ${rule} ] not supported")
}
@@ -43,10 +45,9 @@ case class DataFrameOpsTransformStep(name: String,
context.runTimeTableRegister.registerTable(name, df)
true
} catch {
- case e: Throwable => {
+ case e: Throwable =>
error(s"run data frame ops [ ${rule} ] error: ${e.getMessage}")
false
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
index ead7344..ca03f79 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
@@ -37,10 +37,9 @@ case class SparkSqlTransformStep(name: String,
context.runTimeTableRegister.registerTable(name, df)
true
} catch {
- case e: Throwable => {
+ case e: Throwable =>
error(s"run spark sql [ ${rule} ] error: ${e.getMessage}")
false
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
index 9415998..f34e003 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
@@ -19,9 +19,10 @@ under the License.
package org.apache.griffin.measure.step.write
import org.apache.commons.lang.StringUtils
-import org.apache.griffin.measure.context.DQContext
import org.apache.spark.sql.DataFrame
+import org.apache.griffin.measure.context.DQContext
+
/**
* update data source streaming cache
*/
@@ -34,12 +35,12 @@ case class DataSourceUpdateWriteStep(dsName: String,
def execute(context: DQContext): Boolean = {
getDataSourceCacheUpdateDf(context) match {
- case Some(df) => {
- context.dataSources.find(ds => StringUtils.equals(ds.name, dsName)).foreach(_.updateData(df))
- }
- case _ => {
+ case Some(df) =>
+ context.dataSources
+ .find(ds => StringUtils.equals(ds.name, dsName))
+ .foreach(_.updateData(df))
+ case _ =>
warn(s"update ${dsName} from ${inputName} fails")
- }
}
true
}
@@ -49,13 +50,13 @@ case class DataSourceUpdateWriteStep(dsName: String,
val df = context.sqlContext.table(s"`${name}`")
Some(df)
} catch {
- case e: Throwable => {
+ case e: Throwable =>
error(s"get data frame ${name} fails")
None
- }
}
}
- private def getDataSourceCacheUpdateDf(context: DQContext): Option[DataFrame] = getDataFrame(context, inputName)
+ private def getDataSourceCacheUpdateDf(context: DQContext): Option[DataFrame]
+ = getDataFrame(context, inputName)
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
index 8f7d01c..e787d96 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
@@ -36,10 +36,9 @@ case class MetricFlushStep() extends WriteStep {
context.getSink(t).sinkMetrics(metric)
true
} catch {
- case e: Throwable => {
+ case e: Throwable =>
error(s"flush metrics error: ${e.getMessage}")
false
- }
}
ret && pr
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
index 4771891..bc721f2 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
@@ -45,11 +45,12 @@ case class MetricWriteStep(name: String,
// get timestamp and normalize metric
val writeMode = writeTimestampOpt.map(_ => SimpleMode).getOrElse(context.writeMode)
val timestampMetricMap: Map[Long, Map[String, Any]] = writeMode match {
- case SimpleMode => {
+
+ case SimpleMode =>
val metrics: Map[String, Any] = flattenMetric(metricMaps, name, flattenType)
emptyMetricMap + (timestamp -> metrics)
- }
- case TimestampMode => {
+
+ case TimestampMode =>
val tmstMetrics = metricMaps.map { metric =>
val tmst = metric.getLong(ConstantColumns.tmst, timestamp)
val pureMetric = metric.removeKeys(ConstantColumns.columns)
@@ -61,7 +62,6 @@ case class MetricWriteStep(name: String,
val mtc = flattenMetric(maps, name, flattenType)
(k, mtc)
}
- }
}
// write to metric wrapper
@@ -88,10 +88,9 @@ case class MetricWriteStep(name: String,
}.toSeq
} else Nil
} catch {
- case e: Throwable => {
+ case e: Throwable =>
error(s"get metric ${name} fails")
Nil
- }
}
}
@@ -100,14 +99,12 @@ case class MetricWriteStep(name: String,
flattenType match {
case EntriesFlattenType => metrics.headOption.getOrElse(emptyMap)
case ArrayFlattenType => Map[String, Any]((name -> metrics))
- case MapFlattenType => {
+ case MapFlattenType =>
val v = metrics.headOption.getOrElse(emptyMap)
Map[String, Any]((name -> v))
- }
- case _ => {
+ case _ =>
if (metrics.size > 1) Map[String, Any]((name -> metrics))
else metrics.headOption.getOrElse(emptyMap)
- }
}
}