You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by wa...@apache.org on 2020/02/11 12:16:02 UTC

[griffin] branch master updated: optimize get metric maps in 'MetricWriteStep'

This is an automated email from the ASF dual-hosted git repository.

wankun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git


The following commit(s) were added to refs/heads/master by this push:
     new 24c270e  optimize get metric maps in 'MetricWriteStep'
24c270e is described below

commit 24c270e016a9c7c42ec5f137672dadc7f7ef13f6
Author: yuxiaoyu <yu...@bytedance.com>
AuthorDate: Tue Feb 11 20:15:53 2020 +0800

    optimize get metric maps in 'MetricWriteStep'
    
    **Why/What changes?**
    In 'MetricWriteStep.getMetricMaps()' the dataframe was transformed to json rdd, and then collect, and then transformed to Seq[Map].
    It's not elegant and hard to understand. More optimized way is to collect it first, and then transform it to Seq[Map] directly.
    
    We have test it with our DQ cases. It works well.
    
    Author: yuxiaoyu <yu...@bytedance.com>
    
    Closes #566 from XiaoyuBD/optimizeMetricWriteGetMaps.
---
 .../griffin/measure/step/write/MetricWriteStep.scala       | 14 ++++----------
 1 file changed, 4 insertions(+), 10 deletions(-)

diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
index d43a265..cdf337b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
@@ -82,16 +82,10 @@ case class MetricWriteStep(
   private def getMetricMaps(context: DQContext): Seq[Map[String, Any]] = {
     try {
       val pdf = context.sparkSession.table(s"`$inputName`")
-      val records = pdf.toJSON.collect()
-      if (records.length > 0) {
-        records.flatMap { rec =>
-          try {
-            val value = JsonUtil.toAnyMap(rec)
-            Some(value)
-          } catch {
-            case _: Throwable => None
-          }
-        }.toSeq
+      val rows = pdf.collect()
+      val columns = pdf.columns
+      if (rows.size > 0) {
+        rows.map(_.getValuesMap(columns))
       } else Nil
     } catch {
       case e: Throwable =>