You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2018/01/16 08:57:57 UTC

incubator-griffin git commit: Fix bug of record persist in simple mode

Repository: incubator-griffin
Updated Branches:
  refs/heads/master cbff5b45c -> 06f969fea


Fix bug of record persist in simple mode

the timestamp of records should be default timestamp in export config, not the calculate timestamp

Author: Lionel Liu <bh...@163.com>

Closes #192 from bhlx3lyx7/tmst.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/06f969fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/06f969fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/06f969fe

Branch: refs/heads/master
Commit: 06f969feac9d0016dba0caed0b913e395ef06559
Parents: cbff5b4
Author: Lionel Liu <bh...@163.com>
Authored: Tue Jan 16 16:57:49 2018 +0800
Committer: Lionel Liu <bh...@163.com>
Committed: Tue Jan 16 16:57:49 2018 +0800

----------------------------------------------------------------------
 .../griffin/measure/process/BatchDqProcess.scala    |  4 ++--
 .../griffin/measure/process/StreamingDqThread.scala |  4 ++--
 .../griffin/measure/process/engine/DqEngine.scala   |  2 +-
 .../griffin/measure/process/engine/DqEngines.scala  | 16 ++++++++--------
 .../measure/process/engine/SparkDqEngine.scala      |  3 +--
 5 files changed, 14 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/06f969fe/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala b/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
index 950cd27..44cca9a 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
@@ -116,9 +116,9 @@ case class BatchDqProcess(allParam: AllParam) extends DqProcess {
     dqEngines.runRuleSteps(calcTimeInfo, rulePlan.ruleSteps)
 
     // persist results
-    dqEngines.persistAllMetrics(calcTimeInfo, rulePlan.metricExports, persistFactory)
+    dqEngines.persistAllMetrics(rulePlan.metricExports, persistFactory)
 
-    dqEngines.persistAllRecords(calcTimeInfo, rulePlan.recordExports, persistFactory, dataSources)
+    dqEngines.persistAllRecords(rulePlan.recordExports, persistFactory, dataSources)
 //    dfs.foreach(_._2.cache())
 //
 //    dqEngines.persistAllRecords(dfs, persistFactory)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/06f969fe/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala
index fcf9528..c3c4f09 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala
@@ -85,7 +85,7 @@ case class StreamingDqThread(sqlContext: SQLContext,
 
         // persist results
 //        val timeGroups = dqEngines.persistAllMetrics(ruleSteps, persistFactory)
-        dqEngines.persistAllMetrics(calcTimeInfo, optRulePlan.metricExports, persistFactory)
+        dqEngines.persistAllMetrics(optRulePlan.metricExports, persistFactory)
 //        println(s"--- timeGroups: ${timeGroups}")
 
         val rt = new Date().getTime
@@ -93,7 +93,7 @@ case class StreamingDqThread(sqlContext: SQLContext,
         appPersist.log(rt, persistResultTimeStr)
 
         // persist records
-        dqEngines.persistAllRecords(calcTimeInfo, optRulePlan.recordExports, persistFactory, dataSources)
+        dqEngines.persistAllRecords(optRulePlan.recordExports, persistFactory, dataSources)
 
         val et = new Date().getTime
         val persistTimeStr = s"persist records using time: ${et - rt} ms"

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/06f969fe/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
index 00c6ef4..ee3a65e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
@@ -34,7 +34,7 @@ trait DqEngine extends Loggable with Serializable {
 
   protected def collectable(): Boolean = false
 
-  def collectMetrics(timeInfo: TimeInfo, metricExport: MetricExport): Map[Long, Map[String, Any]]
+  def collectMetrics(metricExport: MetricExport): Map[Long, Map[String, Any]]
 
   //  def collectRecords(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long]): Option[RDD[(Long, Iterable[String])]]
   //

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/06f969fe/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
index 2163925..8f17764 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
@@ -54,11 +54,11 @@ case class DqEngines(engines: Seq[DqEngine]) extends DqEngine {
     }
   }
 
-  def persistAllMetrics(timeInfo: TimeInfo, metricExports: Seq[MetricExport], persistFactory: PersistFactory
+  def persistAllMetrics(metricExports: Seq[MetricExport], persistFactory: PersistFactory
                        ): Unit = {
     val allMetrics: Map[Long, Map[String, Any]] = {
       metricExports.foldLeft(Map[Long, Map[String, Any]]()) { (ret, metricExport) =>
-        val metrics = collectMetrics(timeInfo, metricExport)
+        val metrics = collectMetrics(metricExport)
         metrics.foldLeft(ret) { (total, pair) =>
           val (k, v) = pair
           total.get(k) match {
@@ -112,7 +112,7 @@ case class DqEngines(engines: Seq[DqEngine]) extends DqEngine {
     Await.result(pro.future, Duration.Inf)
   }
 
-  def persistAllRecords(timeInfo: TimeInfo, recordExports: Seq[RecordExport],
+  def persistAllRecords(recordExports: Seq[RecordExport],
                         persistFactory: PersistFactory, dataSources: Seq[DataSource]
                        ): Unit = {
     // method 1: multi thread persist multi data frame
@@ -127,7 +127,7 @@ case class DqEngines(engines: Seq[DqEngine]) extends DqEngine {
       recordExport.mode match {
         case SimpleMode => {
           collectBatchRecords(recordExport).foreach { rdd =>
-            persistCollectedBatchRecords(timeInfo, recordExport, rdd, persistFactory)
+            persistCollectedBatchRecords(recordExport, rdd, persistFactory)
           }
         }
         case TimestampMode => {
@@ -154,10 +154,10 @@ case class DqEngines(engines: Seq[DqEngine]) extends DqEngine {
     ret
   }
 
-  private def persistCollectedBatchRecords(timeInfo: TimeInfo, recordExport: RecordExport,
+  private def persistCollectedBatchRecords(recordExport: RecordExport,
                                            records: RDD[String], persistFactory: PersistFactory
                                           ): Unit = {
-    val persist = persistFactory.getPersists(timeInfo.calcTime)
+    val persist = persistFactory.getPersists(recordExport.defTimestamp)
     persist.persistRecords(records, recordExport.name)
   }
 
@@ -282,10 +282,10 @@ case class DqEngines(engines: Seq[DqEngine]) extends DqEngine {
 //      engine.collectUpdateCacheDatas(ruleStep, timeGroups)
 //    }.headOption
 //  }
-  def collectMetrics(timeInfo: TimeInfo, metricExport: MetricExport
+  def collectMetrics(metricExport: MetricExport
                     ): Map[Long, Map[String, Any]] = {
     val ret = engines.foldLeft(Map[Long, Map[String, Any]]()) { (ret, engine) =>
-      if (ret.nonEmpty) ret else engine.collectMetrics(timeInfo, metricExport)
+      if (ret.nonEmpty) ret else engine.collectMetrics(metricExport)
     }
     ret
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/06f969fe/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
index 3bcecdb..736ce56 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
@@ -68,8 +68,7 @@ trait SparkDqEngine extends DqEngine {
     }
   }
 
-  def collectMetrics(timeInfo: TimeInfo, metricExport: MetricExport
-                    ): Map[Long, Map[String, Any]] = {
+  def collectMetrics(metricExport: MetricExport): Map[Long, Map[String, Any]] = {
     if (collectable) {
       val MetricExport(name, stepName, collectType, defTmst, mode) = metricExport
       try {