You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by vi...@apache.org on 2013/05/03 11:13:34 UTC

git commit: Improve Dsnitch Severity with iowait monitor patch by Vijay; reviewed by jbellis for CASSANDRA-5521

Updated Branches:
  refs/heads/trunk c33ccd9e3 -> bdf08364a


Improve Dsnitch Severity with iowait monitor
patch by Vijay; reviewed by jbellis for CASSANDRA-5521

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bdf08364
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bdf08364
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bdf08364

Branch: refs/heads/trunk
Commit: bdf08364a60d9b05d0b45cb47ddd73adb08b4196
Parents: c33ccd9
Author: Vijay Parthasarathy <vi...@gmail.com>
Authored: Fri May 3 02:03:10 2013 -0700
Committer: Vijay Parthasarathy <vi...@gmail.com>
Committed: Fri May 3 02:03:10 2013 -0700

----------------------------------------------------------------------
 .../cassandra/db/compaction/CompactionInfo.java    |    3 +-
 .../apache/cassandra/service/StorageService.java   |   18 +--
 .../cassandra/utils/BackgroundActivityMonitor.java |  149 +++++++++++++++
 3 files changed, 156 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdf08364/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
index da67bab..d086eef 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
@@ -139,8 +139,7 @@ public final class CompactionInfo implements Serializable
         public void started()
         {
             reportedSeverity = getCompactionInfo().getTotal() / load;
-            if (!StorageService.instance.reportSeverity(reportedSeverity))
-                reportedSeverity = 0d;
+            StorageService.instance.reportSeverity(reportedSeverity);
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdf08364/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index a86d856..3c508ee 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -186,6 +186,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     private final List<IEndpointLifecycleSubscriber> lifecycleSubscribers = new CopyOnWriteArrayList<IEndpointLifecycleSubscriber>();
 
+    private static final BackgroundActivityMonitor bgMonitor = new BackgroundActivityMonitor();
+
     private final ObjectName jmxObjectName;
 
     public void finishBootstrapping()
@@ -927,24 +929,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     }
 
     /**
-     * Gossip about the known severity of the events in this node
+     * Increment about the known Compaction severity of the events in this node
      */
-    public boolean reportSeverity(double incr)
+    public void reportSeverity(double incr)
     {
-        if (!Gossiper.instance.isEnabled())
-            return false;
-        VersionedValue updated = StorageService.instance.valueFactory.severity(severity.addAndGet(incr));
-        Gossiper.instance.addLocalApplicationState(ApplicationState.SEVERITY, updated);
-        return true;
+        bgMonitor.incrCompactionSeverity(incr);
     }
 
     public double getSeverity(InetAddress endpoint)
     {
-        VersionedValue event;
-        EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
-        if (state != null && (event = state.getApplicationState(ApplicationState.SEVERITY)) != null)
-            return Double.parseDouble(event.value);
-        return 0.0;
+        return bgMonitor.getSeverity(endpoint);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdf08364/src/java/org/apache/cassandra/utils/BackgroundActivityMonitor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/BackgroundActivityMonitor.java b/src/java/org/apache/cassandra/utils/BackgroundActivityMonitor.java
new file mode 100644
index 0000000..10215a4
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/BackgroundActivityMonitor.java
@@ -0,0 +1,149 @@
+package org.apache.cassandra.utils;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
+import java.util.StringTokenizer;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.service.StorageService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.AtomicDouble;
+
+public class BackgroundActivityMonitor
+{
+    private static final Logger logger = LoggerFactory.getLogger(BackgroundActivityMonitor.class);
+
+    public static final int USER_INDEX = 0;
+    public static final int NICE_INDEX = 1;
+    public static final int SYS_INDEX = 2;
+    public static final int IDLE_INDEX = 3;
+    public static final int IOWAIT_INDEX = 4;
+    public static final int IRQ_INDEX = 5;
+    public static final int SOFTIRQ_INDEX = 6;
+
+    private static final String OPERATING_SYSTEM = System.getProperty("os.name").toLowerCase();
+    private static final int NUM_CPUS = Runtime.getRuntime().availableProcessors();
+    private static final String PROC_STAT_PATH = "/proc/stat";
+
+    private final AtomicDouble compaction_severity = new AtomicDouble();
+    private final ScheduledExecutorService reportThread = new DebuggableScheduledThreadPoolExecutor("Background_Reporter");
+
+    private RandomAccessFile statsFile;
+    private long[] lastReading;
+
+    public BackgroundActivityMonitor()
+    {
+        try
+        {
+            statsFile = new RandomAccessFile(PROC_STAT_PATH, "r");
+            lastReading = readAndCompute();
+        }
+        catch (IOException ex)
+        {
+            if (isUnix())
+                logger.warn("Couldn't open /proc/stats");
+            statsFile = null;
+        }
+        reportThread.scheduleAtFixedRate(new BackgroundActivityReporter(), 1, 1, TimeUnit.SECONDS);
+    }
+
+    public static boolean isUnix()
+    {
+        return OPERATING_SYSTEM.contains("nix") || OPERATING_SYSTEM.contains("nux") || OPERATING_SYSTEM.contains("aix");
+    }
+
+    private long[] readAndCompute() throws IOException
+    {
+        statsFile.seek(0);
+        StringTokenizer tokenizer = new StringTokenizer(statsFile.readLine());
+        String name = tokenizer.nextToken();
+        assert name.equalsIgnoreCase("cpu");
+        long[] returned = new long[tokenizer.countTokens()];
+        for (int i = 0; i < tokenizer.countTokens(); i++)
+            returned[i] = Long.valueOf(tokenizer.nextToken());
+        return returned;
+    }
+
+    private float compareAtIndex(long[] reading1, long[] reading2, int index)
+    {
+        long total1 = 0, total2 = 0;
+        for (int i = 0; i <= SOFTIRQ_INDEX; i++)
+        {
+            total1 += reading1[i];
+            total2 += reading2[i];
+        }
+        float totalDiff = total2 - total1;
+
+        long intrested1 = reading1[index], intrested2 = reading2[index];
+        float diff = intrested2 - intrested1;
+        if (diff == 0)
+            return 0f;
+        return (diff / totalDiff) * 100; // yes it is hard coded to 100 [update
+                                         // unit?]
+    }
+
+    public void incrCompactionSeverity(double sev)
+    {
+        compaction_severity.addAndGet(sev);
+    }
+
+    public double getIOWait() throws IOException
+    {
+        if (statsFile == null)
+            return -1d;
+        long[] newComp = readAndCompute();
+        double value = compareAtIndex(lastReading, newComp, IOWAIT_INDEX);
+        lastReading = newComp;
+        return value;
+    }
+
+    public double getNormalizedLoadAvg()
+    {
+        double avg = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage();
+        return avg / NUM_CPUS;
+    }
+
+    public double getSeverity(InetAddress endpoint)
+    {
+        VersionedValue event;
+        EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
+        if (state != null && (event = state.getApplicationState(ApplicationState.SEVERITY)) != null)
+            return Double.parseDouble(event.value);
+        return 0.0;
+    }
+
+    public class BackgroundActivityReporter implements Runnable
+    {
+        public void run()
+        {
+            double report = -1;
+            try
+            {
+                report = getIOWait();
+            }
+            catch (IOException e)
+            {
+                // ignore;
+                if (isUnix())
+                    logger.warn("Couldn't read /proc/stats");
+            }
+            if (report == -1d)
+                report = compaction_severity.get();
+
+            if (!Gossiper.instance.isEnabled())
+                return;
+            VersionedValue updated = StorageService.instance.valueFactory.severity(report);
+            Gossiper.instance.addLocalApplicationState(ApplicationState.SEVERITY, updated);
+        }
+    }
+}