You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ra...@apache.org on 2021/09/28 00:56:59 UTC

[samza] branch master updated: Fix container physical memory monitoring to fetch all children process RSS memory with 1 process (#1533)

This is an automated email from the ASF dual-hosted git repository.

rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 571f394  Fix container physical memory monitoring to fetch all children process RSS memory with 1 process (#1533)
571f394 is described below

commit 571f39452419f443154ec72ceed3dd7bb77bed1e
Author: Ziting <90...@users.noreply.github.com>
AuthorDate: Mon Sep 27 17:56:55 2021 -0700

    Fix container physical memory monitoring to fetch all children process RSS memory with 1 process (#1533)
    
    * Include memory usage of child processes in physical-memory-mb metric
    
    * Dummy change to trigger CI
    
    * Revert "Include memory usage of child processes in physical-memory-mb metric"
    
    This reverts commit 9b9b4d051768aaad7d8d69a96b7c138a66a7989f.
    
    * Dummy change to trigger CI
    
    * Revert "Revert "Include memory usage of child processes in physical-memory-mb metric""
    
    This reverts commit 1de226b0cf3e2e14b713a08e5ee31a16ac71629a.
    
    * Bump test timeout to 20 seconds to see if CI can pass
    
    * Try commenting out returning all children processes of the parent process
    
    * Try commenting out returning all children processes of the parent process
    
    * Switch to another way of reading children processes
    
    * Fix stylecheck
    
    * Only evaluate 2 children processes
    
    * Only calculate memory for 5 children processes
    
    * try reading all children processes without calculating memory for any. see if CI passes
    
    * Read only first 5 child processes
    
    * Fix style check
    
    * Adjust test frequency
    
    * Further adjust test frequency
    
    * Adjust frequency of both tests
    
    * Fetch all rss memory in 1 shell command
    
    * Release limit on child process number
    
    * Try to resolve OOM
    
    * Try to resolve OOM
    
    * Dummy change to trigger CI
    
    * Restore test setup
    
    * Trigger CI again
    
    * More logging and clarify method and variable names
    
    * Change typo in variable name
---
 .../host/PosixCommandBasedStatisticsGetter.java    | 73 ++++++----------------
 .../container/host/TestStatisticsMonitorImpl.java  |  1 +
 2 files changed, 21 insertions(+), 53 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/container/host/PosixCommandBasedStatisticsGetter.java b/samza-core/src/main/java/org/apache/samza/container/host/PosixCommandBasedStatisticsGetter.java
index 8fdff3b..ad62eff 100644
--- a/samza-core/src/main/java/org/apache/samza/container/host/PosixCommandBasedStatisticsGetter.java
+++ b/samza-core/src/main/java/org/apache/samza/container/host/PosixCommandBasedStatisticsGetter.java
@@ -18,9 +18,9 @@
  */
 package org.apache.samza.container.host;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
-import java.util.stream.Collectors;
-import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,29 +36,6 @@ public class PosixCommandBasedStatisticsGetter implements SystemStatisticsGetter
   private static final Logger log = LoggerFactory.getLogger(PosixCommandBasedStatisticsGetter.class);
 
   /**
-   * A convenience method to execute shell commands and return the first line of their output.
-   *
-   * @param cmdArray the command to run
-   * @return the first line of the output.
-   * @throws IOException
-   */
-  private String getCommandOutput(String[] cmdArray) throws IOException {
-    Process executable = Runtime.getRuntime().exec(cmdArray);
-    BufferedReader processReader = null;
-    String psOutput = null;
-
-    try {
-      processReader = new BufferedReader(new InputStreamReader(executable.getInputStream()));
-      psOutput = processReader.readLine();
-    } finally {
-      if (processReader != null) {
-        processReader.close();
-      }
-    }
-    return psOutput;
-  }
-
-  /**
    * A convenience method to execute shell commands and return all lines of their output.
    *
    * @param cmdArray the command to run
@@ -66,52 +43,42 @@ public class PosixCommandBasedStatisticsGetter implements SystemStatisticsGetter
    * @throws IOException
    */
   private List<String> getAllCommandOutput(String[] cmdArray) throws IOException {
+    log.info("Executing commands {}", Arrays.toString(cmdArray));
     Process executable = Runtime.getRuntime().exec(cmdArray);
-    BufferedReader processReader = null;
-    List<String> psOutput;
+    BufferedReader processReader;
+    List<String> psOutput = new ArrayList<>();
 
-    try {
-      processReader = new BufferedReader(new InputStreamReader(executable.getInputStream()));
-      psOutput = processReader.lines().filter(StringUtils::isNotEmpty).collect(Collectors.toList());
-    } finally {
-      if (processReader != null) {
-        processReader.close();
+    processReader = new BufferedReader(new InputStreamReader(executable.getInputStream()));
+    String line;
+    while ((line = processReader.readLine()) != null) {
+      if (!line.isEmpty()) {
+        psOutput.add(line);
       }
     }
+    processReader.close();
     return psOutput;
   }
 
-  private long getTotalPhysicalMemory() throws IOException {
+  private long getTotalPhysicalMemoryUsageBytes() throws IOException {
     // collect all child process ids of the main process that runs the application
     List<String> processIds = getAllCommandOutput(new String[]{"sh", "-c", "pgrep -P $PPID"});
     // add the parent process which is the main process that runs the application
     processIds.add("$PPID");
-    long totalPhysicalMemory = 0;
-    for (String processId : processIds) {
-      totalPhysicalMemory += getPhysicalMemory(processId);
-    }
-    return totalPhysicalMemory;
-  }
-
-  private long getPhysicalMemory(String processId) throws IOException {
-    // returns a single long value that represents the rss memory of the process.
-    String commandOutput = getCommandOutput(new String[]{"sh", "-c", String.format("ps -o rss= -p %s", processId)});
-
-    // this should never happen.
-    if (commandOutput == null) {
-      throw new IOException("ps returned unexpected output: " + commandOutput);
+    String processIdsJoined = String.join(" ", processIds);
+    // returns a list of long values that represent the rss memory of each process.
+    List<String> processMemoryKBArray = getAllCommandOutput(new String[]{"sh", "-c", String.format("ps -o rss= -p %s", processIdsJoined)});
+    long totalPhysicalMemoryKB = 0;
+    for (String processMemory : processMemoryKBArray) {
+      totalPhysicalMemoryKB += Long.parseLong(processMemory.trim());
     }
-
-    long rssMemoryKb = Long.parseLong(commandOutput.trim());
     //convert to bytes
-    return rssMemoryKb * 1024;
+    return totalPhysicalMemoryKB * 1024;
   }
 
-
   @Override
   public SystemMemoryStatistics getSystemMemoryStatistics() {
     try {
-      long memory = getTotalPhysicalMemory();
+      long memory = getTotalPhysicalMemoryUsageBytes();
       return new SystemMemoryStatistics(memory);
     } catch (Exception e) {
       log.warn("Error when running ps: ", e);
diff --git a/samza-core/src/test/java/org/apache/samza/container/host/TestStatisticsMonitorImpl.java b/samza-core/src/test/java/org/apache/samza/container/host/TestStatisticsMonitorImpl.java
index b01fdca..7d86496 100644
--- a/samza-core/src/test/java/org/apache/samza/container/host/TestStatisticsMonitorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/container/host/TestStatisticsMonitorImpl.java
@@ -63,6 +63,7 @@ public class TestStatisticsMonitorImpl {
     Assert.assertFalse(registrationFailsAfterStop);
   }
 
+
   @Test
   public void testStopBehavior() throws Exception {