You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2019/03/06 00:21:06 UTC

[griffin] branch master updated: [GRIFFIN-234] Add applicationId to MetricWrapper

This is an automated email from the ASF dual-hosted git repository.

guoyp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git


The following commit(s) were added to refs/heads/master by this push:
     new dd1a16b  [GRIFFIN-234] Add applicationId to MetricWrapper
dd1a16b is described below

commit dd1a16bf6a676bc880709a6b972f6947355310a0
Author: dershovGD <de...@griddynamics.com>
AuthorDate: Wed Mar 6 08:21:00 2019 +0800

    [GRIFFIN-234] Add applicationId to MetricWrapper
    
    In order to store more detailed information about certain metric the YARN application id associated with that metric has been added to MetricWrapper's flush method.
    
    Author: dershovGD <de...@griddynamics.com>
    
    Closes #483 from dershovGD/GRIFFIN-233-modify-ElasticSearchSink.
---
 .../scala/org/apache/griffin/measure/context/DQContext.scala     | 2 +-
 .../scala/org/apache/griffin/measure/context/MetricWrapper.scala | 6 ++++--
 .../org/apache/griffin/measure/context/MetricWrapperTest.scala   | 9 +++++----
 3 files changed, 10 insertions(+), 7 deletions(-)

diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
index b0759c5..2fdf409 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
@@ -44,7 +44,7 @@ case class DQContext(contextId: ContextId,
 
   val dataFrameCache: DataFrameCache = DataFrameCache()
 
-  val metricWrapper: MetricWrapper = MetricWrapper(name)
+  val metricWrapper: MetricWrapper = MetricWrapper(name, sparkSession.sparkContext.applicationId)
   val writeMode = WriteMode.defaultMode(procType)
 
   val dataSourceNames: Seq[String] = {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/MetricWrapper.scala b/measure/src/main/scala/org/apache/griffin/measure/context/MetricWrapper.scala
index cec737f..df162a7 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/MetricWrapper.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/MetricWrapper.scala
@@ -23,11 +23,12 @@ import scala.collection.mutable.{Map => MutableMap}
 /**
   * wrap metrics into one, each calculation produces one metric map
   */
-case class MetricWrapper(name: String) extends Serializable {
+case class MetricWrapper(name: String, applicationId: String) extends Serializable {
 
   val _Name = "name"
   val _Timestamp = "tmst"
   val _Value = "value"
+  val _ApplicationId = "applicationId"
 
   val metrics: MutableMap[Long, Map[String, Any]] = MutableMap()
 
@@ -45,7 +46,8 @@ case class MetricWrapper(name: String) extends Serializable {
       (timestamp, Map[String, Any](
         (_Name -> name),
         (_Timestamp -> timestamp),
-        (_Value -> value)
+        (_Value -> value),
+        (_ApplicationId -> applicationId)
       ))
     }
   }
diff --git a/measure/src/test/scala/org/apache/griffin/measure/context/MetricWrapperTest.scala b/measure/src/test/scala/org/apache/griffin/measure/context/MetricWrapperTest.scala
index c835611..8ad7d5d 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/context/MetricWrapperTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/context/MetricWrapperTest.scala
@@ -23,19 +23,20 @@ import org.scalatest._
 class MetricWrapperTest extends FlatSpec with Matchers {
 
   "metric wrapper" should "flush empty if no metric inserted" in {
-    val metricWrapper = MetricWrapper("name")
+    val metricWrapper = MetricWrapper("name", "appId")
     metricWrapper.flush should be (Map[Long, Map[String, Any]]())
   }
 
   it should "flush all metrics inserted" in {
-    val metricWrapper = MetricWrapper("test")
+    val metricWrapper = MetricWrapper("test", "appId")
     metricWrapper.insertMetric(1, Map("total" -> 10, "miss"-> 2))
     metricWrapper.insertMetric(1, Map("match" -> 8))
     metricWrapper.insertMetric(2, Map("total" -> 20))
     metricWrapper.insertMetric(2, Map("miss" -> 4))
     metricWrapper.flush should be (Map(
-      1L -> Map("name" -> "test", "tmst" -> 1, "value" -> Map("total" -> 10, "miss"-> 2, "match" -> 8)),
-      2L -> Map("name" -> "test", "tmst" -> 2, "value" -> Map("total" -> 20, "miss"-> 4))
+      1L -> Map("name" -> "test", "tmst" -> 1, "value" -> Map("total" -> 10, "miss"-> 2, "match" -> 8),
+        "applicationId" -> "appId"),
+      2L -> Map("name" -> "test", "tmst" -> 2, "value" -> Map("total" -> 20, "miss"-> 4), "applicationId" -> "appId")
     ))
   }