You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/07/27 22:34:36 UTC
samza git commit: SAMZA-972: holistic physical memory monitoring
Repository: samza
Updated Branches:
refs/heads/master 2187d6bd9 -> 40445e2c6
SAMZA-972: holistic physical memory monitoring
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/40445e2c
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/40445e2c
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/40445e2c
Branch: refs/heads/master
Commit: 40445e2c6ce0e3f31d36373633ee5ea215f3024b
Parents: 2187d6b
Author: Jagadish Venkatraman <ja...@gmail.com>
Authored: Wed Jul 27 10:16:30 2016 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Wed Jul 27 10:16:30 2016 -0700
----------------------------------------------------------------------
build.gradle | 1 +
checkstyle/import-control.xml | 1 +
.../disk/PollingScanDiskSpaceMonitor.java | 12 +-
.../host/PosixCommandBasedStatisticsGetter.java | 84 +++++++++
.../container/host/StatisticsMonitorImpl.java | 188 +++++++++++++++++++
.../container/host/SystemMemoryStatistics.java | 62 ++++++
.../container/host/SystemStatisticsGetter.java | 33 ++++
.../container/host/SystemStatisticsMonitor.java | 60 ++++++
.../apache/samza/container/SamzaContainer.scala | 34 +++-
.../samza/container/SamzaContainerMetrics.scala | 2 +
.../host/TestStatisticsMonitorImpl.java | 98 ++++++++++
11 files changed, 571 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/40445e2c/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index ba4a9d1..1d4eb74 100644
--- a/build.gradle
+++ b/build.gradle
@@ -145,6 +145,7 @@ project(":samza-core_$scalaVersion") {
dependencies {
compile project(':samza-api')
+ compile "com.google.guava:guava:$guavaVersion"
compile "org.scala-lang:scala-library:$scalaLibVersion"
compile "org.slf4j:slf4j-api:$slf4jVersion"
compile "net.sf.jopt-simple:jopt-simple:$joptSimpleVersion"
http://git-wip-us.apache.org/repos/asf/samza/blob/40445e2c/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index c85dc94..7e77702 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -144,6 +144,7 @@
<allow pkg="org.apache.samza.config" />
<allow pkg="org.apache.samza.container" />
<allow pkg="org.apache.samza.coordinator.stream" />
+ <allow pkg="com.google.common" />
<allow pkg="org.apache.samza.util" />
<allow pkg="junit.framework" />
<allow class="org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager" />
http://git-wip-us.apache.org/repos/asf/samza/blob/40445e2c/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java b/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java
index 50c8500..75e461d 100644
--- a/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java
+++ b/samza-core/src/main/java/org/apache/samza/container/disk/PollingScanDiskSpaceMonitor.java
@@ -18,6 +18,9 @@
*/
package org.apache.samza.container.disk;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
@@ -45,6 +48,8 @@ public class PollingScanDiskSpaceMonitor implements DiskSpaceMonitor {
private enum State { INIT, RUNNING, STOPPED }
private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryImpl();
+ private static final Logger log = LoggerFactory.getLogger(PollingScanDiskSpaceMonitor.class);
+
// Note: we use this as a set where the value is always Boolean.TRUE.
private final ConcurrentMap<Listener, Boolean> listenerSet = new ConcurrentHashMap<>();
@@ -184,7 +189,12 @@ public class PollingScanDiskSpaceMonitor implements DiskSpaceMonitor {
private void updateSample() {
long totalBytes = getSpaceUsed(watchPaths);
for (Listener listener : listenerSet.keySet()) {
- listener.onUpdate(totalBytes);
+ try {
+ listener.onUpdate(totalBytes);
+ } catch (Throwable e) {
+ // catch an exception thrown by one listener so that it does not impact other listeners.
+ log.error("Exception thrown by a listener ", e);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/40445e2c/samza-core/src/main/java/org/apache/samza/container/host/PosixCommandBasedStatisticsGetter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/host/PosixCommandBasedStatisticsGetter.java b/samza-core/src/main/java/org/apache/samza/container/host/PosixCommandBasedStatisticsGetter.java
new file mode 100644
index 0000000..dbcd370
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/host/PosixCommandBasedStatisticsGetter.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.host;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+/**
+ * An implementation of {@link SystemStatisticsGetter} that relies on using Posix commands like ps.
+ */
+public class PosixCommandBasedStatisticsGetter implements SystemStatisticsGetter {
+
+ private static final Logger log = LoggerFactory.getLogger(PosixCommandBasedStatisticsGetter.class);
+
+ /**
+ * A convenience method to execute shell commands and return the first line of their output.
+ *
+ * @param cmdArray the command to run
+ * @return the first line of the output.
+ * @throws IOException
+ */
+ private String getCommandOutput(String [] cmdArray) throws IOException {
+ Process executable = Runtime.getRuntime().exec(cmdArray);
+ BufferedReader processReader = null;
+ String psOutput = null;
+
+ try {
+ processReader = new BufferedReader(new InputStreamReader(executable.getInputStream()));
+ psOutput = processReader.readLine();
+ } finally {
+ if (processReader != null) {
+ processReader.close();
+ }
+ }
+ return psOutput;
+ }
+
+ private long getPhysicalMemory() throws IOException {
+
+ // returns a single long value that represents the rss memory of the process.
+ String commandOutput = getCommandOutput(new String[]{"sh", "-c", "ps -o rss= -p $PPID"});
+
+ // this should never happen.
+ if (commandOutput == null) {
+ throw new IOException("ps returned unexpected output: " + commandOutput);
+ }
+
+ long rssMemoryKb = Long.parseLong(commandOutput.trim());
+ //convert to bytes
+ return rssMemoryKb * 1024;
+ }
+
+
+ @Override
+ public SystemMemoryStatistics getSystemMemoryStatistics() {
+ try {
+ long memory = getPhysicalMemory();
+ return new SystemMemoryStatistics(memory);
+ } catch (Exception e) {
+ log.warn("Error when running ps: ", e);
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/40445e2c/samza-core/src/main/java/org/apache/samza/container/host/StatisticsMonitorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/host/StatisticsMonitorImpl.java b/samza-core/src/main/java/org/apache/samza/container/host/StatisticsMonitorImpl.java
new file mode 100644
index 0000000..3dfdf36
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/host/StatisticsMonitorImpl.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.host;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * An implementation of {@link SystemStatisticsMonitor} for unix and mac platforms. Users can implement their own
+ * ways of getting {@link SystemMemoryStatistics} and provide a {@link SystemStatisticsGetter} implementation. The default
+ * behavior is to rely on unix commands like ps to obtain {@link SystemMemoryStatistics}
+ *
+ * All callback invocations are from the same thread - hence, are guaranteed to be serialized. An exception thrown
+ * from a callback will suppress all subsequent callbacks. If the execution of a
+ * {@link org.apache.samza.container.host.SystemStatisticsMonitor.Listener} callback takes longer than the polling
+ * interval, subsequent callback invocations may start late but will not be invoked concurrently.
+ *
+ * This class is thread-safe.
+ */
+public class StatisticsMonitorImpl implements SystemStatisticsMonitor {
+
+ private static final ThreadFactory THREAD_FACTORY = new StatisticsMonitorThreadFactory();
+ private static final Logger LOG = LoggerFactory.getLogger(StatisticsMonitorImpl.class);
+
+ /**
+ * Polling interval of this monitor. The monitor will report host statistics periodically via a callback
+ * after pollingIntervalMillis, pollingIntervalMillis *2, pollingIntervalMillis * 3 and so on.
+ *
+ */
+ private final long pollingIntervalMillis;
+
+
+ // Use a private lock instead of synchronized because an instance of StatisticsMonitorImpl could be used as a
+ // lock else-where.
+ private final Object lock = new Object();
+
+ // Single threaded executor to handle callback invocations.
+ private final ScheduledExecutorService schedulerService =
+ Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY);
+
+ // Use this as a set with value always set to True
+ private final ConcurrentMap<StatisticsMonitorImpl.Listener, Boolean> listenerSet = new ConcurrentHashMap<>();
+ private final SystemStatisticsGetter statisticsGetter;
+
+ /**
+ * Tracks the state of the monitor. Typical state transitions from INIT (when the monitor is created) to RUNNING (when
+ * the start is invoked on the monitor) to STOPPED (when stop is invoked)
+ */
+ private enum State { INIT, RUNNING, STOPPED }
+ private volatile State currentState;
+
+
+ /**
+ * Creates a new {@link StatisticsMonitorImpl} that reports statistics every 60 seconds
+ *
+ */
+ public StatisticsMonitorImpl() {
+ this(60000, new PosixCommandBasedStatisticsGetter());
+ }
+
+ /**
+ * Creates a new {@link StatisticsMonitorImpl} that reports statistics periodically
+
+ * @param pollingIntervalMillis The polling interval to report statistics.
+ * @param statisticsGetter the getter to gather system stats info
+ */
+ public StatisticsMonitorImpl(long pollingIntervalMillis, SystemStatisticsGetter statisticsGetter) {
+ this.pollingIntervalMillis = pollingIntervalMillis;
+ this.statisticsGetter = statisticsGetter;
+ currentState = State.INIT;
+ }
+
+ @Override
+ public void start() {
+ synchronized (lock) {
+ switch (currentState) {
+ case INIT:
+ currentState = State.RUNNING;
+ schedulerService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ sampleStatistics();
+ }
+ }, pollingIntervalMillis, pollingIntervalMillis, TimeUnit.MILLISECONDS);
+ break;
+
+ case RUNNING:
+ return;
+
+ case STOPPED:
+ throw new IllegalStateException("Attempting to start an already stopped StatisticsMonitor");
+ }
+ }
+ }
+
+ private void sampleStatistics() {
+ SystemMemoryStatistics statistics = null;
+ try {
+ statistics = statisticsGetter.getSystemMemoryStatistics();
+ } catch (Throwable e) {
+ LOG.error("Error during obtaining statistics: ", e);
+ }
+
+ for (Listener listener : listenerSet.keySet()) {
+ if (statistics != null) {
+ try {
+ // catch all exceptions to shield one listener from exceptions thrown by others.
+ listener.onUpdate(statistics);
+ } catch (Throwable e) {
+ // delete this listener so that it does not receive future callbacks.
+ listenerSet.remove(listener);
+ LOG.error("Listener threw an exception: ", e);
+ }
+ }
+ }
+ }
+
+
+ /**
+ * Stops the monitor. Once the monitor is stopped, no new samples will be delivered to the listeners. If stop is
+ * invoked during the period a {@link org.apache.samza.container.host.SystemStatisticsMonitor.Listener} callback is
+ * under execution, may cause the callback to be interrupted.
+ */
+
+ @Override
+ public void stop() {
+ synchronized (lock) {
+ schedulerService.shutdownNow();
+ listenerSet.clear();
+ currentState = State.STOPPED;
+ }
+ }
+
+ /**
+ * @see org.apache.samza.container.host.SystemStatisticsMonitor.Listener#registerListener(Listener)
+ */
+ @Override
+ public boolean registerListener(Listener listener) {
+ synchronized (lock) {
+ if (currentState == State.STOPPED) {
+ LOG.error("Attempting to register a listener after monitor was stopped.");
+ return false;
+ } else {
+ if (listenerSet.containsKey(listener)) {
+ LOG.error("Attempting to register an already registered listener");
+ return false;
+ }
+ listenerSet.put(listener, Boolean.TRUE);
+ return true;
+ }
+ }
+ }
+
+ // A convenience class that provides named threads
+ private static class StatisticsMonitorThreadFactory implements ThreadFactory {
+ private static final AtomicInteger INSTANCE_COUNT = new AtomicInteger();
+ private static final String PREFIX = "Samza-StatisticsMonitor-Thread-";
+
+ @Override
+ public Thread newThread(Runnable runnable) {
+ return new Thread(runnable, PREFIX + INSTANCE_COUNT.getAndIncrement());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/40445e2c/samza-core/src/main/java/org/apache/samza/container/host/SystemMemoryStatistics.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/host/SystemMemoryStatistics.java b/samza-core/src/main/java/org/apache/samza/container/host/SystemMemoryStatistics.java
new file mode 100644
index 0000000..3dd769f
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/host/SystemMemoryStatistics.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.host;
+
+/**
+ * A {@link SystemMemoryStatistics} object represents information about the physical process that runs the
+ * {@link org.apache.samza.container.SamzaContainer}.
+ */
+public class SystemMemoryStatistics {
+
+ /**
+ * The physical memory used by the Samza container process (native + on heap) in bytes.
+ */
+ private final long physicalMemoryBytes;
+
+ SystemMemoryStatistics(long physicalMemoryBytes) {
+ this.physicalMemoryBytes = physicalMemoryBytes;
+ }
+
+ @Override
+ public String toString() {
+ return "SystemStatistics{" +
+ "containerPhysicalMemory=" + physicalMemoryBytes +
+ '}';
+ }
+
+ public long getPhysicalMemoryBytes() {
+ return physicalMemoryBytes;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ SystemMemoryStatistics that = (SystemMemoryStatistics) o;
+
+ return physicalMemoryBytes == that.physicalMemoryBytes;
+
+ }
+
+ @Override
+ public int hashCode() {
+ return (int) (physicalMemoryBytes ^ (physicalMemoryBytes >>> 32));
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/40445e2c/samza-core/src/main/java/org/apache/samza/container/host/SystemStatisticsGetter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/host/SystemStatisticsGetter.java b/samza-core/src/main/java/org/apache/samza/container/host/SystemStatisticsGetter.java
new file mode 100644
index 0000000..541c2fb
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/host/SystemStatisticsGetter.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.host;
+
+/**
+ * An object that returns {@link SystemMemoryStatistics} for the {@link org.apache.samza.container.SamzaContainer}.
+ */
+public interface SystemStatisticsGetter {
+
+ /**
+ * Returns the {@link SystemMemoryStatistics} for the current Samza container process. A 'null' value is
+ * returned if no statistics are available.
+ *
+ * @return {@link SystemMemoryStatistics} for the Samza container
+ */
+ SystemMemoryStatistics getSystemMemoryStatistics();
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/40445e2c/samza-core/src/main/java/org/apache/samza/container/host/SystemStatisticsMonitor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/host/SystemStatisticsMonitor.java b/samza-core/src/main/java/org/apache/samza/container/host/SystemStatisticsMonitor.java
new file mode 100644
index 0000000..84f4ec3
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/host/SystemStatisticsMonitor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.host;
+
+/**
+ * An object that monitors per-process system statistics like memory utilization and reports this via a
+ * {@link SystemStatisticsMonitor.Listener}.
+ */
+public interface SystemStatisticsMonitor {
+ /**
+ * Starts the system statistics monitor.
+ */
+ void start();
+
+ /**
+ * Stops the memory usage monitor. Once shutdown is complete listeners will not longer receive
+ * new samples. A stopped monitor cannot be restarted with {@link #start()}.
+ */
+ void stop();
+
+ /**
+ * Registers the specified listener with this monitor. The listener will be called
+ * when the monitor has a new sample. The update interval is implementation specific.
+ *
+ * @param listener the listener to register
+ * @return {@code true} if the registration was successful and {@code false} if not. Registration
+ * can fail if the monitor has been stopped or if the listener was already registered.
+ */
+ boolean registerListener(Listener listener);
+
+ /**
+ * A listener that is notified when the monitor has sampled a new statistic.
+ * Register this listener with {@link #registerListener(Listener)} to receive updates.
+ */
+ interface Listener {
+ /**
+ * Invoked with new samples as they become available.
+ *
+ * @param sample the currently sampled statistic.
+ */
+ void onUpdate(SystemMemoryStatistics sample);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/40445e2c/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 90d7279..a37f353 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -20,9 +20,6 @@
package org.apache.samza.container
import java.io.File
-import java.lang.Thread.UncaughtExceptionHandler
-import java.net.URL
-import java.net.UnknownHostException
import java.nio.file.Path
import java.util
import java.util.concurrent.ExecutorService
@@ -47,6 +44,7 @@ import org.apache.samza.container.disk.DiskSpaceMonitor
import org.apache.samza.container.disk.DiskSpaceMonitor.Listener
import org.apache.samza.container.disk.NoThrottlingDiskQuotaPolicyFactory
import org.apache.samza.container.disk.PollingScanDiskSpaceMonitor
+import org.apache.samza.container.host.{SystemMemoryStatistics, SystemStatisticsMonitor, StatisticsMonitorImpl}
import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
import org.apache.samza.job.model.ContainerModel
import org.apache.samza.job.model.JobModel
@@ -556,6 +554,16 @@ object SamzaContainer extends Logging {
val executor = new ThrottlingExecutor(
config.getLong("container.disk.quota.delay.max.ms", TimeUnit.SECONDS.toMillis(1)))
+ val memoryStatisticsMonitor : SystemStatisticsMonitor = new StatisticsMonitorImpl()
+ memoryStatisticsMonitor.registerListener(new SystemStatisticsMonitor.Listener {
+ override def onUpdate(sample: SystemMemoryStatistics): Unit = {
+ val physicalMemoryBytes : Long = sample.getPhysicalMemoryBytes
+ val physicalMemoryMb : Double = physicalMemoryBytes / (1024.0 * 1024.0)
+ logger.debug("Container physical memory utilization (mb): " + physicalMemoryMb)
+ samzaContainerMetrics.physicalMemoryMb.set(physicalMemoryMb)
+ }
+ })
+
val diskQuotaBytes = config.getLong("container.disk.quota.bytes", Long.MaxValue)
samzaContainerMetrics.diskQuotaBytes.set(diskQuotaBytes)
@@ -606,6 +614,7 @@ object SamzaContainer extends Logging {
jvm = jvm,
jmxServer = jmxServer,
diskSpaceMonitor = diskSpaceMonitor,
+ hostStatisticsMonitor = memoryStatisticsMonitor,
taskThreadPool = taskThreadPool)
}
}
@@ -619,6 +628,7 @@ class SamzaContainer(
metrics: SamzaContainerMetrics,
jmxServer: JmxServer,
diskSpaceMonitor: DiskSpaceMonitor = null,
+ hostStatisticsMonitor: SystemStatisticsMonitor = null,
offsetManager: OffsetManager = new OffsetManager,
localityManager: LocalityManager = null,
securityManager: SecurityManager = null,
@@ -637,6 +647,7 @@ class SamzaContainer(
startLocalityManager
startStores
startDiskSpaceMonitor
+ startHostStatisticsMonitor
startProducers
startTask
startConsumers
@@ -656,6 +667,7 @@ class SamzaContainer(
shutdownTask
shutdownStores
shutdownDiskSpaceMonitor
+ shutdownHostStatisticsMonitor
shutdownProducers
shutdownLocalityManager
shutdownOffsetManager
@@ -673,6 +685,13 @@ class SamzaContainer(
}
}
+ def startHostStatisticsMonitor: Unit = {
+ if (hostStatisticsMonitor != null) {
+ info("Starting host statistics monitor")
+ hostStatisticsMonitor.start()
+ }
+ }
+
def startMetrics {
info("Registering task instances with metrics.")
@@ -867,4 +886,13 @@ class SamzaContainer(
diskSpaceMonitor.stop()
}
}
+
+ def shutdownHostStatisticsMonitor: Unit = {
+ if (hostStatisticsMonitor != null) {
+ info("Shutting down host statistics monitor.")
+ hostStatisticsMonitor.stop()
+ }
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/samza/blob/40445e2c/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
index e3891cf..1e7515e 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
@@ -43,10 +43,12 @@ class SamzaContainerMetrics(
val diskUsageBytes = newGauge("disk-usage-bytes", 0L)
val diskQuotaBytes = newGauge("disk-quota-bytes", Long.MaxValue)
val executorWorkFactor = newGauge("executor-work-factor", 1.0)
+ val physicalMemoryMb = newGauge[Double]("physical-memory-mb", 0.0F)
val taskStoreRestorationMetrics: util.Map[TaskName, Gauge[Long]] = new util.HashMap[TaskName, Gauge[Long]]()
def addStoreRestorationGauge(taskName: TaskName, storeName: String) {
taskStoreRestorationMetrics.put(taskName, newGauge("%s-%s-restore-time" format(taskName.toString, storeName), -1L))
}
+
}
http://git-wip-us.apache.org/repos/asf/samza/blob/40445e2c/samza-core/src/test/java/org/apache/samza/container/host/TestStatisticsMonitorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/host/TestStatisticsMonitorImpl.java b/samza-core/src/test/java/org/apache/samza/container/host/TestStatisticsMonitorImpl.java
new file mode 100644
index 0000000..b01fdca
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/container/host/TestStatisticsMonitorImpl.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.host;
+
+import junit.framework.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.fail;
+
+public class TestStatisticsMonitorImpl {
+
+ @Test
+ public void testPhysicalMemoryReporting() throws Exception {
+ final int numSamplesToCollect = 5;
+ final CountDownLatch latch = new CountDownLatch(numSamplesToCollect);
+
+ final StatisticsMonitorImpl monitor = new StatisticsMonitorImpl(10, new PosixCommandBasedStatisticsGetter());
+ monitor.start();
+
+ boolean result = monitor.registerListener(new SystemStatisticsMonitor.Listener() {
+
+ @Override
+ public void onUpdate(SystemMemoryStatistics sample) {
+ // assert memory is greater than 10 bytes, as a sanity check
+ Assert.assertTrue(sample.getPhysicalMemoryBytes() > 10);
+ latch.countDown();
+ }
+ });
+
+ if (!latch.await(5, TimeUnit.SECONDS)) {
+ fail(String.format("Timed out waiting for listener to be give %d updates", numSamplesToCollect));
+ }
+ // assert that the registration for the listener was successful
+ Assert.assertTrue(result);
+ monitor.stop();
+
+ // assert that attempting to register a listener after monitor stop results in failure of registration
+ boolean registrationFailsAfterStop = monitor.registerListener(new SystemStatisticsMonitor.Listener() {
+ @Override
+ public void onUpdate(SystemMemoryStatistics sample) {
+ }
+ });
+ Assert.assertFalse(registrationFailsAfterStop);
+ }
+
+ @Test
+ public void testStopBehavior() throws Exception {
+
+ final int numSamplesToCollect = 5;
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicInteger numCallbacks = new AtomicInteger(0);
+
+ final StatisticsMonitorImpl monitor = new StatisticsMonitorImpl(10, new PosixCommandBasedStatisticsGetter());
+
+ monitor.start();
+ monitor.registerListener(new SystemStatisticsMonitor.Listener() {
+
+ @Override
+ public void onUpdate(SystemMemoryStatistics sample) {
+ Assert.assertTrue(sample.getPhysicalMemoryBytes() > 10);
+ if (numCallbacks.incrementAndGet() == numSamplesToCollect) {
+ //monitor.stop() is invoked from the same thread. So, there's no race between a stop() call and the
+ //callback invocation for the next sample.
+ monitor.stop();
+ latch.countDown();
+ }
+ }
+ });
+
+ if (!latch.await(5, TimeUnit.SECONDS)) {
+ fail(String.format("Timed out waiting for listener to be give %d updates", numSamplesToCollect));
+ }
+ // Ensure that we only receive as many callbacks
+ Assert.assertEquals(numCallbacks.get(), numSamplesToCollect);
+ }
+
+
+}