You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2017/09/30 08:35:20 UTC
[05/11] incubator-griffin git commit: Dsl modify
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/expr/MathExpr.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/expr/MathExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/MathExpr.scala
deleted file mode 100644
index 661e8f4..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/expr/MathExpr.scala
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.expr
-
-import org.apache.griffin.measure.rule.CalculationUtil._
-import org.apache.griffin.measure.rule.DataTypeCalculationUtil._
-import org.apache.spark.sql.types.DataType
-
-trait MathExpr extends Expr {
-
-}
-
-case class MathFactorExpr(self: Expr) extends MathExpr {
- def calculateOnly(values: Map[String, Any]): Option[Any] = self.calculate(values)
- val desc: String = self.desc
- val dataSources: Set[String] = self.dataSources
- override def getSubCacheExprs(ds: String): Iterable[Expr] = {
- self.getCacheExprs(ds)
- }
- override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
- self.getFinalCacheExprs(ds)
- }
- override def getSubPersistExprs(ds: String): Iterable[Expr] = {
- self.getPersistExprs(ds)
- }
-}
-
-case class UnaryMathExpr(oprList: Iterable[String], factor: Expr) extends MathExpr {
- private val (posOpr, negOpr) = ("+", "-")
- def calculateOnly(values: Map[String, Any]): Option[Any] = {
- val fv = factor.calculate(values)
- oprList.foldRight(fv) { (opr, v) =>
- opr match {
- case this.posOpr => v
- case this.negOpr => -v
- case _ => None
- }
- }
- }
- val desc: String = oprList.foldRight(factor.desc) { (prev, ex) => s"${prev}${ex}" }
- val dataSources: Set[String] = factor.dataSources
- override def cacheUnit: Boolean = true
- override def getSubCacheExprs(ds: String): Iterable[Expr] = {
- factor.getCacheExprs(ds)
- }
- override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
- factor.getFinalCacheExprs(ds)
- }
- override def getSubPersistExprs(ds: String): Iterable[Expr] = {
- factor.getPersistExprs(ds)
- }
-}
-
-case class BinaryMathExpr(first: MathExpr, others: Iterable[(String, MathExpr)]) extends MathExpr {
- private val (addOpr, subOpr, mulOpr, divOpr, modOpr) = ("+", "-", "*", "/", "%")
- def calculateOnly(values: Map[String, Any]): Option[Any] = {
- val fv = first.calculate(values)
- others.foldLeft(fv) { (v, pair) =>
- val (opr, next) = pair
- val nv = next.calculate(values)
- opr match {
- case this.addOpr => v + nv
- case this.subOpr => v - nv
- case this.mulOpr => v * nv
- case this.divOpr => v / nv
- case this.modOpr => v % nv
- case _ => None
- }
- }
- }
- val desc: String = others.foldLeft(first.desc) { (ex, next) => s"${ex} ${next._1} ${next._2.desc}" }
- val dataSources: Set[String] = first.dataSources ++ others.flatMap(_._2.dataSources).toSet
- override def cacheUnit: Boolean = true
- override def getSubCacheExprs(ds: String): Iterable[Expr] = {
- first.getCacheExprs(ds) ++ others.flatMap(_._2.getCacheExprs(ds))
- }
- override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
- first.getFinalCacheExprs(ds) ++ others.flatMap(_._2.getFinalCacheExprs(ds))
- }
- override def getSubPersistExprs(ds: String): Iterable[Expr] = {
- first.getPersistExprs(ds) ++ others.flatMap(_._2.getPersistExprs(ds))
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/expr/SelectExpr.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/expr/SelectExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/SelectExpr.scala
deleted file mode 100644
index 5b7f1b0..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/expr/SelectExpr.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.expr
-
-import org.apache.spark.sql.types.DataType
-import org.apache.griffin.measure.rule.CalculationUtil._
-
-trait SelectExpr extends Expr {
- def calculateOnly(values: Map[String, Any]): Option[Any] = None
-}
-
-case class IndexFieldRangeSelectExpr(fields: Iterable[FieldDescOnly]) extends SelectExpr {
- val desc: String = s"[${fields.map(_.desc).mkString(", ")}]"
- val dataSources: Set[String] = Set.empty[String]
-}
-
-case class FunctionOperationExpr(func: String, args: Iterable[MathExpr]) extends SelectExpr {
- val desc: String = s".${func}(${args.map(_.desc).mkString(", ")})"
- val dataSources: Set[String] = args.flatMap(_.dataSources).toSet
- override def getSubCacheExprs(ds: String): Iterable[Expr] = args.flatMap(_.getCacheExprs(ds))
- override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = args.flatMap(_.getFinalCacheExprs(ds))
- override def getSubPersistExprs(ds: String): Iterable[Expr] = args.flatMap(_.getPersistExprs(ds))
-}
-
-case class FilterSelectExpr(field: FieldDesc, compare: String, value: MathExpr) extends SelectExpr {
- val desc: String = s"[${field.desc} ${compare} ${value.desc}]"
- val dataSources: Set[String] = value.dataSources
- override def getSubCacheExprs(ds: String): Iterable[Expr] = value.getCacheExprs(ds)
- override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = value.getFinalCacheExprs(ds)
- override def getSubPersistExprs(ds: String): Iterable[Expr] = value.getPersistExprs(ds)
- private val (eqOpr, neqOpr, btOpr, bteOpr, ltOpr, lteOpr) = ("""==?""".r, """!==?""".r, ">", ">=", "<", "<=")
- override def calculateOnly(values: Map[String, Any]): Option[Any] = {
- val (lv, rv) = (values.get(fieldKey), value.calculate(values))
- compare match {
- case this.eqOpr() => lv === rv
- case this.neqOpr() => lv =!= rv
- case this.btOpr => lv > rv
- case this.bteOpr => lv >= rv
- case this.ltOpr => lv < rv
- case this.lteOpr => lv <= rv
- case _ => None
- }
- }
- def fieldKey: String = s"__${field.field}"
-}
-
-// -- selection --
-case class SelectionExpr(head: SelectionHead, selectors: Iterable[SelectExpr]) extends Expr {
- def calculateOnly(values: Map[String, Any]): Option[Any] = values.get(_id)
-
- val desc: String = {
- val argsString = selectors.map(_.desc).mkString("")
- s"${head.desc}${argsString}"
- }
- val dataSources: Set[String] = {
- val selectorDataSources = selectors.flatMap(_.dataSources).toSet
- selectorDataSources + head.head
- }
-
- override def cacheUnit: Boolean = true
- override def getSubCacheExprs(ds: String): Iterable[Expr] = {
- selectors.flatMap(_.getCacheExprs(ds))
- }
- override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = {
- selectors.flatMap(_.getFinalCacheExprs(ds))
- }
-
- override def persistUnit: Boolean = true
- override def getSubPersistExprs(ds: String): Iterable[Expr] = {
- selectors.flatMap(_.getPersistExprs(ds))
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/func/DefaultFunctionDefine.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/func/DefaultFunctionDefine.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/func/DefaultFunctionDefine.scala
deleted file mode 100644
index 15161c3..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/func/DefaultFunctionDefine.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.func
-
-import org.apache.griffin.measure.utils.JsonUtil
-
-class DefaultFunctionDefine extends FunctionDefine {
-
- def json(strOpt: Option[_]): Map[String, Any] = {
- try {
- strOpt match {
- case Some(str: String) => JsonUtil.toAnyMap(str)
- case _ => throw new Exception("json function param should be string")
- }
- } catch {
- case e: Throwable => throw e
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/func/FunctionDefine.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/func/FunctionDefine.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/func/FunctionDefine.scala
deleted file mode 100644
index d23fc7a..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/func/FunctionDefine.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.func
-
-trait FunctionDefine extends Serializable {
-
-}
-
-class UnKnown {}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/func/FunctionUtil.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/func/FunctionUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/func/FunctionUtil.scala
deleted file mode 100644
index 57e934d..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/func/FunctionUtil.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.func
-
-import java.lang.reflect.Method
-
-import org.apache.griffin.measure.log.Loggable
-
-import scala.collection.mutable.{Map => MutableMap}
-
-object FunctionUtil extends Loggable {
-
- val functionDefines: MutableMap[String, FunctionDefine] = MutableMap[String, FunctionDefine]()
-
- registerFunctionDefine(Array(classOf[DefaultFunctionDefine].getCanonicalName))
-
- def registerFunctionDefine(classes: Iterable[String]): Unit = {
- for (cls <- classes) {
- try {
- val clz: Class[_] = Class.forName(cls)
- if (classOf[FunctionDefine].isAssignableFrom(clz)) {
- functionDefines += (cls -> clz.newInstance.asInstanceOf[FunctionDefine])
- } else {
- warn(s"${cls} register fails: ${cls} is not sub class of ${classOf[FunctionDefine].getCanonicalName}")
- }
- } catch {
- case e: Throwable => warn(s"${cls} register fails: ${e.getMessage}")
- }
- }
- }
-
- def invoke(methodName: String, params: Array[Option[Any]]): Seq[Option[Any]] = {
-// val paramTypes = params.map { param =>
-// try {
-// param match {
-// case Some(v) => v.getClass
-// case _ => classOf[UnKnown]
-// }
-// } catch {
-// case e: Throwable => classOf[UnKnown]
-// }
-// }
- val paramTypes = params.map(a => classOf[Option[_]])
-
- functionDefines.values.foldLeft(Nil: Seq[Option[Any]]) { (res, funcDef) =>
- if (res.isEmpty) {
- val clz = funcDef.getClass
- try {
- val method = clz.getMethod(methodName, paramTypes: _*)
- Seq(Some(method.invoke(funcDef, params: _*)))
- } catch {
- case e: Throwable => res
- }
- } else res
- }
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/preproc/PreProcRuleGenerator.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/preproc/PreProcRuleGenerator.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/preproc/PreProcRuleGenerator.scala
new file mode 100644
index 0000000..22d64d8
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/preproc/PreProcRuleGenerator.scala
@@ -0,0 +1,72 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.preproc
+
+object PreProcRuleGenerator {
+
+ val _name = "name"
+
+ def genPreProcRules(rules: Seq[Map[String, Any]], suffix: String): Seq[Map[String, Any]] = {
+ if (rules == null) Nil else {
+ rules.map { rule =>
+ genPreProcRule(rule, suffix)
+ }
+ }
+ }
+
+ def getRuleNames(rules: Seq[Map[String, Any]]): Seq[String] = {
+ if (rules == null) Nil else {
+ rules.flatMap { rule =>
+ rule.get(_name) match {
+ case Some(s: String) => Some(s)
+ case _ => None
+ }
+ }
+ }
+ }
+
+ private def genPreProcRule(param: Map[String, Any], suffix: String
+ ): Map[String, Any] = {
+ val keys = param.keys
+ keys.foldLeft(param) { (map, key) =>
+ map.get(key) match {
+ case Some(s: String) => map + (key -> genNewString(s, suffix))
+ case Some(subMap: Map[String, Any]) => map + (key -> genPreProcRule(subMap, suffix))
+ case Some(arr: Seq[_]) => map + (key -> genPreProcRule(arr, suffix))
+ case _ => map
+ }
+ }
+ }
+
+ private def genPreProcRule(paramArr: Seq[Any], suffix: String): Seq[Any] = {
+ paramArr.foldLeft(Nil: Seq[Any]) { (res, param) =>
+ param match {
+ case s: String => res :+ genNewString(s, suffix)
+ case map: Map[String, Any] => res :+ genPreProcRule(map, suffix)
+ case arr: Seq[_] => res :+ genPreProcRule(arr, suffix)
+ case _ => res :+ param
+ }
+ }
+ }
+
+ private def genNewString(str: String, suffix: String): String = {
+ str.replaceAll("""\$\{(.*)\}""", s"$$1_${suffix}")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/step/ConcreteRuleStep.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/step/ConcreteRuleStep.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/step/ConcreteRuleStep.scala
new file mode 100644
index 0000000..4b3a4d4
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/step/ConcreteRuleStep.scala
@@ -0,0 +1,37 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.step
+
+import org.apache.griffin.measure.rule.dsl._
+
+trait ConcreteRuleStep extends RuleStep {
+
+ val persistType: PersistType
+
+ val updateDataSource: Option[String]
+
+// def isGroupMetric: Boolean = {
+// val _GroupMetric = "group.metric"
+// details.get(_GroupMetric) match {
+// case Some(b: Boolean) => b
+// case _ => false
+// }
+// }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/step/DfOprStep.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/step/DfOprStep.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/step/DfOprStep.scala
new file mode 100644
index 0000000..86f0bf3
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/step/DfOprStep.scala
@@ -0,0 +1,29 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.step
+
+import org.apache.griffin.measure.rule.dsl._
+
+case class DfOprStep(name: String, rule: String, details: Map[String, Any],
+ persistType: PersistType, updateDataSource: Option[String]
+ ) extends ConcreteRuleStep {
+
+ val dslType: DslType = DfOprType
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/step/GriffinDslStep.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/step/GriffinDslStep.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/step/GriffinDslStep.scala
new file mode 100644
index 0000000..21db8cf
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/step/GriffinDslStep.scala
@@ -0,0 +1,28 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.step
+
+import org.apache.griffin.measure.rule.dsl._
+
+case class GriffinDslStep(name: String, rule: String, dqType: DqType, details: Map[String, Any]
+ ) extends RuleStep {
+
+ val dslType: DslType = GriffinDslType
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/step/RuleStep.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/step/RuleStep.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/step/RuleStep.scala
new file mode 100644
index 0000000..4675ffe
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/step/RuleStep.scala
@@ -0,0 +1,31 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.step
+
+import org.apache.griffin.measure.rule.dsl.{DslType, PersistType}
+
+trait RuleStep extends Serializable {
+
+ val dslType: DslType
+
+ val name: String
+ val rule: String
+ val details: Map[String, Any]
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/step/SparkSqlStep.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/step/SparkSqlStep.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/step/SparkSqlStep.scala
new file mode 100644
index 0000000..62c3c35
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/step/SparkSqlStep.scala
@@ -0,0 +1,30 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.step
+
+import org.apache.griffin.measure.persist._
+import org.apache.griffin.measure.rule.dsl._
+
+case class SparkSqlStep(name: String, rule: String, details: Map[String, Any],
+ persistType: PersistType, updateDataSource: Option[String]
+ ) extends ConcreteRuleStep {
+
+ val dslType: DslType = SparkSqlType
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdfs.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdfs.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdfs.scala
new file mode 100644
index 0000000..11e8c8f
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdfs.scala
@@ -0,0 +1,33 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.udf
+
+import org.apache.spark.sql.SQLContext
+
+object GriffinUdfs {
+
+ def register(sqlContext: SQLContext): Unit = {
+ sqlContext.udf.register("index_of", indexOf)
+ }
+
+ private val indexOf = (arr: Seq[String], v: String) => {
+ arr.indexOf(v)
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsFileDumpUtil.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsFileDumpUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsFileDumpUtil.scala
index 8a608ff..416f567 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsFileDumpUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsFileDumpUtil.scala
@@ -68,7 +68,7 @@ object HdfsFileDumpUtil {
def remove(path: String, filename: String, withSuffix: Boolean): Unit = {
if (withSuffix) {
- val files = HdfsUtil.listSubPaths(path, "file")
+ val files = HdfsUtil.listSubPathsByType(path, "file")
val patternFiles = files.filter(samePattern(_, filename))
patternFiles.foreach { f =>
val rmPath = HdfsUtil.getHdfsFilePath(path, f)
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala
index 6dd54b7..9fa6bcf 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala
@@ -18,10 +18,11 @@ under the License.
*/
package org.apache.griffin.measure.utils
+import org.apache.griffin.measure.log.Loggable
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path}
-object HdfsUtil {
+object HdfsUtil extends Loggable {
private val seprator = "/"
@@ -32,8 +33,17 @@ object HdfsUtil {
private val dfs = FileSystem.get(conf)
def existPath(filePath: String): Boolean = {
- val path = new Path(filePath)
- dfs.exists(path)
+ try {
+ val path = new Path(filePath)
+ dfs.exists(path)
+ } catch {
+ case e: Throwable => false
+ }
+ }
+
+ def existFileInDir(dirPath: String, fileName: String): Boolean = {
+ val filePath = getHdfsFilePath(dirPath, fileName)
+ existPath(filePath)
}
def createFile(filePath: String): FSDataOutputStream = {
@@ -75,8 +85,12 @@ object HdfsUtil {
}
def deleteHdfsPath(dirPath: String): Unit = {
- val path = new Path(dirPath)
- if (dfs.exists(path)) dfs.delete(path, true)
+ try {
+ val path = new Path(dirPath)
+ if (dfs.exists(path)) dfs.delete(path, true)
+ } catch {
+ case e: Throwable => error(s"delete path [${dirPath}] error: ${e.getMessage}")
+ }
}
// def listPathFiles(dirPath: String): Iterable[String] = {
@@ -96,25 +110,38 @@ object HdfsUtil {
// }
// }
- def listSubPaths(dirPath: String, subType: String, fullPath: Boolean = false): Iterable[String] = {
- val path = new Path(dirPath)
- try {
- val fileStatusArray = dfs.listStatus(path)
- fileStatusArray.filter { fileStatus =>
- subType match {
- case "dir" => fileStatus.isDirectory
- case "file" => fileStatus.isFile
- case _ => true
+ def listSubPathsByType(dirPath: String, subType: String, fullPath: Boolean = false): Iterable[String] = {
+ if (existPath(dirPath)) {
+ try {
+ val path = new Path(dirPath)
+ val fileStatusArray = dfs.listStatus(path)
+ fileStatusArray.filter { fileStatus =>
+ subType match {
+ case "dir" => fileStatus.isDirectory
+ case "file" => fileStatus.isFile
+ case _ => true
+ }
+ }.map { fileStatus =>
+ val fname = fileStatus.getPath.getName
+ if (fullPath) getHdfsFilePath(dirPath, fname) else fname
+ }
+ } catch {
+ case e: Throwable => {
+ warn(s"list path [${dirPath}] warn: ${e.getMessage}")
+ Nil
}
- }.map { fileStatus =>
- val fname = fileStatus.getPath.getName
- if (fullPath) getHdfsFilePath(dirPath, fname) else fname
- }
- } catch {
- case e: Throwable => {
- println(s"list path files error: ${e.getMessage}")
- Nil
}
+ } else Nil
+ }
+
+ def listSubPathsByTypes(dirPath: String, subTypes: Iterable[String], fullPath: Boolean = false): Iterable[String] = {
+ subTypes.flatMap { subType =>
+ listSubPathsByType(dirPath, subType, fullPath)
}
}
+
+ def fileNameFromPath(filePath: String): String = {
+ val path = new Path(filePath)
+ path.getName
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala
new file mode 100644
index 0000000..7954b6d
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala
@@ -0,0 +1,164 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.utils
+
+object ParamUtil {
+
+ implicit class ParamMap(params: Map[String, Any]) {
+ def getAny(key: String, defValue: Any): Any = {
+ params.get(key) match {
+ case Some(v) => v
+ case _ => defValue
+ }
+ }
+
+ def getAnyRef[T](key: String, defValue: T)(implicit m: Manifest[T]): T = {
+ params.get(key) match {
+ case Some(v: T) => v
+ case _ => defValue
+ }
+ }
+
+ def getString(key: String, defValue: String): String = {
+ try {
+ params.get(key) match {
+ case Some(v: String) => v.toString
+ case Some(v) => v.toString
+ case _ => defValue
+ }
+ } catch {
+ case _: Throwable => defValue
+ }
+ }
+
+ def getByte(key: String, defValue: Byte): Byte = {
+ try {
+ params.get(key) match {
+ case Some(v: String) => v.toByte
+ case Some(v: Byte) => v.toByte
+ case Some(v: Short) => v.toByte
+ case Some(v: Int) => v.toByte
+ case Some(v: Long) => v.toByte
+ case Some(v: Float) => v.toByte
+ case Some(v: Double) => v.toByte
+ case _ => defValue
+ }
+ } catch {
+ case _: Throwable => defValue
+ }
+ }
+
+ def getShort(key: String, defValue: Short): Short = {
+ try {
+ params.get(key) match {
+ case Some(v: String) => v.toShort
+ case Some(v: Byte) => v.toShort
+ case Some(v: Short) => v.toShort
+ case Some(v: Int) => v.toShort
+ case Some(v: Long) => v.toShort
+ case Some(v: Float) => v.toShort
+ case Some(v: Double) => v.toShort
+ case _ => defValue
+ }
+ } catch {
+ case _: Throwable => defValue
+ }
+ }
+
+ def getInt(key: String, defValue: Int): Int = {
+ try {
+ params.get(key) match {
+ case Some(v: String) => v.toInt
+ case Some(v: Byte) => v.toInt
+ case Some(v: Short) => v.toInt
+ case Some(v: Int) => v.toInt
+ case Some(v: Long) => v.toInt
+ case Some(v: Float) => v.toInt
+ case Some(v: Double) => v.toInt
+ case _ => defValue
+ }
+ } catch {
+ case _: Throwable => defValue
+ }
+ }
+
+ def getLong(key: String, defValue: Long): Long = {
+ try {
+ params.get(key) match {
+ case Some(v: String) => v.toLong
+ case Some(v: Byte) => v.toLong
+ case Some(v: Short) => v.toLong
+ case Some(v: Int) => v.toLong
+ case Some(v: Long) => v.toLong
+ case Some(v: Float) => v.toLong
+ case Some(v: Double) => v.toLong
+ case _ => defValue
+ }
+ } catch {
+ case _: Throwable => defValue
+ }
+ }
+
+ def getFloat(key: String, defValue: Float): Float = {
+ try {
+ params.get(key) match {
+ case Some(v: String) => v.toFloat
+ case Some(v: Byte) => v.toFloat
+ case Some(v: Short) => v.toFloat
+ case Some(v: Int) => v.toFloat
+ case Some(v: Long) => v.toFloat
+ case Some(v: Float) => v.toFloat
+ case Some(v: Double) => v.toFloat
+ case _ => defValue
+ }
+ } catch {
+ case _: Throwable => defValue
+ }
+ }
+
+ def getDouble(key: String, defValue: Double): Double = {
+ try {
+ params.get(key) match {
+ case Some(v: String) => v.toDouble
+ case Some(v: Byte) => v.toDouble
+ case Some(v: Short) => v.toDouble
+ case Some(v: Int) => v.toDouble
+ case Some(v: Long) => v.toDouble
+ case Some(v: Float) => v.toDouble
+ case Some(v: Double) => v.toDouble
+ case _ => defValue
+ }
+ } catch {
+ case _: Throwable => defValue
+ }
+ }
+
+ def getBoolean(key: String, defValue: Boolean): Boolean = {
+ try {
+ params.get(key) match {
+ case Some(v: String) => v.toBoolean
+ case _ => defValue
+ }
+ } catch {
+ case _: Throwable => defValue
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala
index 0079d10..fe721d2 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala
@@ -22,8 +22,8 @@ import scala.util.{Failure, Success, Try}
object TimeUtil {
- final val TimeRegex = """([+\-]?\d+)(d|h|m|s|ms)""".r
- final val PureTimeRegex = """([+\-]?\d+)""".r
+ final val TimeRegex = """^([+\-]?\d+)(d|h|m|s|ms)$""".r
+ final val PureTimeRegex = """^([+\-]?\d+)$""".r
def milliseconds(timeString: String): Option[Long] = {
val value: Option[Long] = {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/resources/config-test-accuracy-streaming-multids.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/config-test-accuracy-streaming-multids.json b/measure/src/test/resources/config-test-accuracy-streaming-multids.json
new file mode 100644
index 0000000..18532b0
--- /dev/null
+++ b/measure/src/test/resources/config-test-accuracy-streaming-multids.json
@@ -0,0 +1,144 @@
+{
+ "name": "accu_streaming",
+
+ "process.type": "streaming",
+
+ "data.sources": [
+ {
+ "name": "source",
+ "connectors": [
+ {
+ "type": "kafka",
+ "version": "0.8",
+ "config": {
+ "kafka.config": {
+ "bootstrap.servers": "10.149.247.156:9092",
+ "group.id": "group1",
+ "auto.offset.reset": "smallest",
+ "auto.commit.enable": "false"
+ },
+ "topics": "sss",
+ "key.type": "java.lang.String",
+ "value.type": "java.lang.String"
+ },
+ "pre.proc": [
+ {
+ "dsl.type": "df-opr",
+ "name": "${s1}",
+ "rule": "from_json",
+ "persist.type": "cache",
+ "details": {
+ "df.name": "${this}"
+ }
+ },
+ {
+ "dsl.type": "spark-sql",
+ "name": "${this}",
+ "rule": "select name, age from ${s1}"
+ }
+ ]
+ },
+ {
+ "type": "text-dir",
+ "config": {
+ "dir.path": "hdfs://localhost/griffin/text",
+ "data.dir.depth": 0,
+ "success.file": "_SUCCESS",
+ "done.file": "_DONE"
+ },
+ "pre.proc": [
+ {
+ "dsl.type": "df-opr",
+ "name": "${s1}",
+ "rule": "from_json",
+ "persist.type": "cache",
+ "details": {
+ "df.name": "${this}"
+ }
+ },
+ {
+ "dsl.type": "spark-sql",
+ "name": "${this}",
+ "rule": "select name, age from ${s1}"
+ }
+ ]
+ }
+ ],
+ "cache": {
+ "file.path": "hdfs://localhost/griffin/streaming/dump/source",
+ "info.path": "source",
+ "ready.time.interval": "10s",
+ "ready.time.delay": "0",
+ "time.range": ["-2m", "0"]
+ }
+ }, {
+ "name": "target",
+ "connectors": [
+ {
+ "type": "kafka",
+ "version": "0.8",
+ "config": {
+ "kafka.config": {
+ "bootstrap.servers": "10.149.247.156:9092",
+ "group.id": "group1",
+ "auto.offset.reset": "smallest",
+ "auto.commit.enable": "false"
+ },
+ "topics": "ttt",
+ "key.type": "java.lang.String",
+ "value.type": "java.lang.String"
+ },
+ "pre.proc": [
+ {
+ "dsl.type": "df-opr",
+ "name": "${t1}",
+ "rule": "from_json",
+ "persist.type": "cache",
+ "details": {
+ "df.name": "${this}"
+ }
+ },
+ {
+ "dsl.type": "spark-sql",
+ "name": "${this}",
+ "rule": "select name, age from ${t1}"
+ }
+ ]
+ }
+ ],
+ "cache": {
+ "file.path": "hdfs://localhost/griffin/streaming/dump/target",
+ "info.path": "target",
+ "ready.time.interval": "10s",
+ "ready.time.delay": "0",
+ "time.range": ["-2m", "0"]
+ }
+ }
+ ],
+
+ "evaluateRule": {
+ "rules": [
+ {
+ "dsl.type": "griffin-dsl",
+ "dq.type": "accuracy",
+ "rule": "source.name = target.name and source.age = target.age",
+ "details": {
+ "source": "source",
+ "target": "target",
+ "miss.records": {
+ "name": "miss.records",
+ "persist.type": "record",
+ "update.data.source": "source"
+ },
+ "accuracy": {
+ "name": "accu",
+ "persist.type": "metric"
+ },
+ "miss": "miss_count",
+ "total": "total_count",
+ "matched": "matched_count"
+ }
+ }
+ ]
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/resources/config-test-accuracy-streaming.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/config-test-accuracy-streaming.json b/measure/src/test/resources/config-test-accuracy-streaming.json
new file mode 100644
index 0000000..276f8dd
--- /dev/null
+++ b/measure/src/test/resources/config-test-accuracy-streaming.json
@@ -0,0 +1,119 @@
+{
+ "name": "accu_streaming",
+
+ "process.type": "streaming",
+
+ "data.sources": [
+ {
+ "name": "source",
+ "connectors": [
+ {
+ "type": "kafka",
+ "version": "0.8",
+ "config": {
+ "kafka.config": {
+ "bootstrap.servers": "10.149.247.156:9092",
+ "group.id": "group1",
+ "auto.offset.reset": "smallest",
+ "auto.commit.enable": "false"
+ },
+ "topics": "sss",
+ "key.type": "java.lang.String",
+ "value.type": "java.lang.String"
+ },
+ "pre.proc": [
+ {
+ "dsl.type": "df-opr",
+ "name": "${s1}",
+ "rule": "from_json",
+ "persist.type": "cache",
+ "details": {
+ "df.name": "${this}"
+ }
+ },
+ {
+ "dsl.type": "spark-sql",
+ "name": "${this}",
+ "rule": "select name, age from ${s1}"
+ }
+ ]
+ }
+ ],
+ "cache": {
+ "file.path": "hdfs://localhost/griffin/streaming/dump/source",
+ "info.path": "source",
+ "ready.time.interval": "10s",
+ "ready.time.delay": "0",
+ "time.range": ["-2m", "0"]
+ }
+ }, {
+ "name": "target",
+ "connectors": [
+ {
+ "type": "kafka",
+ "version": "0.8",
+ "config": {
+ "kafka.config": {
+ "bootstrap.servers": "10.149.247.156:9092",
+ "group.id": "group1",
+ "auto.offset.reset": "smallest",
+ "auto.commit.enable": "false"
+ },
+ "topics": "ttt",
+ "key.type": "java.lang.String",
+ "value.type": "java.lang.String"
+ },
+ "pre.proc": [
+ {
+ "dsl.type": "df-opr",
+ "name": "${t1}",
+ "rule": "from_json",
+ "persist.type": "cache",
+ "details": {
+ "df.name": "${this}"
+ }
+ },
+ {
+ "dsl.type": "spark-sql",
+ "name": "${this}",
+ "rule": "select name, age from ${t1}"
+ }
+ ]
+ }
+ ],
+ "cache": {
+ "file.path": "hdfs://localhost/griffin/streaming/dump/target",
+ "info.path": "target",
+ "ready.time.interval": "10s",
+ "ready.time.delay": "0",
+ "time.range": ["-2m", "0"]
+ }
+ }
+ ],
+
+ "evaluateRule": {
+ "rules": [
+ {
+ "dsl.type": "griffin-dsl",
+ "dq.type": "accuracy",
+ "rule": "source.name = target.name and source.age = target.age",
+ "details": {
+ "source": "source",
+ "target": "target",
+ "miss.records": {
+ "name": "miss.records",
+ "persist.type": "record",
+ "update.data.source": "source"
+ },
+ "accuracy": {
+ "name": "accu",
+ "persist.type": "metric"
+ },
+ "miss": "miss_count",
+ "total": "total_count",
+ "matched": "matched_count"
+ }
+ }
+ ]
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/resources/config-test-accuracy.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/config-test-accuracy.json b/measure/src/test/resources/config-test-accuracy.json
new file mode 100644
index 0000000..ecbdaaa
--- /dev/null
+++ b/measure/src/test/resources/config-test-accuracy.json
@@ -0,0 +1,56 @@
+{
+ "name": "accu_batch_test",
+
+ "process.type": "batch",
+
+ "data.sources": [
+ {
+ "name": "src",
+ "connectors": [
+ {
+ "type": "avro",
+ "version": "1.7",
+ "config": {
+ "file.name": "src/test/resources/users_info_src.avro"
+ }
+ }
+ ]
+ }, {
+ "name": "tgt",
+ "connectors": [
+ {
+ "type": "avro",
+ "version": "1.7",
+ "config": {
+ "file.name": "src/test/resources/users_info_target.avro"
+ }
+ }
+ ]
+ }
+ ],
+
+ "evaluateRule": {
+ "rules": [
+ {
+ "dsl.type": "griffin-dsl",
+ "dq.type": "accuracy",
+ "rule": "src.user_id = tgt.user_id AND upper(src.first_name) = upper(tgt.first_name) AND src.last_name = tgt.last_name AND src.address = tgt.address AND src.email = tgt.email AND src.phone = tgt.phone AND src.post_code = tgt.post_code",
+ "details": {
+ "source": "src",
+ "target": "tgt",
+ "miss.records": {
+ "name": "miss.records",
+ "persist.type": "record"
+ },
+ "accuracy": {
+ "name": "accu",
+ "persist.type": "metric"
+ },
+ "miss": "miss_count",
+ "total": "total_count",
+ "matched": "matched_count"
+ }
+ }
+ ]
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/resources/config-test-profiling-streaming.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/config-test-profiling-streaming.json b/measure/src/test/resources/config-test-profiling-streaming.json
new file mode 100644
index 0000000..b2a74b8
--- /dev/null
+++ b/measure/src/test/resources/config-test-profiling-streaming.json
@@ -0,0 +1,68 @@
+{
+ "name": "prof_streaming",
+
+ "process.type": "streaming",
+
+ "data.sources": [
+ {
+ "name": "source",
+ "connectors": [
+ {
+ "type": "kafka",
+ "version": "0.8",
+ "config": {
+ "kafka.config": {
+ "bootstrap.servers": "10.149.247.156:9092",
+ "group.id": "group1",
+ "auto.offset.reset": "smallest",
+ "auto.commit.enable": "false"
+ },
+ "topics": "sss",
+ "key.type": "java.lang.String",
+ "value.type": "java.lang.String"
+ },
+ "pre.proc": [
+ {
+ "dsl.type": "df-opr",
+ "name": "${s1}",
+ "rule": "from_json",
+ "persist.type": "cache",
+ "details": {
+ "df.name": "${this}"
+ }
+ },
+ {
+ "dsl.type": "spark-sql",
+ "name": "${this}",
+ "rule": "select name, age from ${s1}"
+ }
+ ]
+ }
+ ],
+ "cache": {
+ "file.path": "hdfs://localhost/griffin/streaming/dump/source",
+ "info.path": "source",
+ "ready.time.interval": "10s",
+ "ready.time.delay": "0",
+ "time.range": ["0", "0"]
+ }
+ }
+ ],
+
+ "evaluateRule": {
+ "rules": [
+ {
+ "dsl.type": "griffin-dsl",
+ "dq.type": "profiling",
+ "rule": "source.name.count(), source.age.avg(), source.age.max(), source.age.min() group by source.name",
+ "details": {
+ "source": "source",
+ "profiling": {
+ "name": "prof",
+ "persist.type": "metric"
+ }
+ }
+ }
+ ]
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/resources/config-test-profiling.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/config-test-profiling.json b/measure/src/test/resources/config-test-profiling.json
new file mode 100644
index 0000000..187e88a
--- /dev/null
+++ b/measure/src/test/resources/config-test-profiling.json
@@ -0,0 +1,37 @@
+{
+ "name": "prof_batch_test",
+
+ "process.type": "batch",
+
+ "data.sources": [
+ {
+ "name": "source",
+ "connectors": [
+ {
+ "type": "avro",
+ "version": "1.7",
+ "config": {
+ "file.name": "src/test/resources/users_info_src.avro"
+ }
+ }
+ ]
+ }
+ ],
+
+ "evaluateRule": {
+ "rules": [
+ {
+ "dsl.type": "griffin-dsl",
+ "dq.type": "profiling",
+ "rule": "user_id as id, user_id.approx_count_distinct() as cnt group by user_id order by cnt desc, id desc limit 3",
+ "details": {
+ "source": "source",
+ "profiling": {
+ "name": "count",
+ "persist.type": "metric"
+ }
+ }
+ }
+ ]
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/resources/config-test.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/config-test.json b/measure/src/test/resources/config-test.json
new file mode 100644
index 0000000..23eb5ff
--- /dev/null
+++ b/measure/src/test/resources/config-test.json
@@ -0,0 +1,55 @@
+{
+ "name": "accu batch test",
+
+ "process.type": "batch",
+
+ "data.sources": [
+ {
+ "name": "source",
+ "connectors": [
+ {
+ "type": "avro",
+ "version": "1.7",
+ "config": {
+ "file.name": "src/test/resources/users_info_src.avro"
+ }
+ }
+ ]
+ }, {
+ "name": "target",
+ "connectors": [
+ {
+ "type": "avro",
+ "version": "1.7",
+ "config": {
+ "file.name": "src/test/resources/users_info_target.avro"
+ }
+ }
+ ]
+ }
+ ],
+
+ "evaluateRule": {
+ "rules": [
+ {
+ "dsl.type": "spark-sql",
+ "name": "miss.records",
+ "rule": "SELECT source.user_id, source.first_name, source.last_name, source.address, source.email, source.phone, source.post_code FROM source LEFT JOIN target ON coalesce(source.user_id, 'null') = coalesce(target.user_id, 'null') AND coalesce(source.first_name, 'null') = coalesce(target.first_name, 'null') AND coalesce(source.last_name, 'null') = coalesce(target.last_name, 'null') AND coalesce(source.address, 'null') = coalesce(target.address, 'null') AND coalesce(source.email, 'null') = coalesce(target.email, 'null') AND coalesce(source.phone, 'null') = coalesce(target.phone, 'null') AND coalesce(source.post_code, 'null') = coalesce(target.post_code, 'null') WHERE (NOT (source.user_id IS NULL AND source.first_name IS NULL AND source.last_name IS NULL AND source.address IS NULL AND source.email IS NULL AND source.phone IS NULL AND source.post_code IS NULL)) AND (target.user_id IS NULL AND target.first_name IS NULL AND target.last_name IS NULL AND target.address IS NULL AND t
arget.email IS NULL AND target.phone IS NULL AND target.post_code IS NULL)",
+ "persist.type": "record"
+ }, {
+ "dsl.type": "spark-sql",
+ "name": "miss",
+ "rule": "SELECT COUNT(*) AS `miss` FROM `miss.records`",
+ }, {
+ "dsl.type": "spark-sql",
+ "name": "total",
+ "rule": "SELECT COUNT(*) AS `total` FROM source",
+ }, {
+ "dsl.type": "spark-sql",
+ "name": "accuracy",
+ "rule": "SELECT `total`.`total` AS `total`, `miss`.`miss` AS `miss`, (`total`.`total` - `miss`.`miss`) AS `matched` FROM total JOIN miss",
+ "persist.type": "metric"
+ }
+ ]
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/resources/config-test1.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/config-test1.json b/measure/src/test/resources/config-test1.json
new file mode 100644
index 0000000..53a8765
--- /dev/null
+++ b/measure/src/test/resources/config-test1.json
@@ -0,0 +1,96 @@
+{
+ "name": "accu batch test",
+
+ "process.type": "batch",
+
+ "data.sources": [
+ {
+ "name": "source",
+ "connectors": [
+ {
+ "type": "hive",
+ "version": "1.2",
+ "config": {
+ "database": "default",
+ "table.name": "src"
+ }
+ }
+ ]
+ }, {
+ "name": "target",
+ "connectors": [
+ {
+ "type": "hive",
+ "version": "1.2",
+ "config": {
+ "database": "default",
+ "table.name": "tgt"
+ }
+ }
+ ]
+ }
+ ],
+
+ "evaluateRule": {
+ "rules": [
+ {
+ "dsl.type": "df-opr",
+ "name": "source",
+ "rule": "from_json",
+ "details": {
+ "df.name": "source"
+ }
+ },
+ {
+ "dsl.type": "spark-sql",
+ "name": "seeds",
+ "rule": "SELECT explode(seeds) as seed FROM source"
+ },
+ {
+ "dsl.type": "df-opr",
+ "name": "seeds",
+ "rule": "from_json",
+ "details": {
+ "df.name": "seeds",
+ "col.name": "seed"
+ }
+ },
+ {
+ "dsl.type": "spark-sql",
+ "name": "source",
+ "rule": "SELECT url, get_json_object(metadata, '$.tracker.crawlRequestCreateTS') AS ts FROM seeds"
+ },
+ {
+ "dsl.type": "spark-opr",
+ "name": "target",
+ "rule": "from_json(target.value)"
+ },
+ {
+ "dsl.type": "spark-sql",
+ "name": "attrs",
+ "rule": "SELECT groups[0].attrsList AS attrs FROM target"
+ },
+ {
+ "dsl.type": "spark-sql",
+ "name": "target",
+ "rule": "SELECT attrs.values[index_of(attrs.name, 'URL')][0] AS url, get_json_object(attrs.values[index_of(attrs.name, 'CRAWLMETADATA')][0], '$.tracker.crawlRequestCreateTS') AS ts FROM df2"
+ },
+ {
+ "dsl.type": "spark-sql",
+ "name": "miss.record",
+ "rule": "SELECT source.url, source.ts FROM source LEFT JOIN target ON coalesce(source.url, '') = coalesce(target.url, '') AND coalesce(source.ts, '') = coalesce(target.ts, '') WHERE (NOT (source.url IS NULL AND source.ts IS NULL)) AND (target.url IS NULL AND target.ts IS NULL)",
+ "persist.type": "record"
+ }, {
+ "dsl.type": "spark-sql",
+ "name": "miss.count",
+ "rule": "SELECT COUNT(*) AS `miss.count` FROM `miss.record`",
+ "persist.type": "metric"
+ }, {
+ "dsl.type": "spark-sql",
+ "name": "total.count",
+ "rule": "SELECT COUNT(*) AS `total.count` FROM source",
+ "persist.type": "metric"
+ }
+ ]
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/resources/config.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/config.json b/measure/src/test/resources/config.json
index 08a6021..0a17474 100644
--- a/measure/src/test/resources/config.json
+++ b/measure/src/test/resources/config.json
@@ -22,6 +22,6 @@
"evaluateRule": {
"sampleRatio": 1,
- "rules": "$source.user_id + 5 = $target.user_id + (2 + 3) AND $source.first_name + 12 = $target.first_name + (10 + 2) AND $source.last_name = $target.last_name AND $source.address = $target.address AND $source.email = $target.email AND $source.phone = $target.phone AND $source.post_code = $target.post_code WHEN $source.user_id > 10015"
+ "rules": "$source.user_id = $target.user_id AND $source.first_name = $target.first_name AND $source.last_name = $target.last_name AND $source.address = $target.address AND $source.email = $target.email AND $source.phone = $target.phone AND $source.post_code = $target.post_code"
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/resources/env-streaming.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/env-streaming.json b/measure/src/test/resources/env-streaming.json
index 42b4aa9..a01348f 100644
--- a/measure/src/test/resources/env-streaming.json
+++ b/measure/src/test/resources/env-streaming.json
@@ -5,6 +5,7 @@
"batch.interval": "2s",
"process.interval": "10s",
"config": {
+ "spark.master": "local[*]",
"spark.task.maxFailures": 5,
"spark.streaming.kafkaMaxRatePerPartition": 1000,
"spark.streaming.concurrentJobs": 4,
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/resources/env-test.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/env-test.json b/measure/src/test/resources/env-test.json
new file mode 100644
index 0000000..603fad8
--- /dev/null
+++ b/measure/src/test/resources/env-test.json
@@ -0,0 +1,38 @@
+{
+ "spark": {
+ "log.level": "WARN",
+ "checkpoint.dir": "hdfs:///griffin/batch/cp",
+ "batch.interval": "10s",
+ "process.interval": "10m",
+ "config": {
+ "spark.master": "local[*]"
+ }
+ },
+
+ "persist": [
+ {
+ "type": "log",
+ "config": {
+ "max.log.lines": 100
+ }
+ }
+ ],
+
+ "info.cache": [
+ {
+ "type": "zk",
+ "config": {
+ "hosts": "localhost:2181",
+ "namespace": "griffin/infocache",
+ "lock.path": "lock",
+ "mode": "persist",
+ "init.clear": true,
+ "close.clear": false
+ }
+ }
+ ],
+
+ "cleaner": {
+
+ }
+}
\ No newline at end of file