You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2017/09/30 08:35:26 UTC
[11/11] incubator-griffin git commit: Dsl modify
Dsl modify
dsl modified in measure module, with document in griffin-doc/dsl-guide.md
Author: Lionel Liu <bh...@163.com>
Closes #123 from bhlx3lyx7/dsl-modify.
Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/4aa6f779
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/4aa6f779
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/4aa6f779
Branch: refs/heads/master
Commit: 4aa6f779969650ee6d1871d342ad7e114b7ed4ce
Parents: ac8351f
Author: Lionel Liu <bh...@163.com>
Authored: Sat Sep 30 16:34:53 2017 +0800
Committer: William Guo <gu...@icloud.com>
Committed: Sat Sep 30 16:34:53 2017 +0800
----------------------------------------------------------------------
griffin-doc/dsl-guide.md | 83 +++
measure/derby.log | 13 +
measure/src/main/resources/config-old.json | 31 ++
measure/src/main/resources/config-sql.json | 54 ++
measure/src/main/resources/config.json | 71 ++-
.../apache/griffin/measure/Application.scala | 102 +++-
.../griffin/measure/algo/AccuracyAlgo.scala | 24 -
.../org/apache/griffin/measure/algo/Algo.scala | 34 --
.../griffin/measure/algo/MeasureType.scala | 26 -
.../griffin/measure/algo/ProcessType.scala | 26 -
.../griffin/measure/algo/ProfileAlgo.scala | 23 -
.../measure/algo/batch/BatchAccuracyAlgo.scala | 190 -------
.../measure/algo/batch/BatchProfileAlgo.scala | 162 ------
.../measure/algo/core/AccuracyCore.scala | 103 ----
.../griffin/measure/algo/core/ProfileCore.scala | 73 ---
.../algo/streaming/StreamingAccuracyAlgo.scala | 358 -------------
.../streaming/StreamingAccuracyProcess.scala | 234 --------
.../measure/algo/streaming/TimingProcess.scala | 46 --
.../measure/cache/info/TimeInfoCache.scala | 2 +-
.../cache/result/CacheResultProcesser.scala | 2 +-
.../config/params/user/DataCacheParam.scala | 31 --
.../config/params/user/DataConnectorParam.scala | 6 +-
.../config/params/user/DataSourceParam.scala | 31 ++
.../config/params/user/EvaluateRuleParam.scala | 4 +-
.../measure/config/params/user/UserParam.scala | 10 +-
.../measure/connector/DataConnector.scala | 32 --
.../connector/DataConnectorFactory.scala | 139 -----
.../connector/cache/CacheDataConnector.scala | 33 --
.../measure/connector/cache/DataCacheable.scala | 86 ---
.../measure/connector/cache/DataUpdatable.scala | 30 --
.../cache/HiveCacheDataConnector.scala | 351 ------------
.../cache/TextCacheDataConnector.scala | 311 -----------
.../direct/AvroDirectDataConnector.scala | 132 -----
.../connector/direct/DirectDataConnector.scala | 34 --
.../direct/HiveDirectDataConnector.scala | 158 ------
.../direct/KafkaCacheDirectDataConnector.scala | 125 -----
.../StreamingCacheDirectDataConnector.scala | 60 ---
.../streaming/KafkaStreamingDataConnector.scala | 58 --
.../streaming/StreamingDataConnector.scala | 34 --
.../measure/data/connector/DataConnector.scala | 114 ++++
.../data/connector/DataConnectorFactory.scala | 150 ++++++
.../batch/AvroBatchDataConnector.scala | 112 ++++
.../connector/batch/BatchDataConnector.scala | 35 ++
.../batch/HiveBatchDataConnector.scala | 149 ++++++
.../batch/KafkaCacheDirectDataConnector.scala | 125 +++++
.../StreamingCacheDirectDataConnector.scala | 60 +++
.../batch/TextDirBatchDataConnector.scala | 136 +++++
.../connector/cache/CacheDataConnector.scala | 33 ++
.../data/connector/cache/DataCacheable.scala | 86 +++
.../data/connector/cache/DataUpdatable.scala | 30 ++
.../cache/HiveCacheDataConnector.scala | 351 ++++++++++++
.../cache/TextCacheDataConnector.scala | 311 +++++++++++
.../streaming/KafkaStreamingDataConnector.scala | 70 +++
.../KafkaStreamingStringDataConnector.scala | 65 +++
.../streaming/StreamingDataConnector.scala | 43 ++
.../measure/data/source/DataCacheable.scala | 76 +++
.../measure/data/source/DataSource.scala | 109 ++++
.../measure/data/source/DataSourceCache.scala | 347 ++++++++++++
.../measure/data/source/DataSourceFactory.scala | 80 +++
.../griffin/measure/persist/HdfsPersist.scala | 240 ++++++---
.../griffin/measure/persist/HttpPersist.scala | 64 ++-
.../griffin/measure/persist/LoggerPersist.scala | 173 +++---
.../griffin/measure/persist/MultiPersists.scala | 14 +-
.../measure/persist/OldHttpPersist.scala | 174 +++---
.../griffin/measure/persist/Persist.scala | 24 +-
.../measure/persist/PersistFactory.scala | 4 +-
.../apache/griffin/measure/process/Algo.scala | 34 ++
.../measure/process/BatchDqProcess.scala | 117 ++++
.../griffin/measure/process/DqProcess.scala | 40 ++
.../griffin/measure/process/ProcessType.scala | 47 ++
.../measure/process/StreamingDqProcess.scala | 157 ++++++
.../measure/process/StreamingDqThread.scala | 185 +++++++
.../griffin/measure/process/TimingProcess.scala | 46 ++
.../measure/process/check/DataChecker.scala | 29 +
.../process/engine/DataFrameOprEngine.scala | 165 ++++++
.../measure/process/engine/DqEngine.scala | 41 ++
.../process/engine/DqEngineFactory.scala | 47 ++
.../measure/process/engine/DqEngines.scala | 208 ++++++++
.../measure/process/engine/SparkDqEngine.scala | 167 ++++++
.../process/engine/SparkRowFormatter.scala | 62 +++
.../measure/process/engine/SparkSqlEngine.scala | 58 ++
.../griffin/measure/rule/CalculationUtil.scala | 315 -----------
.../measure/rule/DataTypeCalculationUtil.scala | 159 ------
.../griffin/measure/rule/ExprValueUtil.scala | 263 ---------
.../griffin/measure/rule/RuleAnalyzer.scala | 72 ---
.../griffin/measure/rule/RuleFactory.scala | 52 --
.../griffin/measure/rule/RuleParser.scala | 244 ---------
.../measure/rule/SchemaValueCombineUtil.scala | 187 -------
.../measure/rule/adaptor/AdaptPhase.scala | 25 +
.../rule/adaptor/DataFrameOprAdaptor.scala | 44 ++
.../rule/adaptor/GriffinDslAdaptor.scala | 349 ++++++++++++
.../measure/rule/adaptor/RuleAdaptor.scala | 72 +++
.../measure/rule/adaptor/RuleAdaptorGroup.scala | 105 ++++
.../measure/rule/adaptor/SparkSqlAdaptor.scala | 54 ++
.../griffin/measure/rule/dsl/DqType.scala | 58 ++
.../griffin/measure/rule/dsl/DslType.scala | 58 ++
.../griffin/measure/rule/dsl/PersistType.scala | 58 ++
.../rule/dsl/analyzer/AccuracyAnalyzer.scala | 41 ++
.../rule/dsl/analyzer/BasicAnalyzer.scala | 53 ++
.../rule/dsl/analyzer/ProfilingAnalyzer.scala | 52 ++
.../measure/rule/dsl/expr/AliasableExpr.scala | 25 +
.../rule/dsl/expr/ClauseExpression.scala | 150 ++++++
.../griffin/measure/rule/dsl/expr/Expr.scala | 29 +
.../measure/rule/dsl/expr/FunctionExpr.scala | 29 +
.../measure/rule/dsl/expr/LiteralExpr.scala | 72 +++
.../measure/rule/dsl/expr/LogicalExpr.scala | 170 ++++++
.../measure/rule/dsl/expr/MathExpr.scala | 80 +++
.../measure/rule/dsl/expr/SelectExpr.scala | 115 ++++
.../measure/rule/dsl/expr/TreeNode.scala | 45 ++
.../measure/rule/dsl/parser/BasicParser.scala | 337 ++++++++++++
.../rule/dsl/parser/GriffinDslParser.scala | 50 ++
.../measure/rule/expr/AnalyzableExpr.scala | 24 -
.../griffin/measure/rule/expr/Cacheable.scala | 33 --
.../measure/rule/expr/Calculatable.scala | 25 -
.../griffin/measure/rule/expr/ClauseExpr.scala | 109 ----
.../measure/rule/expr/DataSourceable.scala | 28 -
.../griffin/measure/rule/expr/Describable.scala | 33 --
.../apache/griffin/measure/rule/expr/Expr.scala | 53 --
.../measure/rule/expr/ExprDescOnly.scala | 40 --
.../measure/rule/expr/ExprIdCounter.scala | 60 ---
.../measure/rule/expr/FieldDescOnly.scala | 58 --
.../griffin/measure/rule/expr/LiteralExpr.scala | 83 ---
.../griffin/measure/rule/expr/LogicalExpr.scala | 178 -------
.../griffin/measure/rule/expr/MathExpr.scala | 99 ----
.../griffin/measure/rule/expr/SelectExpr.scala | 88 ---
.../rule/func/DefaultFunctionDefine.scala | 36 --
.../measure/rule/func/FunctionDefine.scala | 25 -
.../measure/rule/func/FunctionUtil.scala | 75 ---
.../rule/preproc/PreProcRuleGenerator.scala | 72 +++
.../measure/rule/step/ConcreteRuleStep.scala | 37 ++
.../griffin/measure/rule/step/DfOprStep.scala | 29 +
.../measure/rule/step/GriffinDslStep.scala | 28 +
.../griffin/measure/rule/step/RuleStep.scala | 31 ++
.../measure/rule/step/SparkSqlStep.scala | 30 ++
.../griffin/measure/rule/udf/GriffinUdfs.scala | 33 ++
.../measure/utils/HdfsFileDumpUtil.scala | 2 +-
.../apache/griffin/measure/utils/HdfsUtil.scala | 71 ++-
.../griffin/measure/utils/ParamUtil.scala | 164 ++++++
.../apache/griffin/measure/utils/TimeUtil.scala | 4 +-
.../config-test-accuracy-streaming-multids.json | 144 +++++
.../config-test-accuracy-streaming.json | 119 +++++
.../test/resources/config-test-accuracy.json | 56 ++
.../config-test-profiling-streaming.json | 68 +++
.../test/resources/config-test-profiling.json | 37 ++
measure/src/test/resources/config-test.json | 55 ++
measure/src/test/resources/config-test1.json | 96 ++++
measure/src/test/resources/config.json | 2 +-
measure/src/test/resources/env-streaming.json | 1 +
measure/src/test/resources/env-test.json | 38 ++
measure/src/test/resources/input.msg | 1 +
measure/src/test/resources/output.msg | 1 +
measure/src/test/resources/test-data.jsonFile | 3 +
measure/src/test/resources/test-data0.json | 56 ++
measure/src/test/resources/test-data1.jsonFile | 31 ++
.../algo/batch/BatchAccuracyAlgoTest.scala | 198 -------
.../algo/batch/BatchProfileAlgoTest.scala | 173 ------
.../measure/algo/batch/DataFrameSaveTest.scala | 172 ------
.../measure/algo/core/AccuracyCoreTest.scala | 89 ----
.../measure/algo/core/ProfileCoreTest.scala | 79 ---
.../streaming/StreamingAccuracyAlgoTest.scala | 267 ----------
.../reader/ParamRawStringReaderTest.scala | 3 +-
.../measure/connector/ConnectorTest.scala | 70 ---
.../measure/data/connector/ConnectorTest.scala | 71 +++
.../measure/process/BatchProcessTest.scala | 146 +++++
.../griffin/measure/process/JsonParseTest.scala | 531 +++++++++++++++++++
.../griffin/measure/process/JsonToStructs.scala | 85 +++
.../measure/process/StreamingProcessTest.scala | 147 +++++
.../measure/rule/ExprValueUtilTest.scala | 86 ---
.../griffin/measure/rule/RuleAnalyzerTest.scala | 60 ---
.../griffin/measure/rule/RuleFactoryTest.scala | 44 --
.../griffin/measure/rule/RuleParserTest.scala | 213 --------
.../rule/adaptor/GriffinDslAdaptorTest.scala | 65 +++
.../rule/dsl/parser/BasicParserTest.scala | 205 +++++++
.../apache/griffin/measure/sql/SqlTest.scala | 125 +++++
.../griffin/measure/utils/HdfsUtilTest.scala | 132 +++++
.../griffin/measure/utils/JsonUtilTest.scala | 120 ++---
.../griffin/measure/utils/ParamUtilTest.scala | 50 ++
177 files changed, 9536 insertions(+), 7114 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/griffin-doc/dsl-guide.md
----------------------------------------------------------------------
diff --git a/griffin-doc/dsl-guide.md b/griffin-doc/dsl-guide.md
new file mode 100644
index 0000000..6a7b3f8
--- /dev/null
+++ b/griffin-doc/dsl-guide.md
@@ -0,0 +1,83 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+# Apache Griffin DSL Guide
+Griffin DSL is designed for DQ measurement, as a SQL-like language, trying to describe the DQ domain request.
+
+## Griffin DSL Syntax Description
+Griffin DSL is SQL-like, case insensitive, and easy to learn.
+
+### Supporting process
+- logical operation: not, and, or, in, between, like, is null, is nan, =, !=, <=, >=, <, >
+- mathematical operation: +, -, *, /, %
+- sql statement: as, where, group by, having, order by, limit
+
+
+### Keywords
+- `null, nan, true, false`
+- `not, and, or`
+- `in, between, like, is`
+- `as, where, group, by, having, order, desc, asc, limit`
+
+### Operators
+- `!, &&, ||, =, !=, <, >, <=, >=, <>`
+- `+, -, *, /, %`
+- `(, )`
+- `., [, ]`
+
+### Literals
+- **string**: any string surrounded with a pair of " or ', with escape charactor \ if any request.
+ e.g. `"test", 'string 1', "hello \" world \" "`
+- **number**: double or integer number.
+ e.g. `123, 33.5`
+- **time**: a integer with unit in a string, will be translated to a integer number in millisecond.
+ e.g. `3d, 5h, 4ms`
+- **boolean**: boolean value directly.
+ e.g. `true, false`
+
+### Selections
+- **selection head**: data source name.
+ e.g. `source, target`
+- **all field selection**: * or with data source name ahead.
+ e.g. `*, source.*, target.*`
+- **field selection**: field name or with data source name ahead.
+ e.g. `source.age, target.name, user_id`
+- **index selection**: interget between square brackets "[]" with field name ahead.
+ e.g. `source.attributes[3]`
+- **function selection**: function name with brackets "()", with field name ahead or not.
+ e.g. `count(*), *.count(), source.user_id.count(), max(source.age)`
+- **alias**: declare an alias after a selection.
+ e.g. `source.user_id as id, target.user_name as name`
+
+### Math expressions
+- **math factor**: literal or function or selection or math exression with brackets.
+ e.g. `123, max(1, 2, 3, 4), source.age, (source.age + 13)`
+- **unary math expression**: unary math operator with factor.
+ e.g. `-(100 - source.score)`
+- **binary math expression**: math factors with binary math operators.
+ e.g. `source.age + 13, score * 2 + ratio`
+
+### Logical expression
+- **in**: in clause like sql.
+ e.g. `source.country in ("USA", "CHN", "RSA")`
+- **between**: between clause like sql.
+ e.g. `source.age between 3 and 30, source.age between (3, 30)`
+- **like**: like clause like sql.
+ e.g. `source.name like "%abc%"`
+- **logical factor**:
+
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/derby.log
----------------------------------------------------------------------
diff --git a/measure/derby.log b/measure/derby.log
new file mode 100644
index 0000000..4b93055
--- /dev/null
+++ b/measure/derby.log
@@ -0,0 +1,13 @@
+----------------------------------------------------------------
+Fri Sep 29 15:53:18 CST 2017:
+Booting Derby version The Apache Software Foundation - Apache Derby - 10.10.2.0 - (1582446): instance a816c00e-015e-cca0-1a8b-00000f890648
+on database directory /private/var/folders/p0/462y3wrn4lv1fptxx5bwy7b839572r/T/spark-890ab6e2-ee56-4d73-8c6a-0dcce204322e/metastore with class loader sun.misc.Launcher$AppClassLoader@18b4aac2
+Loaded from file:/Users/lliu13/.m2/repository/org/apache/derby/derby/10.10.2.0/derby-10.10.2.0.jar
+java.vendor=Oracle Corporation
+java.runtime.version=1.8.0_101-b13
+user.dir=/Users/lliu13/git/incubator-griffin/measure
+os.name=Mac OS X
+os.arch=x86_64
+os.version=10.12.6
+derby.system.home=null
+Database Class Loader started - derby.database.classpath=''
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/resources/config-old.json
----------------------------------------------------------------------
diff --git a/measure/src/main/resources/config-old.json b/measure/src/main/resources/config-old.json
new file mode 100644
index 0000000..ab32b75
--- /dev/null
+++ b/measure/src/main/resources/config-old.json
@@ -0,0 +1,31 @@
+{
+ "name": "accu1",
+ "type": "accuracy",
+
+ "process.type": "batch",
+
+ "source": {
+ "type": "hive",
+ "version": "1.2",
+ "config": {
+ "database": "default",
+ "table.name": "users_info_src",
+ "partitions": "dt=23123, hour=432; dt=35464, hour=4657"
+ }
+ },
+
+ "target": {
+ "type": "hive",
+ "version": "1.2",
+ "config": {
+ "database": "default",
+ "table.name": "users_info_target",
+ "partitions": "dt=23123, hour=432; dt=35464, hour=4657"
+ }
+ },
+
+ "evaluateRule": {
+ "sampleRatio": 0.2,
+ "rules": "$source.user_id = $target.user_id AND $source.first_name = $target.first_name AND $source.last_name = $target.last_name AND $source.address = $target.address AND $source.email = $target.email AND $source.phone = $target.phone AND $source.post_code = $target.post_code"
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/resources/config-sql.json
----------------------------------------------------------------------
diff --git a/measure/src/main/resources/config-sql.json b/measure/src/main/resources/config-sql.json
new file mode 100644
index 0000000..aad9584
--- /dev/null
+++ b/measure/src/main/resources/config-sql.json
@@ -0,0 +1,54 @@
+{
+ "name": "accu1",
+
+ "process.type": "batch",
+
+ "data.sources": [
+ {
+ "name": "source",
+ "connectors": [
+ {
+ "type": "hive",
+ "version": "1.2",
+ "config": {
+ "database": "default",
+ "table.name": "users_info_src",
+ "partitions": "dt=23123, hour=432; dt=35464, hour=4657"
+ }
+ }
+ ]
+ }, {
+ "name": "target",
+ "connectors": [
+ {
+ "type": "hive",
+ "version": "1.2",
+ "config": {
+ "database": "default",
+ "table.name": "users_info_target",
+ "partitions": "dt=23123, hour=432; dt=35464, hour=4657"
+ }
+ }
+ ]
+ }
+ ],
+
+ "evaluateRule": {
+ "dsl.type": "spark-sql",
+ "rules": [
+ {
+ "name": "miss.record",
+ "rule": "SELECT source.name FROM source LEFT JOIN target ON coalesce(source.name, 'null') = coalesce(target.name, 'null') WHERE (NOT (source.name IS NULL)) AND (target.name IS NULL)",
+ "persist.type": "record"
+ }, {
+ "name": "miss.count",
+ "rule": "SELECT COUNT(*) FROM miss",
+ "persist.type": "metric"
+ }, {
+ "name": "total.count",
+ "rule": "SELECT COUNT(*) FROM source",
+ "persist.type": "metric"
+ }
+ ]
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/resources/config.json
----------------------------------------------------------------------
diff --git a/measure/src/main/resources/config.json b/measure/src/main/resources/config.json
index ab32b75..b6e5af9 100644
--- a/measure/src/main/resources/config.json
+++ b/measure/src/main/resources/config.json
@@ -1,31 +1,60 @@
{
"name": "accu1",
- "type": "accuracy",
"process.type": "batch",
- "source": {
- "type": "hive",
- "version": "1.2",
- "config": {
- "database": "default",
- "table.name": "users_info_src",
- "partitions": "dt=23123, hour=432; dt=35464, hour=4657"
+ "data.sources": [
+ {
+ "name": "source",
+ "connectors": [
+ {
+ "type": "hive",
+ "version": "1.2",
+ "config": {
+ "database": "default",
+ "table.name": "users_info_src",
+ "partitions": "dt=23123, hour=432; dt=35464, hour=4657"
+ }
+ }
+ ]
+ }, {
+ "name": "target",
+ "connectors": [
+ {
+ "type": "hive",
+ "version": "1.2",
+ "config": {
+ "database": "default",
+ "table.name": "users_info_target",
+ "partitions": "dt=23123, hour=432; dt=35464, hour=4657"
+ }
+ }
+ ]
}
- },
-
- "target": {
- "type": "hive",
- "version": "1.2",
- "config": {
- "database": "default",
- "table.name": "users_info_target",
- "partitions": "dt=23123, hour=432; dt=35464, hour=4657"
- }
- },
+ ],
"evaluateRule": {
- "sampleRatio": 0.2,
- "rules": "$source.user_id = $target.user_id AND $source.first_name = $target.first_name AND $source.last_name = $target.last_name AND $source.address = $target.address AND $source.email = $target.email AND $source.phone = $target.phone AND $source.post_code = $target.post_code"
+ "rules": [
+ {
+ "dsl.type": "griffin-dsl",
+ "dq.type": "accuracy",
+ "rule": "source.user_id = target.user_id AND source.first_name = target.first_name AND source.last_name = target.last_name AND source.address = target.address AND source.email = target.email AND source.phone = target.phone AND source.post_code = target.post_code",
+ "details": {
+ "source": "source",
+ "miss.record": {
+ "name": "miss.record",
+ "persist.type": "record"
+ },
+ "miss.count": {
+ "name": "miss.count",
+ "persist.type": "metric"
+ },
+ "total.count": {
+ "name": "total.count",
+ "persist.type": "metric"
+ }
+ }
+ }
+ ]
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/Application.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/Application.scala b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
index af8c830..edbb552 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/Application.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
@@ -18,9 +18,6 @@ under the License.
*/
package org.apache.griffin.measure
-import org.apache.griffin.measure.algo._
-import org.apache.griffin.measure.algo.batch._
-import org.apache.griffin.measure.algo.streaming._
import org.apache.griffin.measure.config.params._
import org.apache.griffin.measure.config.params.env._
import org.apache.griffin.measure.config.params.user._
@@ -28,6 +25,7 @@ import org.apache.griffin.measure.config.reader._
import org.apache.griffin.measure.config.validator.AllParamValidator
import org.apache.griffin.measure.log.Loggable
import org.apache.griffin.measure.persist.PersistThreadPool
+import org.apache.griffin.measure.process._
import scala.util.{Failure, Success, Try}
@@ -81,39 +79,91 @@ object Application extends Loggable {
}
// choose algorithm
- val dqType = allParam.userParam.dqType
- val procType = allParam.userParam.procType
- val algo: Algo = (dqType, procType) match {
- case (MeasureType.accuracy(), ProcessType.batch()) => BatchAccuracyAlgo(allParam)
- case (MeasureType.profile(), ProcessType.batch()) => BatchProfileAlgo(allParam)
- case (MeasureType.accuracy(), ProcessType.streaming()) => StreamingAccuracyAlgo(allParam)
-// case (MeasureType.profile(), ProcessType.streaming()) => StreamingProfileAlgo(allParam)
+// val dqType = allParam.userParam.dqType
+ val procType = ProcessType(allParam.userParam.procType)
+ val proc: DqProcess = procType match {
+ case BatchProcessType => BatchDqProcess(allParam)
+ case StreamingProcessType => StreamingDqProcess(allParam)
case _ => {
- error(s"${dqType} with ${procType} is unsupported dq type!")
+ error(s"${procType} is unsupported process type!")
sys.exit(-4)
}
}
- // algorithm run
- algo.run match {
+ // process init
+ proc.init match {
+ case Success(_) => {
+ info("process init success")
+ }
case Failure(ex) => {
- error(s"app error: ${ex.getMessage}")
-
- procType match {
- case ProcessType.streaming() => {
- // streaming need to attempt more times by spark streaming itself
- throw ex
- }
- case _ => {
- shutdown
- sys.exit(-5)
- }
+ error(s"process init error: ${ex.getMessage}")
+ shutdown
+ sys.exit(-5)
+ }
+ }
+
+ // process run
+ proc.run match {
+ case Success(_) => {
+ info("process run success")
+ }
+ case Failure(ex) => {
+ error(s"process run error: ${ex.getMessage}")
+
+ if (proc.retriable) {
+ throw ex
+ } else {
+ shutdown
+ sys.exit(-5)
}
}
- case _ => {
- info("app finished and success")
+ }
+
+ // process end
+ proc.end match {
+ case Success(_) => {
+ info("process end success")
+ }
+ case Failure(ex) => {
+ error(s"process end error: ${ex.getMessage}")
+ shutdown
+ sys.exit(-5)
}
}
+
+ shutdown
+
+// val algo: Algo = (dqType, procType) match {
+// case (MeasureType.accuracy(), ProcessType.batch()) => BatchAccuracyAlgo(allParam)
+// case (MeasureType.profile(), ProcessType.batch()) => BatchProfileAlgo(allParam)
+// case (MeasureType.accuracy(), ProcessType.streaming()) => StreamingAccuracyAlgo(allParam)
+//// case (MeasureType.profile(), ProcessType.streaming()) => StreamingProfileAlgo(allParam)
+// case _ => {
+// error(s"${dqType} with ${procType} is unsupported dq type!")
+// sys.exit(-4)
+// }
+// }
+
+ // algorithm run
+// algo.run match {
+// case Failure(ex) => {
+// error(s"app error: ${ex.getMessage}")
+//
+// procType match {
+// case ProcessType.streaming() => {
+// // streaming need to attempt more times by spark streaming itself
+// throw ex
+// }
+// case _ => {
+// shutdown
+// sys.exit(-5)
+// }
+// }
+// }
+// case _ => {
+// info("app finished and success")
+// }
+// }
}
private def readParamFile[T <: Param](file: String, fsType: String)(implicit m : Manifest[T]): Try[T] = {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/algo/AccuracyAlgo.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/algo/AccuracyAlgo.scala b/measure/src/main/scala/org/apache/griffin/measure/algo/AccuracyAlgo.scala
deleted file mode 100644
index 7e0a563..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/algo/AccuracyAlgo.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.algo
-
-
-trait AccuracyAlgo extends Algo {
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/algo/Algo.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/algo/Algo.scala b/measure/src/main/scala/org/apache/griffin/measure/algo/Algo.scala
deleted file mode 100644
index 82b71f1..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/algo/Algo.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.algo
-
-import org.apache.griffin.measure.config.params.env._
-import org.apache.griffin.measure.config.params.user._
-import org.apache.griffin.measure.log.Loggable
-
-import scala.util.Try
-
-trait Algo extends Loggable with Serializable {
-
- val envParam: EnvParam
- val userParam: UserParam
-
- def run(): Try[_]
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/algo/MeasureType.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/algo/MeasureType.scala b/measure/src/main/scala/org/apache/griffin/measure/algo/MeasureType.scala
deleted file mode 100644
index 23d4dac..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/algo/MeasureType.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.algo
-
-object MeasureType {
-
- val accuracy = """^(?i)accuracy$""".r
- val profile = """^(?i)profile$""".r
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/algo/ProcessType.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/algo/ProcessType.scala b/measure/src/main/scala/org/apache/griffin/measure/algo/ProcessType.scala
deleted file mode 100644
index 5a85c7c..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/algo/ProcessType.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.algo
-
-object ProcessType {
-
- val batch = """^(?i)batch$""".r
- val streaming = """^(?i)streaming$""".r
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/algo/ProfileAlgo.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/algo/ProfileAlgo.scala b/measure/src/main/scala/org/apache/griffin/measure/algo/ProfileAlgo.scala
deleted file mode 100644
index 6ffc87a..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/algo/ProfileAlgo.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.algo
-
-trait ProfileAlgo extends Algo {
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/algo/batch/BatchAccuracyAlgo.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/algo/batch/BatchAccuracyAlgo.scala b/measure/src/main/scala/org/apache/griffin/measure/algo/batch/BatchAccuracyAlgo.scala
deleted file mode 100644
index 241f456..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/algo/batch/BatchAccuracyAlgo.scala
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.algo.batch
-
-import java.util.Date
-
-import org.apache.griffin.measure.algo.AccuracyAlgo
-import org.apache.griffin.measure.algo.core.AccuracyCore
-import org.apache.griffin.measure.config.params.AllParam
-import org.apache.griffin.measure.connector._
-import org.apache.griffin.measure.connector.direct.DirectDataConnector
-import org.apache.griffin.measure.persist._
-import org.apache.griffin.measure.result._
-import org.apache.griffin.measure.rule._
-import org.apache.griffin.measure.rule.expr._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.hive.HiveContext
-import org.apache.spark.{SparkConf, SparkContext}
-
-import scala.util.{Failure, Success, Try}
-
-// accuracy algorithm for batch mode
-case class BatchAccuracyAlgo(allParam: AllParam) extends AccuracyAlgo {
- val envParam = allParam.envParam
- val userParam = allParam.userParam
-
- def run(): Try[_] = {
- Try {
- val metricName = userParam.name
-
- val sparkParam = envParam.sparkParam
-
- val conf = new SparkConf().setAppName(metricName)
- conf.setAll(sparkParam.config)
- val sc = new SparkContext(conf)
- sc.setLogLevel(sparkParam.logLevel)
- val sqlContext = new HiveContext(sc)
-
- // start time
- val startTime = new Date().getTime()
-
- // get persists to persist measure result
- val persist: Persist = PersistFactory(envParam.persistParams, metricName).getPersists(startTime)
-
- // get spark application id
- val applicationId = sc.applicationId
-
- // persist start id
- persist.start(applicationId)
-
- // generate rule from rule param, generate rule analyzer
- val ruleFactory = RuleFactory(userParam.evaluateRuleParam)
- val rule: StatementExpr = ruleFactory.generateRule()
- val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
-
- // const expr value map
- val constExprValueMap = ExprValueUtil.genExprValueMaps(None, ruleAnalyzer.constCacheExprs, Map[String, Any]())
- val finalConstExprValueMap = ExprValueUtil.updateExprValueMaps(ruleAnalyzer.constFinalCacheExprs, constExprValueMap)
- val finalConstMap = finalConstExprValueMap.headOption match {
- case Some(m) => m
- case _ => Map[String, Any]()
- }
-
- // data connector
- val sourceDataConnector: DirectDataConnector =
- DataConnectorFactory.getDirectDataConnector(sqlContext, null, userParam.sourceParam,
- ruleAnalyzer.sourceRuleExprs, finalConstMap
- ) match {
- case Success(cntr) => {
- if (cntr.available) cntr
- else throw new Exception("source data connection error!")
- }
- case Failure(ex) => throw ex
- }
- val targetDataConnector: DirectDataConnector =
- DataConnectorFactory.getDirectDataConnector(sqlContext, null, userParam.targetParam,
- ruleAnalyzer.targetRuleExprs, finalConstMap
- ) match {
- case Success(cntr) => {
- if (cntr.available) cntr
- else throw new Exception("target data connection error!")
- }
- case Failure(ex) => throw ex
- }
-
- // get metadata
-// val sourceMetaData: Iterable[(String, String)] = sourceDataConnector.metaData() match {
-// case Success(md) => md
-// case _ => throw new Exception("source metadata error!")
-// }
-// val targetMetaData: Iterable[(String, String)] = targetDataConnector.metaData() match {
-// case Success(md) => md
-// case _ => throw new Exception("target metadata error!")
-// }
-
- // get data
- val sourceData: RDD[(Product, (Map[String, Any], Map[String, Any]))] = sourceDataConnector.data() match {
- case Success(dt) => dt
- case Failure(ex) => throw ex
- }
- val targetData: RDD[(Product, (Map[String, Any], Map[String, Any]))] = targetDataConnector.data() match {
- case Success(dt) => dt
- case Failure(ex) => throw ex
- }
-
- // accuracy algorithm
- val (accuResult, missingRdd, matchedRdd) = accuracy(sourceData, targetData, ruleAnalyzer)
-
- // end time
- val endTime = new Date().getTime
- persist.log(endTime, s"calculation using time: ${endTime - startTime} ms")
-
- // persist result
- persist.result(endTime, accuResult)
- val missingRecords = missingRdd.map(record2String(_, ruleAnalyzer.sourceRuleExprs.persistExprs, ruleAnalyzer.targetRuleExprs.persistExprs))
-// persist.missRecords(missingRecords)
- persist.records(missingRecords, PersistType.MISS)
-
- // persist end time
- val persistEndTime = new Date().getTime
- persist.log(persistEndTime, s"persist using time: ${persistEndTime - endTime} ms")
-
- // finish
- persist.finish()
-
- // context stop
- sc.stop
-
- }
- }
-
- def wrapInitData(data: Map[String, Any]): (Map[String, Any], Map[String, Any]) = {
- (data, Map[String, Any]())
- }
-
- // calculate accuracy between source data and target data
- def accuracy(sourceData: RDD[(Product, (Map[String, Any], Map[String, Any]))],
- targetData: RDD[(Product, (Map[String, Any], Map[String, Any]))],
- ruleAnalyzer: RuleAnalyzer) = {
- // 1. cogroup
- val allKvs = sourceData.cogroup(targetData)
-
- // 2. accuracy calculation
- val (accuResult, missingRdd, matchedRdd) = AccuracyCore.accuracy(allKvs, ruleAnalyzer)
-
- (accuResult, missingRdd, matchedRdd)
- }
-
- // convert data into a string
- def record2String(rec: (Product, (Map[String, Any], Map[String, Any])), sourcePersist: Iterable[Expr], targetPersist: Iterable[Expr]): String = {
- val (key, (data, info)) = rec
- val persistData = getPersistMap(data, sourcePersist)
- val persistInfo = info.mapValues { value =>
- value match {
- case vd: Map[String, Any] => getPersistMap(vd, targetPersist)
- case v => v
- }
- }.map(identity)
- s"${persistData} [${persistInfo}]"
- }
-
- // get the expr value map of the persist expressions
- private def getPersistMap(data: Map[String, Any], persist: Iterable[Expr]): Map[String, Any] = {
- val persistMap = persist.map(e => (e._id, e.desc)).toMap
- data.flatMap { pair =>
- val (k, v) = pair
- persistMap.get(k) match {
- case Some(d) => Some((d -> v))
- case _ => None
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/algo/batch/BatchProfileAlgo.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/algo/batch/BatchProfileAlgo.scala b/measure/src/main/scala/org/apache/griffin/measure/algo/batch/BatchProfileAlgo.scala
deleted file mode 100644
index 163a0b7..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/algo/batch/BatchProfileAlgo.scala
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.algo.batch
-
-import java.util.Date
-
-import org.apache.griffin.measure.algo.ProfileAlgo
-import org.apache.griffin.measure.algo.core.ProfileCore
-import org.apache.griffin.measure.config.params._
-import org.apache.griffin.measure.connector._
-import org.apache.griffin.measure.connector.direct.DirectDataConnector
-import org.apache.griffin.measure.persist.{Persist, PersistFactory, PersistType}
-import org.apache.griffin.measure.result._
-import org.apache.griffin.measure.rule.expr._
-import org.apache.griffin.measure.rule.{ExprValueUtil, RuleAnalyzer, RuleFactory}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.hive.HiveContext
-import org.apache.spark.{SparkConf, SparkContext}
-
-import scala.util.{Failure, Success, Try}
-
-// profile algorithm for batch mode
-case class BatchProfileAlgo(allParam: AllParam) extends ProfileAlgo {
- val envParam = allParam.envParam
- val userParam = allParam.userParam
-
- def run(): Try[_] = {
- Try {
- val metricName = userParam.name
-
- val sparkParam = envParam.sparkParam
-
- val conf = new SparkConf().setAppName(metricName)
- conf.setAll(sparkParam.config)
- val sc = new SparkContext(conf)
- sc.setLogLevel(sparkParam.logLevel)
- val sqlContext = new HiveContext(sc)
-
- // start time
- val startTime = new Date().getTime()
-
- // get persists to persist measure result
- val persist: Persist = PersistFactory(envParam.persistParams, metricName).getPersists(startTime)
-
- // get spark application id
- val applicationId = sc.applicationId
-
- // persist start id
- persist.start(applicationId)
-
- // generate rule from rule param, generate rule analyzer
- val ruleFactory = RuleFactory(userParam.evaluateRuleParam)
- val rule: StatementExpr = ruleFactory.generateRule()
- val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
-
- // const expr value map
- val constExprValueMap = ExprValueUtil.genExprValueMaps(None, ruleAnalyzer.constCacheExprs, Map[String, Any]())
- val finalConstExprValueMap = ExprValueUtil.updateExprValueMaps(ruleAnalyzer.constFinalCacheExprs, constExprValueMap)
- val finalConstMap = finalConstExprValueMap.headOption match {
- case Some(m) => m
- case _ => Map[String, Any]()
- }
-
- // data connector
- val sourceDataConnector: DirectDataConnector =
- DataConnectorFactory.getDirectDataConnector(sqlContext, null, userParam.sourceParam,
- ruleAnalyzer.sourceRuleExprs, finalConstMap
- ) match {
- case Success(cntr) => {
- if (cntr.available) cntr
- else throw new Exception("source data connection error!")
- }
- case Failure(ex) => throw ex
- }
-
- // get metadata
- // val sourceMetaData: Iterable[(String, String)] = sourceDataConnector.metaData() match {
- // case Success(md) => md
- // case _ => throw new Exception("source metadata error!")
- // }
-
- // get data
- val sourceData: RDD[(Product, (Map[String, Any], Map[String, Any]))] = sourceDataConnector.data() match {
- case Success(dt) => dt
- case Failure(ex) => throw ex
- }
-
- // profile algorithm
- val (profileResult, missingRdd, matchedRdd) = profile(sourceData, ruleAnalyzer)
-
- // end time
- val endTime = new Date().getTime
- persist.log(endTime, s"calculation using time: ${endTime - startTime} ms")
-
- // persist result
- persist.result(endTime, profileResult)
- val matchedRecords = matchedRdd.map(record2String(_, ruleAnalyzer.sourceRuleExprs.persistExprs))
-// persist.matchRecords(matchedRecords)
- persist.records(matchedRecords, PersistType.MATCH)
-
- // persist end time
- val persistEndTime = new Date().getTime
- persist.log(persistEndTime, s"persist using time: ${persistEndTime - endTime} ms")
-
- // finish
- persist.finish()
-
- // context stop
- sc.stop
- }
- }
-
- def wrapInitData(data: Map[String, Any]): (Map[String, Any], Map[String, Any]) = {
- (data, Map[String, Any]())
- }
-
- // calculate profile from source data
- def profile(sourceData: RDD[(Product, (Map[String, Any], Map[String, Any]))], ruleAnalyzer: RuleAnalyzer
- ) = {
- // 1. profile calculation
- val (profileResult, missingRdd, matchedRdd) = ProfileCore.profile(sourceData, ruleAnalyzer)
-
- (profileResult, missingRdd, matchedRdd)
- }
-
- // convert data into a string
- def record2String(rec: (Product, (Map[String, Any], Map[String, Any])), sourcePersist: Iterable[Expr]): String = {
- val (key, (data, info)) = rec
- val persistData = getPersistMap(data, sourcePersist)
- val persistInfo = info
- if (persistInfo.size > 0) s"${persistData} [${persistInfo}]" else s"${persistData}"
- }
-
- // get the expr value map of the persist expressions
- private def getPersistMap(data: Map[String, Any], persist: Iterable[Expr]): Map[String, Any] = {
- val persistMap = persist.map(e => (e._id, e.desc)).toMap
- data.flatMap { pair =>
- val (k, v) = pair
- persistMap.get(k) match {
- case Some(d) => Some((d -> v))
- case _ => None
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/algo/core/AccuracyCore.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/algo/core/AccuracyCore.scala b/measure/src/main/scala/org/apache/griffin/measure/algo/core/AccuracyCore.scala
deleted file mode 100644
index 4ec6505..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/algo/core/AccuracyCore.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.algo.core
-
-import org.apache.griffin.measure.rule.RuleAnalyzer
-import org.apache.griffin.measure.result._
-import org.apache.spark.rdd.RDD
-
-
-object AccuracyCore {
-
- type V = Map[String, Any]
- type T = Map[String, Any]
-
- // allKvs: rdd of (key, (List[(sourceData, sourceInfo)], List[(targetData, targetInfo)]))
- // output: accuracy result, missing source data rdd, matched source data rdd
- def accuracy(allKvs: RDD[(Product, (Iterable[(V, T)], Iterable[(V, T)]))], ruleAnalyzer: RuleAnalyzer
- ): (AccuracyResult, RDD[(Product, (V, T))], RDD[(Product, (V, T))]) = {
- val result: RDD[(Long, Long, List[(Product, (V, T))], List[(Product, (V, T))])] = allKvs.map { kv =>
- val (key, (sourceDatas, targetDatas)) = kv
-
- // result: (missCount, matchCount, missDataList, matchDataList)
- val rslt = sourceDatas.foldLeft((0L, 0L, List[(Product, (V, T))](), List[(Product, (V, T))]())) { (sr, sourcePair) =>
- val matchResult = if (targetDatas.isEmpty) {
- (false, Map[String, Any](MismatchInfo.wrap("no target")))
- } else {
- targetDatas.foldLeft((false, Map[String, Any]())) { (tr, targetPair) =>
- if (tr._1) tr
- else matchData(sourcePair, targetPair, ruleAnalyzer)
- }
- }
-
- if (matchResult._1) {
- val matchItem = (key, sourcePair)
- (sr._1, sr._2 + 1, sr._3, sr._4 :+ matchItem)
- } else {
- val missItem = (key, (sourcePair._1, sourcePair._2 ++ matchResult._2))
- (sr._1 + 1, sr._2, sr._3 :+ missItem, sr._4)
- }
- }
-
- rslt
- }
-
- val missRdd = result.flatMap(_._3)
- val matchRdd = result.flatMap(_._4)
-
- def seq(cnt: (Long, Long), rcd: (Long, Long, Any, Any)): (Long, Long) = {
- (cnt._1 + rcd._1, cnt._2 + rcd._2)
- }
- def comb(c1: (Long, Long), c2: (Long, Long)): (Long, Long) = {
- (c1._1 + c2._1, c1._2 + c2._2)
- }
- val countPair = result.aggregate((0L, 0L))(seq, comb)
-
- (AccuracyResult(countPair._1, (countPair._1 + countPair._2)), missRdd, matchRdd)
- }
-
- // try to match source and target data, return true if matched, false if unmatched, also with some matching info
- private def matchData(source: (V, T), target: (V, T), ruleAnalyzer: RuleAnalyzer): (Boolean, T) = {
-
- // 1. merge source and target cached data
- val mergedExprValueMap: Map[String, Any] = mergeExprValueMap(source, target)
-
- // 2. check valid
- if (ruleAnalyzer.rule.valid(mergedExprValueMap)) {
- // 3. substitute the cached data into statement, get the statement value
- val matched = ruleAnalyzer.rule.calculate(mergedExprValueMap) match {
- case Some(b: Boolean) => b
- case _ => false
- }
- // currently we can not get the mismatch reason, we need to add such information to figure out how it mismatches
- if (matched) (matched, Map[String, Any]())
- else (matched, Map[String, Any](MismatchInfo.wrap("not matched")))
- } else {
- (false, Map[String, Any](MismatchInfo.wrap("invalid to compare")))
- }
-
- }
-
-// private def when
-
- private def mergeExprValueMap(source: (V, T), target: (V, T)): Map[String, Any] = {
- source._1 ++ target._1
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/algo/core/ProfileCore.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/algo/core/ProfileCore.scala b/measure/src/main/scala/org/apache/griffin/measure/algo/core/ProfileCore.scala
deleted file mode 100644
index 2987f2f..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/algo/core/ProfileCore.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.algo.core
-
-import org.apache.griffin.measure.rule.RuleAnalyzer
-import org.apache.griffin.measure.result._
-import org.apache.spark.rdd.RDD
-
-
-object ProfileCore {
-
- type V = Map[String, Any]
- type T = Map[String, Any]
-
- // dataRdd: rdd of (key, (sourceData, sourceInfo))
- // output: accuracy result, missing source data rdd, matched source data rdd
- def profile(dataRdd: RDD[(Product, (V, T))], ruleAnalyzer: RuleAnalyzer
- ): (ProfileResult, RDD[(Product, (V, T))], RDD[(Product, (V, T))]) = {
-
- val resultRdd: RDD[((Product, (V, T)), Boolean)] = dataRdd.map { kv =>
- val (key, (data, info)) = kv
- val (matched, missInfo) = matchData((data, info), ruleAnalyzer)
- ((key, (data, info ++ missInfo)), matched)
- }
-
- val totalCount = resultRdd.count
- val matchRdd = resultRdd.filter(_._2).map(_._1)
- val matchCount = matchRdd.count
- val missRdd = resultRdd.filter(!_._2).map(_._1)
- val missCount = missRdd.count
-
- (ProfileResult(matchCount, totalCount), missRdd, matchRdd)
-
- }
-
- // try to match data as rule, return true if matched, false if unmatched
- private def matchData(dataPair: (V, T), ruleAnalyzer: RuleAnalyzer): (Boolean, T) = {
-
- val data: Map[String, Any] = dataPair._1
-
- // 1. check valid
- if (ruleAnalyzer.rule.valid(data)) {
- // 2. substitute the cached data into statement, get the statement value
- val matched = ruleAnalyzer.rule.calculate(data) match {
- case Some(b: Boolean) => b
- case _ => false
- }
- // currently we can not get the mismatch reason, we need to add such information to figure out how it mismatches
- if (matched) (matched, Map[String, Any]())
- else (matched, Map[String, Any](MismatchInfo.wrap("not matched")))
- } else {
- (false, Map[String, Any](MismatchInfo.wrap("invalid to compare")))
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/algo/streaming/StreamingAccuracyAlgo.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/algo/streaming/StreamingAccuracyAlgo.scala b/measure/src/main/scala/org/apache/griffin/measure/algo/streaming/StreamingAccuracyAlgo.scala
deleted file mode 100644
index bdac64e..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/algo/streaming/StreamingAccuracyAlgo.scala
+++ /dev/null
@@ -1,358 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.algo.streaming
-
-import java.util.Date
-import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit}
-
-import org.apache.griffin.measure.algo.AccuracyAlgo
-import org.apache.griffin.measure.algo.core.AccuracyCore
-import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache}
-import org.apache.griffin.measure.cache.result.CacheResultProcesser
-import org.apache.griffin.measure.config.params.AllParam
-import org.apache.griffin.measure.connector._
-import org.apache.griffin.measure.connector.direct.DirectDataConnector
-import org.apache.griffin.measure.persist.{Persist, PersistFactory, PersistType}
-import org.apache.griffin.measure.result.{AccuracyResult, MismatchInfo, TimeStampInfo}
-import org.apache.griffin.measure.rule.{ExprValueUtil, RuleAnalyzer, RuleFactory}
-import org.apache.griffin.measure.rule.expr._
-import org.apache.griffin.measure.utils.TimeUtil
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.hive.HiveContext
-import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}
-import org.apache.spark.{SparkConf, SparkContext}
-
-import scala.util.{Failure, Success, Try}
-
-
-case class StreamingAccuracyAlgo(allParam: AllParam) extends AccuracyAlgo {
- val envParam = allParam.envParam
- val userParam = allParam.userParam
-
- def run(): Try[_] = {
- Try {
- val metricName = userParam.name
-
- val sparkParam = envParam.sparkParam
-
- val conf = new SparkConf().setAppName(metricName)
- conf.setAll(sparkParam.config)
- val sc = new SparkContext(conf)
- sc.setLogLevel(sparkParam.logLevel)
- val sqlContext = new HiveContext(sc)
-// val sqlContext = new SQLContext(sc)
-
- val batchInterval = TimeUtil.milliseconds(sparkParam.batchInterval) match {
- case Some(interval) => Milliseconds(interval)
- case _ => throw new Exception("invalid batch interval")
- }
- val ssc = new StreamingContext(sc, batchInterval)
- ssc.checkpoint(sparkParam.cpDir)
-
- // init info cache instance
- InfoCacheInstance.initInstance(envParam.infoCacheParams, metricName)
- InfoCacheInstance.init
-
- // start time
- val startTime = new Date().getTime()
-
- val persistFactory = PersistFactory(envParam.persistParams, metricName)
-
- // get persists to persist measure result
- val appPersist: Persist = persistFactory.getPersists(startTime)
-
- // get spark application id
- val applicationId = sc.applicationId
-
- // persist start id
- appPersist.start(applicationId)
-
- // generate rule from rule param, generate rule analyzer
- val ruleFactory = RuleFactory(userParam.evaluateRuleParam)
- val rule: StatementExpr = ruleFactory.generateRule()
- val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
-
- // const expr value map
- val constExprValueMap = ExprValueUtil.genExprValueMaps(None, ruleAnalyzer.constCacheExprs, Map[String, Any]())
- val finalConstExprValueMap = ExprValueUtil.updateExprValueMaps(ruleAnalyzer.constFinalCacheExprs, constExprValueMap)
- val finalConstMap = finalConstExprValueMap.headOption match {
- case Some(m) => m
- case _ => Map[String, Any]()
- }
-
- // data connector
- val sourceDataConnector: DirectDataConnector =
- DataConnectorFactory.getDirectDataConnector(sqlContext, ssc, userParam.sourceParam,
- ruleAnalyzer.sourceRuleExprs, finalConstMap
- ) match {
- case Success(cntr) => {
- if (cntr.available) cntr
- else throw new Exception("source data connection error!")
- }
- case Failure(ex) => throw ex
- }
- val targetDataConnector: DirectDataConnector =
- DataConnectorFactory.getDirectDataConnector(sqlContext, ssc, userParam.targetParam,
- ruleAnalyzer.targetRuleExprs, finalConstMap
- ) match {
- case Success(cntr) => {
- if (cntr.available) cntr
- else throw new Exception("target data connection error!")
- }
- case Failure(ex) => throw ex
- }
-
- val cacheResultProcesser = CacheResultProcesser()
-
- // init data stream
- sourceDataConnector.init()
- targetDataConnector.init()
-
- val streamingAccuracyProcess = StreamingAccuracyProcess(
- sourceDataConnector, targetDataConnector,
- ruleAnalyzer, cacheResultProcesser, persistFactory, appPersist)
-
- // process thread
-// case class Process() extends Runnable {
-// val lock = InfoCacheInstance.genLock("process")
-// def run(): Unit = {
-// val updateTime = new Date().getTime
-// val locked = lock.lock(5, TimeUnit.SECONDS)
-// if (locked) {
-// try {
-// val st = new Date().getTime
-//
-// TimeInfoCache.startTimeInfoCache
-//
-// // get data
-// val sourceData = sourceDataConnector.data match {
-// case Success(dt) => dt
-// case Failure(ex) => throw ex
-// }
-// val targetData = targetDataConnector.data match {
-// case Success(dt) => dt
-// case Failure(ex) => throw ex
-// }
-//
-// sourceData.cache
-// targetData.cache
-//
-// println(s"sourceData.count: ${sourceData.count}")
-// println(s"targetData.count: ${targetData.count}")
-//
-// // accuracy algorithm
-// val (accuResult, missingRdd, matchedRdd) = accuracy(sourceData, targetData, ruleAnalyzer)
-// println(s"accuResult: ${accuResult}")
-//
-// val ct = new Date().getTime
-// appPersist.log(ct, s"calculation using time: ${ct - st} ms")
-//
-// sourceData.unpersist()
-// targetData.unpersist()
-//
-// // result of every group
-// val matchedGroups = reorgByTimeGroup(matchedRdd)
-// val matchedGroupCount = matchedGroups.count
-// println(s"===== matchedGroupCount: ${matchedGroupCount} =====")
-//
-// // get missing results
-// val missingGroups = reorgByTimeGroup(missingRdd)
-// val missingGroupCount = missingGroups.count
-// println(s"===== missingGroupCount: ${missingGroupCount} =====")
-//
-// val groups = matchedGroups.cogroup(missingGroups)
-// val groupCount = groups.count
-// println(s"===== groupCount: ${groupCount} =====")
-//
-// val updateResults = groups.flatMap { group =>
-// val (t, (matchData, missData)) = group
-//
-// val matchSize = matchData.size
-// val missSize = missData.size
-// val res = AccuracyResult(missSize, matchSize + missSize)
-//
-// val updatedCacheResultOpt = cacheResultProcesser.genUpdateCacheResult(t, updateTime, res)
-//
-// updatedCacheResultOpt.flatMap { updatedCacheResult =>
-// Some((updatedCacheResult, (t, missData)))
-// }
-// }
-//
-// updateResults.cache
-//
-// val updateResultsPart = updateResults.map(_._1)
-// val updateDataPart = updateResults.map(_._2)
-//
-// val updateResultsArray = updateResultsPart.collect()
-//
-// // update results cache (in driver)
-// // collect action is traversable once action, it will make rdd updateResults empty
-// updateResultsArray.foreach { updateResult =>
-// println(s"update result: ${updateResult}")
-// cacheResultProcesser.update(updateResult)
-// // persist result
-// val persist: Persist = persistFactory.getPersists(updateResult.timeGroup)
-// persist.result(updateTime, updateResult.result)
-// }
-//
-// // record missing data and update old data (in executor)
-// updateDataPart.foreach { grp =>
-// val (t, datas) = grp
-// val persist: Persist = persistFactory.getPersists(t)
-// // persist missing data
-// val missStrings = datas.map { row =>
-// val (_, (value, info)) = row
-// s"${value} [${info.getOrElse(MismatchInfo.key, "unknown")}]"
-// }
-// persist.records(missStrings, PersistType.MISS)
-// // data connector update old data
-// val dumpDatas = datas.map { r =>
-// val (_, (v, i)) = r
-// v ++ i
-// }
-//
-// println(t)
-// dumpDatas.foreach(println)
-//
-// sourceDataConnector.updateOldData(t, dumpDatas)
-// targetDataConnector.updateOldData(t, dumpDatas) // not correct
-// }
-//
-// updateResults.unpersist()
-//
-// // dump missing rdd (this part not need for future version, only for current df cache data version)
-// val dumpRdd: RDD[Map[String, Any]] = missingRdd.map { r =>
-// val (_, (v, i)) = r
-// v ++ i
-// }
-// sourceDataConnector.updateAllOldData(dumpRdd)
-// targetDataConnector.updateAllOldData(dumpRdd) // not correct
-//
-// TimeInfoCache.endTimeInfoCache
-//
-// val et = new Date().getTime
-// appPersist.log(et, s"persist using time: ${et - ct} ms")
-//
-// } catch {
-// case e: Throwable => error(s"process error: ${e.getMessage}")
-// } finally {
-// lock.unlock()
-// }
-// }
-// }
-// }
-
- val processInterval = TimeUtil.milliseconds(sparkParam.processInterval) match {
- case Some(interval) => interval
- case _ => throw new Exception("invalid batch interval")
- }
- val process = TimingProcess(processInterval, streamingAccuracyProcess)
-
- // clean thread
-// case class Clean() extends Runnable {
-// val lock = InfoCacheInstance.genLock("clean")
-// def run(): Unit = {
-// val locked = lock.lock(5, TimeUnit.SECONDS)
-// if (locked) {
-// try {
-// sourceDataConnector.cleanData
-// targetDataConnector.cleanData
-// } finally {
-// lock.unlock()
-// }
-// }
-// }
-// }
-// val cleanInterval = TimeUtil.milliseconds(cleanerParam.cleanInterval) match {
-// case Some(interval) => interval
-// case _ => throw new Exception("invalid batch interval")
-// }
-// val clean = TimingProcess(cleanInterval, Clean())
-
- process.startup()
-// clean.startup()
-
- ssc.start()
- ssc.awaitTermination()
- ssc.stop(stopSparkContext=true, stopGracefully=true)
-
- // context stop
- sc.stop
-
- InfoCacheInstance.close
-
- appPersist.finish()
-
- process.shutdown()
-// clean.shutdown()
- }
- }
-
- // calculate accuracy between source data and target data
-// def accuracy(sourceData: RDD[(Product, (Map[String, Any], Map[String, Any]))],
-// targetData: RDD[(Product, (Map[String, Any], Map[String, Any]))],
-// ruleAnalyzer: RuleAnalyzer) = {
-// // 1. cogroup
-// val allKvs = sourceData.cogroup(targetData)
-//
-// // 2. accuracy calculation
-// val (accuResult, missingRdd, matchedRdd) = AccuracyCore.accuracy(allKvs, ruleAnalyzer)
-//
-// (accuResult, missingRdd, matchedRdd)
-// }
-
-// // convert data into a string
-// def record2String(rec: (Product, (Map[String, Any], Map[String, Any])), sourcePersist: Iterable[Expr], targetPersist: Iterable[Expr]): String = {
-// val (key, (data, info)) = rec
-// val persistData = getPersistMap(data, sourcePersist)
-// val persistInfo = info.mapValues { value =>
-// value match {
-// case vd: Map[String, Any] => getPersistMap(vd, targetPersist)
-// case v => v
-// }
-// }.map(identity)
-// s"${persistData} [${persistInfo}]"
-// }
-//
-// // get the expr value map of the persist expressions
-// private def getPersistMap(data: Map[String, Any], persist: Iterable[Expr]): Map[String, Any] = {
-// val persistMap = persist.map(e => (e._id, e.desc)).toMap
-// data.flatMap { pair =>
-// val (k, v) = pair
-// persistMap.get(k) match {
-// case Some(d) => Some((d -> v))
-// case _ => None
-// }
-// }
-// }
-
-// def reorgByTimeGroup(rdd: RDD[(Product, (Map[String, Any], Map[String, Any]))]
-// ): RDD[(Long, (Product, (Map[String, Any], Map[String, Any])))] = {
-// rdd.flatMap { row =>
-// val (key, (value, info)) = row
-// val b: Option[(Long, (Product, (Map[String, Any], Map[String, Any])))] = info.get(TimeStampInfo.key) match {
-// case Some(t: Long) => Some((t, row))
-// case _ => None
-// }
-// b
-// }
-// }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/algo/streaming/StreamingAccuracyProcess.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/algo/streaming/StreamingAccuracyProcess.scala b/measure/src/main/scala/org/apache/griffin/measure/algo/streaming/StreamingAccuracyProcess.scala
deleted file mode 100644
index be1f846..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/algo/streaming/StreamingAccuracyProcess.scala
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.algo.streaming
-
-import java.util.Date
-import java.util.concurrent.TimeUnit
-
-import org.apache.griffin.measure.algo.core.AccuracyCore
-import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache}
-import org.apache.griffin.measure.cache.result.CacheResultProcesser
-import org.apache.griffin.measure.connector.direct.DirectDataConnector
-import org.apache.griffin.measure.log.Loggable
-import org.apache.griffin.measure.persist._
-import org.apache.griffin.measure.result.{AccuracyResult, MismatchInfo, TimeStampInfo}
-import org.apache.griffin.measure.rule._
-import org.apache.griffin.measure.rule.expr._
-import org.apache.spark.rdd.RDD
-
-import scala.util.{Failure, Success}
-
-case class StreamingAccuracyProcess(sourceDataConnector: DirectDataConnector,
- targetDataConnector: DirectDataConnector,
- ruleAnalyzer: RuleAnalyzer,
- cacheResultProcesser: CacheResultProcesser,
- persistFactory: PersistFactory,
- appPersist: Persist
- ) extends Runnable with Loggable {
-
- val lock = InfoCacheInstance.genLock("process")
-
- def run(): Unit = {
-// println(s"cache count: ${cacheResultProcesser.cacheGroup.size}")
- val updateTimeDate = new Date()
- val updateTime = updateTimeDate.getTime
- println(s"===== [${updateTimeDate}] process begins =====")
- val locked = lock.lock(5, TimeUnit.SECONDS)
- if (locked) {
- try {
- val st = new Date().getTime
-
- TimeInfoCache.startTimeInfoCache
-
- // get data
- val sourceData = sourceDataConnector.data match {
- case Success(dt) => dt
- case Failure(ex) => throw ex
- }
- val targetData = targetDataConnector.data match {
- case Success(dt) => dt
- case Failure(ex) => throw ex
- }
-
- sourceData.cache
- targetData.cache
-
- println(s"sourceData.count: ${sourceData.count}")
- println(s"targetData.count: ${targetData.count}")
-
- // accuracy algorithm
- val (accuResult, missingRdd, matchedRdd) = accuracy(sourceData, targetData, ruleAnalyzer)
-// println(s"accuResult: ${accuResult}")
-
- val ct = new Date().getTime
- appPersist.log(ct, s"calculation using time: ${ct - st} ms")
-
- sourceData.unpersist()
- targetData.unpersist()
-
- // result of every group
- val matchedGroups = reorgByTimeGroup(matchedRdd)
-// val matchedGroupCount = matchedGroups.count
-// println(s"===== matchedGroupCount: ${matchedGroupCount} =====")
-
- // get missing results
- val missingGroups = reorgByTimeGroup(missingRdd)
-// val missingGroupCount = missingGroups.count
-// println(s"===== missingGroupCount: ${missingGroupCount} =====")
-
- val groups = matchedGroups.cogroup(missingGroups)
-// val groupCount = groups.count
-// println(s"===== groupCount: ${groupCount} =====")
-
- val updateResults = groups.flatMap { group =>
- val (t, (matchData, missData)) = group
-
- val matchSize = matchData.size
- val missSize = missData.size
- val res = AccuracyResult(missSize, matchSize + missSize)
-
- val updatedCacheResultOpt = cacheResultProcesser.genUpdateCacheResult(t, updateTime, res)
-
- updatedCacheResultOpt.flatMap { updatedCacheResult =>
- Some((updatedCacheResult, (t, missData)))
- }
- }
-
- updateResults.cache
-
- val updateResultsPart = updateResults.map(_._1)
- val updateDataPart = updateResults.map(_._2)
-
- val updateResultsArray = updateResultsPart.collect()
-
- // update results cache (in driver)
- // collect action is traversable once action, it will make rdd updateResults empty
- updateResultsArray.foreach { updateResult =>
-// println(s"update result: ${updateResult}")
- cacheResultProcesser.update(updateResult)
- // persist result
- val persist: Persist = persistFactory.getPersists(updateResult.timeGroup)
- persist.result(updateTime, updateResult.result)
- }
-
- // record missing data and dump old data (in executor)
- updateDataPart.foreach { grp =>
- val (t, datas) = grp
- val persist: Persist = persistFactory.getPersists(t)
- // persist missing data
- val missStrings = datas.map { row =>
- record2String(row, ruleAnalyzer.sourceRuleExprs.persistExprs, ruleAnalyzer.targetRuleExprs.persistExprs)
- }
- persist.records(missStrings, PersistType.MISS)
-
- // data connector update old data
- val dumpDatas = datas.map { r =>
- val (_, (v, i)) = r
- v ++ i
- }
- sourceDataConnector.updateOldData(t, dumpDatas)
-// targetDataConnector.updateOldData(t, dumpDatas) // not correct
- }
-
- updateResults.unpersist()
-
- TimeInfoCache.endTimeInfoCache
-
- // clean old data
- cleanData()
-
- val et = new Date().getTime
- appPersist.log(et, s"persist using time: ${et - ct} ms")
-
- } catch {
- case e: Throwable => error(s"process error: ${e.getMessage}")
- } finally {
- lock.unlock()
- }
- } else {
- println(s"===== [${updateTimeDate}] process ignores =====")
- }
- val endTime = new Date().getTime
- println(s"===== [${updateTimeDate}] process ends, using ${endTime - updateTime} ms =====")
- }
-
- // clean old data and old result cache
- def cleanData(): Unit = {
- try {
- sourceDataConnector.cleanOldData
- targetDataConnector.cleanOldData
-
- val cleanTime = TimeInfoCache.getCleanTime
- cacheResultProcesser.refresh(cleanTime)
- } catch {
- case e: Throwable => error(s"clean data error: ${e.getMessage}")
- }
- }
-
- // calculate accuracy between source data and target data
- private def accuracy(sourceData: RDD[(Product, (Map[String, Any], Map[String, Any]))],
- targetData: RDD[(Product, (Map[String, Any], Map[String, Any]))],
- ruleAnalyzer: RuleAnalyzer) = {
- // 1. cogroup
- val allKvs = sourceData.cogroup(targetData)
-
- // 2. accuracy calculation
- val (accuResult, missingRdd, matchedRdd) = AccuracyCore.accuracy(allKvs, ruleAnalyzer)
-
- (accuResult, missingRdd, matchedRdd)
- }
-
- private def reorgByTimeGroup(rdd: RDD[(Product, (Map[String, Any], Map[String, Any]))]
- ): RDD[(Long, (Product, (Map[String, Any], Map[String, Any])))] = {
- rdd.flatMap { row =>
- val (key, (value, info)) = row
- val b: Option[(Long, (Product, (Map[String, Any], Map[String, Any])))] = info.get(TimeStampInfo.key) match {
- case Some(t: Long) => Some((t, row))
- case _ => None
- }
- b
- }
- }
-
- // convert data into a string
- def record2String(rec: (Product, (Map[String, Any], Map[String, Any])), dataPersist: Iterable[Expr], infoPersist: Iterable[Expr]): String = {
- val (key, (data, info)) = rec
- val persistData = getPersistMap(data, dataPersist)
- val persistInfo = info.mapValues { value =>
- value match {
- case vd: Map[String, Any] => getPersistMap(vd, infoPersist)
- case v => v
- }
- }.map(identity)
- s"${persistData} [${persistInfo}]"
- }
-
- // get the expr value map of the persist expressions
- private def getPersistMap(data: Map[String, Any], persist: Iterable[Expr]): Map[String, Any] = {
- val persistMap = persist.map(e => (e._id, e.desc)).toMap
- data.flatMap { pair =>
- val (k, v) = pair
- persistMap.get(k) match {
- case Some(d) => Some((d -> v))
- case _ => None
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/algo/streaming/TimingProcess.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/algo/streaming/TimingProcess.scala b/measure/src/main/scala/org/apache/griffin/measure/algo/streaming/TimingProcess.scala
deleted file mode 100644
index e5bd7de..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/algo/streaming/TimingProcess.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.algo.streaming
-
-import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit}
-import java.util.{Timer, TimerTask}
-
-case class TimingProcess(interval: Long, runnable: Runnable) {
-
- val pool: ThreadPoolExecutor = Executors.newFixedThreadPool(5).asInstanceOf[ThreadPoolExecutor]
-
- val timer = new Timer("process", true)
-
- val timerTask = new TimerTask() {
- override def run(): Unit = {
- pool.submit(runnable)
- }
- }
-
- def startup(): Unit = {
- timer.schedule(timerTask, interval, interval)
- }
-
- def shutdown(): Unit = {
- timer.cancel()
- pool.shutdown()
- pool.awaitTermination(10, TimeUnit.SECONDS)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala b/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala
index ac0acff..b581a58 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala
@@ -109,7 +109,7 @@ object TimeInfoCache extends Loggable with Serializable {
case _ => -1
}
} catch {
- case _ => -1
+ case e: Throwable => -1
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResultProcesser.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResultProcesser.scala b/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResultProcesser.scala
index 50d3ada..9916e92 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResultProcesser.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/cache/result/CacheResultProcesser.scala
@@ -23,7 +23,7 @@ import org.apache.griffin.measure.result._
import scala.collection.mutable.{Map => MutableMap}
-case class CacheResultProcesser() extends Loggable {
+object CacheResultProcesser extends Loggable {
val cacheGroup: MutableMap[Long, CacheResult] = MutableMap()
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataCacheParam.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataCacheParam.scala b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataCacheParam.scala
deleted file mode 100644
index 9c60755..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataCacheParam.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.config.params.user
-
-import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-import org.apache.griffin.measure.config.params.Param
-
-@JsonInclude(Include.NON_NULL)
-case class DataCacheParam( @JsonProperty("type") cacheType: String,
- @JsonProperty("config") config: Map[String, Any],
- @JsonProperty("time.range") timeRange: List[String]
- ) extends Param {
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataConnectorParam.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataConnectorParam.scala b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataConnectorParam.scala
index dbc2e0b..a819997 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataConnectorParam.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataConnectorParam.scala
@@ -26,12 +26,8 @@ import org.apache.griffin.measure.config.params.Param
case class DataConnectorParam( @JsonProperty("type") conType: String,
@JsonProperty("version") version: String,
@JsonProperty("config") config: Map[String, Any],
- @JsonProperty("cache") cache: DataCacheParam,
- @JsonProperty("match.once") matchOnce: Boolean
+ @JsonProperty("pre.proc") preProc: List[Map[String, Any]]
) extends Param {
- def getMatchOnce(): Boolean = {
- if (matchOnce == null) false else matchOnce
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataSourceParam.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataSourceParam.scala b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataSourceParam.scala
new file mode 100644
index 0000000..b638234
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataSourceParam.scala
@@ -0,0 +1,31 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.config.params.user
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import org.apache.griffin.measure.config.params.Param
+
+@JsonInclude(Include.NON_NULL)
+case class DataSourceParam( @JsonProperty("name") name: String,
+ @JsonProperty("connectors") connectors: List[DataConnectorParam],
+ @JsonProperty("cache") cache: Map[String, Any]
+ ) extends Param {
+
+}