You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2018/01/16 08:57:57 UTC
incubator-griffin git commit: Fix bug of record persist in simple mode
Repository: incubator-griffin
Updated Branches:
refs/heads/master cbff5b45c -> 06f969fea
Fix bug of record persist in simple mode
the timestamp of records should be default timestamp in export config, not the calculate timestamp
Author: Lionel Liu <bh...@163.com>
Closes #192 from bhlx3lyx7/tmst.
Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/06f969fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/06f969fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/06f969fe
Branch: refs/heads/master
Commit: 06f969feac9d0016dba0caed0b913e395ef06559
Parents: cbff5b4
Author: Lionel Liu <bh...@163.com>
Authored: Tue Jan 16 16:57:49 2018 +0800
Committer: Lionel Liu <bh...@163.com>
Committed: Tue Jan 16 16:57:49 2018 +0800
----------------------------------------------------------------------
.../griffin/measure/process/BatchDqProcess.scala | 4 ++--
.../griffin/measure/process/StreamingDqThread.scala | 4 ++--
.../griffin/measure/process/engine/DqEngine.scala | 2 +-
.../griffin/measure/process/engine/DqEngines.scala | 16 ++++++++--------
.../measure/process/engine/SparkDqEngine.scala | 3 +--
5 files changed, 14 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/06f969fe/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala b/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
index 950cd27..44cca9a 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
@@ -116,9 +116,9 @@ case class BatchDqProcess(allParam: AllParam) extends DqProcess {
dqEngines.runRuleSteps(calcTimeInfo, rulePlan.ruleSteps)
// persist results
- dqEngines.persistAllMetrics(calcTimeInfo, rulePlan.metricExports, persistFactory)
+ dqEngines.persistAllMetrics(rulePlan.metricExports, persistFactory)
- dqEngines.persistAllRecords(calcTimeInfo, rulePlan.recordExports, persistFactory, dataSources)
+ dqEngines.persistAllRecords(rulePlan.recordExports, persistFactory, dataSources)
// dfs.foreach(_._2.cache())
//
// dqEngines.persistAllRecords(dfs, persistFactory)
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/06f969fe/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala
index fcf9528..c3c4f09 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala
@@ -85,7 +85,7 @@ case class StreamingDqThread(sqlContext: SQLContext,
// persist results
// val timeGroups = dqEngines.persistAllMetrics(ruleSteps, persistFactory)
- dqEngines.persistAllMetrics(calcTimeInfo, optRulePlan.metricExports, persistFactory)
+ dqEngines.persistAllMetrics(optRulePlan.metricExports, persistFactory)
// println(s"--- timeGroups: ${timeGroups}")
val rt = new Date().getTime
@@ -93,7 +93,7 @@ case class StreamingDqThread(sqlContext: SQLContext,
appPersist.log(rt, persistResultTimeStr)
// persist records
- dqEngines.persistAllRecords(calcTimeInfo, optRulePlan.recordExports, persistFactory, dataSources)
+ dqEngines.persistAllRecords(optRulePlan.recordExports, persistFactory, dataSources)
val et = new Date().getTime
val persistTimeStr = s"persist records using time: ${et - rt} ms"
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/06f969fe/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
index 00c6ef4..ee3a65e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
@@ -34,7 +34,7 @@ trait DqEngine extends Loggable with Serializable {
protected def collectable(): Boolean = false
- def collectMetrics(timeInfo: TimeInfo, metricExport: MetricExport): Map[Long, Map[String, Any]]
+ def collectMetrics(metricExport: MetricExport): Map[Long, Map[String, Any]]
// def collectRecords(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long]): Option[RDD[(Long, Iterable[String])]]
//
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/06f969fe/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
index 2163925..8f17764 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
@@ -54,11 +54,11 @@ case class DqEngines(engines: Seq[DqEngine]) extends DqEngine {
}
}
- def persistAllMetrics(timeInfo: TimeInfo, metricExports: Seq[MetricExport], persistFactory: PersistFactory
+ def persistAllMetrics(metricExports: Seq[MetricExport], persistFactory: PersistFactory
): Unit = {
val allMetrics: Map[Long, Map[String, Any]] = {
metricExports.foldLeft(Map[Long, Map[String, Any]]()) { (ret, metricExport) =>
- val metrics = collectMetrics(timeInfo, metricExport)
+ val metrics = collectMetrics(metricExport)
metrics.foldLeft(ret) { (total, pair) =>
val (k, v) = pair
total.get(k) match {
@@ -112,7 +112,7 @@ case class DqEngines(engines: Seq[DqEngine]) extends DqEngine {
Await.result(pro.future, Duration.Inf)
}
- def persistAllRecords(timeInfo: TimeInfo, recordExports: Seq[RecordExport],
+ def persistAllRecords(recordExports: Seq[RecordExport],
persistFactory: PersistFactory, dataSources: Seq[DataSource]
): Unit = {
// method 1: multi thread persist multi data frame
@@ -127,7 +127,7 @@ case class DqEngines(engines: Seq[DqEngine]) extends DqEngine {
recordExport.mode match {
case SimpleMode => {
collectBatchRecords(recordExport).foreach { rdd =>
- persistCollectedBatchRecords(timeInfo, recordExport, rdd, persistFactory)
+ persistCollectedBatchRecords(recordExport, rdd, persistFactory)
}
}
case TimestampMode => {
@@ -154,10 +154,10 @@ case class DqEngines(engines: Seq[DqEngine]) extends DqEngine {
ret
}
- private def persistCollectedBatchRecords(timeInfo: TimeInfo, recordExport: RecordExport,
+ private def persistCollectedBatchRecords(recordExport: RecordExport,
records: RDD[String], persistFactory: PersistFactory
): Unit = {
- val persist = persistFactory.getPersists(timeInfo.calcTime)
+ val persist = persistFactory.getPersists(recordExport.defTimestamp)
persist.persistRecords(records, recordExport.name)
}
@@ -282,10 +282,10 @@ case class DqEngines(engines: Seq[DqEngine]) extends DqEngine {
// engine.collectUpdateCacheDatas(ruleStep, timeGroups)
// }.headOption
// }
- def collectMetrics(timeInfo: TimeInfo, metricExport: MetricExport
+ def collectMetrics(metricExport: MetricExport
): Map[Long, Map[String, Any]] = {
val ret = engines.foldLeft(Map[Long, Map[String, Any]]()) { (ret, engine) =>
- if (ret.nonEmpty) ret else engine.collectMetrics(timeInfo, metricExport)
+ if (ret.nonEmpty) ret else engine.collectMetrics(metricExport)
}
ret
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/06f969fe/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
index 3bcecdb..736ce56 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
@@ -68,8 +68,7 @@ trait SparkDqEngine extends DqEngine {
}
}
- def collectMetrics(timeInfo: TimeInfo, metricExport: MetricExport
- ): Map[Long, Map[String, Any]] = {
+ def collectMetrics(metricExport: MetricExport): Map[Long, Map[String, Any]] = {
if (collectable) {
val MetricExport(name, stepName, collectType, defTmst, mode) = metricExport
try {