You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/05/07 00:27:03 UTC

svn commit: r772448 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: db/ gms/ net/ service/ utils/

Author: jbellis
Date: Wed May  6 22:27:02 2009
New Revision: 772448

URL: http://svn.apache.org/viewvc?rev=772448&view=rev
Log:
clean up more ad-hoc timing message and move to mbeans.  add TimedStatsDeque to
simplify tracking load stats over a one-minute window.
patch by jbellis; reviewed by Eric Evans for CASSANDRA-144

Added:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/AbstractStatsDeque.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/BoundedStatsDeque.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/TimedStatsDeque.java
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=772448&r1=772447&r2=772448&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed May  6 22:27:02 2009
@@ -54,6 +54,7 @@
 import org.apache.cassandra.utils.BloomFilter;
 import org.apache.cassandra.utils.FileUtils;
 import org.apache.cassandra.utils.LogUtil;
+import org.apache.cassandra.utils.TimedStatsDeque;
 
 /**
  * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -88,6 +89,9 @@
     /* Flag indicates if a compaction is in process */
     private AtomicBoolean isCompacting_ = new AtomicBoolean(false);
 
+    private TimedStatsDeque readStats_ = new TimedStatsDeque(60000);
+    private TimedStatsDeque diskReadStats_ = new TimedStatsDeque(60000);
+
     ColumnFamilyStore(String table, String columnFamily, boolean isSuper, int indexValue) throws IOException
     {
         table_ = table;
@@ -482,15 +486,20 @@
 
     public ColumnFamily getColumnFamily(String key, String columnFamilyColumn, IFilter filter) throws IOException
     {
+        long start = System.currentTimeMillis();
         List<ColumnFamily> columnFamilies = getColumnFamilies(key, columnFamilyColumn, filter);
-        return resolveAndRemoveDeleted(columnFamilies);
+        ColumnFamily cf = resolveAndRemoveDeleted(columnFamilies);
+        readStats_.add(System.currentTimeMillis() - start);
+        return cf;
     }
 
     public ColumnFamily getColumnFamily(String key, String columnFamilyColumn, IFilter filter, int gcBefore) throws IOException
     {
+        long start = System.currentTimeMillis();
         List<ColumnFamily> columnFamilies = getColumnFamilies(key, columnFamilyColumn, filter);
-        ColumnFamily cf = ColumnFamily.resolve(columnFamilies);
-        return removeDeleted(cf, gcBefore);
+        ColumnFamily cf = removeDeleted(ColumnFamily.resolve(columnFamilies), gcBefore);
+        readStats_.add(System.currentTimeMillis() - start);
+        return cf;
     }
 
     /**
@@ -513,7 +522,7 @@
         {
             long start = System.currentTimeMillis();
             getColumnFamilyFromDisk(key, columnFamilyColumn, columnFamilies, filter);
-            logger_.debug("DISK TIME: " + (System.currentTimeMillis() - start) + " ms.");
+            diskReadStats_.add(System.currentTimeMillis() - start);
         }
         return columnFamilies;
     }
@@ -1457,4 +1466,19 @@
     {
         return Collections.unmodifiableSet(ssTables_);
     }
+
+    public int getReadCount()
+    {
+        return readStats_.size();
+    }
+
+    public int getReadDiskHits()
+    {
+        return diskReadStats_.size();
+    }
+
+    public double getReadLatency()
+    {
+        return readStats_.mean();
+    }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java?rev=772448&r1=772447&r2=772448&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java Wed May  6 22:27:02 2009
@@ -53,4 +53,19 @@
      * Triggers an immediate memtable flush.
      */
     public void forceFlush();
+
+    /**
+     * @return the number of read operations on this column family in the last minute
+     */
+    public int getReadCount();
+
+    /**
+     * @return the number of read operations on this column family that hit the disk in the last minute
+     */
+    public int getReadDiskHits();
+
+    /**
+     * @return average latency per read operation in the last minute
+     */
+    public double getReadLatency();
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=772448&r1=772447&r2=772448&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java Wed May  6 22:27:02 2009
@@ -75,10 +75,7 @@
             ReadCommand readCommand = ReadCommand.serializer().deserialize(readCtx.bufIn_);
             Table table = Table.open(readCommand.table);
             Row row = null;
-            long start = System.currentTimeMillis();
             row = readCommand.getRow(table);
-            logger_.info("getRow()  TIME: " + (System.currentTimeMillis() - start) + " ms.");
-            start = System.currentTimeMillis();
             ReadResponse readResponse = null;
             if(readCommand.isDigestQuery())
             {
@@ -92,28 +89,24 @@
             /* serialize the ReadResponseMessage. */
             readCtx.bufOut_.reset();
 
-            start = System.currentTimeMillis();
             ReadResponse.serializer().serialize(readResponse, readCtx.bufOut_);
-            logger_.info("serialize  TIME: " + (System.currentTimeMillis() - start) + " ms.");
 
             byte[] bytes = new byte[readCtx.bufOut_.getLength()];
-            start = System.currentTimeMillis();
             System.arraycopy(readCtx.bufOut_.getData(), 0, bytes, 0, bytes.length);
-            logger_.info("copy  TIME: " + (System.currentTimeMillis() - start) + " ms.");
 
-            Message response = message.getReply( StorageService.getLocalStorageEndPoint(), bytes );
+            Message response = message.getReply(StorageService.getLocalStorageEndPoint(), bytes);
+            logger_.debug("Read key " + readCommand.key + "; sending response to " + message.getFrom());
             MessagingService.getMessagingInstance().sendOneWay(response, message.getFrom());
-            logger_.info("ReadVerbHandler  TIME 2: " + (System.currentTimeMillis() - start) + " ms.");
-            
+
             /* Do read repair if header of the message says so */
             if (message.getHeader(ReadCommand.DO_REPAIR) != null)
             {
                 doReadRepair(row, readCommand);
             }
         }
-        catch ( IOException ex)
+        catch (IOException ex)
         {
-            logger_.info( LogUtil.throwableToString(ex) );
+            throw new RuntimeException(ex);
         }
     }
     

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java?rev=772448&r1=772447&r2=772448&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java Wed May  6 22:27:02 2009
@@ -33,6 +33,7 @@
 import org.apache.cassandra.net.EndPoint;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.LogUtil;
+import org.apache.cassandra.utils.BoundedStatsDeque;
 import org.apache.log4j.Logger;
 
 /**
@@ -236,22 +237,17 @@
 {
     private static Logger logger_ = Logger.getLogger(ArrivalWindow.class);
     private double tLast_ = 0L;
-    private Deque<Double> arrivalIntervals_;
+    private BoundedStatsDeque arrivalIntervals_;
     private int size_;
     
     ArrivalWindow(int size)
     {
         size_ = size;
-        arrivalIntervals_ = new ArrayDeque<Double>(size);
+        arrivalIntervals_ = new BoundedStatsDeque(size);
     }
     
     synchronized void add(double value)
     {
-        if ( arrivalIntervals_.size() == size_ )
-        {                          
-            arrivalIntervals_.remove();            
-        }
-        
         double interArrivalTime;
         if ( tLast_ > 0L )
         {                        
@@ -267,41 +263,27 @@
     
     synchronized double sum()
     {
-        double sum = 0d;
-        for (Double interval : arrivalIntervals_)
-        {
-            sum += interval;
-        }
-        return sum;
+        return arrivalIntervals_.sum();
     }
     
     synchronized double sumOfDeviations()
     {
-        double sumOfDeviations = 0d;
-        double mean = mean();
-
-        for (Double interval : arrivalIntervals_)
-        {
-            double v = interval - mean;
-            sumOfDeviations += v * v;
-        }
-
-        return sumOfDeviations;
+        return arrivalIntervals_.sumOfDeviations();
     }
     
     synchronized double mean()
     {
-        return sum()/arrivalIntervals_.size();
+        return arrivalIntervals_.mean();
     }
     
     synchronized double variance()
-    {                
-        return sumOfDeviations() / (arrivalIntervals_.size());        
+    {
+        return arrivalIntervals_.variance();
     }
     
-    double deviation()
-    {        
-        return Math.sqrt(variance());
+    double stdev()
+    {
+        return arrivalIntervals_.stdev();
     }
     
     void clear()
@@ -311,13 +293,9 @@
     
     double p(double t)
     {
-        // Stat stat = new Stat();
-        double mean = mean();        
-        double deviation = deviation();   
-        /* Exponential CDF = 1 -e^-lambda*x */
+        double mean = mean();
         double exponent = (-1)*(t)/mean;
         return 1 - ( 1 - Math.pow(Math.E, exponent) );
-        // return stat.gaussianCDF(mean, deviation, t, Double.POSITIVE_INFINITY);             
     }
     
     double phi(long tnow)
@@ -335,7 +313,7 @@
     
     public String toString()
     {
-        return StringUtils.join(arrivalIntervals_, " ");
+        return StringUtils.join(arrivalIntervals_.iterator(), " ");
     }
 }
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java?rev=772448&r1=772447&r2=772448&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java Wed May  6 22:27:02 2009
@@ -39,18 +39,11 @@
     
     public void run()
     { 
-        try
-        {            
-            String verb = message_.getVerb();                               
-            IVerbHandler verbHandler = MessagingService.getMessagingInstance().getVerbHandler(verb);           
-            if ( verbHandler != null )
-            {
-                verbHandler.doVerb(message_);        
-            }
-        }
-        catch (Throwable th)
+        String verb = message_.getVerb();
+        IVerbHandler verbHandler = MessagingService.getMessagingInstance().getVerbHandler(verb);
+        if ( verbHandler != null )
         {
-            logger_.warn( LogUtil.throwableToString(th) );
+            verbHandler.doVerb(message_);
         }
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=772448&r1=772447&r2=772448&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed May  6 22:27:02 2009
@@ -37,6 +37,7 @@
 import org.apache.cassandra.net.IAsyncResult;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.TimedStatsDeque;
 import org.apache.log4j.Logger;
 
 import javax.management.MBeanServer;
@@ -48,12 +49,9 @@
     private static Logger logger = Logger.getLogger(StorageProxy.class);
 
     // mbean stuff
-    private static volatile long readLatency;
-    private static volatile int readOperations;
-    private static volatile long rangeLatency;
-    private static volatile int rangeOperations;
-    private static volatile long writeLatency;
-    private static volatile int writeOperations;
+    private static TimedStatsDeque readStats = new TimedStatsDeque(60000);
+    private static TimedStatsDeque rangeStats = new TimedStatsDeque(60000);
+    private static TimedStatsDeque writeStats = new TimedStatsDeque(60000);
     private StorageProxy() {}
     static
     {
@@ -119,7 +117,7 @@
 			Map<EndPoint, EndPoint> endpointMap = StorageService.instance().getNStorageEndPointMap(rm.key());
 			// TODO: throw a thrift exception if we do not have N nodes
 			Map<EndPoint, Message> messageMap = createWriteMessages(rm, endpointMap);
-            logger.debug("insert writing to [" + StringUtils.join(messageMap.keySet(), ", ") + "]");
+            logger.debug("insert writing key " + rm.key() + " to [" + StringUtils.join(messageMap.keySet(), ", ") + "]");
 			for (Map.Entry<EndPoint, Message> entry : messageMap.entrySet())
 			{
 				MessagingService.getMessagingInstance().sendOneWay(entry.getValue(), entry.getKey());
@@ -131,12 +129,7 @@
         }
         finally
         {
-            if (writeOperations++ == Integer.MAX_VALUE)
-            {
-                writeOperations = 1;
-                writeLatency = 0;
-            }
-            writeLatency += System.currentTimeMillis() - startTime;
+            writeStats.add(System.currentTimeMillis() - startTime);
         }
     }
 
@@ -154,12 +147,11 @@
         }
         try
         {
-            IResponseResolver<Boolean> writeResponseResolver = new WriteResponseResolver();
             QuorumResponseHandler<Boolean> quorumResponseHandler = new QuorumResponseHandler<Boolean>(
                     DatabaseDescriptor.getReplicationFactor(),
-                    writeResponseResolver);
+                    new WriteResponseResolver());
             EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(rm.key());
-            logger.debug("insertBlocking writing to [" + StringUtils.join(endpoints, ", ") + "]");
+            logger.debug("insertBlocking writing key " + rm.key() + " to [" + StringUtils.join(endpoints, ", ") + "]");
             // TODO: throw a thrift exception if we do not have N nodes
 
             MessagingService.getMessagingInstance().sendRR(message, endpoints, quorumResponseHandler);
@@ -173,12 +165,7 @@
         }
         finally
         {
-            if (writeOperations++ == Integer.MAX_VALUE)
-            {
-                writeOperations = 1;
-                writeLatency = 0;
-            }
-            writeLatency += System.currentTimeMillis() - startTime;
+            writeStats.add(System.currentTimeMillis() - startTime);
         }
     }
     
@@ -346,12 +333,7 @@
         }
         finally
         {
-            if (readOperations++ == Integer.MAX_VALUE)
-            {
-                readOperations = 1;
-                readLatency = 0;
-            }
-            readLatency += System.currentTimeMillis() - startTime;
+            readStats.add(System.currentTimeMillis() - startTime);
         }
     }
 
@@ -385,12 +367,7 @@
             row = strongRead(command);
         }
 
-        if (readOperations++ == Integer.MAX_VALUE)
-        {
-            readOperations = 1;
-            readLatency = 0;
-        }
-        readLatency += System.currentTimeMillis() - startTime;
+        readStats.add(System.currentTimeMillis() - startTime);
 
         return row;
     }
@@ -702,42 +679,37 @@
         }
         finally
         {
-            if (rangeOperations++ == Integer.MAX_VALUE)
-            {
-                rangeOperations = 1;
-                rangeLatency = 0;
-            }
-            rangeLatency += System.currentTimeMillis() - startTime;
+            rangeStats.add(System.currentTimeMillis() - startTime);
         }
     }
 
     public double getReadLatency()
     {
-        return ((double)readLatency) / readOperations;
+        return readStats.mean();
     }
 
     public double getRangeLatency()
     {
-        return ((double)rangeLatency) / rangeOperations;
+        return rangeStats.mean();
     }
 
     public double getWriteLatency()
     {
-        return ((double)writeLatency) / writeOperations;
+        return writeStats.mean();
     }
 
     public int getReadOperations()
     {
-        return readOperations;
+        return readStats.size();
     }
 
     public int getRangeOperations()
     {
-        return rangeOperations;
+        return rangeStats.size();
     }
 
     public int getWriteOperations()
     {
-        return writeOperations;
+        return writeStats.size();
     }
 }

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/AbstractStatsDeque.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/AbstractStatsDeque.java?rev=772448&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/AbstractStatsDeque.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/AbstractStatsDeque.java Wed May  6 22:27:02 2009
@@ -0,0 +1,55 @@
+package org.apache.cassandra.utils;
+
+import java.util.Iterator;
+import java.util.ArrayDeque;
+
+public abstract class AbstractStatsDeque implements Iterable<Double>
+{
+    public abstract Iterator<Double> iterator();
+    public abstract int size();
+    public abstract void add(double o);
+    public abstract void clear();
+
+    //
+    // statistical methods
+    //
+
+    public double sum()
+    {
+        double sum = 0d;
+        for (Double interval : this)
+        {
+            sum += interval;
+        }
+        return sum;
+    }
+
+    public double sumOfDeviations()
+    {
+        double sumOfDeviations = 0d;
+        double mean = mean();
+
+        for (Double interval : this)
+        {
+            double v = interval - mean;
+            sumOfDeviations += v * v;
+        }
+
+        return sumOfDeviations;
+    }
+
+    public double mean()
+    {
+        return sum() / size();
+    }
+
+    public double variance()
+    {
+        return sumOfDeviations() / size();
+    }
+
+    public double stdev()
+    {
+        return Math.sqrt(variance());
+    }
+}

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/BoundedStatsDeque.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/BoundedStatsDeque.java?rev=772448&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/BoundedStatsDeque.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/BoundedStatsDeque.java Wed May  6 22:27:02 2009
@@ -0,0 +1,40 @@
+package org.apache.cassandra.utils;
+
+import java.util.ArrayDeque;
+import java.util.Iterator;
+
+public class BoundedStatsDeque extends AbstractStatsDeque
+{
+    private final int size;
+    protected final ArrayDeque<Double> deque;
+
+    public BoundedStatsDeque(int size)
+    {
+        this.size = size;
+        deque = new ArrayDeque<Double>(size);
+    }
+
+    public Iterator<Double> iterator()
+    {
+        return deque.iterator();
+    }
+
+    public int size()
+    {
+        return deque.size();
+    }
+
+    public void clear()
+    {
+        deque.clear();
+    }
+
+    public void add(double o)
+    {
+        if (size == deque.size())
+        {
+            deque.remove();
+        }
+        deque.add(o);
+    }
+}

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/TimedStatsDeque.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/TimedStatsDeque.java?rev=772448&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/TimedStatsDeque.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/TimedStatsDeque.java Wed May  6 22:27:02 2009
@@ -0,0 +1,68 @@
+package org.apache.cassandra.utils;
+
+import java.util.Iterator;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TimedStatsDeque extends AbstractStatsDeque
+{
+    private final ArrayDeque<Tuple> deque;
+    private final long period;
+
+    public TimedStatsDeque(long period)
+    {
+        this.period = period;
+        deque = new ArrayDeque<Tuple>();
+    }
+
+    private void purge()
+    {
+        long now = System.currentTimeMillis();
+        while (!deque.isEmpty() && deque.peek().timestamp < now - period)
+        {
+            deque.remove();
+        }
+    }
+
+    public Iterator<Double> iterator()
+    {
+        purge();
+        // I expect this method to be called relatively infrequently so inefficiency is ok.
+        List<Double> L = new ArrayList<Double>(deque.size());
+        for (Tuple t : deque)
+        {
+            L.add(t.value);
+        }
+        return L.iterator();
+    }
+
+    public int size()
+    {
+        purge();
+        return deque.size();
+    }
+
+    public void add(double o)
+    {
+        purge();
+        deque.add(new Tuple(o, System.currentTimeMillis()));
+    }
+
+    public void clear()
+    {
+        deque.clear();
+    }
+}
+
+class Tuple
+{
+    public final double value;
+    public final long timestamp;
+
+    public Tuple(double value, long timestamp)
+    {
+        this.value = value;
+        this.timestamp = timestamp;
+    }
+}
\ No newline at end of file