You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2018/06/13 07:28:11 UTC
incubator-griffin git commit: add validation of parameters
Repository: incubator-griffin
Updated Branches:
refs/heads/master 77aad4955 -> 060fc28b8
add validation of parameters
Author: Lionel Liu <bh...@163.com>
Author: dodobel <12...@qq.com>
Closes #297 from bhlx3lyx7/spark2.
Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/060fc28b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/060fc28b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/060fc28b
Branch: refs/heads/master
Commit: 060fc28b81320ebb44604e967f7f47076384b2e0
Parents: 77aad49
Author: Lionel Liu <bh...@163.com>
Authored: Wed Jun 13 15:28:06 2018 +0800
Committer: Lionel Liu <bh...@163.com>
Committed: Wed Jun 13 15:28:06 2018 +0800
----------------------------------------------------------------------
.../apache/griffin/measure/Application.scala | 23 +-
.../configuration/enums/NormalizeType.scala | 3 +-
.../measure/configuration/params/AllParam.scala | 36 ---
.../measure/configuration/params/DQConfig.scala | 217 +++++++++++++++++++
.../measure/configuration/params/DQParam.scala | 183 ----------------
.../configuration/params/EnvConfig.scala | 112 ++++++++++
.../measure/configuration/params/EnvParam.scala | 97 ---------
.../configuration/params/GriffinConfig.scala | 42 ++++
.../measure/configuration/params/Param.scala | 3 +-
.../params/reader/ParamFileReader.scala | 2 +-
.../params/reader/ParamJsonReader.scala | 2 +-
.../params/reader/ParamReader.scala | 10 +
.../validator/ParamValidator.scala | 38 ----
.../context/datasource/DataSourceFactory.scala | 8 +-
.../cache/StreamingCacheClientFactory.scala | 8 +-
.../datasource/connector/DataConnector.scala | 4 +-
.../connector/DataConnectorFactory.scala | 8 +-
.../batch/AvroBatchDataConnector.scala | 2 +-
.../batch/HiveBatchDataConnector.scala | 2 +-
.../batch/TextDirBatchDataConnector.scala | 2 +-
.../streaming/KafkaStreamingDataConnector.scala | 2 +-
.../streaming/info/InfoCacheFactory.scala | 4 +-
.../measure/context/writer/PersistFactory.scala | 4 +-
.../measure/job/builder/DQJobBuilder.scala | 8 +-
.../apache/griffin/measure/launch/DQApp.scala | 6 +-
.../measure/launch/batch/BatchDQApp.scala | 16 +-
.../launch/streaming/StreamingDQApp.scala | 26 +--
.../measure/step/builder/DQStepBuilder.scala | 4 +-
.../builder/DataSourceParamStepBuilder.scala | 2 +-
.../step/builder/RuleParamStepBuilder.scala | 12 +-
.../dsl/transform/AccuracyExpr2DQSteps.scala | 16 +-
.../transform/CompletenessExpr2DQSteps.scala | 8 +-
.../transform/DistinctnessExpr2DQSteps.scala | 4 +-
.../dsl/transform/ProfilingExpr2DQSteps.scala | 5 +-
.../dsl/transform/TimelinessExpr2DQSteps.scala | 7 +-
.../dsl/transform/UniquenessExpr2DQSteps.scala | 2 +-
.../preproc/PreProcRuleParamGenerator.scala | 6 +-
37 files changed, 475 insertions(+), 459 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/Application.scala b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
index dbbe970..566e8df 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/Application.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
@@ -20,9 +20,7 @@ package org.apache.griffin.measure
import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.configuration.params.reader.ParamReaderFactory
-import org.apache.griffin.measure.configuration.params.{AllParam, DQParam, EnvParam, Param}
-import org.apache.griffin.measure.configuration.validator.ParamValidator
-import org.apache.griffin.measure.context.writer.PersistTaskRunner
+import org.apache.griffin.measure.configuration.params.{GriffinConfig, DQConfig, EnvConfig, Param}
import org.apache.griffin.measure.launch.DQApp
import org.apache.griffin.measure.launch.batch.BatchDQApp
import org.apache.griffin.measure.launch.streaming.StreamingDQApp
@@ -48,35 +46,24 @@ object Application extends Loggable {
info(dqParamFile)
// read param files
- val envParam = readParamFile[EnvParam](envParamFile) match {
+ val envParam = readParamFile[EnvConfig](envParamFile) match {
case Success(p) => p
case Failure(ex) => {
error(ex.getMessage)
sys.exit(-2)
}
}
- val dqParam = readParamFile[DQParam](dqParamFile) match {
+ val dqParam = readParamFile[DQConfig](dqParamFile) match {
case Success(p) => p
case Failure(ex) => {
error(ex.getMessage)
sys.exit(-2)
}
}
- val allParam: AllParam = AllParam(envParam, dqParam)
-
- // validate param files
- ParamValidator.validate(allParam) match {
- case Failure(ex) => {
- error(ex.getMessage)
- sys.exit(-3)
- }
- case _ => {
- info("params validation pass")
- }
- }
+ val allParam: GriffinConfig = GriffinConfig(envParam, dqParam)
// choose process
- val procType = ProcessType(allParam.dqParam.procType)
+ val procType = ProcessType(allParam.getDqConfig.procType)
val dqApp: DQApp = procType match {
case BatchProcessType => BatchDQApp(allParam)
case StreamingProcessType => StreamingDQApp(allParam)
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/NormalizeType.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/NormalizeType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/NormalizeType.scala
index b353cbc..61bf27c 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/NormalizeType.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/NormalizeType.scala
@@ -30,11 +30,12 @@ sealed trait NormalizeType {
object NormalizeType {
private val normalizeTypes: List[NormalizeType] = List(DefaultNormalizeType, EntriesNormalizeType, ArrayNormalizeType, MapNormalizeType)
+ val default = DefaultNormalizeType
def apply(ptn: String): NormalizeType = {
normalizeTypes.find(tp => ptn match {
case tp.idPattern() => true
case _ => false
- }).getOrElse(DefaultNormalizeType)
+ }).getOrElse(default)
}
def unapply(pt: NormalizeType): Option[String] = Some(pt.desc)
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/AllParam.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/AllParam.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/AllParam.scala
deleted file mode 100644
index 4ba1a15..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/AllParam.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.configuration.params
-
-import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-
-/**
- * full set of griffin configuration
- * @param envParam environment configuration (must)
- * @param dqParam dq measurement configuration (must)
- */
-@JsonInclude(Include.NON_NULL)
-case class AllParam( @JsonProperty("env") envParam: EnvParam,
- @JsonProperty("dq") dqParam: DQParam
- ) extends Param {
- override def validate(): Boolean = {
- envParam != null && dqParam != null
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/DQConfig.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/DQConfig.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/DQConfig.scala
new file mode 100644
index 0000000..d07ab51
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/DQConfig.scala
@@ -0,0 +1,217 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.configuration.params
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import org.apache.commons.lang.StringUtils
+import org.apache.griffin.measure.configuration.enums._
+
+/**
+ * dq param
+ * @param name name of dq measurement (must)
+ * @param timestamp default timestamp of measure in batch mode (optional)
+ * @param procType batch mode or streaming mode (must)
+ * @param dataSources data sources (must)
+ * @param evaluateRule dq measurement (must)
+ */
+@JsonInclude(Include.NON_NULL)
+case class DQConfig(@JsonProperty("name") name: String,
+ @JsonProperty("timestamp") timestamp: Long,
+ @JsonProperty("process.type") procType: String,
+ @JsonProperty("data.sources") dataSources: List[DataSourceParam],
+ @JsonProperty("evaluate.rule") evaluateRule: EvaluateRuleParam
+ ) extends Param {
+ def getName: String = name
+ def getTimestamp: Long = timestamp
+ def getProcType: String = procType
+ def getDataSources: Seq[DataSourceParam] = {
+ dataSources.foldLeft((Nil: Seq[DataSourceParam], Set[String]())) { (ret, ds) =>
+ val (seq, names) = ret
+ if (!names.contains(ds.getName)){
+ (seq :+ ds, names + ds.getName)
+ } else ret
+ }._1
+ }
+ def getEvaluateRule: EvaluateRuleParam = evaluateRule
+
+ def validate(): Unit = {
+ assert(StringUtils.isNotBlank(name), "dq config name should not be blank")
+ assert(StringUtils.isNotBlank(procType), "process.type should not be blank")
+ assert((dataSources != null), "data.sources should not be null")
+ assert((evaluateRule != null), "evaluate.rule should not be null")
+ getDataSources.foreach(_.validate)
+ evaluateRule.validate
+ }
+}
+
+/**
+ * data source param
+ * @param name data source name (must)
+ * @param connectors data connectors (optional)
+ * @param cache data source cache configuration (must in streaming mode with streaming connectors)
+ */
+@JsonInclude(Include.NON_NULL)
+case class DataSourceParam( @JsonProperty("name") name: String,
+ @JsonProperty("connectors") connectors: List[DataConnectorParam],
+ @JsonProperty("cache") cache: Map[String, Any]
+ ) extends Param {
+ def getName: String = name
+ def getConnectors: Seq[DataConnectorParam] = if (connectors != null) connectors else Nil
+ def getCacheOpt: Option[Map[String, Any]] = if (cache != null) Some(cache) else None
+
+ def validate(): Unit = {
+ assert(StringUtils.isNotBlank(name), "data source name should not be empty")
+ getConnectors.foreach(_.validate)
+ }
+}
+
+/**
+ * data connector param
+ * @param conType data connector type, e.g.: hive, avro, kafka (must)
+ * @param version data connector type version (optional)
+ * @param config detail configuration of data connector (must)
+ * @param preProc pre-process rules after load data (optional)
+ */
+@JsonInclude(Include.NON_NULL)
+case class DataConnectorParam( @JsonProperty("type") conType: String,
+ @JsonProperty("version") version: String,
+ @JsonProperty("config") config: Map[String, Any],
+ @JsonProperty("pre.proc") preProc: List[RuleParam]
+ ) extends Param {
+ def getType: String = conType
+ def getVersion: String = version
+ def getConfig: Map[String, Any] = if (config != null) config else Map[String, Any]()
+ def getPreProcRules: Seq[RuleParam] = if (preProc != null) preProc else Nil
+
+ def validate(): Unit = {
+ assert(StringUtils.isNotBlank(conType), "data connector type should not be empty")
+ getPreProcRules.foreach(_.validate)
+ }
+}
+
+/**
+ * evaluate rule param
+ * @param rules rules to define dq measurement (optional)
+ */
+@JsonInclude(Include.NON_NULL)
+case class EvaluateRuleParam( @JsonProperty("rules") rules: List[RuleParam]
+ ) extends Param {
+ def getRules: Seq[RuleParam] = if (rules != null) rules else Nil
+
+ def validate(): Unit = {
+ getRules.foreach(_.validate)
+ }
+}
+
+/**
+ * rule param
+ * @param dslType dsl type of this rule (must)
+ * @param dqType dq type of this rule (must if dsl type is "griffin-dsl")
+ * @param name name of result calculated by this rule (must if for later usage)
+ * @param rule rule to define dq step calculation (must)
+ * @param details detail config of rule (optional)
+ * @param cache cache the result for multiple usage (optional, valid for "spark-sql" and "df-opr" mode)
+ * @param metric config for metric output (optional)
+ * @param record config for record output (optional)
+ * @param dsCacheUpdate config for data source cache update output (optional, valid in streaming mode)
+ */
+@JsonInclude(Include.NON_NULL)
+case class RuleParam( @JsonProperty("dsl.type") dslType: String,
+ @JsonProperty("dq.type") dqType: String,
+ @JsonProperty("name") name: String,
+ @JsonProperty("rule") rule: String,
+ @JsonProperty("details") details: Map[String, Any],
+ @JsonProperty("cache") cache: Boolean,
+ @JsonProperty("metric") metric: RuleMetricParam,
+ @JsonProperty("record") record: RuleRecordParam,
+ @JsonProperty("ds.cache.update") dsCacheUpdate: RuleDsCacheUpdateParam
+ ) extends Param {
+ def getDslType: DslType = if (dslType != null) DslType(dslType) else DslType("")
+ def getDqType: DqType = if (dqType != null) DqType(dqType) else DqType("")
+ def getCache: Boolean = if (cache != null) cache else false
+
+ def getName: String = if (name != null) name else ""
+ def getRule: String = if (rule != null) rule else ""
+ def getDetails: Map[String, Any] = if (details != null) details else Map[String, Any]()
+
+ def getMetricOpt: Option[RuleMetricParam] = if (metric != null) Some(metric) else None
+ def getRecordOpt: Option[RuleRecordParam] = if (record != null) Some(record) else None
+ def getDsCacheUpdateOpt: Option[RuleDsCacheUpdateParam] = if (dsCacheUpdate != null) Some(dsCacheUpdate) else None
+
+ def replaceName(newName: String): RuleParam = {
+ if (StringUtils.equals(newName, name)) this
+ else RuleParam(dslType, dqType, newName, rule, details, cache, metric, record, dsCacheUpdate)
+ }
+ def replaceRule(newRule: String): RuleParam = {
+ if (StringUtils.equals(newRule, rule)) this
+ else RuleParam(dslType, dqType, name, newRule, details, cache, metric, record, dsCacheUpdate)
+ }
+ def replaceDetails(newDetails: Map[String, Any]): RuleParam = {
+ RuleParam(dslType, dqType, name, rule, newDetails, cache, metric, record, dsCacheUpdate)
+ }
+
+ def validate(): Unit = {
+ assert(!(getDslType.equals(GriffinDslType) && getDqType.equals(UnknownType)),
+ "unknown dq type for griffin dsl")
+
+ getMetricOpt.foreach(_.validate)
+ getRecordOpt.foreach(_.validate)
+ getDsCacheUpdateOpt.foreach(_.validate)
+ }
+}
+
+/**
+ * metric param of rule
+ * @param name name of metric to output (optional)
+ * @param collectType the normalize strategy to collect metric (optional)
+ */
+@JsonInclude(Include.NON_NULL)
+case class RuleMetricParam( @JsonProperty("name") name: String,
+ @JsonProperty("collect.type") collectType: String
+ ) extends Param {
+ def getNameOpt: Option[String] = if (StringUtils.isNotBlank(name)) Some(name) else None
+ def getCollectType: NormalizeType = if (StringUtils.isNotBlank(collectType)) NormalizeType(collectType) else NormalizeType("")
+
+ def validate(): Unit = {}
+}
+
+/**
+ * record param of rule
+ * @param name name of record to output (optional)
+ */
+@JsonInclude(Include.NON_NULL)
+case class RuleRecordParam( @JsonProperty("name") name: String
+ ) extends Param {
+ def getNameOpt: Option[String] = if (StringUtils.isNotBlank(name)) Some(name) else None
+
+ def validate(): Unit = {}
+}
+
+/**
+ * data source cache update param of rule
+ * @param dsName name of data source to be updated by thie rule result (must)
+ */
+@JsonInclude(Include.NON_NULL)
+case class RuleDsCacheUpdateParam( @JsonProperty("ds.name") dsName: String
+ ) extends Param {
+ def getDsNameOpt: Option[String] = if (StringUtils.isNotBlank(dsName)) Some(dsName) else None
+
+ def validate(): Unit = {}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/DQParam.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/DQParam.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/DQParam.scala
deleted file mode 100644
index 8d3c354..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/DQParam.scala
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.configuration.params
-
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
-import org.apache.commons.lang.StringUtils
-import org.apache.griffin.measure.configuration.enums.{DqType, DslType}
-
-/**
- * dq param
- * @param name name of dq measurement (must)
- * @param timestamp default timestamp of measure in batch mode (optional)
- * @param procType batch mode or streaming mode (must)
- * @param dataSourceParams data sources (optional)
- * @param evaluateRuleParam dq measurement (optional)
- */
-@JsonInclude(Include.NON_NULL)
-case class DQParam( @JsonProperty("name") name: String,
- @JsonProperty("timestamp") timestamp: Long,
- @JsonProperty("process.type") procType: String,
- @JsonProperty("data.sources") dataSourceParams: List[DataSourceParam],
- @JsonProperty("evaluate.rule") evaluateRuleParam: EvaluateRuleParam
- ) extends Param {
- val dataSources = {
- val (validDsParams, _) = dataSourceParams.foldLeft((Nil: Seq[DataSourceParam], Set[String]())) { (ret, dsParam) =>
- val (seq, names) = ret
- if (dsParam.hasName && !names.contains(dsParam.name)) {
- (seq :+ dsParam, names + dsParam.name)
- } else ret
- }
- validDsParams
- }
- val evaluateRule: EvaluateRuleParam = {
- if (evaluateRuleParam != null) evaluateRuleParam
- else EvaluateRuleParam("", Nil)
- }
-
- override def validate(): Boolean = {
- dataSources.nonEmpty
- }
-}
-
-/**
- * data source param
- * @param name data source name (must)
- * @param connectors data connectors (optional)
- * @param cache data source cache configuration (must in streaming mode with streaming connectors)
- */
-@JsonInclude(Include.NON_NULL)
-case class DataSourceParam( @JsonProperty("name") name: String,
- @JsonProperty("connectors") connectors: List[DataConnectorParam],
- @JsonProperty("cache") cache: Map[String, Any]
- ) extends Param {
- def hasName: Boolean = StringUtils.isNotBlank(name)
- def getConnectors: List[DataConnectorParam] = if (connectors != null) connectors else Nil
- def hasCache: Boolean = (cache != null)
-
- override def validate(): Boolean = hasName
-}
-
-/**
- * data connector param
- * @param conType data connector type, e.g.: hive, avro, kafka (must)
- * @param version data connector type version (optional)
- * @param config detail configuration of data connector (must)
- * @param preProc pre-process rules after load data (optional)
- */
-@JsonInclude(Include.NON_NULL)
-case class DataConnectorParam( @JsonProperty("type") conType: String,
- @JsonProperty("version") version: String,
- @JsonProperty("config") config: Map[String, Any],
- @JsonProperty("pre.proc") preProc: List[RuleParam]
- ) extends Param {
- override def validate(): Boolean = {
- StringUtils.isNotBlank(conType)
- }
-}
-
-/**
- * evaluate rule param
- * @param dslType default dsl type for all rules (optional)
- * @param rules rules to define dq measurement (optional)
- */
-@JsonInclude(Include.NON_NULL)
-case class EvaluateRuleParam( @JsonProperty("dsl.type") dslType: String,
- @JsonProperty("rules") rules: List[RuleParam]
- ) extends Param {
- def getDslType: DslType = if (dslType != null) DslType(dslType) else DslType("")
- def getRules: List[RuleParam] = if (rules != null) rules else Nil
-}
-
-/**
- * rule param
- * @param dslType dsl type of this rule (must if default dsl type not set)
- * @param dqType dq type of this rule (valid for "griffin-dsl")
- * @param name name of result calculated by this rule (must if for later usage)
- * @param rule rule to define dq step calculation (must)
- * @param details detail config of rule (optional)
- * @param cache cache the result for multiple usage (optional, valid for "spark-sql" and "df-opr" mode)
- * @param metric config for metric output (optional)
- * @param record config for record output (optional)
- * @param dsCacheUpdate config for data source cache update output (optional, valid in streaming mode)
- */
-@JsonInclude(Include.NON_NULL)
-case class RuleParam( @JsonProperty("dsl.type") dslType: String,
- @JsonProperty("dq.type") dqType: String,
- @JsonProperty("name") name: String,
- @JsonProperty("rule") rule: String,
- @JsonProperty("details") details: Map[String, Any],
- @JsonProperty("cache") cache: Boolean,
- @JsonProperty("metric") metric: RuleMetricParam,
- @JsonProperty("record") record: RuleRecordParam,
- @JsonProperty("ds.cache.update") dsCacheUpdate: RuleDsCacheUpdateParam
- ) extends Param {
- def getDslType(defaultDslType: DslType): DslType = if (dslType != null) DslType(dslType) else defaultDslType
- def getDqType: DqType = if (dqType != null) DqType(dqType) else DqType("")
- def getName: String = if (name != null) name else ""
- def getRule: String = if (rule != null) rule else ""
- def getDetails: Map[String, Any] = if (details != null) details else Map[String, Any]()
- def getCache: Boolean = if (cache != null) cache else false
-
- def metricOpt: Option[RuleMetricParam] = if (metric != null) Some(metric) else None
- def recordOpt: Option[RuleRecordParam] = if (record != null) Some(record) else None
- def dsCacheUpdateOpt: Option[RuleDsCacheUpdateParam] = if (dsCacheUpdate != null) Some(dsCacheUpdate) else None
-
- def replaceName(newName: String): RuleParam = {
- if (StringUtils.equals(newName, name)) this
- else RuleParam(dslType, dqType, newName, rule, details, cache, metric, record, dsCacheUpdate)
- }
- def replaceRule(newRule: String): RuleParam = {
- if (StringUtils.equals(newRule, rule)) this
- else RuleParam(dslType, dqType, name, newRule, details, cache, metric, record, dsCacheUpdate)
- }
- def replaceDetails(newDetails: Map[String, Any]): RuleParam = {
- RuleParam(dslType, dqType, name, rule, newDetails, cache, metric, record, dsCacheUpdate)
- }
-}
-
-/**
- * metric param of rule
- * @param name name of metric to output (optional)
- * @param collectType the normalize strategy to collect metric (optional)
- */
-@JsonInclude(Include.NON_NULL)
-case class RuleMetricParam( @JsonProperty("name") name: String,
- @JsonProperty("collect.type") collectType: String
- ) extends Param {
-}
-
-/**
- * record param of rule
- * @param name name of record to output (optional)
- */
-@JsonInclude(Include.NON_NULL)
-case class RuleRecordParam( @JsonProperty("name") name: String
- ) extends Param {
-}
-
-/**
- * data source cache update param of rule
- * @param dsName name of data source to be updated by thie rule result (must)
- */
-@JsonInclude(Include.NON_NULL)
-case class RuleDsCacheUpdateParam( @JsonProperty("ds.name") dsName: String
- ) extends Param {
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvConfig.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvConfig.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvConfig.scala
new file mode 100644
index 0000000..bc6d50f
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvConfig.scala
@@ -0,0 +1,112 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.configuration.params
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import org.apache.commons.lang.StringUtils
+import org.apache.griffin.measure.utils.TimeUtil
+
+/**
+ * environment param
+ * @param sparkParam config of spark environment (must)
+ * @param persistParams config of persist ways (optional)
+ * @param infoCacheParams config of information cache ways (required in streaming mode)
+ */
+@JsonInclude(Include.NON_NULL)
+case class EnvConfig(@JsonProperty("spark") sparkParam: SparkParam,
+ @JsonProperty("persist") persistParams: List[PersistParam],
+ @JsonProperty("info.cache") infoCacheParams: List[InfoCacheParam]
+ ) extends Param {
+ def getSparkParam: SparkParam = sparkParam
+ def getPersistParams: Seq[PersistParam] = if (persistParams != null) persistParams else Nil
+ def getInfoCacheParams: Seq[InfoCacheParam] = if (infoCacheParams != null) infoCacheParams else Nil
+
+ def validate(): Unit = {
+ assert((sparkParam != null), "spark param should not be null")
+ sparkParam.validate
+ getPersistParams.foreach(_.validate)
+ getInfoCacheParams.foreach(_.validate)
+ }
+}
+
+/**
+ * spark param
+ * @param logLevel log level of spark application (optional)
+ * @param cpDir checkpoint directory for spark streaming (required in streaming mode)
+ * @param batchInterval batch interval for spark streaming (required in streaming mode)
+ * @param processInterval process interval for streaming dq calculation (required in streaming mode)
+ * @param config extra config for spark environment (optional)
+ * @param initClear clear checkpoint directory or not when initial (optional)
+ */
+@JsonInclude(Include.NON_NULL)
+case class SparkParam( @JsonProperty("log.level") logLevel: String,
+ @JsonProperty("checkpoint.dir") cpDir: String,
+ @JsonProperty("batch.interval") batchInterval: String,
+ @JsonProperty("process.interval") processInterval: String,
+ @JsonProperty("config") config: Map[String, String],
+ @JsonProperty("init.clear") initClear: Boolean
+ ) extends Param {
+ def getLogLevel: String = if (logLevel != null) logLevel else "WARN"
+ def getCpDir: String = if (cpDir != null) cpDir else ""
+ def getBatchInterval: String = if (batchInterval != null) batchInterval else ""
+ def getProcessInterval: String = if (processInterval != null) processInterval else ""
+ def getConfig: Map[String, String] = if (config != null) config else Map[String, String]()
+ def needInitClear: Boolean = if (initClear != null) initClear else false
+
+ def validate(): Unit = {
+// assert(StringUtils.isNotBlank(cpDir), "checkpoint.dir should not be empty")
+// assert(TimeUtil.milliseconds(getBatchInterval).nonEmpty, "batch.interval should be valid time string")
+// assert(TimeUtil.milliseconds(getProcessInterval).nonEmpty, "process.interval should be valid time string")
+ }
+}
+
+/**
+ * persist param
+ * @param persistType persist type, e.g.: log, hdfs, http, mongo (must)
+ * @param config config of persist way (must)
+ */
+@JsonInclude(Include.NON_NULL)
+case class PersistParam( @JsonProperty("type") persistType: String,
+ @JsonProperty("config") config: Map[String, Any]
+ ) extends Param {
+ def getType: String = persistType
+ def getConfig: Map[String, Any] = if (config != null) config else Map[String, Any]()
+
+ def validate(): Unit = {
+ assert(StringUtils.isNotBlank(persistType), "persist type should not be empty")
+ }
+}
+
+/**
+ * info cache param
+ * @param cacheType information cache type, e.g.: zookeeper (must)
+ * @param config config of cache way
+ */
+@JsonInclude(Include.NON_NULL)
+case class InfoCacheParam( @JsonProperty("type") cacheType: String,
+ @JsonProperty("config") config: Map[String, Any]
+ ) extends Param {
+ def getType: String = cacheType
+ def getConfig: Map[String, Any] = if (config != null) config else Map[String, Any]()
+
+ def validate(): Unit = {
+ assert(StringUtils.isNotBlank(cacheType), "info cache type should not be empty")
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvParam.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvParam.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvParam.scala
deleted file mode 100644
index 5ee0610..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvParam.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.configuration.params
-
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
-import org.apache.commons.lang.StringUtils
-
-/**
- * environment param
- * @param sparkParam config of spark environment (must)
- * @param persistParams config of persist ways (optional)
- * @param infoCacheParams config of information cache ways (must in streaming mode)
- * @param cleanerParam config of cleaner (optional)
- */
-@JsonInclude(Include.NON_NULL)
-case class EnvParam( @JsonProperty("spark") sparkParam: SparkParam,
- @JsonProperty("persist") persistParams: List[PersistParam],
- @JsonProperty("info.cache") infoCacheParams: List[InfoCacheParam],
- @JsonProperty("cleaner") cleanerParam: CleanerParam
- ) extends Param {
-}
-
-/**
- * spark param
- * @param logLevel log level of spark application (optional)
- * @param cpDir checkpoint directory for spark streaming (must in streaming mode)
- * @param batchInterval batch interval for spark streaming (must in streaming mode)
- * @param processInterval process interval for streaming dq calculation (must in streaming mode)
- * @param config extra config for spark environment (optional)
- * @param initClear clear checkpoint directory or not when initial (optional)
- */
-@JsonInclude(Include.NON_NULL)
-case class SparkParam( @JsonProperty("log.level") logLevel: String,
- @JsonProperty("checkpoint.dir") cpDir: String,
- @JsonProperty("batch.interval") batchInterval: String,
- @JsonProperty("process.interval") processInterval: String,
- @JsonProperty("config") config: Map[String, String],
- @JsonProperty("init.clear") initClear: Boolean
- ) extends Param {
- def getLogLevel: String = if (logLevel != null) logLevel else "WARN"
- def needInitClear: Boolean = if (initClear != null) initClear else false
-}
-
-/**
- * persist param
- * @param persistType persist type, e.g.: log, hdfs, http, mongo (must)
- * @param config config of persist way (must)
- */
-@JsonInclude(Include.NON_NULL)
-case class PersistParam( @JsonProperty("type") persistType: String,
- @JsonProperty("config") config: Map[String, Any]
- ) extends Param {
- override def validate(): Boolean = {
- StringUtils.isNotBlank(persistType)
- }
-}
-
-/**
- * info cache param
- * @param cacheType information cache type, e.g.: zookeeper (must)
- * @param config config of cache way
- */
-@JsonInclude(Include.NON_NULL)
-case class InfoCacheParam( @JsonProperty("type") cacheType: String,
- @JsonProperty("config") config: Map[String, Any]
- ) extends Param {
- override def validate(): Boolean = {
- StringUtils.isNotBlank(cacheType)
- }
-}
-
-/**
- * cleaner param, invalid at current
- * @param cleanInterval clean interval (optional)
- */
-@JsonInclude(Include.NON_NULL)
-case class CleanerParam( @JsonProperty("clean.interval") cleanInterval: String
- ) extends Param {
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/GriffinConfig.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/GriffinConfig.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/GriffinConfig.scala
new file mode 100644
index 0000000..8debe48
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/GriffinConfig.scala
@@ -0,0 +1,42 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.configuration.params
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+
+/**
+ * full set of griffin configuration
+ * @param envConfig environment configuration (must)
+ * @param dqConfig dq measurement configuration (must)
+ */
+@JsonInclude(Include.NON_NULL)
+case class GriffinConfig(@JsonProperty("env") envConfig: EnvConfig,
+ @JsonProperty("dq") dqConfig: DQConfig
+ ) extends Param {
+ def getEnvConfig: EnvConfig = envConfig
+ def getDqConfig: DQConfig = dqConfig
+
+ def validate(): Unit = {
+ assert((envConfig != null), "environment config should not be null")
+ assert((dqConfig != null), "dq config should not be null")
+ envConfig.validate
+ dqConfig.validate
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/Param.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/Param.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/Param.scala
index 87ad246..6116cdf 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/Param.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/Param.scala
@@ -22,8 +22,7 @@ trait Param extends Serializable {
/**
* validate param internally
- * @return
*/
- def validate(): Boolean = true
+ def validate(): Unit
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamFileReader.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamFileReader.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamFileReader.scala
index 0ff5b06..a528127 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamFileReader.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamFileReader.scala
@@ -36,7 +36,7 @@ case class ParamFileReader(filePath: String) extends ParamReader {
val source = HdfsUtil.openFile(filePath)
val param = JsonUtil.fromJson[T](source)
source.close
- param
+ validate(param)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamJsonReader.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamJsonReader.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamJsonReader.scala
index 7a7eaf6..91ffa9a 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamJsonReader.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamJsonReader.scala
@@ -32,7 +32,7 @@ case class ParamJsonReader(jsonString: String) extends ParamReader {
def readConfig[T <: Param](implicit m : Manifest[T]): Try[T] = {
Try {
val param = JsonUtil.fromJson[T](jsonString)
- param
+ validate(param)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamReader.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamReader.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamReader.scala
index 77134ea..21ebccd 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamReader.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamReader.scala
@@ -32,4 +32,14 @@ trait ParamReader extends Loggable with Serializable {
*/
def readConfig[T <: Param](implicit m : Manifest[T]): Try[T]
+ /**
+ * validate config param
+ * @param param param to be validated
+ * @return param itself
+ */
+ protected def validate[T <: Param](param: T): T = {
+ param.validate()
+ param
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/configuration/validator/ParamValidator.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/validator/ParamValidator.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/validator/ParamValidator.scala
deleted file mode 100644
index 4c9a4d6..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/validator/ParamValidator.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.configuration.validator
-
-import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.configuration.params._
-
-import scala.util.Try
-
-object ParamValidator extends Loggable with Serializable {
-
- /**
- * validate param
- * @param param param to be validated
- * @tparam T type of param
- * @return param valid or not
- */
- def validate[T <: Param](param: T): Try[Boolean] = Try {
- param.validate
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala
index 95c8de7..edd88b6 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala
@@ -32,9 +32,9 @@ object DataSourceFactory extends Loggable {
def getDataSources(sparkSession: SparkSession,
ssc: StreamingContext,
- dataSourceParams: Seq[DataSourceParam]
+ dataSources: Seq[DataSourceParam]
): Seq[DataSource] = {
- dataSourceParams.zipWithIndex.flatMap { pair =>
+ dataSources.zipWithIndex.flatMap { pair =>
val (param, index) = pair
getDataSource(sparkSession, ssc, param, index)
}
@@ -45,13 +45,13 @@ object DataSourceFactory extends Loggable {
dataSourceParam: DataSourceParam,
index: Int
): Option[DataSource] = {
- val name = dataSourceParam.name
+ val name = dataSourceParam.getName
val connectorParams = dataSourceParam.getConnectors
val tmstCache = TmstCache()
// for streaming data cache
val streamingCacheClientOpt = StreamingCacheClientFactory.getClientOpt(
- sparkSession.sqlContext, dataSourceParam.cache, name, index, tmstCache)
+ sparkSession.sqlContext, dataSourceParam.getCacheOpt, name, index, tmstCache)
val dataConnectors: Seq[DataConnector] = connectorParams.flatMap { connectorParam =>
DataConnectorFactory.getDataConnector(sparkSession, ssc, connectorParam,
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClientFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClientFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClientFactory.scala
index 529b07a..fd9d231 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClientFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClientFactory.scala
@@ -37,16 +37,16 @@ object StreamingCacheClientFactory extends Loggable {
/**
* create streaming cache client
* @param sqlContext sqlContext in spark environment
- * @param param data source cache config
+ * @param cacheOpt data source cache config option
* @param name data source name
* @param index data source index
* @param tmstCache the same tmstCache instance inside a data source
* @return streaming cache client option
*/
- def getClientOpt(sqlContext: SQLContext, param: Map[String, Any],
+ def getClientOpt(sqlContext: SQLContext, cacheOpt: Option[Map[String, Any]],
name: String, index: Int, tmstCache: TmstCache
): Option[StreamingCacheClient] = {
- if (param != null) {
+ cacheOpt.flatMap { param =>
try {
val tp = param.getString(_type, "")
val dsCache = tp match {
@@ -62,7 +62,7 @@ object StreamingCacheClientFactory extends Loggable {
None
}
}
- } else None
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnector.scala
index 6dc17d1..a4c1995 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnector.scala
@@ -65,14 +65,14 @@ trait DataConnector extends Loggable with Serializable {
saveTmst(timestamp) // save timestamp
dfOpt.flatMap { df =>
- val preProcRules = PreProcRuleParamGenerator.getNewPreProcRules(dcParam.preProc, suffix)
+ val preProcRules = PreProcRuleParamGenerator.getNewPreProcRules(dcParam.getPreProcRules, suffix)
// init data
context.compileTableRegister.registerTable(thisTable)
context.runTimeTableRegister.registerTable(thisTable, df)
// build job
- val preprocJob = DQJobBuilder.buildDQJob(context, preProcRules, SparkSqlType)
+ val preprocJob = DQJobBuilder.buildDQJob(context, preProcRules)
// job execute
preprocJob.execute(context)
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala
index 30352a9..4538fbb 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala
@@ -53,8 +53,8 @@ object DataConnectorFactory extends Loggable {
tmstCache: TmstCache,
streamingCacheClientOpt: Option[StreamingCacheClient]
): Try[DataConnector] = {
- val conType = dcParam.conType
- val version = dcParam.version
+ val conType = dcParam.getType
+ val version = dcParam.getVersion
Try {
conType match {
case HiveRegex() => HiveBatchDataConnector(sparkSession, dcParam, tmstCache)
@@ -75,8 +75,8 @@ object DataConnectorFactory extends Loggable {
streamingCacheClientOpt: Option[StreamingCacheClient]
): StreamingDataConnector = {
if (ssc == null) throw new Exception("streaming context is null!")
- val conType = dcParam.conType
- val version = dcParam.version
+ val conType = dcParam.getType
+ val version = dcParam.getVersion
conType match {
case KafkaRegex() => getKafkaDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
case _ => throw new Exception("streaming connector creation error!")
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/AvroBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/AvroBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/AvroBatchDataConnector.scala
index 1ee6e78..a906246 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/AvroBatchDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/AvroBatchDataConnector.scala
@@ -33,7 +33,7 @@ case class AvroBatchDataConnector(@transient sparkSession: SparkSession,
tmstCache: TmstCache
) extends BatchDataConnector {
- val config = dcParam.config
+ val config = dcParam.getConfig
val FilePath = "file.path"
val FileName = "file.name"
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/HiveBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/HiveBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/HiveBatchDataConnector.scala
index 85cd120..331f469 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/HiveBatchDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/HiveBatchDataConnector.scala
@@ -32,7 +32,7 @@ case class HiveBatchDataConnector(@transient sparkSession: SparkSession,
tmstCache: TmstCache
) extends BatchDataConnector {
- val config = dcParam.config
+ val config = dcParam.getConfig
val Database = "database"
val TableName = "table.name"
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/TextDirBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/TextDirBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/TextDirBatchDataConnector.scala
index ca5b7b5..bc76f9d 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/TextDirBatchDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/TextDirBatchDataConnector.scala
@@ -33,7 +33,7 @@ case class TextDirBatchDataConnector(@transient sparkSession: SparkSession,
tmstCache: TmstCache
) extends BatchDataConnector {
- val config = dcParam.config
+ val config = dcParam.getConfig
val DirPath = "dir.path"
val DataDirDepth = "data.dir.depth"
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala
index de2822b..0f30b7f 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala
@@ -33,7 +33,7 @@ trait KafkaStreamingDataConnector extends StreamingDataConnector {
type VD <: Decoder[V]
type OUT = (K, V)
- val config = dcParam.config
+ val config = dcParam.getConfig
val KafkaConfig = "kafka.config"
val Topics = "topics"
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheFactory.scala
index 85106b4..28ade3b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheFactory.scala
@@ -27,8 +27,8 @@ case class InfoCacheFactory(infoCacheParams: Iterable[InfoCacheParam], metricNam
val ZK_REGEX = """^(?i)zk|zookeeper$""".r
def getInfoCache(infoCacheParam: InfoCacheParam): Option[InfoCache] = {
- val config = infoCacheParam.config
- val infoCacheTry = infoCacheParam.cacheType match {
+ val config = infoCacheParam.getConfig
+ val infoCacheTry = infoCacheParam.getType match {
case ZK_REGEX() => Try(ZKInfoCache(config, metricName))
case _ => throw new Exception("not supported info cache type")
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistFactory.scala
index 9314876..0a2649e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistFactory.scala
@@ -41,8 +41,8 @@ case class PersistFactory(persistParams: Iterable[PersistParam], metricName: Str
}
private def getPersist(timeStamp: Long, persistParam: PersistParam, block: Boolean): Option[Persist] = {
- val config = persistParam.config
- val persistTry = persistParam.persistType match {
+ val config = persistParam.getConfig
+ val persistTry = persistParam.getType match {
case LOG_REGEX() => Try(LoggerPersist(config, metricName, timeStamp))
case HDFS_REGEX() => Try(HdfsPersist(config, metricName, timeStamp))
case HTTP_REGEX() => Try(HttpPersist(config, metricName, timeStamp, block))
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala
index a8e5b26..074a74e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala
@@ -38,26 +38,24 @@ object DQJobBuilder {
* @return dq job
*/
def buildDQJob(context: DQContext, evaluateRuleParam: EvaluateRuleParam): DQJob = {
- val defaultDslType = evaluateRuleParam.getDslType
val ruleParams = evaluateRuleParam.getRules
- buildDQJob(context, ruleParams, defaultDslType)
+ buildDQJob(context, ruleParams)
}
/**
* build dq job with rules in evaluate rule param or pre-proc param
* @param context dq context
* @param ruleParams rule params
- * @param defaultDslType default dsl type in evaluate rule param
* @return dq job
*/
- def buildDQJob(context: DQContext, ruleParams: Seq[RuleParam], defaultDslType: DslType): DQJob = {
+ def buildDQJob(context: DQContext, ruleParams: Seq[RuleParam]): DQJob = {
// build steps by datasources
val dsSteps = context.dataSources.flatMap { dataSource =>
DQStepBuilder.buildStepOptByDataSourceParam(context, dataSource.dsParam)
}
// build steps by rules
val ruleSteps = ruleParams.flatMap { ruleParam =>
- DQStepBuilder.buildStepOptByRuleParam(context, ruleParam, defaultDslType)
+ DQStepBuilder.buildStepOptByRuleParam(context, ruleParam)
}
// metric flush step
val metricFlushStep = MetricFlushStep()
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
index 9ec1641..79cef33 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
@@ -19,7 +19,7 @@ under the License.
package org.apache.griffin.measure.launch
import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.configuration.params.{DQParam, EnvParam}
+import org.apache.griffin.measure.configuration.params.{DQConfig, EnvConfig}
import scala.util.Try
@@ -28,8 +28,8 @@ import scala.util.Try
*/
trait DQApp extends Loggable with Serializable {
- val envParam: EnvParam
- val dqParam: DQParam
+ val envParam: EnvConfig
+ val dqParam: DQConfig
def init: Try[_]
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
index 1aa5039..06892a3 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
@@ -32,15 +32,15 @@ import org.apache.spark.sql.{SQLContext, SparkSession}
import scala.util.Try
-case class BatchDQApp(allParam: AllParam) extends DQApp {
+case class BatchDQApp(allParam: GriffinConfig) extends DQApp {
- val envParam: EnvParam = allParam.envParam
- val dqParam: DQParam = allParam.dqParam
+ val envParam: EnvConfig = allParam.getEnvConfig
+ val dqParam: DQConfig = allParam.getDqConfig
val sparkParam = envParam.sparkParam
val metricName = dqParam.name
- val dataSourceParams = dqParam.dataSources
- val dataSourceNames = dataSourceParams.map(_.name)
+// val dataSourceParams = dqParam.dataSources
+// val dataSourceNames = dataSourceParams.map(_.name)
val persistParams = envParam.persistParams
var sqlContext: SQLContext = _
@@ -52,10 +52,10 @@ case class BatchDQApp(allParam: AllParam) extends DQApp {
def init: Try[_] = Try {
// build spark 2.0+ application context
val conf = new SparkConf().setAppName(metricName)
- conf.setAll(sparkParam.config)
+ conf.setAll(sparkParam.getConfig)
conf.set("spark.sql.crossJoin.enabled", "true")
sparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
- sparkSession.sparkContext.setLogLevel(sparkParam.logLevel)
+ sparkSession.sparkContext.setLogLevel(sparkParam.getLogLevel)
sqlContext = sparkSession.sqlContext
// register udf
@@ -70,7 +70,7 @@ case class BatchDQApp(allParam: AllParam) extends DQApp {
val contextId = ContextId(measureTime)
// get data sources
- val dataSources = DataSourceFactory.getDataSources(sparkSession, null, dqParam.dataSources)
+ val dataSources = DataSourceFactory.getDataSources(sparkSession, null, dqParam.getDataSources)
dataSources.foreach(_.init)
// create dq context
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
index d89b7e8..f990c8e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
@@ -38,15 +38,15 @@ import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import scala.util.Try
-case class StreamingDQApp(allParam: AllParam) extends DQApp {
+case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
- val envParam: EnvParam = allParam.envParam
- val dqParam: DQParam = allParam.dqParam
+ val envParam: EnvConfig = allParam.getEnvConfig
+ val dqParam: DQConfig = allParam.getDqConfig
val sparkParam = envParam.sparkParam
val metricName = dqParam.name
- val dataSourceParams = dqParam.dataSources
- val dataSourceNames = dataSourceParams.map(_.name)
+// val dataSourceParams = dqParam.dataSources
+// val dataSourceNames = dataSourceParams.map(_.name)
val persistParams = envParam.persistParams
var sqlContext: SQLContext = _
@@ -58,10 +58,10 @@ case class StreamingDQApp(allParam: AllParam) extends DQApp {
def init: Try[_] = Try {
// build spark 2.0+ application context
val conf = new SparkConf().setAppName(metricName)
- conf.setAll(sparkParam.config)
+ conf.setAll(sparkParam.getConfig)
conf.set("spark.sql.crossJoin.enabled", "true")
sparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
- sparkSession.sparkContext.setLogLevel(sparkParam.logLevel)
+ sparkSession.sparkContext.setLogLevel(sparkParam.getLogLevel)
sqlContext = sparkSession.sqlContext
// clear checkpoint directory
@@ -78,7 +78,7 @@ case class StreamingDQApp(allParam: AllParam) extends DQApp {
def run: Try[_] = Try {
// streaming context
- val ssc = StreamingContext.getOrCreate(sparkParam.cpDir, () => {
+ val ssc = StreamingContext.getOrCreate(sparkParam.getCpDir, () => {
try {
createStreamingContext
} catch {
@@ -94,7 +94,7 @@ case class StreamingDQApp(allParam: AllParam) extends DQApp {
val contextId = ContextId(measureTime)
// generate data sources
- val dataSources = DataSourceFactory.getDataSources(sparkSession, ssc, dqParam.dataSources)
+ val dataSources = DataSourceFactory.getDataSources(sparkSession, ssc, dqParam.getDataSources)
dataSources.foreach(_.init)
// create dq context
@@ -109,7 +109,7 @@ case class StreamingDQApp(allParam: AllParam) extends DQApp {
// process thread
val dqCalculator = StreamingDQCalculator(globalContext, dqParam.evaluateRule)
- val processInterval = TimeUtil.milliseconds(sparkParam.processInterval) match {
+ val processInterval = TimeUtil.milliseconds(sparkParam.getProcessInterval) match {
case Some(interval) => interval
case _ => throw new Exception("invalid batch interval")
}
@@ -135,19 +135,19 @@ case class StreamingDQApp(allParam: AllParam) extends DQApp {
def createStreamingContext: StreamingContext = {
- val batchInterval = TimeUtil.milliseconds(sparkParam.batchInterval) match {
+ val batchInterval = TimeUtil.milliseconds(sparkParam.getBatchInterval) match {
case Some(interval) => Milliseconds(interval)
case _ => throw new Exception("invalid batch interval")
}
val ssc = new StreamingContext(sparkSession.sparkContext, batchInterval)
- ssc.checkpoint(sparkParam.cpDir)
+ ssc.checkpoint(sparkParam.getCpDir)
ssc
}
private def clearCpDir: Unit = {
if (sparkParam.needInitClear) {
- val cpDir = sparkParam.cpDir
+ val cpDir = sparkParam.getCpDir
info(s"clear checkpoint directory ${cpDir}")
HdfsUtil.deleteHdfsPath(cpDir)
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala
index f5b69ad..2ee308d 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala
@@ -57,9 +57,9 @@ object DQStepBuilder {
}
}
- def buildStepOptByRuleParam(context: DQContext, ruleParam: RuleParam, defaultDslType: DslType
+ def buildStepOptByRuleParam(context: DQContext, ruleParam: RuleParam
): Option[DQStep] = {
- val dslType = ruleParam.getDslType(defaultDslType)
+ val dslType = ruleParam.getDslType
val dsNames = context.dataSourceNames
val funcNames = context.functionNames
val dqStepOpt = getRuleParamStepBuilder(dslType, dsNames, funcNames)
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataSourceParamStepBuilder.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataSourceParamStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataSourceParamStepBuilder.scala
index b941211..333615d 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataSourceParamStepBuilder.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataSourceParamStepBuilder.scala
@@ -31,7 +31,7 @@ trait DataSourceParamStepBuilder extends DQStepBuilder {
type ParamType = DataSourceParam
def buildDQStep(context: DQContext, param: ParamType): Option[DQStep] = {
- val name = getStepName(param.name)
+ val name = getStepName(param.getName)
val steps = param.getConnectors.flatMap { dc =>
buildReadSteps(context, dc)
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
index fa9e38b..2a43b34 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
@@ -43,16 +43,16 @@ trait RuleParamStepBuilder extends DQStepBuilder {
protected def buildDirectWriteSteps(ruleParam: RuleParam): Seq[DQStep] = {
val name = getStepName(ruleParam.getName)
// metric writer
- val metricSteps = ruleParam.metricOpt.map { metric =>
- MetricWriteStep(metric.name, name, NormalizeType(metric.collectType))
+ val metricSteps = ruleParam.getMetricOpt.map { metric =>
+ MetricWriteStep(metric.getNameOpt.getOrElse(name), name, NormalizeType(metric.collectType))
}.toSeq
// record writer
- val recordSteps = ruleParam.recordOpt.map { record =>
- RecordWriteStep(record.name, name)
+ val recordSteps = ruleParam.getRecordOpt.map { record =>
+ RecordWriteStep(record.getNameOpt.getOrElse(name), name)
}.toSeq
// update writer
- val dsCacheUpdateSteps = ruleParam.dsCacheUpdateOpt.map { dsCacheUpdate =>
- DataSourceUpdateWriteStep(dsCacheUpdate.dsName, name)
+ val dsCacheUpdateSteps = ruleParam.getDsCacheUpdateOpt.map { dsCacheUpdate =>
+ DataSourceUpdateWriteStep(dsCacheUpdate.getDsNameOpt.getOrElse(""), name)
}.toSeq
metricSteps ++ recordSteps ++ dsCacheUpdateSteps
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
index 9c14325..0416dbb 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
@@ -82,7 +82,7 @@ case class AccuracyExpr2DQSteps(context: DQContext,
val missRecordsTransStep = SparkSqlTransformStep(missRecordsTableName, missRecordsSql, emptyMap, true)
val missRecordsWriteSteps = procType match {
case BatchProcessType => {
- val rwName = ruleParam.recordOpt.map(_.name).getOrElse(missRecordsTableName)
+ val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(missRecordsTableName)
RecordWriteStep(rwName, missRecordsTableName) :: Nil
}
case StreamingProcessType => Nil
@@ -90,7 +90,7 @@ case class AccuracyExpr2DQSteps(context: DQContext,
val missRecordsUpdateWriteSteps = procType match {
case BatchProcessType => Nil
case StreamingProcessType => {
- val dsName = ruleParam.dsCacheUpdateOpt.map(_.dsName).getOrElse(sourceName)
+ val dsName = ruleParam.getDsCacheUpdateOpt.flatMap(_.getDsNameOpt).getOrElse(sourceName)
DataSourceUpdateWriteStep(dsName, missRecordsTableName) :: Nil
}
}
@@ -139,8 +139,9 @@ case class AccuracyExpr2DQSteps(context: DQContext,
val accuracyTransStep = SparkSqlTransformStep(accuracyTableName, accuracyMetricSql, emptyMap)
val accuracyMetricWriteSteps = procType match {
case BatchProcessType => {
- val mwName = ruleParam.metricOpt.map(_.name).getOrElse(ruleParam.name)
- val collectType = NormalizeType(ruleParam.metricOpt.map(_.collectType).getOrElse(""))
+ val metricOpt = ruleParam.getMetricOpt
+ val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.name)
+ val collectType = metricOpt.map(_.getCollectType).getOrElse(NormalizeType.default)
MetricWriteStep(mwName, accuracyTableName, collectType) :: Nil
}
case StreamingProcessType => Nil
@@ -166,8 +167,9 @@ case class AccuracyExpr2DQSteps(context: DQContext,
val accuracyMetricTransStep = DataFrameOpsTransformStep(accuracyMetricTableName,
accuracyMetricRule, accuracyMetricDetails)
val accuracyMetricWriteStep = {
- val mwName = ruleParam.metricOpt.map(_.name).getOrElse(ruleParam.name)
- val collectType = NormalizeType(ruleParam.metricOpt.map(_.collectType).getOrElse(""))
+ val metricOpt = ruleParam.getMetricOpt
+ val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.name)
+ val collectType = metricOpt.map(_.getCollectType).getOrElse(NormalizeType.default)
MetricWriteStep(mwName, accuracyMetricTableName, collectType)
}
@@ -182,7 +184,7 @@ case class AccuracyExpr2DQSteps(context: DQContext,
val accuracyRecordTransStep = SparkSqlTransformStep(
accuracyRecordTableName, accuracyRecordSql, emptyMap)
val accuracyRecordWriteStep = {
- val rwName = ruleParam.recordOpt.map(_.name).getOrElse(missRecordsTableName)
+ val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(missRecordsTableName)
RecordWriteStep(rwName, missRecordsTableName, Some(accuracyRecordTableName))
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
index fcb576c..bd03c7a 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
@@ -89,7 +89,7 @@ case class CompletenessExpr2DQSteps(context: DQContext,
val incompleteRecordsSql = s"SELECT * FROM `${sourceAliasTableName}` WHERE ${incompleteWhereClause}"
val incompleteRecordTransStep = SparkSqlTransformStep(incompleteRecordsTableName, incompleteRecordsSql, emptyMap, true)
val incompleteRecordWriteStep = {
- val rwName = ruleParam.recordOpt.map(_.name).getOrElse(incompleteRecordsTableName)
+ val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(incompleteRecordsTableName)
RecordWriteStep(rwName, incompleteRecordsTableName)
}
@@ -136,8 +136,10 @@ case class CompletenessExpr2DQSteps(context: DQContext,
}
val completeTransStep = SparkSqlTransformStep(completeTableName, completeMetricSql, emptyMap)
val completeWriteStep = {
- val mwName = ruleParam.metricOpt.map(_.name).getOrElse(completeTableName)
- MetricWriteStep(mwName, completeTableName, DefaultNormalizeType)
+ val metricOpt = ruleParam.getMetricOpt
+ val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(completeTableName)
+ val collectType = metricOpt.map(_.getCollectType).getOrElse(NormalizeType.default)
+ MetricWriteStep(mwName, completeTableName, collectType)
}
val transSteps = {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
index 1cf94e0..cf886e3 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
@@ -271,7 +271,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
}
val dupItemsTransStep = SparkSqlTransformStep(dupItemsTableName, dupItemsSql, emptyMap)
val dupItemsWriteStep = {
- val rwName = ruleParam.recordOpt.map(_.name).getOrElse(dupItemsTableName)
+ val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(dupItemsTableName)
RecordWriteStep(rwName, dupItemsTableName, None, writeTimestampOpt)
}
@@ -317,7 +317,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
}
val dupRecordTransStep = SparkSqlTransformStep(dupRecordTableName, dupRecordSql, emptyMap, true)
val dupRecordWriteStep = {
- val rwName = ruleParam.recordOpt.map(_.name).getOrElse(dupRecordTableName)
+ val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(dupRecordTableName)
RecordWriteStep(rwName, dupRecordTableName, None, writeTimestampOpt)
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
index b4da7eb..bc7f620 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
@@ -94,8 +94,9 @@ case class ProfilingExpr2DQSteps(context: DQContext,
val profilingName = ruleParam.name
val profilingTransStep = SparkSqlTransformStep(profilingName, profilingSql, details)
val profilingMetricWriteStep = {
- val mwName = ruleParam.metricOpt.map(_.name).getOrElse(ruleParam.name)
- val collectType = NormalizeType(ruleParam.metricOpt.map(_.collectType).getOrElse(""))
+ val metricOpt = ruleParam.getMetricOpt
+ val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.name)
+ val collectType = metricOpt.map(_.getCollectType).getOrElse(NormalizeType.default)
MetricWriteStep(mwName, profilingName, collectType)
}
profilingTransStep :: profilingMetricWriteStep :: Nil
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
index a56937c..9fa58f2 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
@@ -131,8 +131,9 @@ case class TimelinessExpr2DQSteps(context: DQContext,
}
val metricTransStep = SparkSqlTransformStep(metricTableName, metricSql, emptyMap)
val metricWriteStep = {
- val mwName = ruleParam.metricOpt.map(_.name).getOrElse(ruleParam.name)
- val collectType = NormalizeType(ruleParam.metricOpt.map(_.collectType).getOrElse(""))
+ val metricOpt = ruleParam.getMetricOpt
+ val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.name)
+ val collectType = metricOpt.map(_.getCollectType).getOrElse(NormalizeType.default)
MetricWriteStep(mwName, metricTableName, collectType)
}
@@ -149,7 +150,7 @@ case class TimelinessExpr2DQSteps(context: DQContext,
}
val recordTransStep = SparkSqlTransformStep(recordTableName, recordSql, emptyMap)
val recordWriteStep = {
- val rwName = ruleParam.recordOpt.map(_.name).getOrElse(recordTableName)
+ val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(recordTableName)
RecordWriteStep(rwName, recordTableName, None)
}
(recordTransStep :: Nil, recordWriteStep :: Nil)
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
index 9fecb6d..77a79d4 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
@@ -166,7 +166,7 @@ case class UniquenessExpr2DQSteps(context: DQContext,
}
val dupRecordTransStep = SparkSqlTransformStep(dupRecordTableName, dupRecordSql, emptyMap, true)
val dupRecordWriteStep = {
- val rwName = ruleParam.recordOpt.map(_.name).getOrElse(dupRecordTableName)
+ val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(dupRecordTableName)
RecordWriteStep(rwName, dupRecordTableName)
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcRuleParamGenerator.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcRuleParamGenerator.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcRuleParamGenerator.scala
index f1543be..07b13ea 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcRuleParamGenerator.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcRuleParamGenerator.scala
@@ -28,10 +28,8 @@ object PreProcRuleParamGenerator {
val _name = "name"
def getNewPreProcRules(rules: Seq[RuleParam], suffix: String): Seq[RuleParam] = {
- if (rules == null) Nil else {
- rules.map { rule =>
- getNewPreProcRule(rule, suffix)
- }
+ rules.map { rule =>
+ getNewPreProcRule(rule, suffix)
}
}