You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/07/06 16:13:10 UTC
svn commit: r1143428 - in /cassandra/branches/cassandra-0.8: ./
src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/
src/java/org/apache/cassandra/tools/
Author: jbellis
Date: Wed Jul 6 14:13:10 2011
New Revision: 1143428
URL: http://svn.apache.org/viewvc?rev=1143428&view=rev
Log:
add MessagingService.get[Recently]DroppedMessages and StorageService.getExceptionCount
patch by jbellis and Ryan King; reviewed by slebresne for CASSANDRA-2804
Modified:
cassandra/branches/cassandra-0.8/CHANGES.txt
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/GCInspector.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageServiceMBean.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeCmd.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeProbe.java
Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1143428&r1=1143427&r2=1143428&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Wed Jul 6 14:13:10 2011
@@ -16,6 +16,8 @@
* fix CLI perpetuating obsolete KsDef.replication_factor (CASSANDRA-2846)
* improve cli treatment of multiline comments (CASSANDRA-2852)
* handle row tombstones correctly in EchoedRow (CASSANDRA-2786)
+ * add MessagingService.get[Recently]DroppedMessages and
+ StorageService.getExceptionCount (CASSANDRA-2804)
0.8.1
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessageDeliveryTask.java?rev=1143428&r1=1143427&r2=1143428&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessageDeliveryTask.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessageDeliveryTask.java Wed Jul 6 14:13:10 2011
@@ -42,33 +42,20 @@ public class MessageDeliveryTask impleme
public void run()
{
StorageService.Verb verb = message.getVerb();
- switch (verb)
+ if (MessagingService.DROPPABLE_VERBS.contains(verb)
+ && System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getRpcTimeout())
{
- case BINARY:
- case MUTATION:
- case READ:
- case RANGE_SLICE:
- case READ_REPAIR:
- case REQUEST_RESPONSE:
- if (System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getRpcTimeout())
- {
- MessagingService.instance().incrementDroppedMessages(verb);
- return;
- }
- break;
-
- // don't bother.
- case UNUSED_1:
- case UNUSED_2:
- case UNUSED_3:
- return;
-
- default:
- break;
+ MessagingService.instance().incrementDroppedMessages(verb);
+ return;
}
IVerbHandler verbHandler = MessagingService.instance().getVerbHandler(verb);
- assert verbHandler != null : "unknown verb " + verb;
+ if (verbHandler == null)
+ {
+ logger_.debug("Unknown verb {}", verb);
+ return;
+ }
+
verbHandler.doVerb(message, id);
}
}
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingService.java?rev=1143428&r1=1143427&r2=1143428&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingService.java Wed Jul 6 14:13:10 2011
@@ -57,6 +57,8 @@ import org.cliffc.high_scale_lib.NonBloc
public final class MessagingService implements MessagingServiceMBean
{
+ public static final String MBEAN_NAME = "org.apache.cassandra.net:type=MessagingService";
+
public static final int VERSION_07 = 1;
public static final int version_ = 2;
//TODO: make this parameter dynamic somehow. Not sure if config is appropriate.
@@ -81,13 +83,33 @@ public final class MessagingService impl
private SocketThread socketThread;
private final SimpleCondition listenGate;
+
+ /**
+ * Verbs it's okay to drop if the request has been queued longer than RPC_TIMEOUT. These
+ * all correspond to client requests or something triggered by them; we don't want to
+ * drop internal messages like bootstrap or repair notifications.
+ */
+ public static final EnumSet<StorageService.Verb> DROPPABLE_VERBS = EnumSet.of(StorageService.Verb.BINARY,
+ StorageService.Verb.MUTATION,
+ StorageService.Verb.READ_REPAIR,
+ StorageService.Verb.READ,
+ StorageService.Verb.RANGE_SLICE,
+ StorageService.Verb.REQUEST_RESPONSE);
+
+ // total dropped message counts for server lifetime
private final Map<StorageService.Verb, AtomicInteger> droppedMessages = new EnumMap<StorageService.Verb, AtomicInteger>(StorageService.Verb.class);
+ // dropped count when last requested for the Recent api. high concurrency isn't necessary here.
+ private final Map<StorageService.Verb, Integer> lastDropped = Collections.synchronizedMap(new EnumMap<StorageService.Verb, Integer>(StorageService.Verb.class));
+
private final List<ILatencySubscriber> subscribers = new ArrayList<ILatencySubscriber>();
private static final long DEFAULT_CALLBACK_TIMEOUT = (long) (1.1 * DatabaseDescriptor.getRpcTimeout());
{
- for (StorageService.Verb verb : StorageService.Verb.values())
+ for (StorageService.Verb verb : DROPPABLE_VERBS)
+ {
droppedMessages.put(verb, new AtomicInteger());
+ lastDropped.put(verb, 0);
+ }
}
private static class MSHandle
@@ -127,7 +149,7 @@ public final class MessagingService impl
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
try
{
- mbs.registerMBean(this, new ObjectName("org.apache.cassandra.net:type=MessagingService"));
+ mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
}
catch (Exception e)
{
@@ -546,9 +568,10 @@ public final class MessagingService impl
return buffer;
}
- public int incrementDroppedMessages(StorageService.Verb verb)
+ public void incrementDroppedMessages(StorageService.Verb verb)
{
- return droppedMessages.get(verb).incrementAndGet();
+ assert DROPPABLE_VERBS.contains(verb) : "Verb " + verb + " should not legally be dropped";
+ droppedMessages.get(verb).incrementAndGet();
}
private void logDroppedMessages()
@@ -560,10 +583,9 @@ public final class MessagingService impl
if (dropped.get() > 0)
{
logTpstats = true;
- logger_.warn("Dropped {} {} messages in the last {}ms",
- new Object[] {dropped, entry.getKey(), LOG_DROPPED_INTERVAL_IN_MS});
+ logger_.info("{} {} messages dropped in server lifetime",
+ dropped, entry.getKey());
}
- dropped.set(0);
}
if (logTpstats)
@@ -644,4 +666,26 @@ public final class MessagingService impl
{
return DEFAULT_CALLBACK_TIMEOUT;
}
+
+ public Map<String, Integer> getDroppedMessages()
+ {
+ Map<String, Integer> map = new HashMap<String, Integer>();
+ for (Map.Entry<StorageService.Verb, AtomicInteger> entry : droppedMessages.entrySet())
+ map.put(entry.getKey().toString(), entry.getValue().get());
+ return map;
+ }
+
+ public Map<String, Integer> getRecentlyDroppedMessages()
+ {
+ Map<String, Integer> map = new HashMap<String, Integer>();
+ for (Map.Entry<StorageService.Verb, AtomicInteger> entry : droppedMessages.entrySet())
+ {
+ StorageService.Verb verb = entry.getKey();
+ Integer dropped = entry.getValue().get();
+ Integer recentlyDropped = dropped - lastDropped.get(verb);
+ map.put(verb.toString(), recentlyDropped);
+ lastDropped.put(verb, dropped);
+ }
+ return map;
+ }
}
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingServiceMBean.java?rev=1143428&r1=1143427&r2=1143428&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingServiceMBean.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingServiceMBean.java Wed Jul 6 14:13:10 2011
@@ -49,4 +49,14 @@ public interface MessagingServiceMBean
* Completed tasks for Response(GOSSIP & RESPONSE) TCP Connections
*/
public Map<String, Long> getResponseCompletedTasks();
+
+ /**
+ * dropped message counts for server lifetime
+ */
+ public Map<String, Integer> getDroppedMessages();
+
+ /**
+ * dropped message counts since last called
+ */
+ public Map<String, Integer> getRecentlyDroppedMessages();
}
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java?rev=1143428&r1=1143427&r2=1143428&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java Wed Jul 6 14:13:10 2011
@@ -20,6 +20,7 @@ package org.apache.cassandra.service;
import java.io.File;
import java.io.IOException;
+import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.URL;
@@ -28,6 +29,10 @@ import java.util.concurrent.RejectedExec
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
import org.apache.cassandra.gms.Gossiper;
import org.apache.log4j.PropertyConfigurator;
@@ -79,6 +84,8 @@ public abstract class AbstractCassandraD
}
private static Logger logger = LoggerFactory.getLogger(AbstractCassandraDaemon.class);
+
+ static final AtomicInteger exceptions = new AtomicInteger();
protected InetAddress listenAddr;
protected int listenPort;
@@ -99,18 +106,18 @@ public abstract class AbstractCassandraD
listenPort = DatabaseDescriptor.getRpcPort();
listenAddr = DatabaseDescriptor.getRpcAddress();
-
/*
* If ThriftAddress was left completely unconfigured, then assume
* the same default as ListenAddress
*/
if (listenAddr == null)
listenAddr = FBUtilities.getLocalAddress();
-
+
Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler()
{
public void uncaughtException(Thread t, Throwable e)
{
+ exceptions.incrementAndGet();
logger.error("Fatal exception in thread " + t, e);
if (e instanceof OutOfMemoryError)
{
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/GCInspector.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/GCInspector.java?rev=1143428&r1=1143427&r2=1143428&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/GCInspector.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/GCInspector.java Wed Jul 6 14:13:10 2011
@@ -104,7 +104,7 @@ public class GCInspector
continue;
Long previous = gctimes.get(gcw.getName());
- if (previous != null && previous.longValue() == gcw.getCollectionTime().longValue())
+ if (previous != null && previous.longValue() == gcw.getCollectionTime().longValue())
continue;
gctimes.put(gcw.getName(), gcw.getCollectionTime());
@@ -124,7 +124,7 @@ public class GCInspector
String st = String.format("GC for %s: %s ms, %s reclaimed leaving %s used; max is %s",
gcw.getName(), gcw.getDuration(), previousMemoryUsed - memoryUsed, memoryUsed, memoryMax);
- if (gcw.getDuration() > MIN_DURATION)
+ if (gcw.getDuration() > MIN_DURATION)
logger.info(st);
else if (logger.isDebugEnabled())
logger.debug(st);
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java?rev=1143428&r1=1143427&r2=1143428&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java Wed Jul 6 14:13:10 2011
@@ -2472,4 +2472,9 @@ public class StorageService implements I
throw new RuntimeException(e);
}
}
+
+ public int getExceptionCount()
+ {
+ return AbstractCassandraDaemon.exceptions.get();
+ }
}
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=1143428&r1=1143427&r2=1143428&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageServiceMBean.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageServiceMBean.java Wed Jul 6 14:13:10 2011
@@ -291,6 +291,8 @@ public interface StorageServiceMBean
public void joinRing() throws IOException, org.apache.cassandra.config.ConfigurationException;
public boolean isJoined();
+ public int getExceptionCount();
+
public void setCompactionThroughputMbPerSec(int value);
public void bulkLoad(String directory);
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeCmd.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeCmd.java?rev=1143428&r1=1143427&r2=1143428&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeCmd.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeCmd.java Wed Jul 6 14:13:10 2011
@@ -31,19 +31,18 @@ import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;
-import org.apache.cassandra.db.compaction.CompactionInfo;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.config.ConfigurationException;
-
import org.apache.commons.cli.*;
import org.apache.cassandra.cache.InstrumentingCacheMBean;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
+import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.db.ColumnFamilyStoreMBean;
+import org.apache.cassandra.db.compaction.CompactionInfo;
import org.apache.cassandra.db.compaction.CompactionManagerMBean;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.net.MessagingServiceMBean;
import org.apache.cassandra.utils.EstimatedHistogram;
+import org.apache.cassandra.utils.Pair;
public class NodeCmd
{
@@ -238,6 +237,10 @@ public class NodeCmd
threadPoolProxy.getCurrentlyBlockedTasks(),
threadPoolProxy.getTotalBlockedTasks());
}
+
+ outs.printf("%n%-20s%10s%n", "Message type", "Dropped");
+ for (Entry<String, Integer> entry : probe.getDroppedMessages().entrySet())
+ outs.printf("%-20s%10s%n", entry.getKey(), entry.getValue());
}
/**
@@ -248,7 +251,7 @@ public class NodeCmd
public void printInfo(PrintStream outs)
{
boolean gossipInitialized = probe.isInitialized();
- outs.println(probe.getToken());
+ outs.printf("%-17s: %s%n", "Token", probe.getToken());
outs.printf("%-17s: %s%n", "Gossip active", gossipInitialized);
outs.printf("%-17s: %s%n", "Load", probe.getLoadString());
if (gossipInitialized)
@@ -269,6 +272,9 @@ public class NodeCmd
// Data Center/Rack
outs.printf("%-17s: %s%n", "Data Center", probe.getDataCenter());
outs.printf("%-17s: %s%n", "Rack", probe.getRack());
+
+ // Exceptions
+ outs.printf("%-17s: %s%n", "Exceptions", probe.getExceptionCount());
}
public void printReleaseVersion(PrintStream outs)
@@ -329,7 +335,7 @@ public class NodeCmd
}
}
- MessagingServiceMBean ms = probe.getMsProxy();
+ MessagingServiceMBean ms = probe.msProxy;
outs.printf("%-25s", "Pool Name");
outs.printf("%10s", "Active");
outs.printf("%10s", "Pending");
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeProbe.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=1143428&r1=1143427&r2=1143428&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeProbe.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/NodeProbe.java Wed Jul 6 14:13:10 2011
@@ -26,7 +26,6 @@ import java.lang.management.MemoryUsage;
import java.lang.management.RuntimeMXBean;
import java.net.InetAddress;
import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;
@@ -43,23 +42,19 @@ import com.google.common.collect.Iterabl
import org.apache.cassandra.cache.InstrumentingCacheMBean;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
-import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ConfigurationException;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.db.ColumnFamilyStoreMBean;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.CompactionManagerMBean;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.EndpointSnitchInfoMBean;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.MessagingServiceMBean;
import org.apache.cassandra.service.StorageServiceMBean;
import org.apache.cassandra.streaming.StreamingService;
import org.apache.cassandra.streaming.StreamingServiceMBean;
import org.apache.cassandra.thrift.UnavailableException;
-import static com.google.common.base.Charsets.UTF_8;
-
/**
* JMX client operations for Cassandra.
*/
@@ -80,7 +75,8 @@ public class NodeProbe
private MemoryMXBean memProxy;
private RuntimeMXBean runtimeProxy;
private StreamingServiceMBean streamProxy;
-
+ public MessagingServiceMBean msProxy;
+
/**
* Creates a NodeProbe using the specified JMX host, port, username, and password.
*
@@ -148,6 +144,8 @@ public class NodeProbe
{
ObjectName name = new ObjectName(ssObjName);
ssProxy = JMX.newMBeanProxy(mbeanServerConn, name, StorageServiceMBean.class);
+ name = new ObjectName(MessagingService.MBEAN_NAME);
+ msProxy = JMX.newMBeanProxy(mbeanServerConn, name, MessagingServiceMBean.class);
name = new ObjectName(StreamingService.MBEAN_OBJECT_NAME);
streamProxy = JMX.newMBeanProxy(mbeanServerConn, name, StreamingServiceMBean.class);
name = new ObjectName(CompactionManager.MBEAN_OBJECT_NAME);
@@ -502,18 +500,6 @@ public class NodeProbe
}
}
- public MessagingServiceMBean getMsProxy()
- {
- try
- {
- return JMX.newMBeanProxy(mbeanServerConn, new ObjectName("org.apache.cassandra.net:type=MessagingService"), MessagingServiceMBean.class);
- }
- catch (MalformedObjectNameException e)
- {
- throw new RuntimeException(e);
- }
- }
-
public ColumnFamilyStoreMBean getCfsProxy(String ks, String cf)
{
ColumnFamilyStoreMBean cfsProxy = null;
@@ -595,6 +581,16 @@ public class NodeProbe
{
ssProxy.setCompactionThroughputMbPerSec(value);
}
+
+ public int getExceptionCount()
+ {
+ return ssProxy.getExceptionCount();
+ }
+
+ public Map<String, Integer> getDroppedMessages()
+ {
+ return msProxy.getDroppedMessages();
+ }
}
class ColumnFamilyStoreMBeanIterator implements Iterator<Map.Entry<String, ColumnFamilyStoreMBean>>