You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2022/11/03 22:09:21 UTC
[samza] branch master updated: add the metric "container-active-threads" back (#1638)
This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 95a0c3334 add the metric "container-active-threads" back (#1638)
95a0c3334 is described below
commit 95a0c3334fbb701145e2cca922422f46af1544ae
Author: Alan Zhang <sh...@gmail.com>
AuthorDate: Thu Nov 3 15:09:14 2022 -0700
add the metric "container-active-threads" back (#1638)
Symptom
No data was emitted for the metric container-active-threads
Cause
This PR(#1501 ) removed the logic to emit data for metric accidently: https://github.com/apache/samza/pull/1501/files#diff-f79781ad4c55ae7860829b06fd9dfd15e8069c37e64f8854d8f27ca2cd1f3ee5L637
Changes
Add the deleted logic back in a new classSamzaContainerMonitorListener
---
.../container/SamzaContainerMonitorListener.java | 63 +++++++++++++++++++
.../apache/samza/container/SamzaContainer.scala | 19 ++----
.../TestSamzaContainerMonitorListener.java | 71 ++++++++++++++++++++++
3 files changed, 138 insertions(+), 15 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/container/SamzaContainerMonitorListener.java b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerMonitorListener.java
new file mode 100644
index 000000000..b498b799e
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerMonitorListener.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container;
+
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import org.apache.samza.config.ClusterManagerConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.container.host.SystemMemoryStatistics;
+import org.apache.samza.container.host.SystemStatisticsMonitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SamzaContainerMonitorListener implements SystemStatisticsMonitor.Listener {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SamzaContainerMonitorListener.class);
+
+ private final SamzaContainerMetrics containerMetrics;
+ private final ExecutorService taskThreadPool;
+ private final int containerMemoryMb;
+
+ public SamzaContainerMonitorListener(Config config, SamzaContainerMetrics containerMetrics,
+ ExecutorService taskThreadPool) {
+ this.containerMetrics = containerMetrics;
+ this.taskThreadPool = taskThreadPool;
+ this.containerMemoryMb = new ClusterManagerConfig(config).getContainerMemoryMb();
+ }
+
+ @Override
+ public void onUpdate(SystemMemoryStatistics sample) {
+ // update memory metric
+ long physicalMemoryBytes = sample.getPhysicalMemoryBytes();
+ float physicalMemoryMb = physicalMemoryBytes / (1024.0F * 1024.0F);
+ float memoryUtilization = physicalMemoryMb / containerMemoryMb;
+ LOGGER.debug("Container physical memory utilization (mb): " + physicalMemoryMb);
+ LOGGER.debug("Container physical memory utilization: " + memoryUtilization);
+ containerMetrics.physicalMemoryMb().set(physicalMemoryMb);
+ containerMetrics.physicalMemoryUtilization().set(memoryUtilization);
+
+ // update thread related metrics
+ if (Objects.nonNull(taskThreadPool) && taskThreadPool instanceof ThreadPoolExecutor) {
+ int containerActiveThreads = ((ThreadPoolExecutor) taskThreadPool).getActiveCount();
+ LOGGER.debug("Container active threads count: " + containerActiveThreads);
+ containerMetrics.containerActiveThreads().set(containerActiveThreads);
+ }
+ }
+}
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 69c7d4e5a..fa0843ab3 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -639,20 +639,9 @@ object SamzaContainer extends Logging {
appConfig.getRunId,
isHighLevelApiJob)
- val containerMemoryMb : Int = new ClusterManagerConfig(config).getContainerMemoryMb
-
- val memoryStatisticsMonitor : SystemStatisticsMonitor = new StatisticsMonitorImpl()
- memoryStatisticsMonitor.registerListener(new SystemStatisticsMonitor.Listener {
- override def onUpdate(sample: SystemMemoryStatistics): Unit = {
- val physicalMemoryBytes : Long = sample.getPhysicalMemoryBytes
- val physicalMemoryMb : Float = physicalMemoryBytes / (1024.0F * 1024.0F)
- val memoryUtilization : Float = physicalMemoryMb.toFloat / containerMemoryMb
- logger.debug("Container physical memory utilization (mb): " + physicalMemoryMb)
- logger.debug("Container physical memory utilization: " + memoryUtilization)
- samzaContainerMetrics.physicalMemoryMb.set(physicalMemoryMb)
- samzaContainerMetrics.physicalMemoryUtilization.set(memoryUtilization);
- }
- })
+ val systemStatisticsMonitor : SystemStatisticsMonitor = new StatisticsMonitorImpl()
+ systemStatisticsMonitor.registerListener(
+ new SamzaContainerMonitorListener(config, samzaContainerMetrics, taskThreadPool))
val diskQuotaBytes = config.getLong("container.disk.quota.bytes", Long.MaxValue)
samzaContainerMetrics.diskQuotaBytes.set(diskQuotaBytes)
@@ -697,7 +686,7 @@ object SamzaContainer extends Logging {
reporters = reporters,
jvm = jvm,
diskSpaceMonitor = diskSpaceMonitor,
- hostStatisticsMonitor = memoryStatisticsMonitor,
+ hostStatisticsMonitor = systemStatisticsMonitor,
taskThreadPool = taskThreadPool,
commitThreadPool = commitThreadPool,
timerExecutor = timerExecutor,
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestSamzaContainerMonitorListener.java b/samza-core/src/test/java/org/apache/samza/container/TestSamzaContainerMonitorListener.java
new file mode 100644
index 000000000..b38377b2e
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/container/TestSamzaContainerMonitorListener.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container;
+
+import java.util.Collections;
+import java.util.concurrent.ThreadPoolExecutor;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.host.SystemMemoryStatistics;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+
+public class TestSamzaContainerMonitorListener {
+
+ @Mock
+ private SystemMemoryStatistics sample;
+ @Mock
+ private ThreadPoolExecutor taskThreadPool;
+
+ private final int containerMemoryMb = 2048;
+ private final long physicalMemoryBytes = 1024000L;
+ private final int activeThreadCount = 2;
+
+ private final Config config =
+ new MapConfig(Collections.singletonMap("cluster-manager.container.memory.mb", String.valueOf(containerMemoryMb)));
+ private final SamzaContainerMetrics containerMetrics =
+ new SamzaContainerMetrics("container", new MetricsRegistryMap(), "");
+
+ private SamzaContainerMonitorListener samzaContainerMonitorListener;
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+ when(sample.getPhysicalMemoryBytes()).thenReturn(physicalMemoryBytes);
+ when(taskThreadPool.getActiveCount()).thenReturn(activeThreadCount);
+
+ samzaContainerMonitorListener = new SamzaContainerMonitorListener(config, containerMetrics, taskThreadPool);
+ }
+
+ @Test
+ public void testOnUpdate() {
+ samzaContainerMonitorListener.onUpdate(sample);
+ float physicalMemoryMb = physicalMemoryBytes / 1024.0F / 1024.0F;
+ assertEquals(physicalMemoryMb, containerMetrics.physicalMemoryMb().getValue());
+ assertEquals(physicalMemoryMb / containerMemoryMb, containerMetrics.physicalMemoryUtilization().getValue());
+ assertEquals(activeThreadCount, containerMetrics.containerActiveThreads().getValue());
+ }
+}