You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/07/25 22:03:05 UTC
samza git commit: SAMZA-1733: Adding containerID to metric header
Repository: samza
Updated Branches:
refs/heads/master c98d7b0d8 -> 3235929bc
SAMZA-1733: Adding containerID to metric header
Adding containerID to MetricsHeader (published by MetricsSnapshotReporter).
It is populated using the value set for the env variable ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID
Author: rmatharu@linkedin.com <rm...@linkedin.com>
Reviewers: Yi Pan<ni...@gmail.com>
Closes #572 from rmatharu/containerid
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3235929b
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3235929b
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3235929b
Branch: refs/heads/master
Commit: 3235929bc55f9bc0f7cc9a8b7ad2994200cb7f35
Parents: c98d7b0
Author: rmatharu@linkedin.com <rm...@linkedin.com>
Authored: Wed Jul 25 15:03:02 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Wed Jul 25 15:03:02 2018 -0700
----------------------------------------------------------------------
.../org/apache/samza/metrics/reporter/MetricsHeader.scala | 3 +++
.../samza/metrics/reporter/MetricsSnapshotReporter.scala | 6 +++++-
.../apache/samza/serializers/TestMetricsSnapshotSerde.scala | 3 ++-
3 files changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/3235929b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsHeader.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsHeader.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsHeader.scala
index 8359b17..2fef04e 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsHeader.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsHeader.scala
@@ -29,6 +29,7 @@ object MetricsHeader {
map.get("job-name").toString,
map.get("job-id").toString,
map.get("container-name").toString,
+ map.get("exec-env-container-id").toString,
map.get("source").toString,
map.get("version").toString,
map.get("samza-version").toString,
@@ -45,6 +46,7 @@ class MetricsHeader(
@BeanProperty val jobName: String,
@BeanProperty val jobId: String,
@BeanProperty val containerName: String,
+ @BeanProperty val execEnvironmentContainerId: String,
@BeanProperty val source: String,
@BeanProperty val version: String,
@BeanProperty val samzaVersion: String,
@@ -57,6 +59,7 @@ class MetricsHeader(
map.put("job-name", jobName)
map.put("job-id", jobId)
map.put("container-name", containerName)
+ map.put("exec-env-container-id", execEnvironmentContainerId)
map.put("source", source)
map.put("version", version)
map.put("samza-version", samzaVersion)
http://git-wip-us.apache.org/repos/asf/samza/blob/3235929b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
index d300e90..eca22ff 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
@@ -31,6 +31,8 @@ import java.util.Map
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
+import org.apache.samza.config.ShellCommandConfig
+
import scala.collection.JavaConverters._
/**
@@ -55,6 +57,8 @@ class MetricsSnapshotReporter(
serializer: Serializer[MetricsSnapshot] = null,
clock: () => Long = () => { System.currentTimeMillis }) extends MetricsReporter with Runnable with Logging {
+ val execEnvironmentContainerId = Option[String](System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID)).getOrElse("")
+
val executor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("Samza MetricsSnapshotReporter Thread-%d").setDaemon(true).build())
val resetTime = clock()
@@ -125,7 +129,7 @@ class MetricsSnapshotReporter(
metricsMsg.put(group, groupMsg)
})
- val header = new MetricsHeader(jobName, jobId, containerName, source, version, samzaVersion, host, clock(), resetTime)
+ val header = new MetricsHeader(jobName, jobId, containerName, execEnvironmentContainerId, source, version, samzaVersion, host, clock(), resetTime)
val metrics = new Metrics(metricsMsg)
debug("Flushing metrics for %s to %s with header and map: header=%s, map=%s." format (source, out, header.getAsMap, metrics.getAsMap))
http://git-wip-us.apache.org/repos/asf/samza/blob/3235929b/samza-core/src/test/scala/org/apache/samza/serializers/TestMetricsSnapshotSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestMetricsSnapshotSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestMetricsSnapshotSerde.scala
index 5bc0be6..360e6fa 100644
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestMetricsSnapshotSerde.scala
+++ b/samza-core/src/test/scala/org/apache/samza/serializers/TestMetricsSnapshotSerde.scala
@@ -22,6 +22,7 @@ package org.apache.samza.serializers
import java.util.HashMap
import java.util.Map
+import org.apache.samza.config.ShellCommandConfig
import org.apache.samza.metrics.reporter.MetricsSnapshot
import org.apache.samza.metrics.reporter.MetricsHeader
import org.apache.samza.metrics.reporter.Metrics
@@ -33,7 +34,7 @@ class TestMetricsSnapshotSerde {
@Ignore
@Test
def testMetricsSerdeShouldSerializeAndDeserializeAMetric {
- val header = new MetricsHeader("test", "testjobid", "task", "test", "version", "samzaversion", "host", 1L, 2L)
+ val header = new MetricsHeader("test-jobName", "testjobid", "samza-container-0", "test exec env container id", "test source", "version", "samzaversion", "host", 1L, 2L)
val metricsMap = new HashMap[String, Object]()
metricsMap.put("test2", "foo")
val metricsGroupMap = new HashMap[String, Map[String, Object]]()