You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2018/08/10 10:16:40 UTC
[3/3] incubator-griffin git commit: Modify measure module to support
updated env and config json files format
Modify measure module to support updated env and config json files format
Some important modification in env.json
- "info.cache" -> "griffin.checkpoint"
- "persist" -> "sinks"
- "log" -> "console"
- "http" -> "elasticsearch"
- remove "cleaner"
Some important modiffication in dq.json
- add "baseline" in data source
- add "dataframe.name" in data connector
- in data source,"cache" -> "checkpoint"
- in rule and pre-proc rule, add "in.dataframe.name" and "out.dataframe.name", remove "name"
- in rule, add "out" param array, move "metric", "record" param inside "out" array
- add "sinks" string array as filter of sinks in env.json
Author: Lionel Liu <bh...@163.com>
Closes #382 from bhlx3lyx7/json-update.
Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/23ff999c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/23ff999c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/23ff999c
Branch: refs/heads/master
Commit: 23ff999cd7d5f97063cc3079c77d9413b5d4b7ec
Parents: fda8222
Author: Lionel Liu <bh...@163.com>
Authored: Fri Aug 10 18:16:30 2018 +0800
Committer: William Guo <gu...@apache.org>
Committed: Fri Aug 10 18:16:30 2018 +0800
----------------------------------------------------------------------
.../main/resources/config-batch-advanced.json | 8 +-
measure/src/main/resources/config-batch.json | 2 +-
.../src/main/resources/config-streaming.json | 12 +-
measure/src/main/resources/env-batch.json | 4 +-
measure/src/main/resources/env-streaming.json | 6 +-
.../configuration/dqdefinition/DQConfig.scala | 128 ++++++-----
.../configuration/dqdefinition/EnvConfig.scala | 50 ++---
.../measure/configuration/enums/DslType.scala | 4 +-
.../configuration/enums/FlattenType.scala | 88 ++++++++
.../configuration/enums/NormalizeType.scala | 87 --------
.../configuration/enums/OutputType.scala | 77 +++++++
.../measure/configuration/enums/SinkType.scala | 83 +++++++
.../griffin/measure/context/DQContext.scala | 42 ++--
.../checkpoint/lock/CheckpointLock.scala | 34 +++
.../checkpoint/lock/CheckpointLockInZK.scala | 53 +++++
.../checkpoint/lock/CheckpointLockSeq.scala | 33 +++
.../checkpoint/offset/OffsetCheckpoint.scala | 39 ++++
.../offset/OffsetCheckpointClient.scala | 54 +++++
.../offset/OffsetCheckpointFactory.scala | 39 ++++
.../offset/OffsetCheckpointInZK.scala | 214 +++++++++++++++++++
.../streaming/checkpoint/offset/OffsetOps.scala | 125 +++++++++++
.../context/streaming/lock/CacheLock.scala | 34 ---
.../context/streaming/lock/CacheLockInZK.scala | 53 -----
.../context/streaming/lock/CacheLockSeq.scala | 33 ---
.../context/streaming/offset/OffsetCache.scala | 39 ----
.../streaming/offset/OffsetCacheClient.scala | 54 -----
.../streaming/offset/OffsetCacheFactory.scala | 39 ----
.../streaming/offset/OffsetCacheInZK.scala | 214 -------------------
.../context/streaming/offset/OffsetOps.scala | 125 -----------
.../griffin/measure/datasource/DataSource.scala | 2 +
.../measure/datasource/DataSourceFactory.scala | 2 +-
.../datasource/cache/StreamingCacheClient.scala | 12 +-
.../cache/StreamingCacheClientFactory.scala | 16 +-
.../cache/StreamingOffsetCacheable.scala | 26 +--
.../datasource/connector/DataConnector.scala | 9 +-
.../apache/griffin/measure/launch/DQApp.scala | 9 +-
.../measure/launch/batch/BatchDQApp.scala | 10 +-
.../launch/streaming/StreamingDQApp.scala | 28 +--
.../griffin/measure/sink/ConsoleSink.scala | 8 +-
.../measure/sink/ElasticSearchSink.scala | 81 +++++++
.../apache/griffin/measure/sink/HdfsSink.scala | 20 +-
.../apache/griffin/measure/sink/HttpSink.scala | 81 -------
.../apache/griffin/measure/sink/MongoSink.scala | 8 +-
.../griffin/measure/sink/MultiSinks.scala | 40 ++--
.../org/apache/griffin/measure/sink/Sink.scala | 8 +-
.../griffin/measure/sink/SinkFactory.scala | 43 ++--
.../griffin/measure/sink/SinkTaskRunner.scala | 2 +-
.../builder/DataFrameOpsDQStepBuilder.scala | 5 +-
.../step/builder/GriffinDslDQStepBuilder.scala | 4 +-
.../step/builder/RuleParamStepBuilder.scala | 14 +-
.../step/builder/SparkSqlDQStepBuilder.scala | 2 +-
.../dsl/transform/AccuracyExpr2DQSteps.scala | 29 ++-
.../transform/CompletenessExpr2DQSteps.scala | 10 +-
.../transform/DistinctnessExpr2DQSteps.scala | 14 +-
.../dsl/transform/ProfilingExpr2DQSteps.scala | 12 +-
.../dsl/transform/TimelinessExpr2DQSteps.scala | 16 +-
.../dsl/transform/UniquenessExpr2DQSteps.scala | 8 +-
.../builder/preproc/PreProcParamMaker.scala | 67 ++++++
.../preproc/PreProcRuleParamGenerator.scala | 72 -------
.../measure/step/transform/DataFrameOps.scala | 23 +-
.../transform/DataFrameOpsTransformStep.scala | 7 +-
.../step/write/DataSourceUpdateWriteStep.scala | 4 +-
.../measure/step/write/MetricFlushStep.scala | 2 +-
.../measure/step/write/MetricWriteStep.scala | 16 +-
.../measure/step/write/RecordWriteStep.scala | 16 +-
.../resources/_accuracy-batch-griffindsl.json | 18 +-
.../resources/_accuracy-batch-sparksql.json | 63 ------
.../_accuracy-streaming-griffindsl.json | 56 ++---
.../resources/_accuracy-streaming-sparksql.json | 142 ------------
.../_completeness-batch-griffindsl.json | 15 +-
.../_completeness-streaming-griffindsl.json | 32 +--
.../_distinctness-batch-griffindsl.json | 15 +-
.../_distinctness-batch-griffindsl1.json | 73 -------
.../_distinctness-batch-griffindsl2.json | 74 -------
.../_distinctness-streaming-griffindsl.json | 34 +--
.../_profiling-batch-griffindsl-hive.json | 30 ++-
.../resources/_profiling-batch-griffindsl.json | 36 ++--
.../resources/_profiling-batch-sparksql.json | 28 ++-
.../_profiling-streaming-griffindsl.json | 45 ++--
.../_profiling-streaming-sparksql.json | 80 -------
.../resources/_timeliness-batch-griffindsl.json | 22 +-
.../resources/_timeliness-batch-sparksql.json | 52 -----
.../_timeliness-streaming-griffindsl.json | 39 ++--
.../_timeliness-streaming-sparksql.json | 82 -------
.../resources/_uniqueness-batch-griffindsl.json | 22 +-
.../_uniqueness-streaming-griffindsl.json | 56 ++---
.../_uniqueness-streaming-sparksql.json | 130 -----------
measure/src/test/resources/env-batch.json | 16 +-
.../src/test/resources/env-streaming-mongo.json | 12 +-
measure/src/test/resources/env-streaming.json | 12 +-
.../reader/ParamFileReaderSpec.scala | 6 +-
.../reader/ParamJsonReaderSpec.scala | 6 +-
92 files changed, 1600 insertions(+), 2095 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/resources/config-batch-advanced.json
----------------------------------------------------------------------
diff --git a/measure/src/main/resources/config-batch-advanced.json b/measure/src/main/resources/config-batch-advanced.json
index 96ae245..9e98efa 100644
--- a/measure/src/main/resources/config-batch-advanced.json
+++ b/measure/src/main/resources/config-batch-advanced.json
@@ -6,7 +6,7 @@
"data.sources": [
{
"name": "source",
- "as.baseline": true,
+ "baseline": true,
"connectors": [
{
"type": "avro",
@@ -37,7 +37,7 @@
"dq.type": "accuracy",
"out.dataframe.name": "accu",
"rule": "source.user_id = target.user_id AND upper(source.first_name) = upper(target.first_name) AND source.last_name = target.last_name AND source.address = target.address AND source.email = target.email AND source.phone = target.phone AND source.post_code = target.post_code",
- "alias": {
+ "details": {
"source": "source",
"target": "target",
"miss": "miss_count",
@@ -46,11 +46,11 @@
},
"out":[
{
- "type":"metric",
+ "type": "metric",
"name": "accu"
},
{
- "type":"record",
+ "type": "record",
"name": "missRecords"
}
]
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/resources/config-batch.json
----------------------------------------------------------------------
diff --git a/measure/src/main/resources/config-batch.json b/measure/src/main/resources/config-batch.json
index f1b03fb..d2257a0 100644
--- a/measure/src/main/resources/config-batch.json
+++ b/measure/src/main/resources/config-batch.json
@@ -6,7 +6,7 @@
"data.sources": [
{
"name": "source",
- "as.baseline": true,
+ "baseline": true,
"connectors": [
{
"type": "avro",
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/resources/config-streaming.json
----------------------------------------------------------------------
diff --git a/measure/src/main/resources/config-streaming.json b/measure/src/main/resources/config-streaming.json
index 15a91ac..a7c47e8 100644
--- a/measure/src/main/resources/config-streaming.json
+++ b/measure/src/main/resources/config-streaming.json
@@ -6,6 +6,7 @@
"data.sources": [
{
"name": "source",
+ "baseline": true,
"connectors": [
{
"type": "kafka",
@@ -24,11 +25,10 @@
},
"pre.proc": [
{
- "dsl.type": "df-opr",
+ "dsl.type": "df-ops",
"in.dataframe.name": "kafka",
"out.dataframe.name": "out1",
"rule": "from_json"
-
},
{
"dsl.type": "spark-sql",
@@ -71,13 +71,9 @@
"rule": "select name, count(*) as `cnt` from source group by name",
"out":[
{
- "type": "array",
+ "type": "metric",
"name": "name_group",
- "flatten":"array"
- },
- {
- "type": "record",
- "name": "missRecords"
+ "flatten": "array"
}
]
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/resources/env-batch.json
----------------------------------------------------------------------
diff --git a/measure/src/main/resources/env-batch.json b/measure/src/main/resources/env-batch.json
index 024ad80..bed6ed8 100644
--- a/measure/src/main/resources/env-batch.json
+++ b/measure/src/main/resources/env-batch.json
@@ -32,7 +32,5 @@
}
],
- "info.cache": [],
-
- "cleaner": {}
+ "griffin.checkpoint": []
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/resources/env-streaming.json
----------------------------------------------------------------------
diff --git a/measure/src/main/resources/env-streaming.json b/measure/src/main/resources/env-streaming.json
index 83ff6ab..f5e303c 100644
--- a/measure/src/main/resources/env-streaming.json
+++ b/measure/src/main/resources/env-streaming.json
@@ -54,9 +54,5 @@
"close.clear": false
}
}
- ],
-
- "cleaner": {
- "clean.interval": "2m"
- }
+ ]
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
index 8d69377..79a43c1 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
@@ -25,18 +25,20 @@ import org.apache.griffin.measure.configuration.enums._
/**
* dq param
- * @param name name of dq measurement (must)
- * @param timestamp default timestamp of measure in batch mode (optional)
- * @param procType batch mode or streaming mode (must)
- * @param dataSources data sources (must)
- * @param evaluateRule dq measurement (must)
+ * @param name name of dq measurement (must)
+ * @param timestamp default timestamp of measure in batch mode (optional)
+ * @param procType batch mode or streaming mode (must)
+ * @param dataSources data sources (must)
+ * @param evaluateRule dq measurement (must)
+ * @param sinks sink types (optional, by default will be elasticsearch)
*/
@JsonInclude(Include.NON_NULL)
case class DQConfig(@JsonProperty("name") name: String,
@JsonProperty("timestamp") timestamp: Long,
@JsonProperty("process.type") procType: String,
@JsonProperty("data.sources") dataSources: List[DataSourceParam],
- @JsonProperty("evaluate.rule") evaluateRule: EvaluateRuleParam
+ @JsonProperty("evaluate.rule") evaluateRule: EvaluateRuleParam,
+ @JsonProperty("sinks") sinks: List[String]
) extends Param {
def getName: String = name
def getTimestampOpt: Option[Long] = if (timestamp != 0) Some(timestamp) else None
@@ -50,6 +52,7 @@ case class DQConfig(@JsonProperty("name") name: String,
}._1
}
def getEvaluateRule: EvaluateRuleParam = evaluateRule
+ def getValidSinkTypes: Seq[SinkType] = SinkType.validSinkTypes(if (sinks != null) sinks else Nil)
def validate(): Unit = {
assert(StringUtils.isNotBlank(name), "dq config name should not be blank")
@@ -64,17 +67,20 @@ case class DQConfig(@JsonProperty("name") name: String,
/**
* data source param
* @param name data source name (must)
+ * @param baseline data source is baseline or not, false by default (optional)
* @param connectors data connectors (optional)
- * @param cache data source cache configuration (must in streaming mode with streaming connectors)
+ * @param checkpoint data source checkpoint configuration (must in streaming mode with streaming connectors)
*/
@JsonInclude(Include.NON_NULL)
case class DataSourceParam( @JsonProperty("name") name: String,
+ @JsonProperty("baseline") baseline: Boolean,
@JsonProperty("connectors") connectors: List[DataConnectorParam],
- @JsonProperty("cache") cache: Map[String, Any]
+ @JsonProperty("checkpoint") checkpoint: Map[String, Any]
) extends Param {
def getName: String = name
+ def isBaseline: Boolean = if (baseline != null) baseline else false
def getConnectors: Seq[DataConnectorParam] = if (connectors != null) connectors else Nil
- def getCacheOpt: Option[Map[String, Any]] = if (cache != null) Some(cache) else None
+ def getCheckpointOpt: Option[Map[String, Any]] = if (checkpoint != null) Some(checkpoint) else None
def validate(): Unit = {
assert(StringUtils.isNotBlank(name), "data source name should not be empty")
@@ -86,17 +92,20 @@ case class DataSourceParam( @JsonProperty("name") name: String,
* data connector param
* @param conType data connector type, e.g.: hive, avro, kafka (must)
* @param version data connector type version (optional)
+ * @param dataFrameName data connector dataframe name, for pre-process input usage (optional)
* @param config detail configuration of data connector (must)
* @param preProc pre-process rules after load data (optional)
*/
@JsonInclude(Include.NON_NULL)
case class DataConnectorParam( @JsonProperty("type") conType: String,
@JsonProperty("version") version: String,
+ @JsonProperty("dataframe.name") dataFrameName: String,
@JsonProperty("config") config: Map[String, Any],
@JsonProperty("pre.proc") preProc: List[RuleParam]
) extends Param {
def getType: String = conType
- def getVersion: String = version
+ def getVersion: String = if (version != null) version else ""
+ def getDataFrameName(defName: String): String = if (dataFrameName != null) dataFrameName else defName
def getConfig: Map[String, Any] = if (config != null) config else Map[String, Any]()
def getPreProcRules: Seq[RuleParam] = if (preProc != null) preProc else Nil
@@ -124,94 +133,77 @@ case class EvaluateRuleParam( @JsonProperty("rules") rules: List[RuleParam]
* rule param
* @param dslType dsl type of this rule (must)
* @param dqType dq type of this rule (must if dsl type is "griffin-dsl")
- * @param name name of result calculated by this rule (must if for later usage)
+ * @param inDfName name of input dataframe of this rule, by default will be the previous rule output dataframe name
+ * @param outDfName name of output dataframe of this rule, by default will be generated as data connector dataframe name with index suffix
* @param rule rule to define dq step calculation (must)
* @param details detail config of rule (optional)
- * @param cache cache the result for multiple usage (optional, valid for "spark-sql" and "df-opr" mode)
- * @param metric config for metric output (optional)
- * @param record config for record output (optional)
- * @param dsCacheUpdate config for data source cache update output (optional, valid in streaming mode)
+ * @param cache cache the result for multiple usage (optional, valid for "spark-sql" and "df-ops" mode)
+ * @param outputs output ways configuration (optional)
+// * @param metric config for metric output (optional)
+// * @param record config for record output (optional)
+// * @param dsCacheUpdate config for data source cache update output (optional, valid in streaming mode)
*/
@JsonInclude(Include.NON_NULL)
-case class RuleParam( @JsonProperty("dsl.type") dslType: String,
- @JsonProperty("dq.type") dqType: String,
- @JsonProperty("name") name: String,
- @JsonProperty("rule") rule: String,
- @JsonProperty("details") details: Map[String, Any],
- @JsonProperty("cache") cache: Boolean,
- @JsonProperty("metric") metric: RuleMetricParam,
- @JsonProperty("record") record: RuleRecordParam,
- @JsonProperty("ds.cache.update") dsCacheUpdate: RuleDsCacheUpdateParam
+case class RuleParam(@JsonProperty("dsl.type") dslType: String,
+ @JsonProperty("dq.type") dqType: String,
+ @JsonProperty("in.dataframe.name") inDfName: String,
+ @JsonProperty("out.dataframe.name") outDfName: String,
+ @JsonProperty("rule") rule: String,
+ @JsonProperty("details") details: Map[String, Any],
+ @JsonProperty("cache") cache: Boolean,
+ @JsonProperty("out") outputs: List[RuleOutputParam]
) extends Param {
def getDslType: DslType = if (dslType != null) DslType(dslType) else DslType("")
def getDqType: DqType = if (dqType != null) DqType(dqType) else DqType("")
def getCache: Boolean = if (cache) cache else false
- def getName: String = if (name != null) name else ""
+ def getInDfName(defName: String = ""): String = if (inDfName != null) inDfName else defName
+ def getOutDfName(defName: String = ""): String = if (outDfName != null) outDfName else defName
def getRule: String = if (rule != null) rule else ""
def getDetails: Map[String, Any] = if (details != null) details else Map[String, Any]()
- def getMetricOpt: Option[RuleMetricParam] = if (metric != null) Some(metric) else None
- def getRecordOpt: Option[RuleRecordParam] = if (record != null) Some(record) else None
- def getDsCacheUpdateOpt: Option[RuleDsCacheUpdateParam] = if (dsCacheUpdate != null) Some(dsCacheUpdate) else None
+ def getOutputs: Seq[RuleOutputParam] = if (outputs != null) outputs else Nil
+ def getOutputOpt(tp: OutputType): Option[RuleOutputParam] = getOutputs.filter(_.getOutputType == tp).headOption
- def replaceName(newName: String): RuleParam = {
- if (StringUtils.equals(newName, name)) this
- else RuleParam(dslType, dqType, newName, rule, details, cache, metric, record, dsCacheUpdate)
+ def replaceInDfName(newName: String): RuleParam = {
+ if (StringUtils.equals(newName, inDfName)) this
+ else RuleParam(dslType, dqType, newName, outDfName, rule, details, cache, outputs)
+ }
+ def replaceOutDfName(newName: String): RuleParam = {
+ if (StringUtils.equals(newName, outDfName)) this
+ else RuleParam(dslType, dqType, inDfName, newName, rule, details, cache, outputs)
+ }
+ def replaceInOutDfName(in: String, out: String): RuleParam = {
+ if (StringUtils.equals(inDfName, in) && StringUtils.equals(outDfName, out)) this
+ else RuleParam(dslType, dqType, in, out, rule, details, cache, outputs)
}
def replaceRule(newRule: String): RuleParam = {
if (StringUtils.equals(newRule, rule)) this
- else RuleParam(dslType, dqType, name, newRule, details, cache, metric, record, dsCacheUpdate)
- }
- def replaceDetails(newDetails: Map[String, Any]): RuleParam = {
- RuleParam(dslType, dqType, name, rule, newDetails, cache, metric, record, dsCacheUpdate)
+ else RuleParam(dslType, dqType, inDfName, outDfName, newRule, details, cache, outputs)
}
def validate(): Unit = {
assert(!(getDslType.equals(GriffinDslType) && getDqType.equals(UnknownType)),
"unknown dq type for griffin dsl")
- getMetricOpt.foreach(_.validate)
- getRecordOpt.foreach(_.validate)
- getDsCacheUpdateOpt.foreach(_.validate)
+ getOutputs.foreach(_.validate)
}
}
/**
- * metric param of rule
- * @param name name of metric to output (optional)
- * @param collectType the normalize strategy to collect metric (optional)
+ * out param of rule
+ * @param outputType output type (must)
+ * @param name output name (optional)
+ * @param flatten flatten type of output metric (optional, available in output metric type)
*/
@JsonInclude(Include.NON_NULL)
-case class RuleMetricParam( @JsonProperty("name") name: String,
- @JsonProperty("collect.type") collectType: String
+case class RuleOutputParam( @JsonProperty("type") outputType: String,
+ @JsonProperty("name") name: String,
+ @JsonProperty("flatten") flatten: String
) extends Param {
+ def getOutputType: OutputType = if (outputType != null) OutputType(outputType) else OutputType("")
def getNameOpt: Option[String] = if (StringUtils.isNotBlank(name)) Some(name) else None
- def getCollectType: NormalizeType = if (StringUtils.isNotBlank(collectType)) NormalizeType(collectType) else NormalizeType("")
-
- def validate(): Unit = {}
-}
-
-/**
- * record param of rule
- * @param name name of record to output (optional)
- */
-@JsonInclude(Include.NON_NULL)
-case class RuleRecordParam( @JsonProperty("name") name: String
- ) extends Param {
- def getNameOpt: Option[String] = if (StringUtils.isNotBlank(name)) Some(name) else None
-
- def validate(): Unit = {}
-}
-
-/**
- * data source cache update param of rule
- * @param dsName name of data source to be updated by thie rule result (must)
- */
-@JsonInclude(Include.NON_NULL)
-case class RuleDsCacheUpdateParam( @JsonProperty("ds.name") dsName: String
- ) extends Param {
- def getDsNameOpt: Option[String] = if (StringUtils.isNotBlank(dsName)) Some(dsName) else None
+ def getFlatten: FlattenType = if (StringUtils.isNotBlank(flatten)) FlattenType(flatten) else FlattenType("")
def validate(): Unit = {}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
index 9793c01..2ad2837 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
@@ -21,28 +21,28 @@ package org.apache.griffin.measure.configuration.dqdefinition
import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
import org.apache.commons.lang.StringUtils
-import org.apache.griffin.measure.utils.TimeUtil
+import org.apache.griffin.measure.configuration.enums._
/**
* environment param
- * @param sparkParam config of spark environment (must)
- * @param persistParams config of persist ways (optional)
- * @param offsetCacheParams config of information cache ways (required in streaming mode)
+ * @param sparkParam config of spark environment (must)
+ * @param sinkParams config of sink ways (optional)
+ * @param checkpointParams config of checkpoint locations (required in streaming mode)
*/
@JsonInclude(Include.NON_NULL)
case class EnvConfig(@JsonProperty("spark") sparkParam: SparkParam,
- @JsonProperty("persist") persistParams: List[PersistParam],
- @JsonProperty("info.cache") offsetCacheParams: List[OffsetCacheParam]
+ @JsonProperty("sinks") sinkParams: List[SinkParam],
+ @JsonProperty("griffin.checkpoint") checkpointParams: List[CheckpointParam]
) extends Param {
def getSparkParam: SparkParam = sparkParam
- def getPersistParams: Seq[PersistParam] = if (persistParams != null) persistParams else Nil
- def getOffsetCacheParams: Seq[OffsetCacheParam] = if (offsetCacheParams != null) offsetCacheParams else Nil
+ def getSinkParams: Seq[SinkParam] = if (sinkParams != null) sinkParams else Nil
+ def getCheckpointParams: Seq[CheckpointParam] = if (checkpointParams != null) checkpointParams else Nil
def validate(): Unit = {
assert((sparkParam != null), "spark param should not be null")
sparkParam.validate
- getPersistParams.foreach(_.validate)
- getOffsetCacheParams.foreach(_.validate)
+ getSinkParams.foreach(_.validate)
+ getCheckpointParams.foreach(_.validate)
}
}
@@ -78,35 +78,35 @@ case class SparkParam( @JsonProperty("log.level") logLevel: String,
}
/**
- * persist param
- * @param persistType persist type, e.g.: log, hdfs, http, mongo (must)
- * @param config config of persist way (must)
+ * sink param
+ * @param sinkType sink type, e.g.: log, hdfs, http, mongo (must)
+ * @param config config of sink way (must)
*/
@JsonInclude(Include.NON_NULL)
-case class PersistParam( @JsonProperty("type") persistType: String,
- @JsonProperty("config") config: Map[String, Any]
- ) extends Param {
- def getType: String = persistType
+case class SinkParam(@JsonProperty("type") sinkType: String,
+ @JsonProperty("config") config: Map[String, Any]
+ ) extends Param {
+ def getType: SinkType = SinkType(sinkType)
def getConfig: Map[String, Any] = if (config != null) config else Map[String, Any]()
def validate(): Unit = {
- assert(StringUtils.isNotBlank(persistType), "persist type should not be empty")
+ assert(StringUtils.isNotBlank(sinkType), "sink type should not be empty")
}
}
/**
- * offset cache param
- * @param cacheType offset cache type, e.g.: zookeeper (must)
- * @param config config of cache way
+ * checkpoint param
+ * @param cpType checkpoint location type, e.g.: zookeeper (must)
+ * @param config config of checkpoint location
*/
@JsonInclude(Include.NON_NULL)
-case class OffsetCacheParam(@JsonProperty("type") cacheType: String,
- @JsonProperty("config") config: Map[String, Any]
+case class CheckpointParam(@JsonProperty("type") cpType: String,
+ @JsonProperty("config") config: Map[String, Any]
) extends Param {
- def getType: String = cacheType
+ def getType: String = cpType
def getConfig: Map[String, Any] = if (config != null) config else Map[String, Any]()
def validate(): Unit = {
- assert(StringUtils.isNotBlank(cacheType), "info cache type should not be empty")
+ assert(StringUtils.isNotBlank(cpType), "griffin checkpoint type should not be empty")
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DslType.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DslType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DslType.scala
index c9a1172..c72965c 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DslType.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DslType.scala
@@ -51,8 +51,8 @@ object DslType {
* df-ops: data frame operations rule, support some pre-defined data frame ops
*/
case object DataFrameOpsType extends DslType {
- val idPattern = "^(?i)df-?(?:op|opr|ops)$".r
- val desc = "df-opr"
+ val idPattern = "^(?i)df-?(?:ops|opr|operations)$".r
+ val desc = "df-ops"
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/FlattenType.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/FlattenType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/FlattenType.scala
new file mode 100644
index 0000000..160ecaf
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/FlattenType.scala
@@ -0,0 +1,88 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.configuration.enums
+
+import scala.util.matching.Regex
+
+/**
+ * the strategy to flatten metric
+ */
+sealed trait FlattenType {
+ val idPattern: Regex
+ val desc: String
+}
+
+object FlattenType {
+ private val flattenTypes: List[FlattenType] = List(DefaultFlattenType, EntriesFlattenType, ArrayFlattenType, MapFlattenType)
+ val default = DefaultFlattenType
+ def apply(ptn: String): FlattenType = {
+ flattenTypes.find(tp => ptn match {
+ case tp.idPattern() => true
+ case _ => false
+ }).getOrElse(default)
+ }
+ def unapply(pt: FlattenType): Option[String] = Some(pt.desc)
+}
+
+/**
+ * default flatten strategy
+ * metrics contains 1 row -> flatten metric json map
+ * metrics contains n > 1 rows -> flatten metric json array
+ * n = 0: { }
+ * n = 1: { "col1": "value1", "col2": "value2", ... }
+ * n > 1: { "arr-name": [ { "col1": "value1", "col2": "value2", ... }, ... ] }
+ * all rows
+ */
+ case object DefaultFlattenType extends FlattenType {
+ val idPattern: Regex = "".r
+ val desc: String = "default"
+}
+
+/**
+ * metrics contains n rows -> flatten metric json map
+ * n = 0: { }
+ * n >= 1: { "col1": "value1", "col2": "value2", ... }
+ * the first row only
+ */
+ case object EntriesFlattenType extends FlattenType {
+ val idPattern: Regex = "^(?i)entries$".r
+ val desc: String = "entries"
+}
+
+/**
+ * metrics contains n rows -> flatten metric json array
+ * n = 0: { "arr-name": [ ] }
+ * n >= 1: { "arr-name": [ { "col1": "value1", "col2": "value2", ... }, ... ] }
+ * all rows
+ */
+ case object ArrayFlattenType extends FlattenType {
+ val idPattern: Regex = "^(?i)array|list$".r
+ val desc: String = "array"
+}
+
+/**
+ * metrics contains n rows -> flatten metric json wrapped map
+ * n = 0: { "map-name": { } }
+ * n >= 1: { "map-name": { "col1": "value1", "col2": "value2", ... } }
+ * the first row only
+ */
+ case object MapFlattenType extends FlattenType {
+ val idPattern: Regex = "^(?i)map$".r
+ val desc: String = "map"
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/NormalizeType.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/NormalizeType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/NormalizeType.scala
deleted file mode 100644
index 61bf27c..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/NormalizeType.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.configuration.enums
-
-import scala.util.matching.Regex
-
-/**
- * the normalize strategy to collect metric
- */
-sealed trait NormalizeType {
- val idPattern: Regex
- val desc: String
-}
-
-object NormalizeType {
- private val normalizeTypes: List[NormalizeType] = List(DefaultNormalizeType, EntriesNormalizeType, ArrayNormalizeType, MapNormalizeType)
- val default = DefaultNormalizeType
- def apply(ptn: String): NormalizeType = {
- normalizeTypes.find(tp => ptn match {
- case tp.idPattern() => true
- case _ => false
- }).getOrElse(default)
- }
- def unapply(pt: NormalizeType): Option[String] = Some(pt.desc)
-}
-
-/**
- * default normalize strategy
- * metrics contains n rows -> normalized metric json map
- * n = 0: { }
- * n = 1: { "col1": "value1", "col2": "value2", ... }
- * n > 1: { "arr-name": [ { "col1": "value1", "col2": "value2", ... }, ... ] }
- * all rows
- */
- case object DefaultNormalizeType extends NormalizeType {
- val idPattern: Regex = "".r
- val desc: String = "default"
-}
-
-/**
- * metrics contains n rows -> normalized metric json map
- * n = 0: { }
- * n >= 1: { "col1": "value1", "col2": "value2", ... }
- * the first row only
- */
- case object EntriesNormalizeType extends NormalizeType {
- val idPattern: Regex = "^(?i)entries$".r
- val desc: String = "entries"
-}
-
-/**
- * metrics contains n rows -> normalized metric json map
- * n = 0: { "arr-name": [ ] }
- * n >= 1: { "arr-name": [ { "col1": "value1", "col2": "value2", ... }, ... ] }
- * all rows
- */
- case object ArrayNormalizeType extends NormalizeType {
- val idPattern: Regex = "^(?i)array|list$".r
- val desc: String = "array"
-}
-
-/**
- * metrics contains n rows -> normalized metric json map
- * n = 0: { "map-name": { } }
- * n >= 1: { "map-name": { "col1": "value1", "col2": "value2", ... } }
- * the first row only
- */
- case object MapNormalizeType extends NormalizeType {
- val idPattern: Regex = "^(?i)map$".r
- val desc: String = "map"
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/OutputType.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/OutputType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/OutputType.scala
new file mode 100644
index 0000000..5b1d261
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/OutputType.scala
@@ -0,0 +1,77 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.configuration.enums
+
+import scala.util.matching.Regex
+
+/**
+ * the strategy to flatten metric
+ */
+sealed trait OutputType {
+ val idPattern: Regex
+ val desc: String
+}
+
+object OutputType {
+ private val outputTypes: List[OutputType] = List(MetricOutputType, RecordOutputType, DscUpdateOutputType, UnknownOutputType)
+ val default = UnknownOutputType
+ def apply(ptn: String): OutputType = {
+ outputTypes.find(tp => ptn match {
+ case tp.idPattern() => true
+ case _ => false
+ }).getOrElse(default)
+ }
+ def unapply(pt: OutputType): Option[String] = Some(pt.desc)
+}
+
+/**
+ * metric output type
+ * output the rule step result as metric
+ */
+ case object MetricOutputType extends OutputType {
+ val idPattern: Regex = "^(?i)metric$".r
+ val desc: String = "metric"
+}
+
+/**
+ * record output type
+ * output the rule step result as records
+ */
+ case object RecordOutputType extends OutputType {
+ val idPattern: Regex = "^(?i)record|records$".r
+ val desc: String = "record"
+}
+
+/**
+ * data source cache update output type
+ * output the rule step result to update data source cache
+ */
+ case object DscUpdateOutputType extends OutputType {
+ val idPattern: Regex = "^(?i)dsc-update$".r
+ val desc: String = "dsc-update"
+}
+
+/**
+ * unknown output type
+ * will not output the result
+ */
+ case object UnknownOutputType extends OutputType {
+ val idPattern: Regex = "".r
+ val desc: String = "unknown"
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
new file mode 100644
index 0000000..5471e83
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
@@ -0,0 +1,83 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.configuration.enums
+
+import scala.util.matching.Regex
+
+/**
+ * sink type
+ */
+sealed trait SinkType {
+ val idPattern: Regex
+ val desc: String
+}
+
+object SinkType {
+ private val sinkTypes: List[SinkType] = List(
+ ConsoleSinkType, HdfsSinkType, ElasticsearchSinkType, MongoSinkType, UnknownSinkType
+ )
+ def apply(ptn: String): SinkType = {
+ sinkTypes.find(tp => ptn match {
+ case tp.idPattern() => true
+ case _ => false
+ }).getOrElse(UnknownSinkType)
+ }
+ def unapply(pt: SinkType): Option[String] = Some(pt.desc)
+ def validSinkTypes(strs: Seq[String]): Seq[SinkType] = {
+ val seq = strs.map(s => SinkType(s)).filter(_ != UnknownSinkType).distinct
+ if (seq.size > 0) seq else Seq(ElasticsearchSinkType)
+ }
+}
+
+/**
+ * console sink, will sink metric in console
+ */
+ case object ConsoleSinkType extends SinkType {
+ val idPattern = "^(?i)console|log$".r
+ val desc = "console"
+}
+
+/**
+ * hdfs sink, will sink metric and record in hdfs
+ */
+ case object HdfsSinkType extends SinkType {
+ val idPattern = "^(?i)hdfs$".r
+ val desc = "hdfs"
+}
+
+/**
+ * elasticsearch sink, will sink metric in elasticsearch
+ */
+ case object ElasticsearchSinkType extends SinkType {
+ val idPattern = "^(?i)es|elasticsearch|http$".r
+ val desc = "elasticsearch"
+}
+
+/**
+ * mongo sink, will sink metric in mongo db
+ */
+ case object MongoSinkType extends SinkType {
+ val idPattern = "^(?i)mongo|mongodb$".r
+ val desc = "distinct"
+}
+
+ case object UnknownSinkType extends SinkType {
+ val idPattern = "".r
+ val desc = "unknown"
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
index eee8917..a9f3da0 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.{Encoders, SQLContext, SparkSession}
case class DQContext(contextId: ContextId,
name: String,
dataSources: Seq[DataSource],
- persistParams: Seq[PersistParam],
+ sinkParams: Seq[SinkParam],
procType: ProcessType
)(@transient implicit val sparkSession: SparkSession) {
@@ -46,8 +46,22 @@ case class DQContext(contextId: ContextId,
val metricWrapper: MetricWrapper = MetricWrapper(name)
val writeMode = WriteMode.defaultMode(procType)
- val dataSourceNames: Seq[String] = dataSources.map(_.name)
+ val dataSourceNames: Seq[String] = {
+ // sort data source names, put baseline data source name to the head
+ val (blOpt, others) = dataSources.foldLeft((None: Option[String], Nil: Seq[String])) { (ret, ds) =>
+ val (opt, seq) = ret
+ if (opt.isEmpty && ds.isBaseline) (Some(ds.name), seq) else (opt, seq :+ ds.name)
+ }
+ blOpt match {
+ case Some(bl) => bl +: others
+ case _ => others
+ }
+ }
dataSourceNames.foreach(name => compileTableRegister.registerTable(name))
+ def getDataSourceName(index: Int): String = {
+ if (dataSourceNames.size > index) dataSourceNames(index) else ""
+ }
+
implicit val encoder = Encoders.STRING
val functionNames: Seq[String] = sparkSession.catalog.listFunctions.map(_.name).collect.toSeq
@@ -59,26 +73,22 @@ case class DQContext(contextId: ContextId,
}
printTimeRanges
- def getDataSourceName(index: Int): String = {
- if (dataSourceNames.size > index) dataSourceNames(index) else ""
- }
-
- private val persistFactory = SinkFactory(persistParams, name)
- private val defaultPersist: Sink = createPersist(contextId.timestamp)
- def getPersist(timestamp: Long): Sink = {
- if (timestamp == contextId.timestamp) getPersist()
- else createPersist(timestamp)
+ private val sinkFactory = SinkFactory(sinkParams, name)
+ private val defaultSink: Sink = createSink(contextId.timestamp)
+ def getSink(timestamp: Long): Sink = {
+ if (timestamp == contextId.timestamp) getSink()
+ else createSink(timestamp)
}
- def getPersist(): Sink = defaultPersist
- private def createPersist(t: Long): Sink = {
+ def getSink(): Sink = defaultSink
+ private def createSink(t: Long): Sink = {
procType match {
- case BatchProcessType => persistFactory.getPersists(t, true)
- case StreamingProcessType => persistFactory.getPersists(t, false)
+ case BatchProcessType => sinkFactory.getSinks(t, true)
+ case StreamingProcessType => sinkFactory.getSinks(t, false)
}
}
def cloneDQContext(newContextId: ContextId): DQContext = {
- DQContext(newContextId, name, dataSources, persistParams, procType)(sparkSession)
+ DQContext(newContextId, name, dataSources, sinkParams, procType)(sparkSession)
}
def clean(): Unit = {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLock.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLock.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLock.scala
new file mode 100644
index 0000000..bf0fde0
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLock.scala
@@ -0,0 +1,34 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.streaming.checkpoint.lock
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.griffin.measure.Loggable
+
+/**
+ * lock for checkpoint
+ */
+trait CheckpointLock extends Loggable with Serializable {
+
+ def lock(outtime: Long, unit: TimeUnit): Boolean
+
+ def unlock(): Unit
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLockInZK.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLockInZK.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLockInZK.scala
new file mode 100644
index 0000000..b1cbe0f
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLockInZK.scala
@@ -0,0 +1,53 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.streaming.checkpoint.lock
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.curator.framework.recipes.locks.InterProcessMutex
+
+case class CheckpointLockInZK(@transient mutex: InterProcessMutex) extends CheckpointLock {
+
+ def lock(outtime: Long, unit: TimeUnit): Boolean = {
+ try {
+ if (outtime >= 0) {
+ mutex.acquire(outtime, unit)
+ } else {
+ mutex.acquire(-1, null)
+ }
+ } catch {
+ case e: Throwable => {
+ error(s"lock error: ${e.getMessage}")
+ false
+ }
+ }
+
+ }
+
+ def unlock(): Unit = {
+ try {
+ if (mutex.isAcquiredInThisProcess) mutex.release
+ } catch {
+ case e: Throwable => {
+ error(s"unlock error: ${e.getMessage}")
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLockSeq.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLockSeq.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLockSeq.scala
new file mode 100644
index 0000000..092d86a
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLockSeq.scala
@@ -0,0 +1,33 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.streaming.checkpoint.lock
+
+import java.util.concurrent.TimeUnit
+
+case class CheckpointLockSeq(locks: Seq[CheckpointLock]) extends CheckpointLock {
+
+ def lock(outtime: Long, unit: TimeUnit): Boolean = {
+ locks.headOption.map(_.lock(outtime, unit)).getOrElse(true)
+ }
+
+ def unlock(): Unit = {
+ locks.headOption.foreach(_.unlock)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpoint.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpoint.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpoint.scala
new file mode 100644
index 0000000..8d7b045
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpoint.scala
@@ -0,0 +1,39 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.streaming.checkpoint.offset
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.context.streaming.checkpoint.lock.CheckpointLock
+
+trait OffsetCheckpoint extends Loggable with Serializable {
+
+ def init(): Unit
+ def available(): Boolean
+ def close(): Unit
+
+ def cache(kvs: Map[String, String]): Unit
+ def read(keys: Iterable[String]): Map[String, String]
+ def delete(keys: Iterable[String]): Unit
+ def clear(): Unit
+
+ def listKeys(path: String): List[String]
+
+ def genLock(s: String): CheckpointLock
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointClient.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointClient.scala
new file mode 100644
index 0000000..8acfbeb
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointClient.scala
@@ -0,0 +1,54 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.streaming.checkpoint.offset
+
+import org.apache.griffin.measure.configuration.dqdefinition.CheckpointParam
+import org.apache.griffin.measure.context.streaming.checkpoint.lock.{CheckpointLock, CheckpointLockSeq}
+
+object OffsetCheckpointClient extends OffsetCheckpoint with OffsetOps {
+ var offsetCheckpoints: Seq[OffsetCheckpoint] = Nil
+
+ def initClient(checkpointParams: Iterable[CheckpointParam], metricName: String) = {
+ val fac = OffsetCheckpointFactory(checkpointParams, metricName)
+ offsetCheckpoints = checkpointParams.flatMap(param => fac.getOffsetCheckpoint(param)).toList
+ }
+
+ def init(): Unit = offsetCheckpoints.foreach(_.init)
+ def available(): Boolean = offsetCheckpoints.foldLeft(false)(_ || _.available)
+ def close(): Unit = offsetCheckpoints.foreach(_.close)
+
+ def cache(kvs: Map[String, String]): Unit = {
+ offsetCheckpoints.foreach(_.cache(kvs))
+ }
+ def read(keys: Iterable[String]): Map[String, String] = {
+ val maps = offsetCheckpoints.map(_.read(keys)).reverse
+ maps.fold(Map[String, String]())(_ ++ _)
+ }
+ def delete(keys: Iterable[String]): Unit = offsetCheckpoints.foreach(_.delete(keys))
+ def clear(): Unit = offsetCheckpoints.foreach(_.clear)
+
+ def listKeys(path: String): List[String] = {
+ offsetCheckpoints.foldLeft(Nil: List[String]) { (res, offsetCheckpoint) =>
+ if (res.size > 0) res else offsetCheckpoint.listKeys(path)
+ }
+ }
+
+ def genLock(s: String): CheckpointLock = CheckpointLockSeq(offsetCheckpoints.map(_.genLock(s)))
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointFactory.scala
new file mode 100644
index 0000000..5fe8e15
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointFactory.scala
@@ -0,0 +1,39 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.streaming.checkpoint.offset
+
+import org.apache.griffin.measure.configuration.dqdefinition.CheckpointParam
+
+import scala.util.Try
+
+case class OffsetCheckpointFactory(checkpointParams: Iterable[CheckpointParam], metricName: String
+ ) extends Serializable {
+
+ val ZK_REGEX = """^(?i)zk|zookeeper$""".r
+
+ def getOffsetCheckpoint(checkpointParam: CheckpointParam): Option[OffsetCheckpoint] = {
+ val config = checkpointParam.getConfig
+ val offsetCheckpointTry = checkpointParam.getType match {
+ case ZK_REGEX() => Try(OffsetCheckpointInZK(config, metricName))
+ case _ => throw new Exception("not supported info cache type")
+ }
+ offsetCheckpointTry.toOption
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointInZK.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointInZK.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointInZK.scala
new file mode 100644
index 0000000..e051779
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointInZK.scala
@@ -0,0 +1,214 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.streaming.checkpoint.offset
+
+import org.apache.curator.framework.imps.CuratorFrameworkState
+import org.apache.curator.framework.recipes.locks.InterProcessMutex
+import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
+import org.apache.curator.retry.ExponentialBackoffRetry
+import org.apache.curator.utils.ZKPaths
+import org.apache.griffin.measure.context.streaming.checkpoint.lock.CheckpointLockInZK
+import org.apache.zookeeper.CreateMode
+
+import scala.collection.JavaConverters._
+
+/**
+ * leverage zookeeper for info cache
+ * @param config
+ * @param metricName
+ */
+case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String) extends OffsetCheckpoint with OffsetOps {
+
+ val Hosts = "hosts"
+ val Namespace = "namespace"
+ val Mode = "mode"
+ val InitClear = "init.clear"
+ val CloseClear = "close.clear"
+ val LockPath = "lock.path"
+
+ val PersistentRegex = """^(?i)persist(ent)?$""".r
+ val EphemeralRegex = """^(?i)ephemeral$""".r
+
+ final val separator = ZKPaths.PATH_SEPARATOR
+
+ val hosts = config.getOrElse(Hosts, "").toString
+ val namespace = config.getOrElse(Namespace, "").toString
+ val mode: CreateMode = config.get(Mode) match {
+ case Some(s: String) => s match {
+ case PersistentRegex() => CreateMode.PERSISTENT
+ case EphemeralRegex() => CreateMode.EPHEMERAL
+ case _ => CreateMode.PERSISTENT
+ }
+ case _ => CreateMode.PERSISTENT
+ }
+ val initClear = config.get(InitClear) match {
+ case Some(b: Boolean) => b
+ case _ => true
+ }
+ val closeClear = config.get(CloseClear) match {
+ case Some(b: Boolean) => b
+ case _ => false
+ }
+ val lockPath = config.getOrElse(LockPath, "lock").toString
+
+ private val cacheNamespace: String = if (namespace.isEmpty) metricName else namespace + separator + metricName
+ private val builder = CuratorFrameworkFactory.builder()
+ .connectString(hosts)
+ .retryPolicy(new ExponentialBackoffRetry(1000, 3))
+ .namespace(cacheNamespace)
+ private val client: CuratorFramework = builder.build
+
+ def init(): Unit = {
+ client.start()
+ info("start zk info cache")
+ client.usingNamespace(cacheNamespace)
+ info(s"init with namespace: ${cacheNamespace}")
+ delete(lockPath :: Nil)
+ if (initClear) {
+ clear
+ }
+ }
+
+ def available(): Boolean = {
+ client.getState match {
+ case CuratorFrameworkState.STARTED => true
+ case _ => false
+ }
+ }
+
+ def close(): Unit = {
+ if (closeClear) {
+ clear
+ }
+ info("close zk info cache")
+ client.close()
+ }
+
+ def cache(kvs: Map[String, String]): Unit = {
+ kvs.foreach(kv => createOrUpdate(path(kv._1), kv._2))
+ }
+
+ def read(keys: Iterable[String]): Map[String, String] = {
+ keys.flatMap { key =>
+ read(path(key)) match {
+ case Some(v) => Some((key, v))
+ case _ => None
+ }
+ }.toMap
+ }
+
+ def delete(keys: Iterable[String]): Unit = {
+ keys.foreach { key => delete(path(key)) }
+ }
+
+ def clear(): Unit = {
+// delete("/")
+ delete(finalCacheInfoPath :: Nil)
+ delete(infoPath :: Nil)
+ info("clear info")
+ }
+
+ def listKeys(p: String): List[String] = {
+ children(path(p))
+ }
+
+ def genLock(s: String): CheckpointLockInZK = {
+ val lpt = if (s.isEmpty) path(lockPath) else path(lockPath) + separator + s
+ CheckpointLockInZK(new InterProcessMutex(client, lpt))
+ }
+
+ private def path(k: String): String = {
+ if (k.startsWith(separator)) k else separator + k
+ }
+
+ private def children(path: String): List[String] = {
+ try {
+ client.getChildren().forPath(path).asScala.toList
+ } catch {
+ case e: Throwable => {
+ warn(s"list ${path} warn: ${e.getMessage}")
+ Nil
+ }
+ }
+ }
+
+ private def createOrUpdate(path: String, content: String): Boolean = {
+ if (checkExists(path)) {
+ update(path, content)
+ } else {
+ create(path, content)
+ }
+ }
+
+ private def create(path: String, content: String): Boolean = {
+ try {
+ client.create().creatingParentsIfNeeded().withMode(mode)
+ .forPath(path, content.getBytes("utf-8"))
+ true
+ } catch {
+ case e: Throwable => {
+ error(s"create ( ${path} -> ${content} ) error: ${e.getMessage}")
+ false
+ }
+ }
+ }
+
+ private def update(path: String, content: String): Boolean = {
+ try {
+ client.setData().forPath(path, content.getBytes("utf-8"))
+ true
+ } catch {
+ case e: Throwable => {
+ error(s"update ( ${path} -> ${content} ) error: ${e.getMessage}")
+ false
+ }
+ }
+ }
+
+ private def read(path: String): Option[String] = {
+ try {
+ Some(new String(client.getData().forPath(path), "utf-8"))
+ } catch {
+ case e: Throwable => {
+ warn(s"read ${path} warn: ${e.getMessage}")
+ None
+ }
+ }
+ }
+
+ private def delete(path: String): Unit = {
+ try {
+ client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path)
+ } catch {
+ case e: Throwable => error(s"delete ${path} error: ${e.getMessage}")
+ }
+ }
+
+ private def checkExists(path: String): Boolean = {
+ try {
+ client.checkExists().forPath(path) != null
+ } catch {
+ case e: Throwable => {
+ warn(s"check exists ${path} warn: ${e.getMessage}")
+ false
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetOps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetOps.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetOps.scala
new file mode 100644
index 0000000..6be97ef
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetOps.scala
@@ -0,0 +1,125 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.streaming.checkpoint.offset
+
+trait OffsetOps extends Serializable { this: OffsetCheckpoint =>
+
+ val CacheTime = "cache.time"
+ val LastProcTime = "last.proc.time"
+ val ReadyTime = "ready.time"
+ val CleanTime = "clean.time"
+ val OldCacheIndex = "old.cache.index"
+
+ def cacheTime(path: String): String = s"${path}/${CacheTime}"
+ def lastProcTime(path: String): String = s"${path}/${LastProcTime}"
+ def readyTime(path: String): String = s"${path}/${ReadyTime}"
+ def cleanTime(path: String): String = s"${path}/${CleanTime}"
+ def oldCacheIndex(path: String): String = s"${path}/${OldCacheIndex}"
+
+ val infoPath = "info"
+
+ val finalCacheInfoPath = "info.final"
+ val finalReadyTime = s"${finalCacheInfoPath}/${ReadyTime}"
+ val finalLastProcTime = s"${finalCacheInfoPath}/${LastProcTime}"
+ val finalCleanTime = s"${finalCacheInfoPath}/${CleanTime}"
+
+ def startOffsetCheckpoint(): Unit = {
+ genFinalReadyTime
+ }
+
+ def getTimeRange(): (Long, Long) = {
+ readTimeRange
+ }
+
+ def getCleanTime(): Long = {
+ readCleanTime
+ }
+
+ def endOffsetCheckpoint: Unit = {
+ genFinalLastProcTime
+ genFinalCleanTime
+ }
+
+ private def genFinalReadyTime(): Unit = {
+ val subPath = listKeys(infoPath)
+ val keys = subPath.map { p => s"${infoPath}/${p}/${ReadyTime}" }
+ val result = read(keys)
+ val times = keys.flatMap { k =>
+ getLongOpt(result, k)
+ }
+ if (times.nonEmpty) {
+ val time = times.min
+ val map = Map[String, String]((finalReadyTime -> time.toString))
+ cache(map)
+ }
+ }
+
+ private def genFinalLastProcTime(): Unit = {
+ val subPath = listKeys(infoPath)
+ val keys = subPath.map { p => s"${infoPath}/${p}/${LastProcTime}" }
+ val result = read(keys)
+ val times = keys.flatMap { k =>
+ getLongOpt(result, k)
+ }
+ if (times.nonEmpty) {
+ val time = times.min
+ val map = Map[String, String]((finalLastProcTime -> time.toString))
+ cache(map)
+ }
+ }
+
+ private def genFinalCleanTime(): Unit = {
+ val subPath = listKeys(infoPath)
+ val keys = subPath.map { p => s"${infoPath}/${p}/${CleanTime}" }
+ val result = read(keys)
+ val times = keys.flatMap { k =>
+ getLongOpt(result, k)
+ }
+ if (times.nonEmpty) {
+ val time = times.min
+ val map = Map[String, String]((finalCleanTime -> time.toString))
+ cache(map)
+ }
+ }
+
+ private def readTimeRange(): (Long, Long) = {
+ val map = read(List(finalLastProcTime, finalReadyTime))
+ val lastProcTime = getLong(map, finalLastProcTime)
+ val curReadyTime = getLong(map, finalReadyTime)
+ (lastProcTime, curReadyTime)
+ }
+
+ private def readCleanTime(): Long = {
+ val map = read(List(finalCleanTime))
+ val cleanTime = getLong(map, finalCleanTime)
+ cleanTime
+ }
+
+ private def getLongOpt(map: Map[String, String], key: String): Option[Long] = {
+ try {
+ map.get(key).map(_.toLong)
+ } catch {
+ case e: Throwable => None
+ }
+ }
+ private def getLong(map: Map[String, String], key: String) = {
+ getLongOpt(map, key).getOrElse(-1L)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLock.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLock.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLock.scala
deleted file mode 100644
index a2cfad9..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLock.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.streaming.lock
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.griffin.measure.Loggable
-
-/**
- * lock for info cache
- */
-trait CacheLock extends Loggable with Serializable {
-
- def lock(outtime: Long, unit: TimeUnit): Boolean
-
- def unlock(): Unit
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLockInZK.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLockInZK.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLockInZK.scala
deleted file mode 100644
index 5ae4c75..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLockInZK.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.streaming.lock
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.curator.framework.recipes.locks.InterProcessMutex
-
-case class CacheLockInZK(@transient mutex: InterProcessMutex) extends CacheLock {
-
- def lock(outtime: Long, unit: TimeUnit): Boolean = {
- try {
- if (outtime >= 0) {
- mutex.acquire(outtime, unit)
- } else {
- mutex.acquire(-1, null)
- }
- } catch {
- case e: Throwable => {
- error(s"lock error: ${e.getMessage}")
- false
- }
- }
-
- }
-
- def unlock(): Unit = {
- try {
- if (mutex.isAcquiredInThisProcess) mutex.release
- } catch {
- case e: Throwable => {
- error(s"unlock error: ${e.getMessage}")
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLockSeq.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLockSeq.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLockSeq.scala
deleted file mode 100644
index 6f033bd..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLockSeq.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.streaming.lock
-
-import java.util.concurrent.TimeUnit
-
-case class CacheLockSeq(cacheLocks: Seq[CacheLock]) extends CacheLock {
-
- def lock(outtime: Long, unit: TimeUnit): Boolean = {
- cacheLocks.headOption.map(_.lock(outtime, unit)).getOrElse(true)
- }
-
- def unlock(): Unit = {
- cacheLocks.headOption.foreach(_.unlock)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCache.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCache.scala
deleted file mode 100644
index fc78eda..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCache.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.streaming.offset
-
-import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.context.streaming.lock.CacheLock
-
-trait OffsetCache extends Loggable with Serializable {
-
- def init(): Unit
- def available(): Boolean
- def close(): Unit
-
- def cache(kvs: Map[String, String]): Unit
- def read(keys: Iterable[String]): Map[String, String]
- def delete(keys: Iterable[String]): Unit
- def clear(): Unit
-
- def listKeys(path: String): List[String]
-
- def genLock(s: String): CacheLock
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheClient.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheClient.scala
deleted file mode 100644
index 8d4abea..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheClient.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.streaming.offset
-
-import org.apache.griffin.measure.configuration.dqdefinition.OffsetCacheParam
-import org.apache.griffin.measure.context.streaming.lock.{CacheLock, CacheLockSeq}
-
-object OffsetCacheClient extends OffsetCache with OffsetOps {
- var offsetCaches: Seq[OffsetCache] = Nil
-
- def initClient(offsetCacheParams: Iterable[OffsetCacheParam], metricName: String) = {
- val fac = OffsetCacheFactory(offsetCacheParams, metricName)
- offsetCaches = offsetCacheParams.flatMap(param => fac.getOffsetCache(param)).toList
- }
-
- def init(): Unit = offsetCaches.foreach(_.init)
- def available(): Boolean = offsetCaches.foldLeft(false)(_ || _.available)
- def close(): Unit = offsetCaches.foreach(_.close)
-
- def cache(kvs: Map[String, String]): Unit = {
- offsetCaches.foreach(_.cache(kvs))
- }
- def read(keys: Iterable[String]): Map[String, String] = {
- val maps = offsetCaches.map(_.read(keys)).reverse
- maps.fold(Map[String, String]())(_ ++ _)
- }
- def delete(keys: Iterable[String]): Unit = offsetCaches.foreach(_.delete(keys))
- def clear(): Unit = offsetCaches.foreach(_.clear)
-
- def listKeys(path: String): List[String] = {
- offsetCaches.foldLeft(Nil: List[String]) { (res, offsetCache) =>
- if (res.size > 0) res else offsetCache.listKeys(path)
- }
- }
-
- def genLock(s: String): CacheLock = CacheLockSeq(offsetCaches.map(_.genLock(s)))
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheFactory.scala
deleted file mode 100644
index 0fbf335..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheFactory.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.streaming.offset
-
-import org.apache.griffin.measure.configuration.dqdefinition.OffsetCacheParam
-
-import scala.util.{Success, Try}
-
-case class OffsetCacheFactory(offsetCacheParams: Iterable[OffsetCacheParam], metricName: String
- ) extends Serializable {
-
- val ZK_REGEX = """^(?i)zk|zookeeper$""".r
-
- def getOffsetCache(offsetCacheParam: OffsetCacheParam): Option[OffsetCache] = {
- val config = offsetCacheParam.getConfig
- val offsetCacheTry = offsetCacheParam.getType match {
- case ZK_REGEX() => Try(OffsetCacheInZK(config, metricName))
- case _ => throw new Exception("not supported info cache type")
- }
- offsetCacheTry.toOption
- }
-
-}
\ No newline at end of file