You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2022/11/03 22:09:21 UTC

[samza] branch master updated: add the metric "container-active-threads" back (#1638)

This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 95a0c3334 add the metric "container-active-threads" back (#1638)
95a0c3334 is described below

commit 95a0c3334fbb701145e2cca922422f46af1544ae
Author: Alan Zhang <sh...@gmail.com>
AuthorDate: Thu Nov 3 15:09:14 2022 -0700

    add the metric "container-active-threads" back (#1638)
    
    Symptom
    No data was emitted for the metric container-active-threads
    
    Cause
    This PR(#1501 ) removed the logic to emit data for metric accidently: https://github.com/apache/samza/pull/1501/files#diff-f79781ad4c55ae7860829b06fd9dfd15e8069c37e64f8854d8f27ca2cd1f3ee5L637
    
    Changes
    Add the deleted logic back in a new classSamzaContainerMonitorListener
---
 .../container/SamzaContainerMonitorListener.java   | 63 +++++++++++++++++++
 .../apache/samza/container/SamzaContainer.scala    | 19 ++----
 .../TestSamzaContainerMonitorListener.java         | 71 ++++++++++++++++++++++
 3 files changed, 138 insertions(+), 15 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/container/SamzaContainerMonitorListener.java b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerMonitorListener.java
new file mode 100644
index 000000000..b498b799e
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerMonitorListener.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container;
+
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import org.apache.samza.config.ClusterManagerConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.container.host.SystemMemoryStatistics;
+import org.apache.samza.container.host.SystemStatisticsMonitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SamzaContainerMonitorListener implements SystemStatisticsMonitor.Listener {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SamzaContainerMonitorListener.class);
+
+  private final SamzaContainerMetrics containerMetrics;
+  private final ExecutorService taskThreadPool;
+  private final int containerMemoryMb;
+
+  public SamzaContainerMonitorListener(Config config, SamzaContainerMetrics containerMetrics,
+      ExecutorService taskThreadPool) {
+    this.containerMetrics = containerMetrics;
+    this.taskThreadPool = taskThreadPool;
+    this.containerMemoryMb = new ClusterManagerConfig(config).getContainerMemoryMb();
+  }
+
+  @Override
+  public void onUpdate(SystemMemoryStatistics sample) {
+    // update memory metric
+    long physicalMemoryBytes = sample.getPhysicalMemoryBytes();
+    float physicalMemoryMb = physicalMemoryBytes / (1024.0F * 1024.0F);
+    float memoryUtilization = physicalMemoryMb / containerMemoryMb;
+    LOGGER.debug("Container physical memory utilization (mb): " + physicalMemoryMb);
+    LOGGER.debug("Container physical memory utilization: " + memoryUtilization);
+    containerMetrics.physicalMemoryMb().set(physicalMemoryMb);
+    containerMetrics.physicalMemoryUtilization().set(memoryUtilization);
+
+    // update thread related metrics
+    if (Objects.nonNull(taskThreadPool) && taskThreadPool instanceof ThreadPoolExecutor) {
+      int containerActiveThreads = ((ThreadPoolExecutor) taskThreadPool).getActiveCount();
+      LOGGER.debug("Container active threads count: " + containerActiveThreads);
+      containerMetrics.containerActiveThreads().set(containerActiveThreads);
+    }
+  }
+}
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 69c7d4e5a..fa0843ab3 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -639,20 +639,9 @@ object SamzaContainer extends Logging {
       appConfig.getRunId,
       isHighLevelApiJob)
 
-    val containerMemoryMb : Int = new ClusterManagerConfig(config).getContainerMemoryMb
-
-    val memoryStatisticsMonitor : SystemStatisticsMonitor = new StatisticsMonitorImpl()
-    memoryStatisticsMonitor.registerListener(new SystemStatisticsMonitor.Listener {
-      override def onUpdate(sample: SystemMemoryStatistics): Unit = {
-        val physicalMemoryBytes : Long = sample.getPhysicalMemoryBytes
-        val physicalMemoryMb : Float = physicalMemoryBytes / (1024.0F * 1024.0F)
-        val memoryUtilization : Float = physicalMemoryMb.toFloat / containerMemoryMb
-        logger.debug("Container physical memory utilization (mb): " + physicalMemoryMb)
-        logger.debug("Container physical memory utilization: " + memoryUtilization)
-        samzaContainerMetrics.physicalMemoryMb.set(physicalMemoryMb)
-        samzaContainerMetrics.physicalMemoryUtilization.set(memoryUtilization);
-      }
-    })
+    val systemStatisticsMonitor : SystemStatisticsMonitor = new StatisticsMonitorImpl()
+    systemStatisticsMonitor.registerListener(
+      new SamzaContainerMonitorListener(config, samzaContainerMetrics, taskThreadPool))
 
     val diskQuotaBytes = config.getLong("container.disk.quota.bytes", Long.MaxValue)
     samzaContainerMetrics.diskQuotaBytes.set(diskQuotaBytes)
@@ -697,7 +686,7 @@ object SamzaContainer extends Logging {
       reporters = reporters,
       jvm = jvm,
       diskSpaceMonitor = diskSpaceMonitor,
-      hostStatisticsMonitor = memoryStatisticsMonitor,
+      hostStatisticsMonitor = systemStatisticsMonitor,
       taskThreadPool = taskThreadPool,
       commitThreadPool = commitThreadPool,
       timerExecutor = timerExecutor,
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestSamzaContainerMonitorListener.java b/samza-core/src/test/java/org/apache/samza/container/TestSamzaContainerMonitorListener.java
new file mode 100644
index 000000000..b38377b2e
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/container/TestSamzaContainerMonitorListener.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container;
+
+import java.util.Collections;
+import java.util.concurrent.ThreadPoolExecutor;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.host.SystemMemoryStatistics;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+
+public class TestSamzaContainerMonitorListener {
+
+  @Mock
+  private SystemMemoryStatistics sample;
+  @Mock
+  private ThreadPoolExecutor taskThreadPool;
+
+  private final int containerMemoryMb = 2048;
+  private final long physicalMemoryBytes = 1024000L;
+  private final int activeThreadCount = 2;
+
+  private final Config config =
+      new MapConfig(Collections.singletonMap("cluster-manager.container.memory.mb", String.valueOf(containerMemoryMb)));
+  private final SamzaContainerMetrics containerMetrics =
+      new SamzaContainerMetrics("container", new MetricsRegistryMap(), "");
+
+  private SamzaContainerMonitorListener samzaContainerMonitorListener;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    when(sample.getPhysicalMemoryBytes()).thenReturn(physicalMemoryBytes);
+    when(taskThreadPool.getActiveCount()).thenReturn(activeThreadCount);
+
+    samzaContainerMonitorListener = new SamzaContainerMonitorListener(config, containerMetrics, taskThreadPool);
+  }
+
+  @Test
+  public void testOnUpdate() {
+    samzaContainerMonitorListener.onUpdate(sample);
+    float physicalMemoryMb = physicalMemoryBytes / 1024.0F / 1024.0F;
+    assertEquals(physicalMemoryMb, containerMetrics.physicalMemoryMb().getValue());
+    assertEquals(physicalMemoryMb / containerMemoryMb, containerMetrics.physicalMemoryUtilization().getValue());
+    assertEquals(activeThreadCount, containerMetrics.containerActiveThreads().getValue());
+  }
+}