You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/05/07 00:27:03 UTC
svn commit: r772448 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra: db/ gms/ net/
service/ utils/
Author: jbellis
Date: Wed May 6 22:27:02 2009
New Revision: 772448
URL: http://svn.apache.org/viewvc?rev=772448&view=rev
Log:
clean up more ad-hoc timing message and move to mbeans. add TimedStatsDeque to
simplify tracking load stats over a one-minute window.
patch by jbellis; reviewed by Eric Evans for CASSANDRA-144
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/AbstractStatsDeque.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/BoundedStatsDeque.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/TimedStatsDeque.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=772448&r1=772447&r2=772448&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed May 6 22:27:02 2009
@@ -54,6 +54,7 @@
import org.apache.cassandra.utils.BloomFilter;
import org.apache.cassandra.utils.FileUtils;
import org.apache.cassandra.utils.LogUtil;
+import org.apache.cassandra.utils.TimedStatsDeque;
/**
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -88,6 +89,9 @@
/* Flag indicates if a compaction is in process */
private AtomicBoolean isCompacting_ = new AtomicBoolean(false);
+ private TimedStatsDeque readStats_ = new TimedStatsDeque(60000);
+ private TimedStatsDeque diskReadStats_ = new TimedStatsDeque(60000);
+
ColumnFamilyStore(String table, String columnFamily, boolean isSuper, int indexValue) throws IOException
{
table_ = table;
@@ -482,15 +486,20 @@
public ColumnFamily getColumnFamily(String key, String columnFamilyColumn, IFilter filter) throws IOException
{
+ long start = System.currentTimeMillis();
List<ColumnFamily> columnFamilies = getColumnFamilies(key, columnFamilyColumn, filter);
- return resolveAndRemoveDeleted(columnFamilies);
+ ColumnFamily cf = resolveAndRemoveDeleted(columnFamilies);
+ readStats_.add(System.currentTimeMillis() - start);
+ return cf;
}
public ColumnFamily getColumnFamily(String key, String columnFamilyColumn, IFilter filter, int gcBefore) throws IOException
{
+ long start = System.currentTimeMillis();
List<ColumnFamily> columnFamilies = getColumnFamilies(key, columnFamilyColumn, filter);
- ColumnFamily cf = ColumnFamily.resolve(columnFamilies);
- return removeDeleted(cf, gcBefore);
+ ColumnFamily cf = removeDeleted(ColumnFamily.resolve(columnFamilies), gcBefore);
+ readStats_.add(System.currentTimeMillis() - start);
+ return cf;
}
/**
@@ -513,7 +522,7 @@
{
long start = System.currentTimeMillis();
getColumnFamilyFromDisk(key, columnFamilyColumn, columnFamilies, filter);
- logger_.debug("DISK TIME: " + (System.currentTimeMillis() - start) + " ms.");
+ diskReadStats_.add(System.currentTimeMillis() - start);
}
return columnFamilies;
}
@@ -1457,4 +1466,19 @@
{
return Collections.unmodifiableSet(ssTables_);
}
+
+ public int getReadCount()
+ {
+ return readStats_.size();
+ }
+
+ public int getReadDiskHits()
+ {
+ return diskReadStats_.size();
+ }
+
+ public double getReadLatency()
+ {
+ return readStats_.mean();
+ }
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java?rev=772448&r1=772447&r2=772448&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java Wed May 6 22:27:02 2009
@@ -53,4 +53,19 @@
* Triggers an immediate memtable flush.
*/
public void forceFlush();
+
+ /**
+ * @return the number of read operations on this column family in the last minute
+ */
+ public int getReadCount();
+
+ /**
+ * @return the number of read operations on this column family that hit the disk in the last minute
+ */
+ public int getReadDiskHits();
+
+ /**
+ * @return average latency per read operation in the last minute
+ */
+ public double getReadLatency();
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=772448&r1=772447&r2=772448&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java Wed May 6 22:27:02 2009
@@ -75,10 +75,7 @@
ReadCommand readCommand = ReadCommand.serializer().deserialize(readCtx.bufIn_);
Table table = Table.open(readCommand.table);
Row row = null;
- long start = System.currentTimeMillis();
row = readCommand.getRow(table);
- logger_.info("getRow() TIME: " + (System.currentTimeMillis() - start) + " ms.");
- start = System.currentTimeMillis();
ReadResponse readResponse = null;
if(readCommand.isDigestQuery())
{
@@ -92,28 +89,24 @@
/* serialize the ReadResponseMessage. */
readCtx.bufOut_.reset();
- start = System.currentTimeMillis();
ReadResponse.serializer().serialize(readResponse, readCtx.bufOut_);
- logger_.info("serialize TIME: " + (System.currentTimeMillis() - start) + " ms.");
byte[] bytes = new byte[readCtx.bufOut_.getLength()];
- start = System.currentTimeMillis();
System.arraycopy(readCtx.bufOut_.getData(), 0, bytes, 0, bytes.length);
- logger_.info("copy TIME: " + (System.currentTimeMillis() - start) + " ms.");
- Message response = message.getReply( StorageService.getLocalStorageEndPoint(), bytes );
+ Message response = message.getReply(StorageService.getLocalStorageEndPoint(), bytes);
+ logger_.debug("Read key " + readCommand.key + "; sending response to " + message.getFrom());
MessagingService.getMessagingInstance().sendOneWay(response, message.getFrom());
- logger_.info("ReadVerbHandler TIME 2: " + (System.currentTimeMillis() - start) + " ms.");
-
+
/* Do read repair if header of the message says so */
if (message.getHeader(ReadCommand.DO_REPAIR) != null)
{
doReadRepair(row, readCommand);
}
}
- catch ( IOException ex)
+ catch (IOException ex)
{
- logger_.info( LogUtil.throwableToString(ex) );
+ throw new RuntimeException(ex);
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java?rev=772448&r1=772447&r2=772448&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java Wed May 6 22:27:02 2009
@@ -33,6 +33,7 @@
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.LogUtil;
+import org.apache.cassandra.utils.BoundedStatsDeque;
import org.apache.log4j.Logger;
/**
@@ -236,22 +237,17 @@
{
private static Logger logger_ = Logger.getLogger(ArrivalWindow.class);
private double tLast_ = 0L;
- private Deque<Double> arrivalIntervals_;
+ private BoundedStatsDeque arrivalIntervals_;
private int size_;
ArrivalWindow(int size)
{
size_ = size;
- arrivalIntervals_ = new ArrayDeque<Double>(size);
+ arrivalIntervals_ = new BoundedStatsDeque(size);
}
synchronized void add(double value)
{
- if ( arrivalIntervals_.size() == size_ )
- {
- arrivalIntervals_.remove();
- }
-
double interArrivalTime;
if ( tLast_ > 0L )
{
@@ -267,41 +263,27 @@
synchronized double sum()
{
- double sum = 0d;
- for (Double interval : arrivalIntervals_)
- {
- sum += interval;
- }
- return sum;
+ return arrivalIntervals_.sum();
}
synchronized double sumOfDeviations()
{
- double sumOfDeviations = 0d;
- double mean = mean();
-
- for (Double interval : arrivalIntervals_)
- {
- double v = interval - mean;
- sumOfDeviations += v * v;
- }
-
- return sumOfDeviations;
+ return arrivalIntervals_.sumOfDeviations();
}
synchronized double mean()
{
- return sum()/arrivalIntervals_.size();
+ return arrivalIntervals_.mean();
}
synchronized double variance()
- {
- return sumOfDeviations() / (arrivalIntervals_.size());
+ {
+ return arrivalIntervals_.variance();
}
- double deviation()
- {
- return Math.sqrt(variance());
+ double stdev()
+ {
+ return arrivalIntervals_.stdev();
}
void clear()
@@ -311,13 +293,9 @@
double p(double t)
{
- // Stat stat = new Stat();
- double mean = mean();
- double deviation = deviation();
- /* Exponential CDF = 1 -e^-lambda*x */
+ double mean = mean();
double exponent = (-1)*(t)/mean;
return 1 - ( 1 - Math.pow(Math.E, exponent) );
- // return stat.gaussianCDF(mean, deviation, t, Double.POSITIVE_INFINITY);
}
double phi(long tnow)
@@ -335,7 +313,7 @@
public String toString()
{
- return StringUtils.join(arrivalIntervals_, " ");
+ return StringUtils.join(arrivalIntervals_.iterator(), " ");
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java?rev=772448&r1=772447&r2=772448&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java Wed May 6 22:27:02 2009
@@ -39,18 +39,11 @@
public void run()
{
- try
- {
- String verb = message_.getVerb();
- IVerbHandler verbHandler = MessagingService.getMessagingInstance().getVerbHandler(verb);
- if ( verbHandler != null )
- {
- verbHandler.doVerb(message_);
- }
- }
- catch (Throwable th)
+ String verb = message_.getVerb();
+ IVerbHandler verbHandler = MessagingService.getMessagingInstance().getVerbHandler(verb);
+ if ( verbHandler != null )
{
- logger_.warn( LogUtil.throwableToString(th) );
+ verbHandler.doVerb(message_);
}
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=772448&r1=772447&r2=772448&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed May 6 22:27:02 2009
@@ -37,6 +37,7 @@
import org.apache.cassandra.net.IAsyncResult;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.TimedStatsDeque;
import org.apache.log4j.Logger;
import javax.management.MBeanServer;
@@ -48,12 +49,9 @@
private static Logger logger = Logger.getLogger(StorageProxy.class);
// mbean stuff
- private static volatile long readLatency;
- private static volatile int readOperations;
- private static volatile long rangeLatency;
- private static volatile int rangeOperations;
- private static volatile long writeLatency;
- private static volatile int writeOperations;
+ private static TimedStatsDeque readStats = new TimedStatsDeque(60000);
+ private static TimedStatsDeque rangeStats = new TimedStatsDeque(60000);
+ private static TimedStatsDeque writeStats = new TimedStatsDeque(60000);
private StorageProxy() {}
static
{
@@ -119,7 +117,7 @@
Map<EndPoint, EndPoint> endpointMap = StorageService.instance().getNStorageEndPointMap(rm.key());
// TODO: throw a thrift exception if we do not have N nodes
Map<EndPoint, Message> messageMap = createWriteMessages(rm, endpointMap);
- logger.debug("insert writing to [" + StringUtils.join(messageMap.keySet(), ", ") + "]");
+ logger.debug("insert writing key " + rm.key() + " to [" + StringUtils.join(messageMap.keySet(), ", ") + "]");
for (Map.Entry<EndPoint, Message> entry : messageMap.entrySet())
{
MessagingService.getMessagingInstance().sendOneWay(entry.getValue(), entry.getKey());
@@ -131,12 +129,7 @@
}
finally
{
- if (writeOperations++ == Integer.MAX_VALUE)
- {
- writeOperations = 1;
- writeLatency = 0;
- }
- writeLatency += System.currentTimeMillis() - startTime;
+ writeStats.add(System.currentTimeMillis() - startTime);
}
}
@@ -154,12 +147,11 @@
}
try
{
- IResponseResolver<Boolean> writeResponseResolver = new WriteResponseResolver();
QuorumResponseHandler<Boolean> quorumResponseHandler = new QuorumResponseHandler<Boolean>(
DatabaseDescriptor.getReplicationFactor(),
- writeResponseResolver);
+ new WriteResponseResolver());
EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(rm.key());
- logger.debug("insertBlocking writing to [" + StringUtils.join(endpoints, ", ") + "]");
+ logger.debug("insertBlocking writing key " + rm.key() + " to [" + StringUtils.join(endpoints, ", ") + "]");
// TODO: throw a thrift exception if we do not have N nodes
MessagingService.getMessagingInstance().sendRR(message, endpoints, quorumResponseHandler);
@@ -173,12 +165,7 @@
}
finally
{
- if (writeOperations++ == Integer.MAX_VALUE)
- {
- writeOperations = 1;
- writeLatency = 0;
- }
- writeLatency += System.currentTimeMillis() - startTime;
+ writeStats.add(System.currentTimeMillis() - startTime);
}
}
@@ -346,12 +333,7 @@
}
finally
{
- if (readOperations++ == Integer.MAX_VALUE)
- {
- readOperations = 1;
- readLatency = 0;
- }
- readLatency += System.currentTimeMillis() - startTime;
+ readStats.add(System.currentTimeMillis() - startTime);
}
}
@@ -385,12 +367,7 @@
row = strongRead(command);
}
- if (readOperations++ == Integer.MAX_VALUE)
- {
- readOperations = 1;
- readLatency = 0;
- }
- readLatency += System.currentTimeMillis() - startTime;
+ readStats.add(System.currentTimeMillis() - startTime);
return row;
}
@@ -702,42 +679,37 @@
}
finally
{
- if (rangeOperations++ == Integer.MAX_VALUE)
- {
- rangeOperations = 1;
- rangeLatency = 0;
- }
- rangeLatency += System.currentTimeMillis() - startTime;
+ rangeStats.add(System.currentTimeMillis() - startTime);
}
}
public double getReadLatency()
{
- return ((double)readLatency) / readOperations;
+ return readStats.mean();
}
public double getRangeLatency()
{
- return ((double)rangeLatency) / rangeOperations;
+ return rangeStats.mean();
}
public double getWriteLatency()
{
- return ((double)writeLatency) / writeOperations;
+ return writeStats.mean();
}
public int getReadOperations()
{
- return readOperations;
+ return readStats.size();
}
public int getRangeOperations()
{
- return rangeOperations;
+ return rangeStats.size();
}
public int getWriteOperations()
{
- return writeOperations;
+ return writeStats.size();
}
}
Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/AbstractStatsDeque.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/AbstractStatsDeque.java?rev=772448&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/AbstractStatsDeque.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/AbstractStatsDeque.java Wed May 6 22:27:02 2009
@@ -0,0 +1,55 @@
+package org.apache.cassandra.utils;
+
+import java.util.Iterator;
+import java.util.ArrayDeque;
+
+public abstract class AbstractStatsDeque implements Iterable<Double>
+{
+ public abstract Iterator<Double> iterator();
+ public abstract int size();
+ public abstract void add(double o);
+ public abstract void clear();
+
+ //
+ // statistical methods
+ //
+
+ public double sum()
+ {
+ double sum = 0d;
+ for (Double interval : this)
+ {
+ sum += interval;
+ }
+ return sum;
+ }
+
+ public double sumOfDeviations()
+ {
+ double sumOfDeviations = 0d;
+ double mean = mean();
+
+ for (Double interval : this)
+ {
+ double v = interval - mean;
+ sumOfDeviations += v * v;
+ }
+
+ return sumOfDeviations;
+ }
+
+ public double mean()
+ {
+ return sum() / size();
+ }
+
+ public double variance()
+ {
+ return sumOfDeviations() / size();
+ }
+
+ public double stdev()
+ {
+ return Math.sqrt(variance());
+ }
+}
Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/BoundedStatsDeque.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/BoundedStatsDeque.java?rev=772448&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/BoundedStatsDeque.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/BoundedStatsDeque.java Wed May 6 22:27:02 2009
@@ -0,0 +1,40 @@
+package org.apache.cassandra.utils;
+
+import java.util.ArrayDeque;
+import java.util.Iterator;
+
+public class BoundedStatsDeque extends AbstractStatsDeque
+{
+ private final int size;
+ protected final ArrayDeque<Double> deque;
+
+ public BoundedStatsDeque(int size)
+ {
+ this.size = size;
+ deque = new ArrayDeque<Double>(size);
+ }
+
+ public Iterator<Double> iterator()
+ {
+ return deque.iterator();
+ }
+
+ public int size()
+ {
+ return deque.size();
+ }
+
+ public void clear()
+ {
+ deque.clear();
+ }
+
+ public void add(double o)
+ {
+ if (size == deque.size())
+ {
+ deque.remove();
+ }
+ deque.add(o);
+ }
+}
Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/TimedStatsDeque.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/TimedStatsDeque.java?rev=772448&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/TimedStatsDeque.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/TimedStatsDeque.java Wed May 6 22:27:02 2009
@@ -0,0 +1,68 @@
+package org.apache.cassandra.utils;
+
+import java.util.Iterator;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TimedStatsDeque extends AbstractStatsDeque
+{
+ private final ArrayDeque<Tuple> deque;
+ private final long period;
+
+ public TimedStatsDeque(long period)
+ {
+ this.period = period;
+ deque = new ArrayDeque<Tuple>();
+ }
+
+ private void purge()
+ {
+ long now = System.currentTimeMillis();
+ while (!deque.isEmpty() && deque.peek().timestamp < now - period)
+ {
+ deque.remove();
+ }
+ }
+
+ public Iterator<Double> iterator()
+ {
+ purge();
+ // I expect this method to be called relatively infrequently so inefficiency is ok.
+ List<Double> L = new ArrayList<Double>(deque.size());
+ for (Tuple t : deque)
+ {
+ L.add(t.value);
+ }
+ return L.iterator();
+ }
+
+ public int size()
+ {
+ purge();
+ return deque.size();
+ }
+
+ public void add(double o)
+ {
+ purge();
+ deque.add(new Tuple(o, System.currentTimeMillis()));
+ }
+
+ public void clear()
+ {
+ deque.clear();
+ }
+}
+
+class Tuple
+{
+ public final double value;
+ public final long timestamp;
+
+ public Tuple(double value, long timestamp)
+ {
+ this.value = value;
+ this.timestamp = timestamp;
+ }
+}
\ No newline at end of file