You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2018/08/27 02:05:49 UTC
incubator-griffin git commit: set the fields in configuration classes
private
Repository: incubator-griffin
Updated Branches:
refs/heads/master 5b2c7959a -> 75b426fab
set the fields in configuration classes private
in configuration case classes, get*** functions are defined to access to the fields, so for consistency, make the fields private
Author: grant-xuexu <gr...@gmail.com>
Closes #396 from grant-xuexu/configuration.
Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/75b426fa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/75b426fa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/75b426fa
Branch: refs/heads/master
Commit: 75b426fab82d404f552fbdfb602958ebb47850d9
Parents: 5b2c795
Author: grant-xuexu <gr...@gmail.com>
Authored: Mon Aug 27 10:05:44 2018 +0800
Committer: Lionel Liu <bh...@163.com>
Committed: Mon Aug 27 10:05:44 2018 +0800
----------------------------------------------------------------------
.../apache/griffin/measure/Application.scala | 2 +-
.../configuration/dqdefinition/DQConfig.scala | 56 ++++++++++----------
.../configuration/dqdefinition/EnvConfig.scala | 26 ++++-----
.../dqdefinition/GriffinConfig.scala | 4 +-
.../datasource/connector/DataConnector.scala | 2 +-
.../connector/DataConnectorFactory.scala | 16 ++----
.../apache/griffin/measure/launch/DQApp.scala | 2 +-
.../measure/launch/batch/BatchDQApp.scala | 6 +--
.../launch/streaming/StreamingDQApp.scala | 8 +--
.../step/builder/RuleParamStepBuilder.scala | 2 +-
.../step/builder/dsl/expr/TreeNode.scala | 25 ++++++---
.../dsl/transform/AccuracyExpr2DQSteps.scala | 6 +--
.../transform/CompletenessExpr2DQSteps.scala | 2 +-
.../dsl/transform/ProfilingExpr2DQSteps.scala | 4 +-
.../dsl/transform/TimelinessExpr2DQSteps.scala | 4 +-
.../reader/ParamFileReaderSpec.scala | 4 +-
.../reader/ParamJsonReaderSpec.scala | 4 +-
17 files changed, 89 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/75b426fa/measure/src/main/scala/org/apache/griffin/measure/Application.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/Application.scala b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
index 278214d..e7df806 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/Application.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
@@ -64,7 +64,7 @@ object Application extends Loggable {
val allParam: GriffinConfig = GriffinConfig(envParam, dqParam)
// choose process
- val procType = ProcessType(allParam.getDqConfig.procType)
+ val procType = ProcessType(allParam.getDqConfig.getProcType)
val dqApp: DQApp = procType match {
case BatchProcessType => BatchDQApp(allParam)
case StreamingProcessType => StreamingDQApp(allParam)
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/75b426fa/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
index 79a43c1..4943329 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
@@ -33,12 +33,12 @@ import org.apache.griffin.measure.configuration.enums._
* @param sinks sink types (optional, by default will be elasticsearch)
*/
@JsonInclude(Include.NON_NULL)
-case class DQConfig(@JsonProperty("name") name: String,
- @JsonProperty("timestamp") timestamp: Long,
- @JsonProperty("process.type") procType: String,
- @JsonProperty("data.sources") dataSources: List[DataSourceParam],
- @JsonProperty("evaluate.rule") evaluateRule: EvaluateRuleParam,
- @JsonProperty("sinks") sinks: List[String]
+case class DQConfig(@JsonProperty("name") private val name: String,
+ @JsonProperty("timestamp") private val timestamp: Long,
+ @JsonProperty("process.type") private val procType: String,
+ @JsonProperty("data.sources") private val dataSources: List[DataSourceParam],
+ @JsonProperty("evaluate.rule") private val evaluateRule: EvaluateRuleParam,
+ @JsonProperty("sinks") private val sinks: List[String]
) extends Param {
def getName: String = name
def getTimestampOpt: Option[Long] = if (timestamp != 0) Some(timestamp) else None
@@ -72,13 +72,13 @@ case class DQConfig(@JsonProperty("name") name: String,
* @param checkpoint data source checkpoint configuration (must in streaming mode with streaming connectors)
*/
@JsonInclude(Include.NON_NULL)
-case class DataSourceParam( @JsonProperty("name") name: String,
- @JsonProperty("baseline") baseline: Boolean,
- @JsonProperty("connectors") connectors: List[DataConnectorParam],
- @JsonProperty("checkpoint") checkpoint: Map[String, Any]
+case class DataSourceParam( @JsonProperty("name") private val name: String,
+ @JsonProperty("baseline") private val baseline: Boolean,
+ @JsonProperty("connectors") private val connectors: List[DataConnectorParam],
+ @JsonProperty("checkpoint") private val checkpoint: Map[String, Any]
) extends Param {
def getName: String = name
- def isBaseline: Boolean = if (baseline != null) baseline else false
+ def isBaseline: Boolean = if (!baseline.equals(null)) baseline else false
def getConnectors: Seq[DataConnectorParam] = if (connectors != null) connectors else Nil
def getCheckpointOpt: Option[Map[String, Any]] = if (checkpoint != null) Some(checkpoint) else None
@@ -97,11 +97,11 @@ case class DataSourceParam( @JsonProperty("name") name: String,
* @param preProc pre-process rules after load data (optional)
*/
@JsonInclude(Include.NON_NULL)
-case class DataConnectorParam( @JsonProperty("type") conType: String,
- @JsonProperty("version") version: String,
- @JsonProperty("dataframe.name") dataFrameName: String,
- @JsonProperty("config") config: Map[String, Any],
- @JsonProperty("pre.proc") preProc: List[RuleParam]
+case class DataConnectorParam( @JsonProperty("type") private val conType: String,
+ @JsonProperty("version") private val version: String,
+ @JsonProperty("dataframe.name") private val dataFrameName: String,
+ @JsonProperty("config") private val config: Map[String, Any],
+ @JsonProperty("pre.proc") private val preProc: List[RuleParam]
) extends Param {
def getType: String = conType
def getVersion: String = if (version != null) version else ""
@@ -120,7 +120,7 @@ case class DataConnectorParam( @JsonProperty("type") conType: String,
* @param rules rules to define dq measurement (optional)
*/
@JsonInclude(Include.NON_NULL)
-case class EvaluateRuleParam( @JsonProperty("rules") rules: List[RuleParam]
+case class EvaluateRuleParam( @JsonProperty("rules") private val rules: List[RuleParam]
) extends Param {
def getRules: Seq[RuleParam] = if (rules != null) rules else Nil
@@ -144,14 +144,14 @@ case class EvaluateRuleParam( @JsonProperty("rules") rules: List[RuleParam]
// * @param dsCacheUpdate config for data source cache update output (optional, valid in streaming mode)
*/
@JsonInclude(Include.NON_NULL)
-case class RuleParam(@JsonProperty("dsl.type") dslType: String,
- @JsonProperty("dq.type") dqType: String,
- @JsonProperty("in.dataframe.name") inDfName: String,
- @JsonProperty("out.dataframe.name") outDfName: String,
- @JsonProperty("rule") rule: String,
- @JsonProperty("details") details: Map[String, Any],
- @JsonProperty("cache") cache: Boolean,
- @JsonProperty("out") outputs: List[RuleOutputParam]
+case class RuleParam(@JsonProperty("dsl.type") private val dslType: String,
+ @JsonProperty("dq.type") private val dqType: String,
+ @JsonProperty("in.dataframe.name") private val inDfName: String,
+ @JsonProperty("out.dataframe.name") private val outDfName: String,
+ @JsonProperty("rule") private val rule: String,
+ @JsonProperty("details") private val details: Map[String, Any],
+ @JsonProperty("cache") private val cache: Boolean,
+ @JsonProperty("out") private val outputs: List[RuleOutputParam]
) extends Param {
def getDslType: DslType = if (dslType != null) DslType(dslType) else DslType("")
def getDqType: DqType = if (dqType != null) DqType(dqType) else DqType("")
@@ -197,9 +197,9 @@ case class RuleParam(@JsonProperty("dsl.type") dslType: String,
* @param flatten flatten type of output metric (optional, available in output metric type)
*/
@JsonInclude(Include.NON_NULL)
-case class RuleOutputParam( @JsonProperty("type") outputType: String,
- @JsonProperty("name") name: String,
- @JsonProperty("flatten") flatten: String
+case class RuleOutputParam( @JsonProperty("type") private val outputType: String,
+ @JsonProperty("name") private val name: String,
+ @JsonProperty("flatten") private val flatten: String
) extends Param {
def getOutputType: OutputType = if (outputType != null) OutputType(outputType) else OutputType("")
def getNameOpt: Option[String] = if (StringUtils.isNotBlank(name)) Some(name) else None
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/75b426fa/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
index 2ad2837..bf77d13 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
@@ -30,9 +30,9 @@ import org.apache.griffin.measure.configuration.enums._
* @param checkpointParams config of checkpoint locations (required in streaming mode)
*/
@JsonInclude(Include.NON_NULL)
-case class EnvConfig(@JsonProperty("spark") sparkParam: SparkParam,
- @JsonProperty("sinks") sinkParams: List[SinkParam],
- @JsonProperty("griffin.checkpoint") checkpointParams: List[CheckpointParam]
+case class EnvConfig(@JsonProperty("spark") private val sparkParam: SparkParam,
+ @JsonProperty("sinks") private val sinkParams: List[SinkParam],
+ @JsonProperty("griffin.checkpoint") private val checkpointParams: List[CheckpointParam]
) extends Param {
def getSparkParam: SparkParam = sparkParam
def getSinkParams: Seq[SinkParam] = if (sinkParams != null) sinkParams else Nil
@@ -56,12 +56,12 @@ case class EnvConfig(@JsonProperty("spark") sparkParam: SparkParam,
* @param initClear clear checkpoint directory or not when initial (optional)
*/
@JsonInclude(Include.NON_NULL)
-case class SparkParam( @JsonProperty("log.level") logLevel: String,
- @JsonProperty("checkpoint.dir") cpDir: String,
- @JsonProperty("batch.interval") batchInterval: String,
- @JsonProperty("process.interval") processInterval: String,
- @JsonProperty("config") config: Map[String, String],
- @JsonProperty("init.clear") initClear: Boolean
+case class SparkParam( @JsonProperty("log.level") private val logLevel: String,
+ @JsonProperty("checkpoint.dir") private val cpDir: String,
+ @JsonProperty("batch.interval") private val batchInterval: String,
+ @JsonProperty("process.interval") private val processInterval: String,
+ @JsonProperty("config") private val config: Map[String, String],
+ @JsonProperty("init.clear") private val initClear: Boolean
) extends Param {
def getLogLevel: String = if (logLevel != null) logLevel else "WARN"
def getCpDir: String = if (cpDir != null) cpDir else ""
@@ -83,8 +83,8 @@ case class SparkParam( @JsonProperty("log.level") logLevel: String,
* @param config config of sink way (must)
*/
@JsonInclude(Include.NON_NULL)
-case class SinkParam(@JsonProperty("type") sinkType: String,
- @JsonProperty("config") config: Map[String, Any]
+case class SinkParam(@JsonProperty("type") private val sinkType: String,
+ @JsonProperty("config") private val config: Map[String, Any]
) extends Param {
def getType: SinkType = SinkType(sinkType)
def getConfig: Map[String, Any] = if (config != null) config else Map[String, Any]()
@@ -100,8 +100,8 @@ case class SinkParam(@JsonProperty("type") sinkType: String,
* @param config config of checkpoint location
*/
@JsonInclude(Include.NON_NULL)
-case class CheckpointParam(@JsonProperty("type") cpType: String,
- @JsonProperty("config") config: Map[String, Any]
+case class CheckpointParam(@JsonProperty("type") private val cpType: String,
+ @JsonProperty("config") private val config: Map[String, Any]
) extends Param {
def getType: String = cpType
def getConfig: Map[String, Any] = if (config != null) config else Map[String, Any]()
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/75b426fa/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/GriffinConfig.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/GriffinConfig.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/GriffinConfig.scala
index 4527d4b..98a694f 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/GriffinConfig.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/GriffinConfig.scala
@@ -27,8 +27,8 @@ import com.fasterxml.jackson.annotation.JsonInclude.Include
* @param dqConfig dq measurement configuration (must)
*/
@JsonInclude(Include.NON_NULL)
-case class GriffinConfig(@JsonProperty("env") envConfig: EnvConfig,
- @JsonProperty("dq") dqConfig: DQConfig
+case class GriffinConfig(@JsonProperty("env") private val envConfig: EnvConfig,
+ @JsonProperty("dq") private val dqConfig: DQConfig
) extends Param {
def getEnvConfig: EnvConfig = envConfig
def getDqConfig: DQConfig = dqConfig
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/75b426fa/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
index caea078..05d3c75 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.functions._
trait DataConnector extends Loggable with Serializable {
- @transient val sparkSession: SparkSession
+ val sparkSession: SparkSession
val dcParam: DataConnectorParam
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/75b426fa/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
index 104b05c..f4911fc 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
@@ -91,11 +91,12 @@ object DataConnectorFactory extends Loggable {
): KafkaStreamingDataConnector = {
val KeyType = "key.type"
val ValueType = "value.type"
- val config = dcParam.config
+ val config = dcParam.getConfig
val keyType = config.getOrElse(KeyType, "java.lang.String").toString
val valueType = config.getOrElse(ValueType, "java.lang.String").toString
- (getClassTag(keyType), getClassTag(valueType)) match {
- case (ClassTag(k: Class[String]), ClassTag(v: Class[String])) => {
+
+ (keyType, valueType) match {
+ case ("java.lang.String", "java.lang.String") => {
KafkaStreamingStringDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
}
case _ => {
@@ -104,15 +105,6 @@ object DataConnectorFactory extends Loggable {
}
}
- private def getClassTag(tp: String): ClassTag[_] = {
- try {
- val clazz = Class.forName(tp)
- ClassTag(clazz)
- } catch {
- case e: Throwable => throw e
- }
- }
-
// def filterDataConnectors[T <: DataConnector : ClassTag](connectors: Seq[DataConnector]): Seq[T] = {
// connectors.flatMap { dc =>
// dc match {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/75b426fa/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
index 3b19892..b6cca98 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
@@ -49,7 +49,7 @@ trait DQApp extends Loggable with Serializable {
*/
protected def getMeasureTime: Long = {
dqParam.getTimestampOpt match {
- case Some(t) if t > 0 => dqParam.timestamp
+ case Some(t) if t > 0 => t
case _ => System.currentTimeMillis
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/75b426fa/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
index 8733789..ba1f389 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
@@ -37,8 +37,8 @@ case class BatchDQApp(allParam: GriffinConfig) extends DQApp {
val envParam: EnvConfig = allParam.getEnvConfig
val dqParam: DQConfig = allParam.getDqConfig
- val sparkParam = envParam.sparkParam
- val metricName = dqParam.name
+ val sparkParam = envParam.getSparkParam
+ val metricName = dqParam.getName
// val dataSourceParams = dqParam.dataSources
// val dataSourceNames = dataSourceParams.map(_.name)
val sinkParams = getSinkParams
@@ -83,7 +83,7 @@ case class BatchDQApp(allParam: GriffinConfig) extends DQApp {
dqContext.getSink().start(applicationId)
// build job
- val dqJob = DQJobBuilder.buildDQJob(dqContext, dqParam.evaluateRule)
+ val dqJob = DQJobBuilder.buildDQJob(dqContext, dqParam.getEvaluateRule)
// dq job execute
dqJob.execute(dqContext)
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/75b426fa/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
index 1768ae2..ceecb78 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
@@ -43,8 +43,8 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
val envParam: EnvConfig = allParam.getEnvConfig
val dqParam: DQConfig = allParam.getDqConfig
- val sparkParam = envParam.sparkParam
- val metricName = dqParam.name
+ val sparkParam = envParam.getSparkParam
+ val metricName = dqParam.getName
// val dataSourceParams = dqParam.dataSources
// val dataSourceNames = dataSourceParams.map(_.name)
val sinkParams = getSinkParams
@@ -68,7 +68,7 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
clearCpDir
// init info cache instance
- OffsetCheckpointClient.initClient(envParam.checkpointParams, metricName)
+ OffsetCheckpointClient.initClient(envParam.getCheckpointParams, metricName)
OffsetCheckpointClient.init
// register udf
@@ -107,7 +107,7 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
globalContext.getSink().start(applicationId)
// process thread
- val dqCalculator = StreamingDQCalculator(globalContext, dqParam.evaluateRule)
+ val dqCalculator = StreamingDQCalculator(globalContext, dqParam.getEvaluateRule)
val processInterval = TimeUtil.milliseconds(sparkParam.getProcessInterval) match {
case Some(interval) => interval
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/75b426fa/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
index 176aed6..5a04c11 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
@@ -44,7 +44,7 @@ trait RuleParamStepBuilder extends DQStepBuilder {
val name = getStepName(ruleParam.getOutDfName())
// metric writer
val metricSteps = ruleParam.getOutputOpt(MetricOutputType).map { metric =>
- MetricWriteStep(metric.getNameOpt.getOrElse(name), name, FlattenType(metric.flatten))
+ MetricWriteStep(metric.getNameOpt.getOrElse(name), name, metric.getFlatten)
}.toSeq
// record writer
val recordSteps = ruleParam.getOutputOpt(RecordOutputType).map { record =>
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/75b426fa/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/TreeNode.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/TreeNode.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/TreeNode.scala
index 32330cd..4ce0fe6 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/TreeNode.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/TreeNode.scala
@@ -18,6 +18,8 @@ under the License.
*/
package org.apache.griffin.measure.step.builder.dsl.expr
+import scala.reflect.ClassTag
+
trait TreeNode extends Serializable {
var children = Seq[TreeNode]()
@@ -25,21 +27,32 @@ trait TreeNode extends Serializable {
def addChild(expr: TreeNode) = { children :+= expr }
def addChildren(exprs: Seq[TreeNode]) = { children ++= exprs }
- def preOrderTraverseDepthFirst[T, A <: TreeNode](z: T)(seqOp: (A, T) => T, combOp: (T, T) => T): T = {
- if (this.isInstanceOf[A]) {
+ def preOrderTraverseDepthFirst[T, A <: TreeNode](z: T)(seqOp: (A, T) => T, combOp: (T, T) => T)(implicit tag: ClassTag[A]): T = {
+
+ val clazz = tag.runtimeClass
+ if(clazz.isAssignableFrom(this.getClass)){
val tv = seqOp(this.asInstanceOf[A], z)
children.foldLeft(combOp(z, tv)) { (ov, tn) =>
combOp(ov, tn.preOrderTraverseDepthFirst(z)(seqOp, combOp))
}
- } else z
+ }
+ else {
+ z
+ }
+
}
- def postOrderTraverseDepthFirst[T, A <: TreeNode](z: T)(seqOp: (A, T) => T, combOp: (T, T) => T): T = {
- if (this.isInstanceOf[A]) {
+ def postOrderTraverseDepthFirst[T, A <: TreeNode](z: T)(seqOp: (A, T) => T, combOp: (T, T) => T)(implicit tag: ClassTag[A]): T = {
+
+ val clazz = tag.runtimeClass
+ if(clazz.isAssignableFrom(this.getClass)){
val cv = children.foldLeft(z) { (ov, tn) =>
combOp(ov, tn.postOrderTraverseDepthFirst(z)(seqOp, combOp))
}
combOp(z, seqOp(this.asInstanceOf[A], cv))
- } else z
+ }
+ else{
+ z
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/75b426fa/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
index 7c84d38..b97a1a0 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
@@ -114,7 +114,7 @@ case class AccuracyExpr2DQSteps(context: DQContext,
val totalCountTransStep = SparkSqlTransformStep(totalCountTableName, totalCountSql, emptyMap)
// 4. accuracy metric
- val accuracyTableName = ruleParam.outDfName
+ val accuracyTableName = ruleParam.getOutDfName()
val matchedColName = details.getStringOrKey(_matched)
val accuracyMetricSql = procType match {
case BatchProcessType => {
@@ -140,7 +140,7 @@ case class AccuracyExpr2DQSteps(context: DQContext,
val accuracyMetricWriteSteps = procType match {
case BatchProcessType => {
val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
- val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.outDfName)
+ val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
val flattenType = metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
MetricWriteStep(mwName, accuracyTableName, flattenType) :: Nil
}
@@ -167,7 +167,7 @@ case class AccuracyExpr2DQSteps(context: DQContext,
accuracyTableName, accuracyMetricRule, accuracyMetricDetails)
val accuracyMetricWriteStep = {
val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
- val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.outDfName)
+ val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
val flattenType = metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
MetricWriteStep(mwName, accuracyMetricTableName, flattenType)
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/75b426fa/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
index 347fabd..4852a5b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
@@ -112,7 +112,7 @@ case class CompletenessExpr2DQSteps(context: DQContext,
val totalCountTransStep = SparkSqlTransformStep(totalCountTableName, totalCountSql, emptyMap)
// 5. complete metric
- val completeTableName = ruleParam.outDfName
+ val completeTableName = ruleParam.getOutDfName()
val completeColName = details.getStringOrKey(_complete)
val completeMetricSql = procType match {
case BatchProcessType => {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/75b426fa/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
index 33d44c5..ecc115c 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
@@ -92,11 +92,11 @@ case class ProfilingExpr2DQSteps(context: DQContext,
val profilingSql = {
s"SELECT ${selCondition} ${selClause} ${fromClause} ${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}"
}
- val profilingName = ruleParam.outDfName
+ val profilingName = ruleParam.getOutDfName()
val profilingTransStep = SparkSqlTransformStep(profilingName, profilingSql, details)
val profilingMetricWriteStep = {
val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
- val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.outDfName)
+ val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
val flattenType = metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
MetricWriteStep(mwName, profilingName, flattenType)
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/75b426fa/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
index 71e9f4b..8a3924b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
@@ -108,7 +108,7 @@ case class TimelinessExpr2DQSteps(context: DQContext,
val latencyTransStep = SparkSqlTransformStep(latencyTableName, latencySql, emptyMap, true)
// 3. timeliness metric
- val metricTableName = ruleParam.outDfName
+ val metricTableName = ruleParam.getOutDfName()
val totalColName = details.getStringOrKey(_total)
val avgColName = details.getStringOrKey(_avg)
val metricSql = procType match {
@@ -132,7 +132,7 @@ case class TimelinessExpr2DQSteps(context: DQContext,
val metricTransStep = SparkSqlTransformStep(metricTableName, metricSql, emptyMap)
val metricWriteStep = {
val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
- val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.outDfName)
+ val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
val flattenType = metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
MetricWriteStep(mwName, metricTableName, flattenType)
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/75b426fa/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
index bb75cec..dfa2598 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
@@ -32,8 +32,8 @@ class ParamFileReaderSpec extends FlatSpec with Matchers{
val params = reader.readConfig[DQConfig]
params match {
case Success(v) =>
- v.evaluateRule.getRules(0).dslType should === ("griffin-dsl")
- v.evaluateRule.getRules(0).outDfName should === ("accu")
+ v.getEvaluateRule.getRules(0).getDslType.desc should === ("griffin-dsl")
+ v.getEvaluateRule.getRules(0).getOutDfName() should === ("accu")
case Failure(_) =>
fail("it should not happen")
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/75b426fa/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala
index 1e9f3b0..f4b31fa 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala
@@ -36,8 +36,8 @@ class ParamJsonReaderSpec extends FlatSpec with Matchers{
val params = reader.readConfig[DQConfig]
params match {
case Success(v) =>
- v.evaluateRule.getRules(0).dslType should === ("griffin-dsl")
- v.evaluateRule.getRules(0).outDfName should === ("accu")
+ v.getEvaluateRule.getRules(0).getDslType.desc should === ("griffin-dsl")
+ v.getEvaluateRule.getRules(0).getOutDfName() should === ("accu")
case Failure(_) =>
fail("it should not happen")
}