You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/05/04 21:26:23 UTC
svn commit: r771412 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra: db/ gms/ net/
service/
Author: jbellis
Date: Mon May 4 19:26:23 2009
New Revision: 771412
URL: http://svn.apache.org/viewvc?rev=771412&view=rev
Log:
clean up logging. move some of it into MBeans. patch by jbellis; reviewed by Eric Evans for CASSANDRA-130
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxyMBean.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=771412&r1=771411&r2=771412&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon May 4 19:26:23 2009
@@ -1423,6 +1423,11 @@
return memtableSwitchCount;
}
+ public int getMemtableTasks()
+ {
+ return memtable_.get().getPendingTasks();
+ }
+
/**
* clears out all data associated with this ColumnFamily.
* For use in testing.
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java?rev=771412&r1=771411&r2=771412&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java Mon May 4 19:26:23 2009
@@ -42,10 +42,15 @@
public int getMemtableColumnsCount();
/**
- * Returns the number of times that a flush has resulted in the
+ * Returns the number of times that a flush has resulted in the
* memtable being switched out.
- *
+ *
* @return the number of memtable switches
*/
public int getMemtableSwitchCount();
+
+ /**
+ * @return the number of tasks waiting to run on the memtable executor
+ */
+ public int getMemtableTasks();
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=771412&r1=771411&r2=771412&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Mon May 4 19:26:23 2009
@@ -193,10 +193,9 @@
return cfName_;
}
- void printExecutorStats()
+ int getPendingTasks()
{
- long taskCount = (executor_.getTaskCount() - executor_.getCompletedTaskCount());
- logger_.debug("MEMTABLE TASKS : " + taskCount);
+ return (int)(executor_.getTaskCount() - executor_.getCompletedTaskCount());
}
/*
@@ -232,7 +231,6 @@
}
else
{
- printExecutorStats();
Runnable putter = new Putter(key, columnFamily);
executor_.submit(putter);
}
@@ -347,22 +345,21 @@
ColumnFamily get(String key, String cfName, IFilter filter)
{
- printExecutorStats();
Callable<ColumnFamily> call = new Getter(key, cfName, filter);
ColumnFamily cf = null;
- try
- {
- cf = executor_.submit(call).get();
- }
- catch ( ExecutionException ex )
- {
- logger_.debug(LogUtil.throwableToString(ex));
- }
- catch ( InterruptedException ex2 )
- {
- logger_.debug(LogUtil.throwableToString(ex2));
- }
- return cf;
+ try
+ {
+ cf = executor_.submit(call).get();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException(e);
+ }
+ return cf;
}
void flush(CommitLog.CommitLogContext cLogCtx) throws IOException
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java?rev=771412&r1=771411&r2=771412&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java Mon May 4 19:26:23 2009
@@ -157,6 +157,7 @@
public void report(EndPoint ep)
{
+ logger_.debug("reporting as failed " + ep);
long now = System.currentTimeMillis();
ArrivalWindow hbWnd = arrivalSamples_.get(ep);
if ( hbWnd == null )
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=771412&r1=771411&r2=771412&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Mon May 4 19:26:23 2009
@@ -563,7 +563,6 @@
int remoteGeneration = gDigest.generation_;
if ( remoteGeneration > localGeneration )
{
- logger_.debug("Reporting " + gDigest.endPoint_ + " to the FD.");
fd.report(gDigest.endPoint_);
continue;
}
@@ -575,7 +574,6 @@
int remoteVersion = gDigest.maxVersion_;
if ( remoteVersion > localVersion )
{
- logger_.debug("Reporting " + gDigest.endPoint_ + " to the FD.");
fd.report(gDigest.endPoint_);
}
}
@@ -601,7 +599,6 @@
int remoteGeneration = remoteEndPointState.getHeartBeatState().generation_;
if ( remoteGeneration > localGeneration )
{
- logger_.debug("Reporting " + endpoint + " to the FD.");
fd.report(endpoint);
continue;
}
@@ -613,7 +610,6 @@
int remoteVersion = remoteEndPointState.getHeartBeatState().getHeartBeatVersion();
if ( remoteVersion > localVersion )
{
- logger_.debug("Reporting " + endpoint + " to the FD.");
fd.report(endpoint);
}
}
@@ -952,17 +948,18 @@
byte[] bytes = message.getMessageBody();
DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+ JoinMessage joinMessage = null;
try
{
- JoinMessage joinMessage = JoinMessage.serializer().deserialize(dis);
- if ( joinMessage.clusterId_.equals( DatabaseDescriptor.getClusterName() ) )
- {
- Gossiper.instance().join(from);
- }
+ joinMessage = JoinMessage.serializer().deserialize(dis);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
}
- catch ( IOException ex )
+ if ( joinMessage.clusterId_.equals( DatabaseDescriptor.getClusterName() ) )
{
- logger_.info( LogUtil.throwableToString(ex) );
+ Gossiper.instance().join(from);
}
}
}
@@ -1003,7 +1000,7 @@
}
catch (IOException e)
{
- logger_.info( LogUtil.throwableToString(e) );
+ throw new RuntimeException(e);
}
}
@@ -1093,7 +1090,7 @@
}
catch ( IOException e )
{
- logger_.info( LogUtil.throwableToString(e) );
+ throw new RuntimeException(e);
}
}
}
@@ -1109,18 +1106,19 @@
byte[] bytes = message.getMessageBody();
DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+ GossipDigestAck2Message gDigestAck2Message = null;
try
{
- GossipDigestAck2Message gDigestAck2Message = GossipDigestAck2Message.serializer().deserialize(dis);
- Map<EndPoint, EndPointState> remoteEpStateMap = gDigestAck2Message.getEndPointStateMap();
- /* Notify the Failure Detector */
- Gossiper.instance().notifyFailureDetector(remoteEpStateMap);
- Gossiper.instance().applyStateLocally(remoteEpStateMap);
+ gDigestAck2Message = GossipDigestAck2Message.serializer().deserialize(dis);
}
- catch ( IOException e )
+ catch (IOException e)
{
- logger_.info( LogUtil.throwableToString(e) );
+ throw new RuntimeException(e);
}
+ Map<EndPoint, EndPointState> remoteEpStateMap = gDigestAck2Message.getEndPointStateMap();
+ /* Notify the Failure Detector */
+ Gossiper.instance().notifyFailureDetector(remoteEpStateMap);
+ Gossiper.instance().applyStateLocally(remoteEpStateMap);
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=771412&r1=771411&r2=771412&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Mon May 4 19:26:23 2009
@@ -586,7 +586,6 @@
if ( stage != null )
{
- logger_.info("Running on stage " + stage.getName());
stage.execute(runnable);
}
else
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java?rev=771412&r1=771411&r2=771412&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java Mon May 4 19:26:23 2009
@@ -28,7 +28,6 @@
import java.util.Map;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
@@ -49,13 +48,7 @@
import org.apache.cassandra.db.ColumnFamilyNotDefinedException;
import org.apache.cassandra.db.TableNotDefinedException;
import org.apache.cassandra.db.RangeCommand;
-import org.apache.cassandra.db.RangeReply;
import org.apache.cassandra.utils.LogUtil;
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.IAsyncResult;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.Message;
import org.apache.cassandra.dht.OrderPreservingPartitioner;
import org.apache.thrift.TException;
@@ -65,7 +58,7 @@
public class CassandraServer implements Cassandra.Iface
{
- private static Logger logger_ = Logger.getLogger(CassandraServer.class);
+ private static Logger logger = Logger.getLogger(CassandraServer.class);
private final static List<column_t> EMPTY_COLUMNS = Arrays.asList();
private final static List<superColumn_t> EMPTY_SUPERCOLUMNS = Arrays.asList();
@@ -76,7 +69,7 @@
*/
protected StorageService storageService;
- public CassandraServer()
+ public CassandraServer()
{
storageService = StorageService.instance();
}
@@ -162,89 +155,67 @@
public List<column_t> get_columns_since(String tablename, String key, String columnFamily_column, long timeStamp) throws InvalidRequestException
{
- long startTime = System.currentTimeMillis();
- try
+ logger.debug("get_columns_since");
+ ColumnFamily cfamily = readColumnFamily(new ColumnsSinceReadCommand(tablename, key, columnFamily_column, timeStamp));
+ String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
+ if (cfamily == null)
{
- ColumnFamily cfamily = readColumnFamily(new ColumnsSinceReadCommand(tablename, key, columnFamily_column, timeStamp));
- String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
- if (cfamily == null)
- {
- return EMPTY_COLUMNS;
- }
- Collection<IColumn> columns = null;
- if( values.length > 1 )
- {
- // this is the super column case
- IColumn column = cfamily.getColumn(values[1]);
- if(column != null)
- columns = column.getSubColumns();
- }
- else
- {
- columns = cfamily.getAllColumns();
- }
- return thriftifyColumns(columns);
+ return EMPTY_COLUMNS;
}
- finally
+ Collection<IColumn> columns = null;
+ if( values.length > 1 )
{
- logger_.debug("get_slice2: " + (System.currentTimeMillis() - startTime) + " ms.");
+ // this is the super column case
+ IColumn column = cfamily.getColumn(values[1]);
+ if(column != null)
+ columns = column.getSubColumns();
}
+ else
+ {
+ columns = cfamily.getAllColumns();
+ }
+ return thriftifyColumns(columns);
}
public List<column_t> get_slice_by_names(String tablename, String key, String columnFamily, List<String> columnNames) throws InvalidRequestException
{
- long startTime = System.currentTimeMillis();
- try
- {
- ColumnFamily cfamily = readColumnFamily(new SliceByNamesReadCommand(tablename, key, columnFamily, columnNames));
- if (cfamily == null)
- {
- return EMPTY_COLUMNS;
- }
- Collection<IColumn> columns = null;
- columns = cfamily.getAllColumns();
- return thriftifyColumns(columns);
- }
- finally
+ logger.debug("get_slice_by_names");
+ ColumnFamily cfamily = readColumnFamily(new SliceByNamesReadCommand(tablename, key, columnFamily, columnNames));
+ if (cfamily == null)
{
- logger_.debug("get_slice2: " + (System.currentTimeMillis() - startTime) + " ms.");
+ return EMPTY_COLUMNS;
}
+ return thriftifyColumns(cfamily.getAllColumns());
}
public List<column_t> get_slice(String tablename, String key, String columnFamily_column, int start, int count) throws InvalidRequestException
{
- long startTime = System.currentTimeMillis();
- try
- {
- String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
- ColumnFamily cfamily = readColumnFamily(new SliceReadCommand(tablename, key, columnFamily_column, start, count));
- if (cfamily == null)
- {
- return EMPTY_COLUMNS;
- }
- Collection<IColumn> columns = null;
- if( values.length > 1 )
- {
- // this is the super column case
- IColumn column = cfamily.getColumn(values[1]);
- if(column != null)
- columns = column.getSubColumns();
- }
- else
- {
- columns = cfamily.getAllColumns();
- }
- return thriftifyColumns(columns);
- }
- finally
+ logger.debug("get_slice");
+ String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
+ ColumnFamily cfamily = readColumnFamily(new SliceReadCommand(tablename, key, columnFamily_column, start, count));
+ if (cfamily == null)
{
- logger_.debug("get_slice2: " + (System.currentTimeMillis() - startTime) + " ms.");
+ return EMPTY_COLUMNS;
}
+ Collection<IColumn> columns = null;
+ if( values.length > 1 )
+ {
+ // this is the super column case
+ IColumn column = cfamily.getColumn(values[1]);
+ if(column != null)
+ columns = column.getSubColumns();
+ }
+ else
+ {
+ columns = cfamily.getAllColumns();
+ }
+ return thriftifyColumns(columns);
}
public column_t get_column(String tablename, String key, String columnFamily_column) throws NotFoundException, InvalidRequestException
{
+ logger.debug("get_column");
String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
if (values.length < 2)
{
@@ -285,6 +256,7 @@
public int get_column_count(String tablename, String key, String columnFamily_column) throws InvalidRequestException
{
+ logger.debug("get_column_count");
String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
ColumnFamily cfamily = readColumnFamily(new SliceReadCommand(tablename, key, columnFamily_column, -1, Integer.MAX_VALUE));
if (cfamily == null)
@@ -312,6 +284,7 @@
public void insert(String tablename, String key, String columnFamily_column, byte[] cellData, long timestamp)
{
+ logger.debug("insert");
RowMutation rm = new RowMutation(tablename, key.trim());
rm.add(columnFamily_column, cellData, timestamp);
try
@@ -327,6 +300,7 @@
public boolean insert_blocking(String tablename, String key, String columnFamily_column, byte[] cellData, long timestamp) throws InvalidRequestException
{
+ logger.debug("insert_blocking");
RowMutation rm = new RowMutation(tablename, key.trim());
rm.add(columnFamily_column, cellData, timestamp);
validateCommand(rm.key(), rm.table(), rm.columnFamilyNames().toArray(new String[0]));
@@ -335,7 +309,7 @@
public boolean batch_insert_blocking(batch_mutation_t batchMutation) throws InvalidRequestException
{
- logger_.debug("batch_insert_blocking");
+ logger.debug("batch_insert_blocking");
RowMutation rm = RowMutation.getRowMutation(batchMutation);
validateCommand(rm.key(), rm.table(), rm.columnFamilyNames().toArray(new String[0]));
return StorageProxy.insertBlocking(rm);
@@ -343,7 +317,7 @@
public void batch_insert(batch_mutation_t batchMutation)
{
- logger_.debug("batch_insert");
+ logger.debug("batch_insert");
RowMutation rm = RowMutation.getRowMutation(batchMutation);
try
{
@@ -359,7 +333,7 @@
public boolean remove(String tablename, String key, String columnFamily_column, long timestamp, boolean block) throws InvalidRequestException
{
- logger_.debug("remove");
+ logger.debug("remove");
RowMutation rm = new RowMutation(tablename, key.trim());
rm.delete(columnFamily_column, timestamp);
validateCommand(rm.key(), rm.table(), rm.columnFamilyNames().toArray(new String[0]));
@@ -373,22 +347,13 @@
public List<superColumn_t> get_slice_super_by_names(String tablename, String key, String columnFamily, List<String> superColumnNames) throws InvalidRequestException
{
- long startTime = System.currentTimeMillis();
- try
- {
- ColumnFamily cfamily = readColumnFamily(new SliceByNamesReadCommand(tablename, key, columnFamily, superColumnNames));
- if (cfamily == null)
- {
- return EMPTY_SUPERCOLUMNS;
- }
- Collection<IColumn> columns = null;
- columns = cfamily.getAllColumns();
- return thriftifySuperColumns(columns);
- }
- finally
+ logger.debug("get_slice_super_by_names");
+ ColumnFamily cfamily = readColumnFamily(new SliceByNamesReadCommand(tablename, key, columnFamily, superColumnNames));
+ if (cfamily == null)
{
- logger_.debug("get_slice2: " + (System.currentTimeMillis() - startTime) + " ms.");
+ return EMPTY_SUPERCOLUMNS;
}
+ return thriftifySuperColumns(cfamily.getAllColumns());
}
private List<superColumn_t> thriftifySuperColumns(Collection<IColumn> columns)
@@ -414,6 +379,7 @@
public List<superColumn_t> get_slice_super(String tablename, String key, String columnFamily_superColumnName, int start, int count) throws InvalidRequestException
{
+ logger.debug("get_slice_super");
ColumnFamily cfamily = readColumnFamily(new SliceReadCommand(tablename, key, columnFamily_superColumnName, start, count));
if (cfamily == null)
{
@@ -425,6 +391,7 @@
public superColumn_t get_superColumn(String tablename, String key, String columnFamily_column) throws InvalidRequestException, NotFoundException
{
+ logger.debug("get_superColumn");
ColumnFamily cfamily = readColumnFamily(new ColumnReadCommand(tablename, key, columnFamily_column));
if (cfamily == null)
{
@@ -448,7 +415,7 @@
public boolean batch_insert_superColumn_blocking(batch_mutation_super_t batchMutationSuper) throws InvalidRequestException
{
- logger_.debug("batch_insert_SuperColumn_blocking");
+ logger.debug("batch_insert_SuperColumn_blocking");
RowMutation rm = RowMutation.getRowMutation(batchMutationSuper);
validateCommand(rm.key(), rm.table(), rm.columnFamilyNames().toArray(new String[0]));
return StorageProxy.insertBlocking(rm);
@@ -456,7 +423,7 @@
public void batch_insert_superColumn(batch_mutation_super_t batchMutationSuper)
{
- logger_.debug("batch_insert_SuperColumn");
+ logger.debug("batch_insert_SuperColumn");
RowMutation rm = RowMutation.getRowMutation(batchMutationSuper);
try
{
@@ -557,29 +524,13 @@
public List<String> get_key_range(String tablename, String startWith, String stopAt, int maxResults) throws InvalidRequestException
{
- logger_.debug("get_range");
-
+ logger.debug("get_key_range");
if (!(StorageService.getPartitioner() instanceof OrderPreservingPartitioner))
{
throw new InvalidRequestException("range queries may only be performed against an order-preserving partitioner");
}
- try
- {
- Message message = new RangeCommand(tablename, startWith, stopAt, maxResults).getMessage();
- EndPoint endPoint = StorageService.instance().findSuitableEndPoint(startWith);
- IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(message, endPoint);
-
- // read response
- // TODO send more requests if we need to span multiple nodes
- // double the usual timeout since range requests are expensive
- byte[] responseBody = iar.get(2 * DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
- return RangeReply.read(responseBody).keys;
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ return StorageProxy.getRange(new RangeCommand(tablename, startWith, stopAt, maxResults));
}
/*
@@ -588,17 +539,11 @@
* the SSTable index bucket it falls in, are in
* buffer cache.
*/
- public void touch (String key , boolean fData)
+ public void touch (String key, boolean fData)
{
- try
- {
- StorageProxy.touchProtocol(DatabaseDescriptor.getTables().get(0), key, fData, StorageService.ConsistencyLevel.WEAK);
- }
- catch ( Exception e)
- {
- logger_.info( LogUtil.throwableToString(e) );
- }
+ logger.debug("touch");
+ StorageProxy.touchProtocol(DatabaseDescriptor.getTables().get(0), key, fData, StorageService.ConsistencyLevel.WEAK);
}
-
+
// main method moved to CassandraDaemon
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=771412&r1=771411&r2=771412&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Mon May 4 19:26:23 2009
@@ -26,29 +26,48 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.lang.management.ManagementFactory;
import org.apache.commons.lang.StringUtils;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ReadCommand;
-import org.apache.cassandra.db.ReadResponse;
-import org.apache.cassandra.db.Row;
-import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.db.Table;
-import org.apache.cassandra.db.TouchMessage;
+import org.apache.cassandra.db.*;
import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.net.IAsyncResult;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.LogUtil;
import org.apache.log4j.Logger;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
-public class StorageProxy
+
+public class StorageProxy implements StorageProxyMBean
{
- private static Logger logger_ = Logger.getLogger(StorageProxy.class);
-
+ private static Logger logger = Logger.getLogger(StorageProxy.class);
+
+ // mbean stuff
+ private static volatile long readLatency;
+ private static volatile int readOperations;
+ private static volatile long rangeLatency;
+ private static volatile int rangeOperations;
+ private static volatile long writeLatency;
+ private static volatile int writeOperations;
+ private StorageProxy() {}
+ static
+ {
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ try
+ {
+ mbs.registerMBean(new StorageProxy(), new ObjectName("org.apache.cassandra.service:type=StorageProxy"));
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
/**
* This method is responsible for creating Message to be
* sent over the wire to N replicas where some of the replicas
@@ -67,7 +86,7 @@
{
Message hintedMessage = rm.makeRowMutationMessage();
hintedMessage.addHeader(RowMutation.HINT, EndPoint.toBytes(hint) );
- logger_.debug("Sending the hint of " + target.getHost() + " to " + hint.getHost());
+ logger.debug("Sending the hint of " + target.getHost() + " to " + hint.getHost());
messageMap.put(target, hintedMessage);
}
else
@@ -87,8 +106,6 @@
*/
public static void insert(RowMutation rm)
{
- // TODO check for valid Table, ColumnFamily
-
/*
* Get the N nodes from storage service where the data needs to be
* replicated
@@ -96,26 +113,36 @@
* Send them asynchronously to the replicas.
*/
+ long startTime = System.currentTimeMillis();
try
{
Map<EndPoint, EndPoint> endpointMap = StorageService.instance().getNStorageEndPointMap(rm.key());
// TODO: throw a thrift exception if we do not have N nodes
Map<EndPoint, Message> messageMap = createWriteMessages(rm, endpointMap);
- logger_.debug("insert writing to [" + StringUtils.join(messageMap.keySet(), ", ") + "]");
+ logger.debug("insert writing to [" + StringUtils.join(messageMap.keySet(), ", ") + "]");
for (Map.Entry<EndPoint, Message> entry : messageMap.entrySet())
{
MessagingService.getMessagingInstance().sendOneWay(entry.getValue(), entry.getKey());
}
}
- catch (Exception e)
+ catch (IOException e)
{
- logger_.error( LogUtil.throwableToString(e) );
+ throw new RuntimeException(e);
+ }
+ finally
+ {
+ if (writeOperations++ == Integer.MAX_VALUE)
+ {
+ writeOperations = 1;
+ writeLatency = 0;
+ }
+ writeLatency += System.currentTimeMillis() - startTime;
}
- return;
}
public static boolean insertBlocking(RowMutation rm)
{
+ long startTime = System.currentTimeMillis();
try
{
Message message = rm.makeRowMutationMessage();
@@ -125,7 +152,7 @@
DatabaseDescriptor.getReplicationFactor(),
writeResponseResolver);
EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(rm.key());
- logger_.debug("insertBlocking writing to [" + StringUtils.join(endpoints, ", ") + "]");
+ logger.debug("insertBlocking writing to [" + StringUtils.join(endpoints, ", ") + "]");
// TODO: throw a thrift exception if we do not have N nodes
MessagingService.getMessagingInstance().sendRR(message, endpoints, quorumResponseHandler);
@@ -137,8 +164,16 @@
}
catch (Exception e)
{
- logger_.error( LogUtil.throwableToString(e) );
- return false;
+ throw new RuntimeException(e);
+ }
+ finally
+ {
+ if (writeOperations++ == Integer.MAX_VALUE)
+ {
+ writeOperations = 1;
+ writeLatency = 0;
+ }
+ writeLatency += System.currentTimeMillis() - startTime;
}
}
@@ -213,7 +248,7 @@
{
EndPoint endPoint = StorageService.instance().findSuitableEndPoint(command.key);
assert endPoint != null;
- logger_.debug("weakreadremote reading " + command + " from " + endPoint);
+ logger.debug("weakreadremote reading " + command + " from " + endPoint);
Message message = command.makeReadMessage();
message.addHeader(ReadCommand.DO_REPAIR, ReadCommand.DO_REPAIR.getBytes());
IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(message, endPoint);
@@ -239,7 +274,7 @@
table.touch(key, fData);
}
- static void weakTouchProtocol(String tablename, String key, boolean fData) throws Exception
+ static void weakTouchProtocol(String tablename, String key, boolean fData) throws IOException
{
EndPoint endPoint = null;
try
@@ -262,10 +297,9 @@
Message message = TouchMessage.makeTouchMessage(touchMessage);
MessagingService.getMessagingInstance().sendOneWay(message, endPoint);
}
- return ;
}
- static void strongTouchProtocol(String tablename, String key, boolean fData) throws Exception
+ static void strongTouchProtocol(String tablename, String key, boolean fData) throws IOException
{
Map<EndPoint, EndPoint> endpointMap = StorageService.instance().getNStorageEndPointMap(key);
Set<EndPoint> endpoints = endpointMap.keySet();
@@ -281,23 +315,40 @@
/*
* Only touch data on the most suitable end point.
*/
- public static void touchProtocol(String tablename, String key, boolean fData, StorageService.ConsistencyLevel consistencyLevel) throws Exception
+ public static void touchProtocol(String tablename, String key, boolean fData, StorageService.ConsistencyLevel consistencyLevel)
{
- switch ( consistencyLevel )
+ long startTime = System.currentTimeMillis();
+ try
+ {
+ switch ( consistencyLevel )
+ {
+ case WEAK:
+ weakTouchProtocol(tablename, key, fData);
+ break;
+
+ case STRONG:
+ strongTouchProtocol(tablename, key, fData);
+ break;
+
+ default:
+ weakTouchProtocol(tablename, key, fData);
+ break;
+ }
+ }
+ catch (IOException ex)
+ {
+ throw new RuntimeException(ex);
+ }
+ finally
{
- case WEAK:
- weakTouchProtocol(tablename, key, fData);
- break;
-
- case STRONG:
- strongTouchProtocol(tablename, key, fData);
- break;
-
- default:
- weakTouchProtocol(tablename, key, fData);
- break;
+ if (readOperations++ == Integer.MAX_VALUE)
+ {
+ readOperations = 1;
+ readLatency = 0;
+ }
+ readLatency += System.currentTimeMillis() - startTime;
}
- }
+ }
/**
* Performs the actual reading of a row out of the StorageService, fetching
@@ -307,7 +358,8 @@
throws IOException, TimeoutException
{
long startTime = System.currentTimeMillis();
- Row row = null;
+
+ Row row;
EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(command.key);
if (consistencyLevel == StorageService.ConsistencyLevel.WEAK)
@@ -328,7 +380,13 @@
row = strongRead(command);
}
- logger_.debug("Finished reading " + row + " in " + (System.currentTimeMillis() - startTime) + " ms.");
+ if (readOperations++ == Integer.MAX_VALUE)
+ {
+ readOperations = 1;
+ readLatency = 0;
+ }
+ readLatency += System.currentTimeMillis() - startTime;
+
return row;
}
@@ -365,8 +423,7 @@
*/
public static Map<String, Row> strongReadProtocol(String[] keys, ReadCommand readCommand) throws IOException, TimeoutException
{
- Map<String, Row> rows = new HashMap<String, Row>();
- long startTime = System.currentTimeMillis();
+ Map<String, Row> rows;
// TODO: throw a thrift exception if we do not have N nodes
Map<String, ReadCommand[]> readMessages = new HashMap<String, ReadCommand[]>();
for (String key : keys )
@@ -378,7 +435,6 @@
readMessages.put(key, readParameters);
}
rows = doStrongReadProtocol(readMessages);
- logger_.debug("readProtocol: " + (System.currentTimeMillis() - startTime) + " ms.");
return rows;
}
@@ -428,7 +484,7 @@
endPoints[i] = endpointList.get(i - 1);
messages[i] = messageDigestOnly;
}
- logger_.debug("strongread reading " + command + " from " + StringUtils.join(endPoints, ", "));
+ logger.debug("strongread reading " + command + " from " + StringUtils.join(endPoints, ", "));
try
{
@@ -436,7 +492,7 @@
long startTime2 = System.currentTimeMillis();
row = quorumResponseHandler.get();
- logger_.debug("quorumResponseHandler: " + (System.currentTimeMillis() - startTime2) + " ms.");
+ logger.debug("quorumResponseHandler: " + (System.currentTimeMillis() - startTime2) + " ms.");
}
catch (DigestMismatchException ex)
{
@@ -446,7 +502,7 @@
QuorumResponseHandler<Row> quorumResponseHandlerRepair = new QuorumResponseHandler<Row>(
DatabaseDescriptor.getReplicationFactor(),
readResponseResolverRepair);
- logger_.info("DigestMismatchException: " + command.key);
+ logger.info("DigestMismatchException: " + command.key);
Message messageRepair = command.makeReadMessage();
MessagingService.getMessagingInstance().sendRR(messageRepair, endPoints,
quorumResponseHandlerRepair);
@@ -552,10 +608,9 @@
rows.put(row.key(), row);
}
}
- catch ( TimeoutException ex )
+ catch (TimeoutException e)
{
- logger_.info("Operation timed out waiting for responses ...");
- logger_.info(LogUtil.throwableToString(ex));
+ throw new RuntimeException(e);
}
return rows;
}
@@ -574,7 +629,6 @@
public static Map<String, Row> weakReadProtocol(String[] keys, ReadCommand readCommand) throws Exception
{
Row row = null;
- long startTime = System.currentTimeMillis();
Map<String, ReadCommand> readMessages = new HashMap<String, ReadCommand>();
for ( String key : keys )
{
@@ -606,7 +660,7 @@
*/
private static Row weakReadLocal(ReadCommand command) throws IOException
{
- logger_.debug("weakreadlocal for " + command);
+ logger.debug("weakreadlocal reading " + command);
List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(command.key);
/* Remove the local storage endpoint from the list. */
endpoints.remove(StorageService.getLocalStorageEndPoint());
@@ -623,4 +677,62 @@
StorageService.instance().doConsistencyCheck(row, endpoints, command);
return row;
}
+
+ static List<String> getRange(RangeCommand command)
+ {
+ long startTime = System.currentTimeMillis();
+ try
+ {
+ EndPoint endPoint = StorageService.instance().findSuitableEndPoint(command.startWith);
+ IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(command.getMessage(), endPoint);
+
+ // read response
+ // TODO send more requests if we need to span multiple nodes
+ byte[] responseBody = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
+ return RangeReply.read(responseBody).keys;
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ finally
+ {
+ if (rangeOperations++ == Integer.MAX_VALUE)
+ {
+ rangeOperations = 1;
+ rangeLatency = 0;
+ }
+ rangeLatency += System.currentTimeMillis() - startTime;
+ }
+ }
+
+ public double getReadLatency()
+ {
+ return ((double)readLatency) / readOperations;
+ }
+
+ public double getRangeLatency()
+ {
+ return ((double)rangeLatency) / rangeOperations;
+ }
+
+ public double getWriteLatency()
+ {
+ return ((double)writeLatency) / writeOperations;
+ }
+
+ public int getReadOperations()
+ {
+ return readOperations;
+ }
+
+ public int getRangeOperations()
+ {
+ return rangeOperations;
+ }
+
+ public int getWriteOperations()
+ {
+ return writeOperations;
+ }
}
Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxyMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxyMBean.java?rev=771412&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxyMBean.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxyMBean.java Mon May 4 19:26:23 2009
@@ -0,0 +1,11 @@
+package org.apache.cassandra.service;
+
+public interface StorageProxyMBean
+{
+ public double getReadLatency();
+ public int getReadOperations();
+ public double getRangeLatency();
+ public int getRangeOperations();
+ public double getWriteLatency();
+ public int getWriteOperations();
+}