You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/07/06 16:13:10 UTC

svn commit: r1143428 - in /cassandra/branches/cassandra-0.8: ./ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/tools/

Author: jbellis
Date: Wed Jul  6 14:13:10 2011
New Revision: 1143428

URL: http://svn.apache.org/viewvc?rev=1143428&view=rev
Log:
add MessagingService.get[Recently]DroppedMessages and StorageService.getExceptionCount
patch by jbellis and Ryan King; reviewed by slebresne for CASSANDRA-2804

Modified:
    cassandra/branches/cassandra-0.8/CHANGES.txt
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/GCInspector.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageServiceMBean.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeCmd.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeProbe.java

Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1143428&r1=1143427&r2=1143428&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Wed Jul  6 14:13:10 2011
@@ -16,6 +16,8 @@
  * fix CLI perpetuating obsolete KsDef.replication_factor (CASSANDRA-2846)
  * improve cli treatment of multiline comments (CASSANDRA-2852)
  * handle row tombstones correctly in EchoedRow (CASSANDRA-2786)
+ * add MessagingService.get[Recently]DroppedMessages and
+   StorageService.getExceptionCount (CASSANDRA-2804)
 
 
 0.8.1

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessageDeliveryTask.java?rev=1143428&r1=1143427&r2=1143428&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessageDeliveryTask.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessageDeliveryTask.java Wed Jul  6 14:13:10 2011
@@ -42,33 +42,20 @@ public class MessageDeliveryTask impleme
     public void run()
     { 
         StorageService.Verb verb = message.getVerb();
-        switch (verb)
+        if (MessagingService.DROPPABLE_VERBS.contains(verb)
+            && System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getRpcTimeout())
         {
-            case BINARY:
-            case MUTATION:
-            case READ:
-            case RANGE_SLICE:
-            case READ_REPAIR:
-            case REQUEST_RESPONSE:
-                if (System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getRpcTimeout())
-                {
-                    MessagingService.instance().incrementDroppedMessages(verb);
-                    return;
-                }
-                break;
-            
-            // don't bother.
-            case UNUSED_1:
-            case UNUSED_2:
-            case UNUSED_3:
-                return;
-            
-            default:
-                break;
+            MessagingService.instance().incrementDroppedMessages(verb);
+            return;
         }
 
         IVerbHandler verbHandler = MessagingService.instance().getVerbHandler(verb);
-        assert verbHandler != null : "unknown verb " + verb;
+        if (verbHandler == null)
+        {
+            logger_.debug("Unknown verb {}", verb);
+            return;
+        }
+
         verbHandler.doVerb(message, id);
     }
 }

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingService.java?rev=1143428&r1=1143427&r2=1143428&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingService.java Wed Jul  6 14:13:10 2011
@@ -57,6 +57,8 @@ import org.cliffc.high_scale_lib.NonBloc
 
 public final class MessagingService implements MessagingServiceMBean
 {
+    public static final String MBEAN_NAME = "org.apache.cassandra.net:type=MessagingService";
+
     public static final int VERSION_07 = 1;
     public static final int version_ = 2;
     //TODO: make this parameter dynamic somehow.  Not sure if config is appropriate.
@@ -81,13 +83,33 @@ public final class MessagingService impl
 
     private SocketThread socketThread;
     private final SimpleCondition listenGate;
+
+    /**
+     * Verbs it's okay to drop if the request has been queued longer than RPC_TIMEOUT.  These
+     * all correspond to client requests or something triggered by them; we don't want to
+     * drop internal messages like bootstrap or repair notifications.
+     */
+    public static final EnumSet<StorageService.Verb> DROPPABLE_VERBS = EnumSet.of(StorageService.Verb.BINARY,
+                                                                                  StorageService.Verb.MUTATION,
+                                                                                  StorageService.Verb.READ_REPAIR,
+                                                                                  StorageService.Verb.READ,
+                                                                                  StorageService.Verb.RANGE_SLICE,
+                                                                                  StorageService.Verb.REQUEST_RESPONSE);
+
+    // total dropped message counts for server lifetime
     private final Map<StorageService.Verb, AtomicInteger> droppedMessages = new EnumMap<StorageService.Verb, AtomicInteger>(StorageService.Verb.class);
+    // dropped count when last requested for the Recent api.  high concurrency isn't necessary here.
+    private final Map<StorageService.Verb, Integer> lastDropped = Collections.synchronizedMap(new EnumMap<StorageService.Verb, Integer>(StorageService.Verb.class));
+
     private final List<ILatencySubscriber> subscribers = new ArrayList<ILatencySubscriber>();
     private static final long DEFAULT_CALLBACK_TIMEOUT = (long) (1.1 * DatabaseDescriptor.getRpcTimeout());
 
     {
-        for (StorageService.Verb verb : StorageService.Verb.values())
+        for (StorageService.Verb verb : DROPPABLE_VERBS)
+        {
             droppedMessages.put(verb, new AtomicInteger());
+            lastDropped.put(verb, 0);
+        }
     }
 
     private static class MSHandle
@@ -127,7 +149,7 @@ public final class MessagingService impl
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         try
         {
-            mbs.registerMBean(this, new ObjectName("org.apache.cassandra.net:type=MessagingService"));
+            mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
         }
         catch (Exception e)
         {
@@ -546,9 +568,10 @@ public final class MessagingService impl
         return buffer;
     }
 
-    public int incrementDroppedMessages(StorageService.Verb verb)
+    public void incrementDroppedMessages(StorageService.Verb verb)
     {
-        return droppedMessages.get(verb).incrementAndGet();
+        assert DROPPABLE_VERBS.contains(verb) : "Verb " + verb + " should not legally be dropped";
+        droppedMessages.get(verb).incrementAndGet();
     }
 
     private void logDroppedMessages()
@@ -560,10 +583,9 @@ public final class MessagingService impl
             if (dropped.get() > 0)
             {
                 logTpstats = true;
-                logger_.warn("Dropped {} {} messages in the last {}ms",
-                             new Object[] {dropped, entry.getKey(), LOG_DROPPED_INTERVAL_IN_MS});
+                logger_.info("{} {} messages dropped in server lifetime",
+                             dropped, entry.getKey());
             }
-            dropped.set(0);
         }
 
         if (logTpstats)
@@ -644,4 +666,26 @@ public final class MessagingService impl
     {
         return DEFAULT_CALLBACK_TIMEOUT;
     }
+
+    public Map<String, Integer> getDroppedMessages()
+    {
+        Map<String, Integer> map = new HashMap<String, Integer>();
+        for (Map.Entry<StorageService.Verb, AtomicInteger> entry : droppedMessages.entrySet())
+            map.put(entry.getKey().toString(), entry.getValue().get());
+        return map;
+    }
+
+    public Map<String, Integer> getRecentlyDroppedMessages()
+    {
+        Map<String, Integer> map = new HashMap<String, Integer>();
+        for (Map.Entry<StorageService.Verb, AtomicInteger> entry : droppedMessages.entrySet())
+        {
+            StorageService.Verb verb = entry.getKey();
+            Integer dropped = entry.getValue().get();
+            Integer recentlyDropped = dropped - lastDropped.get(verb);
+            map.put(verb.toString(), recentlyDropped);
+            lastDropped.put(verb, dropped);
+        }
+        return map;
+    }
 }

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingServiceMBean.java?rev=1143428&r1=1143427&r2=1143428&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingServiceMBean.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingServiceMBean.java Wed Jul  6 14:13:10 2011
@@ -49,4 +49,14 @@ public interface MessagingServiceMBean
      * Completed tasks for Response(GOSSIP & RESPONSE) TCP Connections
      */
     public Map<String, Long> getResponseCompletedTasks();
+
+    /**
+     * dropped message counts for server lifetime
+     */
+    public Map<String, Integer> getDroppedMessages();
+
+    /**
+     * dropped message counts since last called
+     */
+    public Map<String, Integer> getRecentlyDroppedMessages();
 }

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java?rev=1143428&r1=1143427&r2=1143428&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java Wed Jul  6 14:13:10 2011
@@ -20,6 +20,7 @@ package org.apache.cassandra.service;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.net.MalformedURLException;
 import java.net.URL;
@@ -28,6 +29,10 @@ import java.util.concurrent.RejectedExec
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
 
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.log4j.PropertyConfigurator;
@@ -79,6 +84,8 @@ public abstract class AbstractCassandraD
     }
 
     private static Logger logger = LoggerFactory.getLogger(AbstractCassandraDaemon.class);
+
+    static final AtomicInteger exceptions = new AtomicInteger();
     
     protected InetAddress listenAddr;
     protected int listenPort;
@@ -99,18 +106,18 @@ public abstract class AbstractCassandraD
 
         listenPort = DatabaseDescriptor.getRpcPort();
         listenAddr = DatabaseDescriptor.getRpcAddress();
-        
         /* 
          * If ThriftAddress was left completely unconfigured, then assume
          * the same default as ListenAddress
          */
         if (listenAddr == null)
             listenAddr = FBUtilities.getLocalAddress();
-        
+
         Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler()
         {
             public void uncaughtException(Thread t, Throwable e)
             {
+                exceptions.incrementAndGet();
                 logger.error("Fatal exception in thread " + t, e);
                 if (e instanceof OutOfMemoryError)
                 {

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/GCInspector.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/GCInspector.java?rev=1143428&r1=1143427&r2=1143428&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/GCInspector.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/GCInspector.java Wed Jul  6 14:13:10 2011
@@ -104,7 +104,7 @@ public class GCInspector
                 continue;
 
             Long previous = gctimes.get(gcw.getName());
-            if (previous != null && previous.longValue() == gcw.getCollectionTime().longValue())            
+            if (previous != null && previous.longValue() == gcw.getCollectionTime().longValue())
                 continue;
             gctimes.put(gcw.getName(), gcw.getCollectionTime());
 
@@ -124,7 +124,7 @@ public class GCInspector
 
             String st = String.format("GC for %s: %s ms, %s reclaimed leaving %s used; max is %s",
                                       gcw.getName(), gcw.getDuration(), previousMemoryUsed - memoryUsed, memoryUsed, memoryMax);
-            if (gcw.getDuration() > MIN_DURATION)                          
+            if (gcw.getDuration() > MIN_DURATION)
                 logger.info(st);
             else if (logger.isDebugEnabled())
                 logger.debug(st);

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java?rev=1143428&r1=1143427&r2=1143428&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java Wed Jul  6 14:13:10 2011
@@ -2472,4 +2472,9 @@ public class StorageService implements I
             throw new RuntimeException(e);
         }
     }
+
+    public int getExceptionCount()
+    {
+        return AbstractCassandraDaemon.exceptions.get();
+    }
 }

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=1143428&r1=1143427&r2=1143428&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageServiceMBean.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageServiceMBean.java Wed Jul  6 14:13:10 2011
@@ -291,6 +291,8 @@ public interface StorageServiceMBean
     public void joinRing() throws IOException, org.apache.cassandra.config.ConfigurationException;
     public boolean isJoined();
 
+    public int getExceptionCount();
+
     public void setCompactionThroughputMbPerSec(int value);
 
     public void bulkLoad(String directory);

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeCmd.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeCmd.java?rev=1143428&r1=1143427&r2=1143428&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeCmd.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeCmd.java Wed Jul  6 14:13:10 2011
@@ -31,19 +31,18 @@ import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.ExecutionException;
 
-import org.apache.cassandra.db.compaction.CompactionInfo;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.config.ConfigurationException;
-
 import org.apache.commons.cli.*;
 
 import org.apache.cassandra.cache.InstrumentingCacheMBean;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
+import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.db.ColumnFamilyStoreMBean;
+import org.apache.cassandra.db.compaction.CompactionInfo;
 import org.apache.cassandra.db.compaction.CompactionManagerMBean;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.net.MessagingServiceMBean;
 import org.apache.cassandra.utils.EstimatedHistogram;
+import org.apache.cassandra.utils.Pair;
 
 public class NodeCmd
 {
@@ -238,6 +237,10 @@ public class NodeCmd
                         threadPoolProxy.getCurrentlyBlockedTasks(),
                         threadPoolProxy.getTotalBlockedTasks());
         }
+
+        outs.printf("%n%-20s%10s%n", "Message type", "Dropped");
+        for (Entry<String, Integer> entry : probe.getDroppedMessages().entrySet())
+            outs.printf("%-20s%10s%n", entry.getKey(), entry.getValue());
     }
 
     /**
@@ -248,7 +251,7 @@ public class NodeCmd
     public void printInfo(PrintStream outs)
     {
         boolean gossipInitialized = probe.isInitialized();
-        outs.println(probe.getToken());
+        outs.printf("%-17s: %s%n", "Token", probe.getToken());
         outs.printf("%-17s: %s%n", "Gossip active", gossipInitialized);
         outs.printf("%-17s: %s%n", "Load", probe.getLoadString());
         if (gossipInitialized)
@@ -269,6 +272,9 @@ public class NodeCmd
         // Data Center/Rack
         outs.printf("%-17s: %s%n", "Data Center", probe.getDataCenter());
         outs.printf("%-17s: %s%n", "Rack", probe.getRack());
+
+        // Exceptions
+        outs.printf("%-17s: %s%n", "Exceptions", probe.getExceptionCount());
     }
 
     public void printReleaseVersion(PrintStream outs)
@@ -329,7 +335,7 @@ public class NodeCmd
             }
         }
 
-        MessagingServiceMBean ms = probe.getMsProxy();
+        MessagingServiceMBean ms = probe.msProxy;
         outs.printf("%-25s", "Pool Name");
         outs.printf("%10s", "Active");
         outs.printf("%10s", "Pending");

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeProbe.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=1143428&r1=1143427&r2=1143428&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeProbe.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeProbe.java Wed Jul  6 14:13:10 2011
@@ -26,7 +26,6 @@ import java.lang.management.MemoryUsage;
 import java.lang.management.RuntimeMXBean;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.ExecutionException;
@@ -43,23 +42,19 @@ import com.google.common.collect.Iterabl
 
 import org.apache.cassandra.cache.InstrumentingCacheMBean;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ConfigurationException;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.db.ColumnFamilyStoreMBean;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.CompactionManagerMBean;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.EndpointSnitchInfoMBean;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.MessagingServiceMBean;
 import org.apache.cassandra.service.StorageServiceMBean;
 import org.apache.cassandra.streaming.StreamingService;
 import org.apache.cassandra.streaming.StreamingServiceMBean;
 import org.apache.cassandra.thrift.UnavailableException;
 
-import static com.google.common.base.Charsets.UTF_8;
-
 /**
  * JMX client operations for Cassandra.
  */
@@ -80,7 +75,8 @@ public class NodeProbe
     private MemoryMXBean memProxy;
     private RuntimeMXBean runtimeProxy;
     private StreamingServiceMBean streamProxy;
-    
+    public MessagingServiceMBean msProxy;
+
     /**
      * Creates a NodeProbe using the specified JMX host, port, username, and password.
      *
@@ -148,6 +144,8 @@ public class NodeProbe
         {
             ObjectName name = new ObjectName(ssObjName);
             ssProxy = JMX.newMBeanProxy(mbeanServerConn, name, StorageServiceMBean.class);
+            name = new ObjectName(MessagingService.MBEAN_NAME);
+            msProxy = JMX.newMBeanProxy(mbeanServerConn, name, MessagingServiceMBean.class);
             name = new ObjectName(StreamingService.MBEAN_OBJECT_NAME);
             streamProxy = JMX.newMBeanProxy(mbeanServerConn, name, StreamingServiceMBean.class);
             name = new ObjectName(CompactionManager.MBEAN_OBJECT_NAME);
@@ -502,18 +500,6 @@ public class NodeProbe
         }
     }
     
-    public MessagingServiceMBean getMsProxy()
-    {
-        try
-        {
-            return JMX.newMBeanProxy(mbeanServerConn, new ObjectName("org.apache.cassandra.net:type=MessagingService"), MessagingServiceMBean.class);
-        }
-        catch (MalformedObjectNameException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-    
     public ColumnFamilyStoreMBean getCfsProxy(String ks, String cf)
     {
         ColumnFamilyStoreMBean cfsProxy = null;
@@ -595,6 +581,16 @@ public class NodeProbe
     {
         ssProxy.setCompactionThroughputMbPerSec(value);
     }
+
+    public int getExceptionCount()
+    {
+        return ssProxy.getExceptionCount();
+    }
+
+    public Map<String, Integer> getDroppedMessages()
+    {
+        return msProxy.getDroppedMessages();
+    }
 }
 
 class ColumnFamilyStoreMBeanIterator implements Iterator<Map.Entry<String, ColumnFamilyStoreMBean>>