You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/07/27 22:34:36 UTC

samza git commit: SAMZA-972: holistic physical memory monitoring

Repository: samza
Updated Branches:
  refs/heads/master 2187d6bd9 -> 40445e2c6


SAMZA-972: holistic physical memory monitoring


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/40445e2c
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/40445e2c
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/40445e2c

Branch: refs/heads/master
Commit: 40445e2c6ce0e3f31d36373633ee5ea215f3024b
Parents: 2187d6b
Author: Jagadish Venkatraman <ja...@gmail.com>
Authored: Wed Jul 27 10:16:30 2016 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Wed Jul 27 10:16:30 2016 -0700

----------------------------------------------------------------------
 build.gradle                                    |   1 +
 checkstyle/import-control.xml                   |   1 +
 .../disk/PollingScanDiskSpaceMonitor.java       |  12 +-
 .../host/PosixCommandBasedStatisticsGetter.java |  84 +++++++++
 .../container/host/StatisticsMonitorImpl.java   | 188 +++++++++++++++++++
 .../container/host/SystemMemoryStatistics.java  |  62 ++++++
 .../container/host/SystemStatisticsGetter.java  |  33 ++++
 .../container/host/SystemStatisticsMonitor.java |  60 ++++++
 .../apache/samza/container/SamzaContainer.scala |  34 +++-
 .../samza/container/SamzaContainerMetrics.scala |   2 +
 .../host/TestStatisticsMonitorImpl.java         |  98 ++++++++++
 11 files changed, 571 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/40445e2c/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index ba4a9d1..1d4eb74 100644
--- a/build.gradle
+++ b/build.gradle
@@ -145,6 +145,7 @@ project(":samza-core_$scalaVersion") {
 
   dependencies {
     compile project(':samza-api')
+    compile "com.google.guava:guava:$guavaVersion"
     compile "org.scala-lang:scala-library:$scalaLibVersion"
     compile "org.slf4j:slf4j-api:$slf4jVersion"
     compile "net.sf.jopt-simple:jopt-simple:$joptSimpleVersion"

http://git-wip-us.apache.org/repos/asf/samza/blob/40445e2c/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index c85dc94..7e77702 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -144,6 +144,7 @@
         <allow pkg="org.apache.samza.config" />
         <allow pkg="org.apache.samza.container" />
         <allow pkg="org.apache.samza.coordinator.stream" />
+        <allow pkg="com.google.common" />
         <allow pkg="org.apache.samza.util" />
         <allow pkg="junit.framework" />
         <allow class="org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager" />

http://git-wip-us.apache.org/repos/asf/samza/blob/40445e2c/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java b/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java
index 50c8500..75e461d 100644
--- a/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java
+++ b/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java
@@ -18,6 +18,9 @@
  */
 package org.apache.samza.container.disk;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
@@ -45,6 +48,8 @@ public class PollingScanDiskSpaceMonitor implements DiskSpaceMonitor {
   private enum State { INIT, RUNNING, STOPPED }
 
   private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryImpl();
+  private static final Logger log = LoggerFactory.getLogger(PollingScanDiskSpaceMonitor.class);
+
   // Note: we use this as a set where the value is always Boolean.TRUE.
   private final ConcurrentMap<Listener, Boolean> listenerSet = new ConcurrentHashMap<>();
 
@@ -184,7 +189,12 @@ public class PollingScanDiskSpaceMonitor implements DiskSpaceMonitor {
   private void updateSample() {
     long totalBytes = getSpaceUsed(watchPaths);
     for (Listener listener : listenerSet.keySet()) {
-      listener.onUpdate(totalBytes);
+      try {
+        listener.onUpdate(totalBytes);
+      } catch (Throwable e) {
+        // catch an exception thrown by one listener so that it does not impact other listeners.
+        log.error("Exception thrown by a listener ", e);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/40445e2c/samza-core/src/main/java/org/apache/samza/container/host/PosixCommandBasedStatisticsGetter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/host/PosixCommandBasedStatisticsGetter.java b/samza-core/src/main/java/org/apache/samza/container/host/PosixCommandBasedStatisticsGetter.java
new file mode 100644
index 0000000..dbcd370
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/host/PosixCommandBasedStatisticsGetter.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.host;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+/**
+ * An implementation of {@link SystemStatisticsGetter} that relies on using Posix commands like ps.
+ */
+public class PosixCommandBasedStatisticsGetter implements SystemStatisticsGetter {
+
+  private static final Logger log = LoggerFactory.getLogger(PosixCommandBasedStatisticsGetter.class);
+
+  /**
+   * A convenience method to execute shell commands and return the first line of their output.
+   *
+   * @param cmdArray the command to run
+   * @return the first line of the output.
+   * @throws IOException
+   */
+  private String getCommandOutput(String [] cmdArray) throws IOException {
+    Process executable = Runtime.getRuntime().exec(cmdArray);
+    BufferedReader processReader = null;
+    String psOutput = null;
+
+    try {
+      processReader = new BufferedReader(new InputStreamReader(executable.getInputStream()));
+      psOutput = processReader.readLine();
+    } finally {
+      if (processReader != null) {
+        processReader.close();
+      }
+    }
+    return psOutput;
+  }
+
+  private long getPhysicalMemory() throws IOException {
+
+    // returns a single long value that represents the rss memory of the process.
+    String commandOutput = getCommandOutput(new String[]{"sh", "-c", "ps -o rss= -p $PPID"});
+
+    // this should never happen.
+    if (commandOutput == null) {
+      throw new IOException("ps returned unexpected output: " + commandOutput);
+    }
+
+    long rssMemoryKb = Long.parseLong(commandOutput.trim());
+    //convert to bytes
+    return rssMemoryKb * 1024;
+  }
+
+
+  @Override
+  public SystemMemoryStatistics getSystemMemoryStatistics() {
+    try {
+      long memory = getPhysicalMemory();
+      return new SystemMemoryStatistics(memory);
+    } catch (Exception e) {
+      log.warn("Error when running ps: ", e);
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/40445e2c/samza-core/src/main/java/org/apache/samza/container/host/StatisticsMonitorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/host/StatisticsMonitorImpl.java b/samza-core/src/main/java/org/apache/samza/container/host/StatisticsMonitorImpl.java
new file mode 100644
index 0000000..3dfdf36
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/host/StatisticsMonitorImpl.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.host;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * An implementation of {@link SystemStatisticsMonitor} for unix and mac platforms. Users can implement their own
+ * ways of getting {@link SystemMemoryStatistics} and provide a {@link SystemStatisticsGetter} implementation. The default
+ * behavior is to rely on unix commands like ps to obtain {@link SystemMemoryStatistics}
+ *
+ * All callback invocations are from the same thread - hence, are guaranteed to be serialized. An exception thrown
+ * from a callback will suppress all subsequent callbacks. If the execution of a
+ * {@link org.apache.samza.container.host.SystemStatisticsMonitor.Listener} callback takes longer than the polling
+ * interval, subsequent callback invocations may start late but will not be invoked concurrently.
+ *
+ * This class is thread-safe.
+ */
+public class StatisticsMonitorImpl implements SystemStatisticsMonitor {
+
+  private static final ThreadFactory THREAD_FACTORY = new StatisticsMonitorThreadFactory();
+  private static final Logger LOG = LoggerFactory.getLogger(StatisticsMonitorImpl.class);
+
+  /**
+   * Polling interval of this monitor. The monitor will report host statistics periodically via a callback
+   * after pollingIntervalMillis, pollingIntervalMillis *2, pollingIntervalMillis * 3 and so on.
+   *
+   */
+  private final long pollingIntervalMillis;
+
+
+  // Use a private lock instead of synchronized because an instance of StatisticsMonitorImpl could be used as a
+  // lock else-where.
+  private final Object lock = new Object();
+
+  // Single threaded executor to handle callback invocations.
+  private final ScheduledExecutorService schedulerService =
+      Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY);
+
+  // Use this as a set with value always set to True
+  private final ConcurrentMap<StatisticsMonitorImpl.Listener, Boolean> listenerSet = new ConcurrentHashMap<>();
+  private final SystemStatisticsGetter statisticsGetter;
+
+  /**
+   * Tracks the state of the monitor. Typical state transitions from INIT (when the monitor is created) to RUNNING (when
+   * the start is invoked on the monitor) to STOPPED (when stop is invoked)
+   */
+  private enum State { INIT, RUNNING, STOPPED }
+  private volatile State currentState;
+
+
+  /**
+   * Creates a new {@link StatisticsMonitorImpl} that reports statistics every 60 seconds
+   *
+   */
+  public StatisticsMonitorImpl() {
+    this(60000, new PosixCommandBasedStatisticsGetter());
+  }
+
+  /**
+   * Creates a new {@link StatisticsMonitorImpl} that reports statistics periodically
+
+   * @param pollingIntervalMillis The polling interval to report statistics.
+   * @param statisticsGetter the getter to gather system stats info
+   */
+  public StatisticsMonitorImpl(long pollingIntervalMillis, SystemStatisticsGetter statisticsGetter) {
+    this.pollingIntervalMillis = pollingIntervalMillis;
+    this.statisticsGetter = statisticsGetter;
+    currentState = State.INIT;
+  }
+
+  @Override
+  public void start() {
+    synchronized (lock) {
+      switch (currentState) {
+        case INIT:
+          currentState = State.RUNNING;
+          schedulerService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+              sampleStatistics();
+            }
+          }, pollingIntervalMillis, pollingIntervalMillis, TimeUnit.MILLISECONDS);
+          break;
+
+        case RUNNING:
+          return;
+
+        case STOPPED:
+          throw new IllegalStateException("Attempting to start an already stopped StatisticsMonitor");
+      }
+    }
+  }
+
+  private void sampleStatistics() {
+    SystemMemoryStatistics statistics = null;
+    try {
+      statistics = statisticsGetter.getSystemMemoryStatistics();
+    } catch (Throwable e) {
+      LOG.error("Error during obtaining statistics: ", e);
+    }
+
+    for (Listener listener : listenerSet.keySet()) {
+      if (statistics != null) {
+        try {
+          // catch all exceptions to shield one listener from exceptions thrown by others.
+          listener.onUpdate(statistics);
+        } catch (Throwable e) {
+          // delete this listener so that it does not receive future callbacks.
+          listenerSet.remove(listener);
+          LOG.error("Listener threw an exception: ", e);
+        }
+      }
+    }
+  }
+
+
+  /**
+   * Stops the monitor. Once the monitor is stopped, no new samples will be delivered to the listeners. If stop is
+   * invoked during the period a {@link org.apache.samza.container.host.SystemStatisticsMonitor.Listener} callback is
+   * under execution, may cause the callback to be interrupted.
+   */
+
+  @Override
+  public void stop() {
+    synchronized (lock) {
+      schedulerService.shutdownNow();
+      listenerSet.clear();
+      currentState = State.STOPPED;
+    }
+  }
+
+  /**
+   * @see org.apache.samza.container.host.SystemStatisticsMonitor.Listener#registerListener(Listener)
+   */
+  @Override
+  public boolean registerListener(Listener listener) {
+    synchronized (lock) {
+      if (currentState == State.STOPPED) {
+        LOG.error("Attempting to register a listener after monitor was stopped.");
+        return false;
+      } else {
+        if (listenerSet.containsKey(listener)) {
+          LOG.error("Attempting to register an already registered listener");
+          return false;
+        }
+        listenerSet.put(listener, Boolean.TRUE);
+        return true;
+      }
+    }
+  }
+
+  // A convenience class that provides named threads
+  private static class StatisticsMonitorThreadFactory implements ThreadFactory {
+    private static final AtomicInteger INSTANCE_COUNT = new AtomicInteger();
+    private static final String PREFIX = "Samza-StatisticsMonitor-Thread-";
+
+    @Override
+    public Thread newThread(Runnable runnable) {
+      return new Thread(runnable, PREFIX + INSTANCE_COUNT.getAndIncrement());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/40445e2c/samza-core/src/main/java/org/apache/samza/container/host/SystemMemoryStatistics.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/host/SystemMemoryStatistics.java b/samza-core/src/main/java/org/apache/samza/container/host/SystemMemoryStatistics.java
new file mode 100644
index 0000000..3dd769f
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/host/SystemMemoryStatistics.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.host;
+
+/**
+ * A {@link SystemMemoryStatistics} object represents information about the physical process that runs the
+ * {@link org.apache.samza.container.SamzaContainer}.
+ */
+public class SystemMemoryStatistics {
+
+  /**
+   * The physical memory used by the Samza container process (native + on heap) in bytes.
+   */
+  private final long physicalMemoryBytes;
+
+  SystemMemoryStatistics(long physicalMemoryBytes) {
+    this.physicalMemoryBytes = physicalMemoryBytes;
+  }
+
+  @Override
+  public String toString() {
+    return "SystemStatistics{" +
+        "containerPhysicalMemory=" + physicalMemoryBytes +
+        '}';
+  }
+
+  public long getPhysicalMemoryBytes() {
+    return physicalMemoryBytes;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    SystemMemoryStatistics that = (SystemMemoryStatistics) o;
+
+    return physicalMemoryBytes == that.physicalMemoryBytes;
+
+  }
+
+  @Override
+  public int hashCode() {
+    return (int) (physicalMemoryBytes ^ (physicalMemoryBytes >>> 32));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/40445e2c/samza-core/src/main/java/org/apache/samza/container/host/SystemStatisticsGetter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/host/SystemStatisticsGetter.java b/samza-core/src/main/java/org/apache/samza/container/host/SystemStatisticsGetter.java
new file mode 100644
index 0000000..541c2fb
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/host/SystemStatisticsGetter.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.host;
+
+/**
+ * An object that returns {@link SystemMemoryStatistics} for the {@link org.apache.samza.container.SamzaContainer}.
+ */
+public interface SystemStatisticsGetter {
+
+  /**
+   * Returns the {@link SystemMemoryStatistics} for the current Samza container process. A 'null' value is
+   * returned if no statistics are available.
+   *
+   * @return {@link SystemMemoryStatistics} for the Samza container
+   */
+  SystemMemoryStatistics getSystemMemoryStatistics();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/40445e2c/samza-core/src/main/java/org/apache/samza/container/host/SystemStatisticsMonitor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/host/SystemStatisticsMonitor.java b/samza-core/src/main/java/org/apache/samza/container/host/SystemStatisticsMonitor.java
new file mode 100644
index 0000000..84f4ec3
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/host/SystemStatisticsMonitor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.host;
+
+/**
+ * An object that monitors per-process system statistics like memory utilization and reports this via a
+ * {@link SystemStatisticsMonitor.Listener}.
+ */
+public interface SystemStatisticsMonitor {
+  /**
+   * Starts the system statistics monitor.
+   */
+  void start();
+
+  /**
+   * Stops the memory usage monitor. Once shutdown is complete listeners will not longer receive
+   * new samples. A stopped monitor cannot be restarted with {@link #start()}.
+   */
+  void stop();
+
+  /**
+   * Registers the specified listener with this monitor. The listener will be called
+   * when the monitor has a new sample. The update interval is implementation specific.
+   *
+   * @param listener the listener to register
+   * @return {@code true} if the registration was successful and {@code false} if not. Registration
+   * can fail if the monitor has been stopped or if the listener was already registered.
+   */
+  boolean registerListener(Listener listener);
+
+  /**
+   * A listener that is notified when the monitor has sampled a new statistic.
+   * Register this listener with {@link #registerListener(Listener)} to receive updates.
+   */
+  interface Listener {
+    /**
+     * Invoked with new samples as they become available.
+     *
+     * @param sample the currently sampled statistic.
+     */
+    void onUpdate(SystemMemoryStatistics sample);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/40445e2c/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 90d7279..a37f353 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -20,9 +20,6 @@
 package org.apache.samza.container
 
 import java.io.File
-import java.lang.Thread.UncaughtExceptionHandler
-import java.net.URL
-import java.net.UnknownHostException
 import java.nio.file.Path
 import java.util
 import java.util.concurrent.ExecutorService
@@ -47,6 +44,7 @@ import org.apache.samza.container.disk.DiskSpaceMonitor
 import org.apache.samza.container.disk.DiskSpaceMonitor.Listener
 import org.apache.samza.container.disk.NoThrottlingDiskQuotaPolicyFactory
 import org.apache.samza.container.disk.PollingScanDiskSpaceMonitor
+import org.apache.samza.container.host.{SystemMemoryStatistics, SystemStatisticsMonitor, StatisticsMonitorImpl}
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
 import org.apache.samza.job.model.ContainerModel
 import org.apache.samza.job.model.JobModel
@@ -556,6 +554,16 @@ object SamzaContainer extends Logging {
     val executor = new ThrottlingExecutor(
       config.getLong("container.disk.quota.delay.max.ms", TimeUnit.SECONDS.toMillis(1)))
 
+    val memoryStatisticsMonitor : SystemStatisticsMonitor = new StatisticsMonitorImpl()
+    memoryStatisticsMonitor.registerListener(new SystemStatisticsMonitor.Listener {
+      override def onUpdate(sample: SystemMemoryStatistics): Unit = {
+        val physicalMemoryBytes : Long = sample.getPhysicalMemoryBytes
+        val physicalMemoryMb : Double = physicalMemoryBytes / (1024.0 * 1024.0)
+        logger.debug("Container physical memory utilization (mb): " + physicalMemoryMb)
+        samzaContainerMetrics.physicalMemoryMb.set(physicalMemoryMb)
+      }
+    })
+
     val diskQuotaBytes = config.getLong("container.disk.quota.bytes", Long.MaxValue)
     samzaContainerMetrics.diskQuotaBytes.set(diskQuotaBytes)
 
@@ -606,6 +614,7 @@ object SamzaContainer extends Logging {
       jvm = jvm,
       jmxServer = jmxServer,
       diskSpaceMonitor = diskSpaceMonitor,
+      hostStatisticsMonitor = memoryStatisticsMonitor,
       taskThreadPool = taskThreadPool)
   }
 }
@@ -619,6 +628,7 @@ class SamzaContainer(
   metrics: SamzaContainerMetrics,
   jmxServer: JmxServer,
   diskSpaceMonitor: DiskSpaceMonitor = null,
+  hostStatisticsMonitor: SystemStatisticsMonitor = null,
   offsetManager: OffsetManager = new OffsetManager,
   localityManager: LocalityManager = null,
   securityManager: SecurityManager = null,
@@ -637,6 +647,7 @@ class SamzaContainer(
       startLocalityManager
       startStores
       startDiskSpaceMonitor
+      startHostStatisticsMonitor
       startProducers
       startTask
       startConsumers
@@ -656,6 +667,7 @@ class SamzaContainer(
       shutdownTask
       shutdownStores
       shutdownDiskSpaceMonitor
+      shutdownHostStatisticsMonitor
       shutdownProducers
       shutdownLocalityManager
       shutdownOffsetManager
@@ -673,6 +685,13 @@ class SamzaContainer(
     }
   }
 
+  def startHostStatisticsMonitor: Unit = {
+    if (hostStatisticsMonitor != null) {
+      info("Starting host statistics monitor")
+      hostStatisticsMonitor.start()
+    }
+  }
+
   def startMetrics {
     info("Registering task instances with metrics.")
 
@@ -867,4 +886,13 @@ class SamzaContainer(
       diskSpaceMonitor.stop()
     }
   }
+
+  def shutdownHostStatisticsMonitor: Unit = {
+    if (hostStatisticsMonitor != null) {
+      info("Shutting down host statistics monitor.")
+      hostStatisticsMonitor.stop()
+    }
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/40445e2c/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
index e3891cf..1e7515e 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
@@ -43,10 +43,12 @@ class SamzaContainerMetrics(
   val diskUsageBytes = newGauge("disk-usage-bytes", 0L)
   val diskQuotaBytes = newGauge("disk-quota-bytes", Long.MaxValue)
   val executorWorkFactor = newGauge("executor-work-factor", 1.0)
+  val physicalMemoryMb = newGauge[Double]("physical-memory-mb", 0.0F)
 
   val taskStoreRestorationMetrics: util.Map[TaskName, Gauge[Long]] = new util.HashMap[TaskName, Gauge[Long]]()
 
   def addStoreRestorationGauge(taskName: TaskName, storeName: String) {
     taskStoreRestorationMetrics.put(taskName, newGauge("%s-%s-restore-time" format(taskName.toString, storeName), -1L))
   }
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/40445e2c/samza-core/src/test/java/org/apache/samza/container/host/TestStatisticsMonitorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/host/TestStatisticsMonitorImpl.java b/samza-core/src/test/java/org/apache/samza/container/host/TestStatisticsMonitorImpl.java
new file mode 100644
index 0000000..b01fdca
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/container/host/TestStatisticsMonitorImpl.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.host;
+
+import junit.framework.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.fail;
+
+public class TestStatisticsMonitorImpl {
+
+  @Test
+  public void testPhysicalMemoryReporting() throws Exception {
+    final int numSamplesToCollect = 5;
+    final CountDownLatch latch = new CountDownLatch(numSamplesToCollect);
+
+    final StatisticsMonitorImpl monitor = new StatisticsMonitorImpl(10, new PosixCommandBasedStatisticsGetter());
+    monitor.start();
+
+    boolean result = monitor.registerListener(new SystemStatisticsMonitor.Listener() {
+
+      @Override
+      public void onUpdate(SystemMemoryStatistics sample) {
+        // assert memory is greater than 10 bytes, as a sanity check
+        Assert.assertTrue(sample.getPhysicalMemoryBytes() > 10);
+        latch.countDown();
+      }
+    });
+
+    if (!latch.await(5, TimeUnit.SECONDS)) {
+      fail(String.format("Timed out waiting for listener to be give %d updates", numSamplesToCollect));
+    }
+    // assert that the registration for the listener was successful
+    Assert.assertTrue(result);
+    monitor.stop();
+
+    // assert that attempting to register a listener after monitor stop results in failure of registration
+    boolean registrationFailsAfterStop = monitor.registerListener(new SystemStatisticsMonitor.Listener() {
+      @Override
+      public void onUpdate(SystemMemoryStatistics sample) {
+      }
+    });
+    Assert.assertFalse(registrationFailsAfterStop);
+  }
+
+  @Test
+  public void testStopBehavior() throws Exception {
+
+    final int numSamplesToCollect = 5;
+    final CountDownLatch latch = new CountDownLatch(1);
+    final AtomicInteger numCallbacks = new AtomicInteger(0);
+
+    final StatisticsMonitorImpl monitor = new StatisticsMonitorImpl(10, new PosixCommandBasedStatisticsGetter());
+
+    monitor.start();
+    monitor.registerListener(new SystemStatisticsMonitor.Listener() {
+
+      @Override
+      public void onUpdate(SystemMemoryStatistics sample) {
+        Assert.assertTrue(sample.getPhysicalMemoryBytes() > 10);
+        if (numCallbacks.incrementAndGet() == numSamplesToCollect) {
+          //monitor.stop() is invoked from the same thread. So, there's no race between a stop() call and the
+          //callback invocation for the next sample.
+          monitor.stop();
+          latch.countDown();
+        }
+      }
+    });
+
+    if (!latch.await(5, TimeUnit.SECONDS)) {
+      fail(String.format("Timed out waiting for listener to be give %d updates", numSamplesToCollect));
+    }
+    // Ensure that we only receive as many callbacks
+    Assert.assertEquals(numCallbacks.get(), numSamplesToCollect);
+  }
+
+
+}