You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/07/25 22:03:05 UTC

samza git commit: SAMZA-1733: Adding containerID to metric header

Repository: samza
Updated Branches:
  refs/heads/master c98d7b0d8 -> 3235929bc


SAMZA-1733: Adding containerID to metric header

Adding containerID to MetricsHeader (published by MetricsSnapshotReporter).
It is populated using the value set for the env variable ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID

Author: rmatharu@linkedin.com <rm...@linkedin.com>

Reviewers: Yi Pan<ni...@gmail.com>

Closes #572 from rmatharu/containerid


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3235929b
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3235929b
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3235929b

Branch: refs/heads/master
Commit: 3235929bc55f9bc0f7cc9a8b7ad2994200cb7f35
Parents: c98d7b0
Author: rmatharu@linkedin.com <rm...@linkedin.com>
Authored: Wed Jul 25 15:03:02 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Wed Jul 25 15:03:02 2018 -0700

----------------------------------------------------------------------
 .../org/apache/samza/metrics/reporter/MetricsHeader.scala      | 3 +++
 .../samza/metrics/reporter/MetricsSnapshotReporter.scala       | 6 +++++-
 .../apache/samza/serializers/TestMetricsSnapshotSerde.scala    | 3 ++-
 3 files changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/3235929b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsHeader.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsHeader.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsHeader.scala
index 8359b17..2fef04e 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsHeader.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsHeader.scala
@@ -29,6 +29,7 @@ object MetricsHeader {
       map.get("job-name").toString,
       map.get("job-id").toString,
       map.get("container-name").toString,
+      map.get("exec-env-container-id").toString,
       map.get("source").toString,
       map.get("version").toString,
       map.get("samza-version").toString,
@@ -45,6 +46,7 @@ class MetricsHeader(
   @BeanProperty val jobName: String,
   @BeanProperty val jobId: String,
   @BeanProperty val containerName: String,
+  @BeanProperty val execEnvironmentContainerId: String,
   @BeanProperty val source: String,
   @BeanProperty val version: String,
   @BeanProperty val samzaVersion: String,
@@ -57,6 +59,7 @@ class MetricsHeader(
     map.put("job-name", jobName)
     map.put("job-id", jobId)
     map.put("container-name", containerName)
+    map.put("exec-env-container-id", execEnvironmentContainerId)
     map.put("source", source)
     map.put("version", version)
     map.put("samza-version", samzaVersion)

http://git-wip-us.apache.org/repos/asf/samza/blob/3235929b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
index d300e90..eca22ff 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
@@ -31,6 +31,8 @@ import java.util.Map
 import java.util.concurrent.Executors
 import java.util.concurrent.TimeUnit
 
+import org.apache.samza.config.ShellCommandConfig
+
 import scala.collection.JavaConverters._
 
 /**
@@ -55,6 +57,8 @@ class MetricsSnapshotReporter(
   serializer: Serializer[MetricsSnapshot] = null,
   clock: () => Long = () => { System.currentTimeMillis }) extends MetricsReporter with Runnable with Logging {
 
+  val execEnvironmentContainerId = Option[String](System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID)).getOrElse("")
+
   val executor = Executors.newSingleThreadScheduledExecutor(
     new ThreadFactoryBuilder().setNameFormat("Samza MetricsSnapshotReporter Thread-%d").setDaemon(true).build())
   val resetTime = clock()
@@ -125,7 +129,7 @@ class MetricsSnapshotReporter(
         metricsMsg.put(group, groupMsg)
       })
 
-      val header = new MetricsHeader(jobName, jobId, containerName, source, version, samzaVersion, host, clock(), resetTime)
+      val header = new MetricsHeader(jobName, jobId, containerName, execEnvironmentContainerId, source, version, samzaVersion, host, clock(), resetTime)
       val metrics = new Metrics(metricsMsg)
 
       debug("Flushing metrics for %s to %s with header and map: header=%s, map=%s." format (source, out, header.getAsMap, metrics.getAsMap))

http://git-wip-us.apache.org/repos/asf/samza/blob/3235929b/samza-core/src/test/scala/org/apache/samza/serializers/TestMetricsSnapshotSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestMetricsSnapshotSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestMetricsSnapshotSerde.scala
index 5bc0be6..360e6fa 100644
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestMetricsSnapshotSerde.scala
+++ b/samza-core/src/test/scala/org/apache/samza/serializers/TestMetricsSnapshotSerde.scala
@@ -22,6 +22,7 @@ package org.apache.samza.serializers
 import java.util.HashMap
 import java.util.Map
 
+import org.apache.samza.config.ShellCommandConfig
 import org.apache.samza.metrics.reporter.MetricsSnapshot
 import org.apache.samza.metrics.reporter.MetricsHeader
 import org.apache.samza.metrics.reporter.Metrics
@@ -33,7 +34,7 @@ class TestMetricsSnapshotSerde {
   @Ignore
   @Test
   def testMetricsSerdeShouldSerializeAndDeserializeAMetric {
-    val header = new MetricsHeader("test", "testjobid", "task", "test", "version", "samzaversion", "host", 1L, 2L)
+    val header = new MetricsHeader("test-jobName", "testjobid", "samza-container-0", "test exec env container id", "test source", "version", "samzaversion", "host", 1L, 2L)
     val metricsMap = new HashMap[String, Object]()
     metricsMap.put("test2", "foo")
     val metricsGroupMap = new HashMap[String, Map[String, Object]]()