You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/03/31 01:15:59 UTC

[2/2] git commit: Dsnitch uses 'severity', latency, and time since last reply for scores. Patch by vijay and brandonwilliams, reviewed by vijay for CASSANDRA-3722

Dsnitch uses 'severity', latency, and time since last reply for scores.
Patch by vijay and brandonwilliams, reviewed by vijay for CASSANDRA-3722


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/98a70bde
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/98a70bde
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/98a70bde

Branch: refs/heads/trunk
Commit: 98a70bdebb26955ea629fe3a21212253fc7b7d17
Parents: 08345fa
Author: Brandon Williams <br...@apache.org>
Authored: Fri Mar 30 16:44:35 2012 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Fri Mar 30 18:15:50 2012 -0500

----------------------------------------------------------------------
 .../cassandra/db/compaction/CompactionInfo.java    |   21 +++++++
 .../cassandra/db/compaction/CompactionManager.java |    4 ++
 .../org/apache/cassandra/gms/ApplicationState.java |    1 +
 src/java/org/apache/cassandra/gms/Gossiper.java    |    2 +-
 .../org/apache/cassandra/gms/VersionedValue.java   |    5 ++
 .../cassandra/locator/DynamicEndpointSnitch.java   |   43 ++++++++++++++-
 .../locator/DynamicEndpointSnitchMBean.java        |    7 +++
 .../org/apache/cassandra/net/MessagingService.java |    6 ++
 .../apache/cassandra/service/StorageService.java   |   23 ++++++++-
 .../locator/DynamicEndpointSnitchTest.java         |   11 ++++-
 10 files changed, 117 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/98a70bde/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
index fda8dd3..594b639 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
@@ -21,6 +21,8 @@ import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.cassandra.service.StorageService;
+
 /** Implements serializable to allow structured info to be returned via JMX. */
 public final class CompactionInfo implements Serializable
 {
@@ -103,6 +105,8 @@ public final class CompactionInfo implements Serializable
     {
         private volatile boolean isStopped = false;
         public abstract CompactionInfo getCompactionInfo();
+        double load = StorageService.instance.getLoad();
+        boolean reportedSeverity = false;
 
         public void stop()
         {
@@ -113,5 +117,22 @@ public final class CompactionInfo implements Serializable
         {
             return isStopped;
         }
+        /**
+         * report event on the size of the compaction.
+         */
+        public void started()
+        {
+            reportedSeverity = StorageService.instance.reportSeverity(getCompactionInfo().getTotalBytes()/load);
+        }
+
+        /**
+         * remove the event complete
+         */
+        public void finished()
+        {
+            if (reportedSeverity)
+                StorageService.instance.reportSeverity(-(getCompactionInfo().getTotalBytes()/load));            
+            reportedSeverity = false;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98a70bde/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 848abee..36f05c1 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1017,11 +1017,15 @@ public class CompactionManager implements CompactionManagerMBean
 
         public void beginCompaction(CompactionInfo.Holder ci)
         {
+            // notify
+            ci.started();
             compactions.add(ci);
         }
 
         public void finishCompaction(CompactionInfo.Holder ci)
         {
+            // notify
+            ci.finished();
             compactions.remove(ci);
             totalBytesCompacted += ci.getCompactionInfo().getTotalBytes();
             totalCompactionsCompleted += 1;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98a70bde/src/java/org/apache/cassandra/gms/ApplicationState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/ApplicationState.java b/src/java/org/apache/cassandra/gms/ApplicationState.java
index 48e8d84..4520426 100644
--- a/src/java/org/apache/cassandra/gms/ApplicationState.java
+++ b/src/java/org/apache/cassandra/gms/ApplicationState.java
@@ -28,6 +28,7 @@ public enum ApplicationState
     REMOVAL_COORDINATOR,
     INTERNAL_IP,
     RPC_ADDRESS,
+    SEVERITY,
     // pad to allow adding new states to existing cluster
     X1,
     X2,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98a70bde/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index e6a2a3c..2a948a6 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -1160,7 +1160,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
 
     public boolean isEnabled()
     {
-        return !scheduledGossipTask.isCancelled();
+        return (scheduledGossipTask != null) && (!scheduledGossipTask.isCancelled());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98a70bde/src/java/org/apache/cassandra/gms/VersionedValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java
index eccca1f..36ff1d9 100644
--- a/src/java/org/apache/cassandra/gms/VersionedValue.java
+++ b/src/java/org/apache/cassandra/gms/VersionedValue.java
@@ -178,6 +178,11 @@ public class VersionedValue implements Comparable<VersionedValue>
         {
             return new VersionedValue(private_ip);
         }
+        
+        public VersionedValue severity(double value)
+        {
+            return new VersionedValue(String.valueOf(value));
+        }
     }
 
     private static class VersionedValueSerializer implements IVersionedSerializer<VersionedValue>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98a70bde/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
index 0f3aaca..3b80e67 100644
--- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
+++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
@@ -49,6 +49,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
     private boolean registered = false;
 
     private final ConcurrentHashMap<InetAddress, Double> scores = new ConcurrentHashMap<InetAddress, Double>();
+    private final ConcurrentHashMap<InetAddress, Long> lastReceived = new ConcurrentHashMap<InetAddress, Long>();
     private final ConcurrentHashMap<InetAddress, BoundedStatsDeque> windows = new ConcurrentHashMap<InetAddress, BoundedStatsDeque>();
     private final AtomicInteger intervalupdates = new AtomicInteger(0);
 
@@ -56,7 +57,13 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
 
     public DynamicEndpointSnitch(IEndpointSnitch snitch)
     {
-        mbeanName = "org.apache.cassandra.db:type=DynamicEndpointSnitch,instance="+hashCode();
+        this(snitch, null);
+    }
+    public DynamicEndpointSnitch(IEndpointSnitch snitch, String instance)
+    {
+        mbeanName = "org.apache.cassandra.db:type=DynamicEndpointSnitch";
+        if (instance != null)
+            mbeanName += ",instance=" + instance;
         subsnitch = snitch;
         Runnable update = new Runnable()
         {
@@ -195,12 +202,13 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
 
     public void receiveTiming(InetAddress host, Double latency) // this is cheap
     {
+        lastReceived.put(host, System.currentTimeMillis());
         if (intervalupdates.intValue() >= UPDATES_PER_INTERVAL)
             return;
         BoundedStatsDeque deque = windows.get(host);
         if (deque == null)
         {
-            BoundedStatsDeque maybeNewDeque  = new BoundedStatsDeque(WINDOW_SIZE);
+            BoundedStatsDeque maybeNewDeque = new BoundedStatsDeque(WINDOW_SIZE);
             deque = windows.putIfAbsent(host, maybeNewDeque);
             if (deque == null)
                 deque = maybeNewDeque;
@@ -222,13 +230,32 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
             }
 
         }
+        double maxLatency = 1;
+        long maxPenalty = 1;
+        HashMap<InetAddress, Long> penalties = new HashMap<InetAddress, Long>();
+        for (Map.Entry<InetAddress, BoundedStatsDeque> entry : windows.entrySet())
+        {
+            double mean = entry.getValue().mean();
+            if (mean > maxLatency)
+                maxLatency = mean;
+            long timePenalty = lastReceived.containsKey(entry.getKey()) ? lastReceived.get(entry.getKey()) : System.currentTimeMillis();
+            timePenalty = System.currentTimeMillis() - timePenalty;
+            timePenalty = timePenalty > UPDATE_INTERVAL_IN_MS ? UPDATE_INTERVAL_IN_MS : timePenalty;
+            penalties.put(entry.getKey(), timePenalty);
+            if (timePenalty > maxPenalty)
+                maxPenalty = timePenalty;
+        }
         for (Map.Entry<InetAddress, BoundedStatsDeque> entry: windows.entrySet())
         {
-            scores.put(entry.getKey(), entry.getValue().mean());
+            double score = entry.getValue().mean() / maxLatency;
+            score += penalties.get(entry.getKey()) / maxPenalty;
+            score += StorageService.instance.getSeverity(entry.getKey());
+            scores.put(entry.getKey(), score);            
         }
         intervalupdates.set(0);
     }
 
+
     private void reset()
     {
         for (BoundedStatsDeque deque : windows.values())
@@ -274,4 +301,14 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
         return timings;
     }
 
+    public void setSeverity(double severity)
+    {
+        StorageService.instance.reportSeverity(severity);
+    }
+
+    public double getSeverity()
+    {
+        return StorageService.instance.getSeverity(FBUtilities.getBroadcastAddress());
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98a70bde/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java
index 7c423c8..becbacf 100644
--- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java
+++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java
@@ -29,4 +29,11 @@ public interface DynamicEndpointSnitchMBean {
     public double getBadnessThreshold();
     public String getSubsnitchClassName();
     public List<Double> dumpTimings(String hostname) throws UnknownHostException;
+    /**
+     * Use this if you want to specify a severity it can be -ve
+     * Example: Page cache is cold and you want data to be sent 
+     *          though it is not preferred one.
+     */
+    public void setSeverity(double severity);
+    public double getSeverity();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98a70bde/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index f5379de..f3e2600 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -715,6 +715,12 @@ public final class MessagingService implements MessagingServiceMBean
         return pendingTasks;
     }
 
+    public int getCommandPendingTasks(InetAddress address)
+    {
+        OutboundTcpConnectionPool connection = connectionManagers.get(address);
+        return connection == null ? 0 : connection.cmdCon.getPendingMessages();
+    }
+    
     public Map<String, Long> getCommandCompletedTasks()
     {
         Map<String, Long> completedTasks = new HashMap<String, Long>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98a70bde/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 4e43964..4b12383 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -48,7 +48,6 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.dht.*;
-import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.gms.*;
 import org.apache.cassandra.io.sstable.SSTableDeletingTask;
 import org.apache.cassandra.io.sstable.SSTableLoader;
@@ -785,6 +784,28 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
     }
 
     /**
+     * Gossip about the known severity of the events in this node
+     */
+    public synchronized boolean reportSeverity(double incr)
+    {
+        if (!Gossiper.instance.isEnabled())
+            return false;
+        double update = getSeverity(FBUtilities.getBroadcastAddress()) + incr;
+        VersionedValue updated = StorageService.instance.valueFactory.severity(update);
+        Gossiper.instance.addLocalApplicationState(ApplicationState.SEVERITY, updated);
+        return true;
+    }
+    
+    public double getSeverity(InetAddress endpoint)
+    {
+        VersionedValue event;
+        EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
+        if (state != null && (event = state.getApplicationState(ApplicationState.SEVERITY)) != null)
+            return Double.parseDouble(event.value);
+        return 0.0;
+    }
+
+    /**
      * for a keyspace, return the ranges and corresponding listen addresses.
      * @param keyspace
      * @return

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98a70bde/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java b/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
index 7d44f27..decd59a 100644
--- a/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
+++ b/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
@@ -37,7 +37,8 @@ public class DynamicEndpointSnitchTest
         // do this because SS needs to be initialized before DES can work properly.
         StorageService.instance.initClient(0);
         int sleeptime = 150;
-        DynamicEndpointSnitch dsnitch = new DynamicEndpointSnitch(new SimpleSnitch());
+        SimpleSnitch ss = new SimpleSnitch();
+        DynamicEndpointSnitch dsnitch = new DynamicEndpointSnitch(ss, String.valueOf(ss.hashCode()));
         InetAddress self = FBUtilities.getBroadcastAddress();
         ArrayList<InetAddress> order = new ArrayList<InetAddress>();
         InetAddress host1 = InetAddress.getByName("127.0.0.1");
@@ -61,6 +62,8 @@ public class DynamicEndpointSnitchTest
 
         // make host1 a little worse
         dsnitch.receiveTiming(host1, 2.0);
+        dsnitch.receiveTiming(host2, 1.0);
+        dsnitch.receiveTiming(host3, 1.0);
         Thread.sleep(sleeptime);
 
         order.clear();
@@ -71,6 +74,8 @@ public class DynamicEndpointSnitchTest
 
         // make host2 as bad as host1
         dsnitch.receiveTiming(host2, 2.0);
+        dsnitch.receiveTiming(host1, 1.0);
+        dsnitch.receiveTiming(host3, 1.0);
         Thread.sleep(sleeptime);
 
         order.clear();
@@ -82,6 +87,8 @@ public class DynamicEndpointSnitchTest
         // make host3 the worst
         for (int i = 0; i < 2; i++)
         {
+            dsnitch.receiveTiming(host1, 1.0);
+            dsnitch.receiveTiming(host2, 1.0);
             dsnitch.receiveTiming(host3, 2.0);
         }
         Thread.sleep(sleeptime);
@@ -95,6 +102,8 @@ public class DynamicEndpointSnitchTest
         // make host3 equal to the others
         for (int i = 0; i < 2; i++)
         {
+            dsnitch.receiveTiming(host1, 1.0);
+            dsnitch.receiveTiming(host2, 1.0);
             dsnitch.receiveTiming(host3, 1.0);
         }
         Thread.sleep(sleeptime);