You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by vi...@apache.org on 2013/05/03 11:13:34 UTC
git commit: Improve Dsnitch Severity with iowait monitor patch by
Vijay; reviewed by jbellis for CASSANDRA-5521
Updated Branches:
refs/heads/trunk c33ccd9e3 -> bdf08364a
Improve Dsnitch Severity with iowait monitor
patch by Vijay; reviewed by jbellis for CASSANDRA-5521
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bdf08364
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bdf08364
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bdf08364
Branch: refs/heads/trunk
Commit: bdf08364a60d9b05d0b45cb47ddd73adb08b4196
Parents: c33ccd9
Author: Vijay Parthasarathy <vi...@gmail.com>
Authored: Fri May 3 02:03:10 2013 -0700
Committer: Vijay Parthasarathy <vi...@gmail.com>
Committed: Fri May 3 02:03:10 2013 -0700
----------------------------------------------------------------------
.../cassandra/db/compaction/CompactionInfo.java | 3 +-
.../apache/cassandra/service/StorageService.java | 18 +--
.../cassandra/utils/BackgroundActivityMonitor.java | 149 +++++++++++++++
3 files changed, 156 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdf08364/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
index da67bab..d086eef 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
@@ -139,8 +139,7 @@ public final class CompactionInfo implements Serializable
public void started()
{
reportedSeverity = getCompactionInfo().getTotal() / load;
- if (!StorageService.instance.reportSeverity(reportedSeverity))
- reportedSeverity = 0d;
+ StorageService.instance.reportSeverity(reportedSeverity);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdf08364/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index a86d856..3c508ee 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -186,6 +186,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private final List<IEndpointLifecycleSubscriber> lifecycleSubscribers = new CopyOnWriteArrayList<IEndpointLifecycleSubscriber>();
+ private static final BackgroundActivityMonitor bgMonitor = new BackgroundActivityMonitor();
+
private final ObjectName jmxObjectName;
public void finishBootstrapping()
@@ -927,24 +929,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
/**
- * Gossip about the known severity of the events in this node
+ * Increment about the known Compaction severity of the events in this node
*/
- public boolean reportSeverity(double incr)
+ public void reportSeverity(double incr)
{
- if (!Gossiper.instance.isEnabled())
- return false;
- VersionedValue updated = StorageService.instance.valueFactory.severity(severity.addAndGet(incr));
- Gossiper.instance.addLocalApplicationState(ApplicationState.SEVERITY, updated);
- return true;
+ bgMonitor.incrCompactionSeverity(incr);
}
public double getSeverity(InetAddress endpoint)
{
- VersionedValue event;
- EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
- if (state != null && (event = state.getApplicationState(ApplicationState.SEVERITY)) != null)
- return Double.parseDouble(event.value);
- return 0.0;
+ return bgMonitor.getSeverity(endpoint);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdf08364/src/java/org/apache/cassandra/utils/BackgroundActivityMonitor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/BackgroundActivityMonitor.java b/src/java/org/apache/cassandra/utils/BackgroundActivityMonitor.java
new file mode 100644
index 0000000..10215a4
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/BackgroundActivityMonitor.java
@@ -0,0 +1,149 @@
+package org.apache.cassandra.utils;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
+import java.util.StringTokenizer;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.service.StorageService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.AtomicDouble;
+
+public class BackgroundActivityMonitor
+{
+ private static final Logger logger = LoggerFactory.getLogger(BackgroundActivityMonitor.class);
+
+ public static final int USER_INDEX = 0;
+ public static final int NICE_INDEX = 1;
+ public static final int SYS_INDEX = 2;
+ public static final int IDLE_INDEX = 3;
+ public static final int IOWAIT_INDEX = 4;
+ public static final int IRQ_INDEX = 5;
+ public static final int SOFTIRQ_INDEX = 6;
+
+ private static final String OPERATING_SYSTEM = System.getProperty("os.name").toLowerCase();
+ private static final int NUM_CPUS = Runtime.getRuntime().availableProcessors();
+ private static final String PROC_STAT_PATH = "/proc/stat";
+
+ private final AtomicDouble compaction_severity = new AtomicDouble();
+ private final ScheduledExecutorService reportThread = new DebuggableScheduledThreadPoolExecutor("Background_Reporter");
+
+ private RandomAccessFile statsFile;
+ private long[] lastReading;
+
+ public BackgroundActivityMonitor()
+ {
+ try
+ {
+ statsFile = new RandomAccessFile(PROC_STAT_PATH, "r");
+ lastReading = readAndCompute();
+ }
+ catch (IOException ex)
+ {
+ if (isUnix())
+ logger.warn("Couldn't open /proc/stats");
+ statsFile = null;
+ }
+ reportThread.scheduleAtFixedRate(new BackgroundActivityReporter(), 1, 1, TimeUnit.SECONDS);
+ }
+
+ public static boolean isUnix()
+ {
+ return OPERATING_SYSTEM.contains("nix") || OPERATING_SYSTEM.contains("nux") || OPERATING_SYSTEM.contains("aix");
+ }
+
+ private long[] readAndCompute() throws IOException
+ {
+ statsFile.seek(0);
+ StringTokenizer tokenizer = new StringTokenizer(statsFile.readLine());
+ String name = tokenizer.nextToken();
+ assert name.equalsIgnoreCase("cpu");
+ long[] returned = new long[tokenizer.countTokens()];
+ for (int i = 0; i < tokenizer.countTokens(); i++)
+ returned[i] = Long.valueOf(tokenizer.nextToken());
+ return returned;
+ }
+
+ private float compareAtIndex(long[] reading1, long[] reading2, int index)
+ {
+ long total1 = 0, total2 = 0;
+ for (int i = 0; i <= SOFTIRQ_INDEX; i++)
+ {
+ total1 += reading1[i];
+ total2 += reading2[i];
+ }
+ float totalDiff = total2 - total1;
+
+ long intrested1 = reading1[index], intrested2 = reading2[index];
+ float diff = intrested2 - intrested1;
+ if (diff == 0)
+ return 0f;
+ return (diff / totalDiff) * 100; // yes it is hard coded to 100 [update
+ // unit?]
+ }
+
+ public void incrCompactionSeverity(double sev)
+ {
+ compaction_severity.addAndGet(sev);
+ }
+
+ public double getIOWait() throws IOException
+ {
+ if (statsFile == null)
+ return -1d;
+ long[] newComp = readAndCompute();
+ double value = compareAtIndex(lastReading, newComp, IOWAIT_INDEX);
+ lastReading = newComp;
+ return value;
+ }
+
+ public double getNormalizedLoadAvg()
+ {
+ double avg = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage();
+ return avg / NUM_CPUS;
+ }
+
+ public double getSeverity(InetAddress endpoint)
+ {
+ VersionedValue event;
+ EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
+ if (state != null && (event = state.getApplicationState(ApplicationState.SEVERITY)) != null)
+ return Double.parseDouble(event.value);
+ return 0.0;
+ }
+
+ public class BackgroundActivityReporter implements Runnable
+ {
+ public void run()
+ {
+ double report = -1;
+ try
+ {
+ report = getIOWait();
+ }
+ catch (IOException e)
+ {
+ // ignore;
+ if (isUnix())
+ logger.warn("Couldn't read /proc/stats");
+ }
+ if (report == -1d)
+ report = compaction_severity.get();
+
+ if (!Gossiper.instance.isEnabled())
+ return;
+ VersionedValue updated = StorageService.instance.valueFactory.severity(report);
+ Gossiper.instance.addLocalApplicationState(ApplicationState.SEVERITY, updated);
+ }
+ }
+}