You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2022/11/07 23:35:23 UTC

[samza] branch master updated: SAMZA-2762: new cpu usage metric which counts child processes usage (#1636)

This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new dd8ecf185 SAMZA-2762: new cpu usage metric which counts child processes usage (#1636)
dd8ecf185 is described below

commit dd8ecf185152ec7ff0739a10c700dca6802f8a37
Author: Alan Zhang <sh...@gmail.com>
AuthorDate: Mon Nov 7 15:35:17 2022 -0800

    SAMZA-2762: new cpu usage metric which counts child processes usage (#1636)
    
    Symptom
    We have observed that some use cases used quasar(TensorFlow framework) to do model inference and this framework spawn child processes(non-JVM) to run TensorFlow serving. These child processes were using high CPU usage(200%) however their CPU usage can't be captured by the existing CPU usage metric process-cpu-usage
    
    Cause
    The existing metric process-cpu-usage metric was designed for capturing the CPU usage for the JVM process only, it can't count the child processes(especially for non-JVM processes) usage.
    
    Changes
    Reply on oshi framwork to capture the CPU usage for the JVM process and all its child processes, and create a new metric to display the total CPU usage.
    The CPU usage percentage is calculated based on top of the logical CPU count on the system
    
    API Changes
    Added a new metric total-process-cpu-usage in SamzaContainerMetrics which is similar with how we provided physical-memory-mb metric
---
 build.gradle                                       |   3 +
 .../versioned/container/metrics-table.html         |   4 +
 .../versioned/operations/monitoring.md             |   1 +
 gradle/dependency-versions.gradle                  |   3 +-
 .../container/SamzaContainerMonitorListener.java   |  29 ++++--
 .../host/DefaultSystemStatisticsGetter.java        |  52 +++++++++++
 .../container/host/OshiBasedStatisticsGetter.java  | 103 +++++++++++++++++++++
 .../host/PosixCommandBasedStatisticsGetter.java    |  13 ++-
 .../samza/container/host/ProcessCPUStatistics.java |  63 +++++++++++++
 .../container/host/StatisticsMonitorImpl.java      |  31 ++++---
 .../samza/container/host/SystemStatistics.java     |  66 +++++++++++++
 .../container/host/SystemStatisticsGetter.java     |   8 ++
 .../container/host/SystemStatisticsMonitor.java    |   2 +-
 .../apache/samza/container/SamzaContainer.scala    |   2 +-
 .../samza/container/SamzaContainerMetrics.scala    |   1 +
 .../TestSamzaContainerMonitorListener.java         |  14 ++-
 .../host/TestDefaultSystemStatisticsGetter.java    |  56 +++++++++++
 .../host/TestOshiBasedStatisticsGetter.java        |  96 +++++++++++++++++++
 .../container/host/TestStatisticsMonitorImpl.java  |  27 +++---
 19 files changed, 530 insertions(+), 44 deletions(-)

diff --git a/build.gradle b/build.gradle
index 39687057f..1b6fcaddd 100644
--- a/build.gradle
+++ b/build.gradle
@@ -192,6 +192,9 @@ project(":samza-core_$scalaSuffix") {
     compile "org.scala-lang:scala-library:$scalaVersion"
     compile "org.slf4j:slf4j-api:$slf4jVersion"
     compile "net.jodah:failsafe:$failsafeVersion"
+    compile "com.github.oshi:oshi-core:$oshiVersion"
+    compile "net.java.dev.jna:jna:$jnaVersion"
+    compile "net.java.dev.jna:jna-platform:$jnaVersion"
     testCompile project(":samza-api").sourceSets.test.output
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-core:$mockitoVersion"
diff --git a/docs/learn/documentation/versioned/container/metrics-table.html b/docs/learn/documentation/versioned/container/metrics-table.html
index 869dd7d9a..e130fb085 100644
--- a/docs/learn/documentation/versioned/container/metrics-table.html
+++ b/docs/learn/documentation/versioned/container/metrics-table.html
@@ -215,6 +215,10 @@
         <td>executor-work-factor</td>
         <td>Current work factor in use</td>
     </tr>
+    <tr>
+        <td>total-process-cpu-usage</td>
+        <td>The process cpu usage percentage (in the [0, 100] interval) used by the Samza container process and all its child processes</td>
+    </tr>
     <tr>
         <td>physical-memory-mb</td>
         <td>The physical memory used by the Samza container process (native + on heap) (in megabytes)</td>
diff --git a/docs/learn/documentation/versioned/operations/monitoring.md b/docs/learn/documentation/versioned/operations/monitoring.md
index 067104719..c7ffaf600 100644
--- a/docs/learn/documentation/versioned/operations/monitoring.md
+++ b/docs/learn/documentation/versioned/operations/monitoring.md
@@ -392,6 +392,7 @@ All \<system\>, \<stream\>, \<partition\>, \<store-name\>, \<topic\>, are popula
 | | disk-usage-bytes | Total disk space size used by key-value stores (in bytes). |
 | | disk-quota-bytes | Disk memory usage quota for key-value stores (in bytes). |
 | | executor-work-factor | The work factor of the run loop. A work factor of 1 indicates full throughput, while a work factor of less than 1 will introduce delays into the execution to approximate the requested work factor. The work factor is set by the disk space monitor in accordance with the disk quota policy. Given the latest percentage of available disk quota, this policy returns the work factor that should be applied. |
+| | total-process-cpu-usage | The process cpu usage percentage (in the [0, 100] interval) used by the Samza container process and all its child processes. |
 | | physical-memory-mb | The physical memory used by the Samza container process (native + on heap) (in MBs). |
 | | physical-memory-utilization | The ratio between the physical memory used by the Samza container process (native + on heap) and the total physical memory of the Samza container. |
 | | container-thread-pool-size | The current size of a Samza container's thread pool. It may or may not be the same as job.container.thread.pool.size, depending on the implementation. |
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index 5720a625c..1896a3716 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -50,8 +50,9 @@
   zookeeperVersion = "3.6.3"
   failsafeVersion = "2.4.0"
   jlineVersion = "3.8.2"
-  jnaVersion = "4.5.1"
+  jnaVersion = "5.12.1"
   couchbaseClientVersion = "2.7.2"
   couchbaseMockVersion = "1.5.22"
   yarn3Version = "3.3.4"
+  oshiVersion = "6.3.0"
 }
diff --git a/samza-core/src/main/java/org/apache/samza/container/SamzaContainerMonitorListener.java b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerMonitorListener.java
index b498b799e..cfc4aea99 100644
--- a/samza-core/src/main/java/org/apache/samza/container/SamzaContainerMonitorListener.java
+++ b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerMonitorListener.java
@@ -23,7 +23,9 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
+import org.apache.samza.container.host.ProcessCPUStatistics;
 import org.apache.samza.container.host.SystemMemoryStatistics;
+import org.apache.samza.container.host.SystemStatistics;
 import org.apache.samza.container.host.SystemStatisticsMonitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,15 +45,26 @@ public class SamzaContainerMonitorListener implements SystemStatisticsMonitor.Li
   }
 
   @Override
-  public void onUpdate(SystemMemoryStatistics sample) {
+  public void onUpdate(SystemStatistics sample) {
+    // update cpu metric
+    ProcessCPUStatistics cpuSample = sample.getCpuStatistics();
+    if (Objects.nonNull(cpuSample)) {
+      double cpuUsage = cpuSample.getProcessCPUUsagePercentage();
+      LOGGER.debug("Container total cpu usage: " + cpuUsage);
+      containerMetrics.totalProcessCpuUsage().set(cpuUsage);
+    }
+
     // update memory metric
-    long physicalMemoryBytes = sample.getPhysicalMemoryBytes();
-    float physicalMemoryMb = physicalMemoryBytes / (1024.0F * 1024.0F);
-    float memoryUtilization = physicalMemoryMb / containerMemoryMb;
-    LOGGER.debug("Container physical memory utilization (mb): " + physicalMemoryMb);
-    LOGGER.debug("Container physical memory utilization: " + memoryUtilization);
-    containerMetrics.physicalMemoryMb().set(physicalMemoryMb);
-    containerMetrics.physicalMemoryUtilization().set(memoryUtilization);
+    SystemMemoryStatistics memorySample = sample.getMemoryStatistics();
+    if (Objects.nonNull(memorySample)) {
+      long physicalMemoryBytes = memorySample.getPhysicalMemoryBytes();
+      float physicalMemoryMb = physicalMemoryBytes / (1024.0F * 1024.0F);
+      float memoryUtilization = physicalMemoryMb / containerMemoryMb;
+      LOGGER.debug("Container physical memory utilization (mb): " + physicalMemoryMb);
+      LOGGER.debug("Container physical memory utilization: " + memoryUtilization);
+      containerMetrics.physicalMemoryMb().set(physicalMemoryMb);
+      containerMetrics.physicalMemoryUtilization().set(memoryUtilization);
+    }
 
     // update thread related metrics
     if (Objects.nonNull(taskThreadPool) && taskThreadPool instanceof ThreadPoolExecutor) {
diff --git a/samza-core/src/main/java/org/apache/samza/container/host/DefaultSystemStatisticsGetter.java b/samza-core/src/main/java/org/apache/samza/container/host/DefaultSystemStatisticsGetter.java
new file mode 100644
index 000000000..ccce5830f
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/host/DefaultSystemStatisticsGetter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.host;
+
+import com.google.common.annotations.VisibleForTesting;
+
+
+/**
+ * An default implementation of {@link SystemStatisticsGetter} that relies on {@link PosixCommandBasedStatisticsGetter}
+ * and {@link OshiBasedStatisticsGetter} implementations
+ */
+public class DefaultSystemStatisticsGetter implements SystemStatisticsGetter {
+  private final OshiBasedStatisticsGetter oshiBasedStatisticsGetter;
+  private final PosixCommandBasedStatisticsGetter posixCommandBasedStatisticsGetter;
+
+  public DefaultSystemStatisticsGetter() {
+    this(new OshiBasedStatisticsGetter(), new PosixCommandBasedStatisticsGetter());
+  }
+
+  @VisibleForTesting
+  DefaultSystemStatisticsGetter(OshiBasedStatisticsGetter oshiBasedStatisticsGetter,
+      PosixCommandBasedStatisticsGetter posixCommandBasedStatisticsGetter) {
+    this.oshiBasedStatisticsGetter = oshiBasedStatisticsGetter;
+    this.posixCommandBasedStatisticsGetter = posixCommandBasedStatisticsGetter;
+  }
+
+  @Override
+  public SystemMemoryStatistics getSystemMemoryStatistics() {
+    return posixCommandBasedStatisticsGetter.getSystemMemoryStatistics();
+  }
+
+  @Override
+  public ProcessCPUStatistics getProcessCPUStatistics() {
+    return oshiBasedStatisticsGetter.getProcessCPUStatistics();
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/container/host/OshiBasedStatisticsGetter.java b/samza-core/src/main/java/org/apache/samza/container/host/OshiBasedStatisticsGetter.java
new file mode 100644
index 000000000..22091b8a0
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/host/OshiBasedStatisticsGetter.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.host;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import oshi.SystemInfo;
+import oshi.annotation.concurrent.NotThreadSafe;
+import oshi.software.os.OSProcess;
+import oshi.software.os.OperatingSystem;
+
+
+/**
+ * An implementation of {@link SystemStatisticsGetter} that relies on using oshi framework(https://www.oshi.ooo/)
+ *
+ * This class captures the recent cpu usage percentage(in the [0, 100] interval) used by the Samza container process and
+ * its child processes. It gets CPU usage of this process since a previous snapshot of the same process, the snapshot is
+ * triggered by last poll, have polling interval of at least a few seconds is recommended.
+ */
+@NotThreadSafe
+public class OshiBasedStatisticsGetter implements SystemStatisticsGetter {
+  private static final Logger LOGGER = LoggerFactory.getLogger(OshiBasedStatisticsGetter.class);
+  // the snapshots of current JVM process and its child processes
+  private final Map<Integer, OSProcess> previousProcessSnapshots = new HashMap<>();
+
+  private final OperatingSystem os;
+  private final int cpuCount;
+
+  public OshiBasedStatisticsGetter() {
+    this(new SystemInfo());
+  }
+
+  @VisibleForTesting
+  OshiBasedStatisticsGetter(SystemInfo si) {
+    this(si.getOperatingSystem(), si.getHardware().getProcessor().getLogicalProcessorCount());
+  }
+
+  @VisibleForTesting
+  OshiBasedStatisticsGetter(OperatingSystem os, int cpuCount) {
+    this.os = os;
+    this.cpuCount = cpuCount;
+  }
+
+  @Override
+  public SystemMemoryStatistics getSystemMemoryStatistics() {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  @Override
+  public ProcessCPUStatistics getProcessCPUStatistics() {
+    try {
+      final List<OSProcess> currentProcessAndChildProcesses = getCurrentProcessAndChildProcesses();
+      final double totalCPUUsage = getTotalCPUUsage(currentProcessAndChildProcesses);
+      refreshProcessSnapshots(currentProcessAndChildProcesses);
+      return new ProcessCPUStatistics(100d * totalCPUUsage / cpuCount);
+    } catch (Exception e) {
+      LOGGER.warn("Error when running oshi: ", e);
+      return null;
+    }
+  }
+
+  private List<OSProcess> getCurrentProcessAndChildProcesses() {
+    final List<OSProcess> processes = new ArrayList<>();
+    // get current process
+    processes.add(os.getProcess(os.getProcessId()));
+    // get all child processes of current process
+    processes.addAll(os.getChildProcesses(os.getProcessId(), OperatingSystem.ProcessFiltering.ALL_PROCESSES,
+        OperatingSystem.ProcessSorting.NO_SORTING, 0));
+    return processes;
+  }
+
+  private double getTotalCPUUsage(List<OSProcess> processes) {
+    return processes.stream()
+        .mapToDouble(p -> p.getProcessCpuLoadBetweenTicks(previousProcessSnapshots.get(p.getProcessID())))
+        .sum();
+  }
+
+  private void refreshProcessSnapshots(List<OSProcess> processes) {
+    previousProcessSnapshots.clear();
+    processes.forEach(p -> previousProcessSnapshots.put(p.getProcessID(), p));
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/container/host/PosixCommandBasedStatisticsGetter.java b/samza-core/src/main/java/org/apache/samza/container/host/PosixCommandBasedStatisticsGetter.java
index e7043781a..ec16791e7 100644
--- a/samza-core/src/main/java/org/apache/samza/container/host/PosixCommandBasedStatisticsGetter.java
+++ b/samza-core/src/main/java/org/apache/samza/container/host/PosixCommandBasedStatisticsGetter.java
@@ -18,16 +18,15 @@
  */
 package org.apache.samza.container.host;
 
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-
 /**
  * An implementation of {@link SystemStatisticsGetter} that relies on using Posix commands like ps.
  */
@@ -85,4 +84,10 @@ public class PosixCommandBasedStatisticsGetter implements SystemStatisticsGetter
       return null;
     }
   }
+
+  @Override
+  public ProcessCPUStatistics getProcessCPUStatistics() {
+    throw new UnsupportedOperationException(
+        "No appropriate Posix command available for getting recent CPU usage information. For example, the CPU information exposed by ps command 'ps -o %cpu= -p <PID>' represents the percentage of time spent running during the entire lifetime of a process not for the recent CPU usage");
+  }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/container/host/ProcessCPUStatistics.java b/samza-core/src/main/java/org/apache/samza/container/host/ProcessCPUStatistics.java
new file mode 100644
index 000000000..da04c5d55
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/host/ProcessCPUStatistics.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.host;
+
+import java.util.Objects;
+
+
+/**
+ * A {@link ProcessCPUStatistics} object represents recent CPU usage percentage about the container process(including its child processes)
+ */
+public class ProcessCPUStatistics {
+
+  /**
+   * The CPU used by the Samza container process(including its child processes) in percentage.
+   */
+  private final double processCPUUsagePercentage;
+
+  ProcessCPUStatistics(double processCpuUsagePercentage) {
+    this.processCPUUsagePercentage = processCpuUsagePercentage;
+  }
+
+  public double getProcessCPUUsagePercentage() {
+    return processCPUUsagePercentage;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ProcessCPUStatistics that = (ProcessCPUStatistics) o;
+    return Double.compare(that.processCPUUsagePercentage, processCPUUsagePercentage) == 0;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(processCPUUsagePercentage);
+  }
+
+  @Override
+  public String toString() {
+    return "ProcessCPUStatistics{" + "processCPUUsagePercentage=" + processCPUUsagePercentage + '}';
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/container/host/StatisticsMonitorImpl.java b/samza-core/src/main/java/org/apache/samza/container/host/StatisticsMonitorImpl.java
index 23704b400..1f5297ed4 100644
--- a/samza-core/src/main/java/org/apache/samza/container/host/StatisticsMonitorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/container/host/StatisticsMonitorImpl.java
@@ -31,8 +31,9 @@ import java.util.concurrent.TimeUnit;
 
 /**
  * An implementation of {@link SystemStatisticsMonitor} for unix and mac platforms. Users can implement their own
- * ways of getting {@link SystemMemoryStatistics} and provide a {@link SystemStatisticsGetter} implementation. The default
- * behavior is to rely on unix commands like ps to obtain {@link SystemMemoryStatistics}
+ * ways of getting {@link SystemStatistics} and provide a {@link SystemStatisticsGetter} implementation. The default
+ * behavior is to rely on unix commands like ps to obtain {@link SystemMemoryStatistics} and rely on ohsi framework to
+ * obtain {@link ProcessCPUStatistics}
  *
  * All callback invocations are from the same thread - hence, are guaranteed to be serialized. An exception thrown
  * from a callback will suppress all subsequent callbacks. If the execution of a
@@ -78,7 +79,7 @@ public class StatisticsMonitorImpl implements SystemStatisticsMonitor {
    *
    */
   public StatisticsMonitorImpl() {
-    this(60000, new PosixCommandBasedStatisticsGetter());
+    this(60000, new DefaultSystemStatisticsGetter());
   }
 
   /**
@@ -117,23 +118,23 @@ public class StatisticsMonitorImpl implements SystemStatisticsMonitor {
   }
 
   private void sampleStatistics() {
-    SystemMemoryStatistics statistics = null;
+    SystemMemoryStatistics memoryStatistics = null;
+    ProcessCPUStatistics cpuStatistics = null;
     try {
-      statistics = statisticsGetter.getSystemMemoryStatistics();
+      memoryStatistics = statisticsGetter.getSystemMemoryStatistics();
+      cpuStatistics = statisticsGetter.getProcessCPUStatistics();
     } catch (Throwable e) {
       LOG.error("Error during obtaining statistics: ", e);
     }
-
+    SystemStatistics systemStatistics = new SystemStatistics(cpuStatistics, memoryStatistics);
     for (Listener listener : listenerSet.keySet()) {
-      if (statistics != null) {
-        try {
-          // catch all exceptions to shield one listener from exceptions thrown by others.
-          listener.onUpdate(statistics);
-        } catch (Throwable e) {
-          // delete this listener so that it does not receive future callbacks.
-          listenerSet.remove(listener);
-          LOG.error("Listener threw an exception: ", e);
-        }
+      try {
+        // catch all exceptions to shield one listener from exceptions thrown by others.
+        listener.onUpdate(systemStatistics);
+      } catch (Throwable e) {
+        // delete this listener so that it does not receive future callbacks.
+        listenerSet.remove(listener);
+        LOG.error("Listener threw an exception: ", e);
       }
     }
   }
diff --git a/samza-core/src/main/java/org/apache/samza/container/host/SystemStatistics.java b/samza-core/src/main/java/org/apache/samza/container/host/SystemStatistics.java
new file mode 100644
index 000000000..5b0d45b74
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/host/SystemStatistics.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.host;
+
+import java.util.Objects;
+
+/**
+ * A {@link SystemStatistics} object represents system related information about the physical process that runs the
+ * {@link org.apache.samza.container.SamzaContainer}.
+ */
+public class SystemStatistics {
+
+  private final ProcessCPUStatistics cpuStatistics;
+  private final SystemMemoryStatistics memoryStatistics;
+
+  public SystemStatistics(ProcessCPUStatistics cpuStatistics, SystemMemoryStatistics memoryStatistics) {
+    this.cpuStatistics = cpuStatistics;
+    this.memoryStatistics = memoryStatistics;
+  }
+
+  public ProcessCPUStatistics getCpuStatistics() {
+    return cpuStatistics;
+  }
+
+  public SystemMemoryStatistics getMemoryStatistics() {
+    return memoryStatistics;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    SystemStatistics that = (SystemStatistics) o;
+    return Objects.equals(cpuStatistics, that.cpuStatistics) && Objects.equals(memoryStatistics, that.memoryStatistics);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(cpuStatistics, memoryStatistics);
+  }
+
+  @Override
+  public String toString() {
+    return "SystemStatistics{" + "cpuStatistics=" + cpuStatistics + ", memoryStatistics=" + memoryStatistics + '}';
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/container/host/SystemStatisticsGetter.java b/samza-core/src/main/java/org/apache/samza/container/host/SystemStatisticsGetter.java
index 541c2fb6e..f340896c6 100644
--- a/samza-core/src/main/java/org/apache/samza/container/host/SystemStatisticsGetter.java
+++ b/samza-core/src/main/java/org/apache/samza/container/host/SystemStatisticsGetter.java
@@ -30,4 +30,12 @@ public interface SystemStatisticsGetter {
    * @return {@link SystemMemoryStatistics} for the Samza container
    */
   SystemMemoryStatistics getSystemMemoryStatistics();
+
+  /**
+   * Returns the {@link ProcessCPUStatistics} for the current Samza container process(includes its child processes). A
+   * 'null' value is returned if no statistics are available.
+   *
+   * @return {@link ProcessCPUStatistics} for the Samza container process
+   */
+  ProcessCPUStatistics getProcessCPUStatistics();
 }
diff --git a/samza-core/src/main/java/org/apache/samza/container/host/SystemStatisticsMonitor.java b/samza-core/src/main/java/org/apache/samza/container/host/SystemStatisticsMonitor.java
index 84f4ec367..b91f3709e 100644
--- a/samza-core/src/main/java/org/apache/samza/container/host/SystemStatisticsMonitor.java
+++ b/samza-core/src/main/java/org/apache/samza/container/host/SystemStatisticsMonitor.java
@@ -54,7 +54,7 @@ public interface SystemStatisticsMonitor {
      *
      * @param sample the currently sampled statistic.
      */
-    void onUpdate(SystemMemoryStatistics sample);
+    void onUpdate(SystemStatistics sample);
   }
 
 }
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index fa0843ab3..bba782525 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -35,7 +35,7 @@ import org.apache.samza.clustermanager.StandbyTaskUtil
 import org.apache.samza.config.{StreamConfig, _}
 import org.apache.samza.container.disk.DiskSpaceMonitor.Listener
 import org.apache.samza.container.disk.{DiskQuotaPolicyFactory, DiskSpaceMonitor, NoThrottlingDiskQuotaPolicyFactory, PollingScanDiskSpaceMonitor}
-import org.apache.samza.container.host.{StatisticsMonitorImpl, SystemMemoryStatistics, SystemStatisticsMonitor}
+import org.apache.samza.container.host.{StatisticsMonitorImpl, SystemMemoryStatistics, SystemStatistics, SystemStatisticsMonitor}
 import org.apache.samza.context._
 import org.apache.samza.diagnostics.DiagnosticsManager
 import org.apache.samza.drain.DrainMonitor.DrainCallback
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
index 138db2a16..dc87475f1 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
@@ -49,6 +49,7 @@ class SamzaContainerMetrics(
   val executorWorkFactor = newGauge("executor-work-factor", 1.0)
   val physicalMemoryMb = newGauge("physical-memory-mb", 0.0F)
   val physicalMemoryUtilization = newGauge("physical-memory-utilization", 0.0F)
+  val totalProcessCpuUsage = newGauge("total-process-cpu-usage", 0.0)
   val containerThreadPoolSize = newGauge("container-thread-pool-size", 0L)
   val containerActiveThreads = newGauge("container-active-threads", 0L)
 
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestSamzaContainerMonitorListener.java b/samza-core/src/test/java/org/apache/samza/container/TestSamzaContainerMonitorListener.java
index b38377b2e..5adb114f0 100644
--- a/samza-core/src/test/java/org/apache/samza/container/TestSamzaContainerMonitorListener.java
+++ b/samza-core/src/test/java/org/apache/samza/container/TestSamzaContainerMonitorListener.java
@@ -22,7 +22,9 @@ import java.util.Collections;
 import java.util.concurrent.ThreadPoolExecutor;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.host.ProcessCPUStatistics;
 import org.apache.samza.container.host.SystemMemoryStatistics;
+import org.apache.samza.container.host.SystemStatistics;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.junit.Before;
 import org.junit.Test;
@@ -36,10 +38,15 @@ import static org.mockito.Mockito.*;
 public class TestSamzaContainerMonitorListener {
 
   @Mock
-  private SystemMemoryStatistics sample;
+  private ProcessCPUStatistics cpuSample;
+  @Mock
+  private SystemMemoryStatistics memorySample;
   @Mock
   private ThreadPoolExecutor taskThreadPool;
 
+  private SystemStatistics sample;
+
+  private final double cpuUsage = 30.0;
   private final int containerMemoryMb = 2048;
   private final long physicalMemoryBytes = 1024000L;
   private final int activeThreadCount = 2;
@@ -54,15 +61,18 @@ public class TestSamzaContainerMonitorListener {
   @Before
   public void setup() {
     MockitoAnnotations.initMocks(this);
-    when(sample.getPhysicalMemoryBytes()).thenReturn(physicalMemoryBytes);
+    when(cpuSample.getProcessCPUUsagePercentage()).thenReturn(cpuUsage);
+    when(memorySample.getPhysicalMemoryBytes()).thenReturn(physicalMemoryBytes);
     when(taskThreadPool.getActiveCount()).thenReturn(activeThreadCount);
 
+    sample = new SystemStatistics(cpuSample, memorySample);
     samzaContainerMonitorListener = new SamzaContainerMonitorListener(config, containerMetrics, taskThreadPool);
   }
 
   @Test
   public void testOnUpdate() {
     samzaContainerMonitorListener.onUpdate(sample);
+    assertEquals(cpuUsage, containerMetrics.totalProcessCpuUsage().getValue());
     float physicalMemoryMb = physicalMemoryBytes / 1024.0F / 1024.0F;
     assertEquals(physicalMemoryMb, containerMetrics.physicalMemoryMb().getValue());
     assertEquals(physicalMemoryMb / containerMemoryMb, containerMetrics.physicalMemoryUtilization().getValue());
diff --git a/samza-core/src/test/java/org/apache/samza/container/host/TestDefaultSystemStatisticsGetter.java b/samza-core/src/test/java/org/apache/samza/container/host/TestDefaultSystemStatisticsGetter.java
new file mode 100644
index 000000000..04f9669d0
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/container/host/TestDefaultSystemStatisticsGetter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.host;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.mockito.Mockito.*;
+
+
+public class TestDefaultSystemStatisticsGetter {
+
+  @Mock
+  private OshiBasedStatisticsGetter oshiBasedStatisticsGetter;
+  @Mock
+  private PosixCommandBasedStatisticsGetter posixCommandBasedStatisticsGetter;
+
+  private DefaultSystemStatisticsGetter defaultSystemStatisticsGetter;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    this.defaultSystemStatisticsGetter =
+        new DefaultSystemStatisticsGetter(oshiBasedStatisticsGetter, posixCommandBasedStatisticsGetter);
+  }
+
+  @Test
+  public void testGetSystemMemoryStatistics() {
+    defaultSystemStatisticsGetter.getSystemMemoryStatistics();
+    verify(posixCommandBasedStatisticsGetter).getSystemMemoryStatistics();
+  }
+
+  @Test
+  public void testGetProcessCPUStatistics() {
+    defaultSystemStatisticsGetter.getProcessCPUStatistics();
+    verify(oshiBasedStatisticsGetter).getProcessCPUStatistics();
+  }
+}
diff --git a/samza-core/src/test/java/org/apache/samza/container/host/TestOshiBasedStatisticsGetter.java b/samza-core/src/test/java/org/apache/samza/container/host/TestOshiBasedStatisticsGetter.java
new file mode 100644
index 000000000..ee31e354a
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/container/host/TestOshiBasedStatisticsGetter.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.host;
+
+import com.google.common.collect.Lists;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import oshi.software.os.OSProcess;
+import oshi.software.os.OperatingSystem;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+
+public class TestOshiBasedStatisticsGetter {
+  @Mock
+  private OperatingSystem os;
+  @Mock
+  private OSProcess process;
+  @Mock
+  private OSProcess childProcess;
+  @Mock
+  private OSProcess processSnapshot;
+  @Mock
+  private OSProcess childProcessSnapshot;
+
+  private final int pid = 10001;
+  private final int childPid = 10002;
+  private final int cpuCount = 2;
+
+  private OshiBasedStatisticsGetter oshiBasedStatisticsGetter;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+
+    when(process.getProcessID()).thenReturn(pid);
+    when(processSnapshot.getProcessID()).thenReturn(pid);
+    when(childProcess.getProcessID()).thenReturn(childPid);
+    when(childProcessSnapshot.getProcessID()).thenReturn(childPid);
+
+    when(os.getProcessId()).thenReturn(pid);
+
+    oshiBasedStatisticsGetter = new OshiBasedStatisticsGetter(os, cpuCount);
+  }
+
+  @Test
+  public void testGetProcessCPUStatistics() {
+    // first time to get cpu usage info
+    double processCpuUsage1 = 0.5;
+    double childProcessCPUUsage1 = 0.3;
+    when(os.getProcess(pid)).thenReturn(processSnapshot);
+    when(os.getProcess(childPid)).thenReturn(childProcessSnapshot);
+    when(os.getChildProcesses(pid, OperatingSystem.ProcessFiltering.ALL_PROCESSES,
+        OperatingSystem.ProcessSorting.NO_SORTING, 0)).thenReturn(Lists.newArrayList(childProcessSnapshot));
+    when(processSnapshot.getProcessCpuLoadBetweenTicks(null)).thenReturn(processCpuUsage1);
+    when(childProcessSnapshot.getProcessCpuLoadBetweenTicks(null)).thenReturn(childProcessCPUUsage1);
+    assertEquals(new ProcessCPUStatistics(100d * (processCpuUsage1 + childProcessCPUUsage1) / cpuCount),
+        oshiBasedStatisticsGetter.getProcessCPUStatistics());
+
+    // second time to get cpu usage info
+    double processCpuUsage2 = 0.2;
+    double childProcessCPUUsage2 = 2;
+    when(os.getProcess(pid)).thenReturn(process);
+    when(os.getProcess(childPid)).thenReturn(childProcess);
+    when(os.getChildProcesses(pid, OperatingSystem.ProcessFiltering.ALL_PROCESSES,
+        OperatingSystem.ProcessSorting.NO_SORTING, 0)).thenReturn(Lists.newArrayList(childProcess));
+    when(process.getProcessCpuLoadBetweenTicks(processSnapshot)).thenReturn(processCpuUsage2);
+    when(childProcess.getProcessCpuLoadBetweenTicks(childProcessSnapshot)).thenReturn(childProcessCPUUsage2);
+    assertEquals(new ProcessCPUStatistics(100d * (processCpuUsage2 + childProcessCPUUsage2) / cpuCount),
+        oshiBasedStatisticsGetter.getProcessCPUStatistics());
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testGetSystemMemoryStatistics() {
+    oshiBasedStatisticsGetter.getSystemMemoryStatistics();
+  }
+}
diff --git a/samza-core/src/test/java/org/apache/samza/container/host/TestStatisticsMonitorImpl.java b/samza-core/src/test/java/org/apache/samza/container/host/TestStatisticsMonitorImpl.java
index 7d864968a..9700454d4 100644
--- a/samza-core/src/test/java/org/apache/samza/container/host/TestStatisticsMonitorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/container/host/TestStatisticsMonitorImpl.java
@@ -18,7 +18,7 @@
  */
 package org.apache.samza.container.host;
 
-import junit.framework.Assert;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.concurrent.CountDownLatch;
@@ -27,22 +27,27 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.fail;
 
+
 public class TestStatisticsMonitorImpl {
 
   @Test
-  public void testPhysicalMemoryReporting() throws Exception {
+  public void testSystemStatisticsReporting() throws Exception {
     final int numSamplesToCollect = 5;
     final CountDownLatch latch = new CountDownLatch(numSamplesToCollect);
 
-    final StatisticsMonitorImpl monitor = new StatisticsMonitorImpl(10, new PosixCommandBasedStatisticsGetter());
+    final StatisticsMonitorImpl monitor = new StatisticsMonitorImpl(10, new DefaultSystemStatisticsGetter());
     monitor.start();
 
     boolean result = monitor.registerListener(new SystemStatisticsMonitor.Listener() {
 
       @Override
-      public void onUpdate(SystemMemoryStatistics sample) {
+      public void onUpdate(SystemStatistics sample) {
+        SystemMemoryStatistics memorySample = sample.getMemoryStatistics();
         // assert memory is greater than 10 bytes, as a sanity check
-        Assert.assertTrue(sample.getPhysicalMemoryBytes() > 10);
+        Assert.assertTrue(memorySample.getPhysicalMemoryBytes() > 10);
+        ProcessCPUStatistics cpuSample = sample.getCpuStatistics();
+        // assert cpu usage is greater than 0, as a sanity check
+        Assert.assertTrue(cpuSample.getProcessCPUUsagePercentage() > 0);
         latch.countDown();
       }
     });
@@ -57,13 +62,12 @@ public class TestStatisticsMonitorImpl {
     // assert that attempting to register a listener after monitor stop results in failure of registration
     boolean registrationFailsAfterStop = monitor.registerListener(new SystemStatisticsMonitor.Listener() {
       @Override
-      public void onUpdate(SystemMemoryStatistics sample) {
+      public void onUpdate(SystemStatistics sample) {
       }
     });
     Assert.assertFalse(registrationFailsAfterStop);
   }
 
-
   @Test
   public void testStopBehavior() throws Exception {
 
@@ -71,14 +75,15 @@ public class TestStatisticsMonitorImpl {
     final CountDownLatch latch = new CountDownLatch(1);
     final AtomicInteger numCallbacks = new AtomicInteger(0);
 
-    final StatisticsMonitorImpl monitor = new StatisticsMonitorImpl(10, new PosixCommandBasedStatisticsGetter());
+    final StatisticsMonitorImpl monitor = new StatisticsMonitorImpl(10, new DefaultSystemStatisticsGetter());
 
     monitor.start();
     monitor.registerListener(new SystemStatisticsMonitor.Listener() {
 
       @Override
-      public void onUpdate(SystemMemoryStatistics sample) {
-        Assert.assertTrue(sample.getPhysicalMemoryBytes() > 10);
+      public void onUpdate(SystemStatistics sample) {
+        SystemMemoryStatistics memorySample = sample.getMemoryStatistics();
+        Assert.assertTrue(memorySample.getPhysicalMemoryBytes() > 10);
         if (numCallbacks.incrementAndGet() == numSamplesToCollect) {
           //monitor.stop() is invoked from the same thread. So, there's no race between a stop() call and the
           //callback invocation for the next sample.
@@ -94,6 +99,4 @@ public class TestStatisticsMonitorImpl {
     // Ensure that we only receive as many callbacks
     Assert.assertEquals(numCallbacks.get(), numSamplesToCollect);
   }
-
-
 }