You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/05/04 21:26:23 UTC

svn commit: r771412 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: db/ gms/ net/ service/

Author: jbellis
Date: Mon May  4 19:26:23 2009
New Revision: 771412

URL: http://svn.apache.org/viewvc?rev=771412&view=rev
Log:
clean up logging.  move some of it into MBeans.  patch by jbellis; reviewed by Eric Evans for CASSANDRA-130

Added:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxyMBean.java
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=771412&r1=771411&r2=771412&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon May  4 19:26:23 2009
@@ -1423,6 +1423,11 @@
         return memtableSwitchCount;
     }
 
+    public int getMemtableTasks()
+    {
+        return memtable_.get().getPendingTasks();
+    }
+
     /**
      * clears out all data associated with this ColumnFamily.
      * For use in testing.

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java?rev=771412&r1=771411&r2=771412&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java Mon May  4 19:26:23 2009
@@ -42,10 +42,15 @@
     public int getMemtableColumnsCount();
     
     /**
-     * Returns the number of times that a flush has resulted in the 
+     * Returns the number of times that a flush has resulted in the
      * memtable being switched out.
-     * 
+     *
      * @return the number of memtable switches
      */
     public int getMemtableSwitchCount();
+
+    /**
+     * @return the number of tasks waiting to run on the memtable executor
+     */
+    public int getMemtableTasks();
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=771412&r1=771411&r2=771412&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Mon May  4 19:26:23 2009
@@ -193,10 +193,9 @@
     	return cfName_;
     }
 
-    void printExecutorStats()
+    int getPendingTasks()
     {
-    	long taskCount = (executor_.getTaskCount() - executor_.getCompletedTaskCount());
-    	logger_.debug("MEMTABLE TASKS : " + taskCount);
+    	return (int)(executor_.getTaskCount() - executor_.getCompletedTaskCount());
     }
 
     /*
@@ -232,7 +231,6 @@
         }
         else
         {
-        	printExecutorStats();
         	Runnable putter = new Putter(key, columnFamily);
         	executor_.submit(putter);
         }
@@ -347,22 +345,21 @@
 
     ColumnFamily get(String key, String cfName, IFilter filter)
     {
-    	printExecutorStats();
     	Callable<ColumnFamily> call = new Getter(key, cfName, filter);
     	ColumnFamily cf = null;
-    	try
-    	{
-    		cf = executor_.submit(call).get();
-    	}
-    	catch ( ExecutionException ex )
-    	{
-    		logger_.debug(LogUtil.throwableToString(ex));
-    	}
-    	catch ( InterruptedException ex2 )
-    	{
-    		logger_.debug(LogUtil.throwableToString(ex2));
-    	}
-    	return cf;
+        try
+        {
+            cf = executor_.submit(call).get();
+        }
+        catch (InterruptedException e)
+        {
+            throw new RuntimeException(e);
+        }
+        catch (ExecutionException e)
+        {
+            throw new RuntimeException(e);
+        }
+        return cf;
     }
 
     void flush(CommitLog.CommitLogContext cLogCtx) throws IOException

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java?rev=771412&r1=771411&r2=771412&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java Mon May  4 19:26:23 2009
@@ -157,6 +157,7 @@
     
     public void report(EndPoint ep)
     {
+        logger_.debug("reporting as failed " + ep);
         long now = System.currentTimeMillis();
         ArrivalWindow hbWnd = arrivalSamples_.get(ep);
         if ( hbWnd == null )

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=771412&r1=771411&r2=771412&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Mon May  4 19:26:23 2009
@@ -563,7 +563,6 @@
                 int remoteGeneration = gDigest.generation_;
                 if ( remoteGeneration > localGeneration )
                 {
-                    logger_.debug("Reporting " + gDigest.endPoint_ + " to the FD.");
                     fd.report(gDigest.endPoint_);
                     continue;
                 }
@@ -575,7 +574,6 @@
                     int remoteVersion = gDigest.maxVersion_;
                     if ( remoteVersion > localVersion )
                     {
-                        logger_.debug("Reporting " + gDigest.endPoint_ + " to the FD.");
                         fd.report(gDigest.endPoint_);
                     }
                 }
@@ -601,7 +599,6 @@
                 int remoteGeneration = remoteEndPointState.getHeartBeatState().generation_;
                 if ( remoteGeneration > localGeneration )
                 {
-                    logger_.debug("Reporting " + endpoint + " to the FD.");
                     fd.report(endpoint);
                     continue;
                 }
@@ -613,7 +610,6 @@
                     int remoteVersion = remoteEndPointState.getHeartBeatState().getHeartBeatVersion();
                     if ( remoteVersion > localVersion )
                     {
-                        logger_.debug("Reporting " + endpoint + " to the FD.");
                         fd.report(endpoint);
                     }
                 }
@@ -952,17 +948,18 @@
         byte[] bytes = message.getMessageBody();
         DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
 
+        JoinMessage joinMessage = null;
         try
         {
-            JoinMessage joinMessage = JoinMessage.serializer().deserialize(dis);
-            if ( joinMessage.clusterId_.equals( DatabaseDescriptor.getClusterName() ) )
-            {
-                Gossiper.instance().join(from);
-            }
+            joinMessage = JoinMessage.serializer().deserialize(dis);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
         }
-        catch ( IOException ex )
+        if ( joinMessage.clusterId_.equals( DatabaseDescriptor.getClusterName() ) )
         {
-            logger_.info( LogUtil.throwableToString(ex) );
+            Gossiper.instance().join(from);
         }
     }
 }
@@ -1003,7 +1000,7 @@
         }
         catch (IOException e)
         {
-            logger_.info( LogUtil.throwableToString(e) );
+            throw new RuntimeException(e);
         }
     }
 
@@ -1093,7 +1090,7 @@
         }
         catch ( IOException e )
         {
-            logger_.info( LogUtil.throwableToString(e) );
+            throw new RuntimeException(e);
         }
     }
 }
@@ -1109,18 +1106,19 @@
 
         byte[] bytes = message.getMessageBody();
         DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+        GossipDigestAck2Message gDigestAck2Message = null;
         try
         {
-            GossipDigestAck2Message gDigestAck2Message = GossipDigestAck2Message.serializer().deserialize(dis);
-            Map<EndPoint, EndPointState> remoteEpStateMap = gDigestAck2Message.getEndPointStateMap();
-            /* Notify the Failure Detector */
-            Gossiper.instance().notifyFailureDetector(remoteEpStateMap);
-            Gossiper.instance().applyStateLocally(remoteEpStateMap);
+            gDigestAck2Message = GossipDigestAck2Message.serializer().deserialize(dis);
         }
-        catch ( IOException e )
+        catch (IOException e)
         {
-            logger_.info( LogUtil.throwableToString(e) );
+            throw new RuntimeException(e);
         }
+        Map<EndPoint, EndPointState> remoteEpStateMap = gDigestAck2Message.getEndPointStateMap();
+        /* Notify the Failure Detector */
+        Gossiper.instance().notifyFailureDetector(remoteEpStateMap);
+        Gossiper.instance().applyStateLocally(remoteEpStateMap);
     }
 }
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=771412&r1=771411&r2=771412&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Mon May  4 19:26:23 2009
@@ -586,7 +586,6 @@
         
         if ( stage != null )
         {
-            logger_.info("Running on stage " + stage.getName());
             stage.execute(runnable);
         } 
         else

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java?rev=771412&r1=771411&r2=771412&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java Mon May  4 19:26:23 2009
@@ -28,7 +28,6 @@
 import java.util.Map;
 import java.util.Arrays;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.log4j.Logger;
 
@@ -49,13 +48,7 @@
 import org.apache.cassandra.db.ColumnFamilyNotDefinedException;
 import org.apache.cassandra.db.TableNotDefinedException;
 import org.apache.cassandra.db.RangeCommand;
-import org.apache.cassandra.db.RangeReply;
 import org.apache.cassandra.utils.LogUtil;
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.IAsyncResult;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.Message;
 import org.apache.cassandra.dht.OrderPreservingPartitioner;
 import org.apache.thrift.TException;
 
@@ -65,7 +58,7 @@
 
 public class CassandraServer implements Cassandra.Iface
 {
-	private static Logger logger_ = Logger.getLogger(CassandraServer.class);
+	private static Logger logger = Logger.getLogger(CassandraServer.class);
 
     private final static List<column_t> EMPTY_COLUMNS = Arrays.asList();
     private final static List<superColumn_t> EMPTY_SUPERCOLUMNS = Arrays.asList();
@@ -76,7 +69,7 @@
       */
 	protected StorageService storageService;
 
-	public CassandraServer()
+    public CassandraServer()
 	{
 		storageService = StorageService.instance();
 	}
@@ -162,89 +155,67 @@
 
     public List<column_t> get_columns_since(String tablename, String key, String columnFamily_column, long timeStamp) throws InvalidRequestException
     {
-        long startTime = System.currentTimeMillis();
-        try
+        logger.debug("get_columns_since");
+        ColumnFamily cfamily = readColumnFamily(new ColumnsSinceReadCommand(tablename, key, columnFamily_column, timeStamp));
+        String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
+        if (cfamily == null)
         {
-            ColumnFamily cfamily = readColumnFamily(new ColumnsSinceReadCommand(tablename, key, columnFamily_column, timeStamp));
-            String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
-            if (cfamily == null)
-            {
-                return EMPTY_COLUMNS;
-            }
-            Collection<IColumn> columns = null;
-            if( values.length > 1 )
-            {
-                // this is the super column case
-                IColumn column = cfamily.getColumn(values[1]);
-                if(column != null)
-                    columns = column.getSubColumns();
-            }
-            else
-            {
-                columns = cfamily.getAllColumns();
-            }
-            return thriftifyColumns(columns);
+            return EMPTY_COLUMNS;
         }
-        finally
+        Collection<IColumn> columns = null;
+        if( values.length > 1 )
         {
-            logger_.debug("get_slice2: " + (System.currentTimeMillis() - startTime) + " ms.");
+            // this is the super column case
+            IColumn column = cfamily.getColumn(values[1]);
+            if(column != null)
+                columns = column.getSubColumns();
         }
+        else
+        {
+            columns = cfamily.getAllColumns();
+        }
+        return thriftifyColumns(columns);
 	}
 	
 
     public List<column_t> get_slice_by_names(String tablename, String key, String columnFamily, List<String> columnNames) throws InvalidRequestException
     {
-        long startTime = System.currentTimeMillis();
-        try
-        {
-            ColumnFamily cfamily = readColumnFamily(new SliceByNamesReadCommand(tablename, key, columnFamily, columnNames));
-            if (cfamily == null)
-            {
-                return EMPTY_COLUMNS;
-            }
-            Collection<IColumn> columns = null;
-            columns = cfamily.getAllColumns();
-            return thriftifyColumns(columns);
-        }
-        finally
+        logger.debug("get_slice_by_names");
+        ColumnFamily cfamily = readColumnFamily(new SliceByNamesReadCommand(tablename, key, columnFamily, columnNames));
+        if (cfamily == null)
         {
-            logger_.debug("get_slice2: " + (System.currentTimeMillis() - startTime) + " ms.");
+            return EMPTY_COLUMNS;
         }
+        return thriftifyColumns(cfamily.getAllColumns());
     }
     
     public List<column_t> get_slice(String tablename, String key, String columnFamily_column, int start, int count) throws InvalidRequestException
     {
-        long startTime = System.currentTimeMillis();
-		try
-		{
-	        String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
-            ColumnFamily cfamily = readColumnFamily(new SliceReadCommand(tablename, key, columnFamily_column, start, count));
-            if (cfamily == null)
-			{
-                return EMPTY_COLUMNS;
-			}
-			Collection<IColumn> columns = null;
-			if( values.length > 1 )
-			{
-				// this is the super column case 
-				IColumn column = cfamily.getColumn(values[1]);
-				if(column != null)
-					columns = column.getSubColumns();
-			}
-			else
-			{
-				columns = cfamily.getAllColumns();
-			}
-            return thriftifyColumns(columns);
-		}
-        finally
+        logger.debug("get_slice");
+        String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
+        ColumnFamily cfamily = readColumnFamily(new SliceReadCommand(tablename, key, columnFamily_column, start, count));
+        if (cfamily == null)
         {
-            logger_.debug("get_slice2: " + (System.currentTimeMillis() - startTime) + " ms.");
+            return EMPTY_COLUMNS;
         }
+        Collection<IColumn> columns = null;
+        if( values.length > 1 )
+        {
+            // this is the super column case
+            IColumn column = cfamily.getColumn(values[1]);
+            if(column != null)
+                columns = column.getSubColumns();
+        }
+        else
+        {
+            columns = cfamily.getAllColumns();
+        }
+        return thriftifyColumns(columns);
 	}
     
     public column_t get_column(String tablename, String key, String columnFamily_column) throws NotFoundException, InvalidRequestException
     {
+        logger.debug("get_column");
         String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
         if (values.length < 2)
         {
@@ -285,6 +256,7 @@
 
     public int get_column_count(String tablename, String key, String columnFamily_column) throws InvalidRequestException
     {
+        logger.debug("get_column_count");
         String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
         ColumnFamily cfamily = readColumnFamily(new SliceReadCommand(tablename, key, columnFamily_column, -1, Integer.MAX_VALUE));
         if (cfamily == null)
@@ -312,6 +284,7 @@
 
     public void insert(String tablename, String key, String columnFamily_column, byte[] cellData, long timestamp)
 	{
+        logger.debug("insert");
         RowMutation rm = new RowMutation(tablename, key.trim());
         rm.add(columnFamily_column, cellData, timestamp);
         try
@@ -327,6 +300,7 @@
     
     public boolean insert_blocking(String tablename, String key, String columnFamily_column, byte[] cellData, long timestamp) throws InvalidRequestException
     {
+        logger.debug("insert_blocking");
         RowMutation rm = new RowMutation(tablename, key.trim());
         rm.add(columnFamily_column, cellData, timestamp);
         validateCommand(rm.key(), rm.table(), rm.columnFamilyNames().toArray(new String[0]));
@@ -335,7 +309,7 @@
 
     public boolean batch_insert_blocking(batch_mutation_t batchMutation) throws InvalidRequestException
     {
-        logger_.debug("batch_insert_blocking");
+        logger.debug("batch_insert_blocking");
         RowMutation rm = RowMutation.getRowMutation(batchMutation);
         validateCommand(rm.key(), rm.table(), rm.columnFamilyNames().toArray(new String[0]));
         return StorageProxy.insertBlocking(rm);
@@ -343,7 +317,7 @@
 
 	public void batch_insert(batch_mutation_t batchMutation)
     {
-        logger_.debug("batch_insert");
+        logger.debug("batch_insert");
         RowMutation rm = RowMutation.getRowMutation(batchMutation);
         try
         {
@@ -359,7 +333,7 @@
 
     public boolean remove(String tablename, String key, String columnFamily_column, long timestamp, boolean block) throws InvalidRequestException
     {
-        logger_.debug("remove");
+        logger.debug("remove");
         RowMutation rm = new RowMutation(tablename, key.trim());
         rm.delete(columnFamily_column, timestamp);
         validateCommand(rm.key(), rm.table(), rm.columnFamilyNames().toArray(new String[0]));
@@ -373,22 +347,13 @@
 
     public List<superColumn_t> get_slice_super_by_names(String tablename, String key, String columnFamily, List<String> superColumnNames) throws InvalidRequestException
     {
-        long startTime = System.currentTimeMillis();
-		try
-		{
-			ColumnFamily cfamily = readColumnFamily(new SliceByNamesReadCommand(tablename, key, columnFamily, superColumnNames));
-			if (cfamily == null)
-			{
-                return EMPTY_SUPERCOLUMNS;
-			}
-			Collection<IColumn> columns = null;
-			columns = cfamily.getAllColumns();
-            return thriftifySuperColumns(columns);
-		}
-        finally
+        logger.debug("get_slice_super_by_names");
+        ColumnFamily cfamily = readColumnFamily(new SliceByNamesReadCommand(tablename, key, columnFamily, superColumnNames));
+        if (cfamily == null)
         {
-            logger_.debug("get_slice2: " + (System.currentTimeMillis() - startTime) + " ms.");
+            return EMPTY_SUPERCOLUMNS;
         }
+        return thriftifySuperColumns(cfamily.getAllColumns());
     }
 
     private List<superColumn_t> thriftifySuperColumns(Collection<IColumn> columns)
@@ -414,6 +379,7 @@
 
     public List<superColumn_t> get_slice_super(String tablename, String key, String columnFamily_superColumnName, int start, int count) throws InvalidRequestException
     {
+        logger.debug("get_slice_super");
         ColumnFamily cfamily = readColumnFamily(new SliceReadCommand(tablename, key, columnFamily_superColumnName, start, count));
         if (cfamily == null)
         {
@@ -425,6 +391,7 @@
     
     public superColumn_t get_superColumn(String tablename, String key, String columnFamily_column) throws InvalidRequestException, NotFoundException
     {
+        logger.debug("get_superColumn");
         ColumnFamily cfamily = readColumnFamily(new ColumnReadCommand(tablename, key, columnFamily_column));
         if (cfamily == null)
         {
@@ -448,7 +415,7 @@
     
     public boolean batch_insert_superColumn_blocking(batch_mutation_super_t batchMutationSuper) throws InvalidRequestException
     {
-        logger_.debug("batch_insert_SuperColumn_blocking");
+        logger.debug("batch_insert_SuperColumn_blocking");
         RowMutation rm = RowMutation.getRowMutation(batchMutationSuper);
         validateCommand(rm.key(), rm.table(), rm.columnFamilyNames().toArray(new String[0]));
         return StorageProxy.insertBlocking(rm);
@@ -456,7 +423,7 @@
 
     public void batch_insert_superColumn(batch_mutation_super_t batchMutationSuper)
     {
-        logger_.debug("batch_insert_SuperColumn");
+        logger.debug("batch_insert_SuperColumn");
         RowMutation rm = RowMutation.getRowMutation(batchMutationSuper);
         try
         {
@@ -557,29 +524,13 @@
 
     public List<String> get_key_range(String tablename, String startWith, String stopAt, int maxResults) throws InvalidRequestException
     {
-        logger_.debug("get_range");
-
+        logger.debug("get_key_range");
         if (!(StorageService.getPartitioner() instanceof OrderPreservingPartitioner))
         {
             throw new InvalidRequestException("range queries may only be performed against an order-preserving partitioner");
         }
 
-        try
-        {
-            Message message = new RangeCommand(tablename, startWith, stopAt, maxResults).getMessage();
-            EndPoint endPoint = StorageService.instance().findSuitableEndPoint(startWith);
-            IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(message, endPoint);
-
-            // read response
-            // TODO send more requests if we need to span multiple nodes
-            // double the usual timeout since range requests are expensive
-            byte[] responseBody = iar.get(2 * DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
-            return RangeReply.read(responseBody).keys;
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+        return StorageProxy.getRange(new RangeCommand(tablename, startWith, stopAt, maxResults));
     }
 
     /*
@@ -588,17 +539,11 @@
      * the SSTable index bucket it falls in, are in
      * buffer cache.  
     */
-    public void touch (String key , boolean fData) 
+    public void touch (String key, boolean fData)
     {
-    	try
-    	{
-    		StorageProxy.touchProtocol(DatabaseDescriptor.getTables().get(0), key, fData, StorageService.ConsistencyLevel.WEAK);
-    	}
-    	catch ( Exception e)
-    	{
-			logger_.info( LogUtil.throwableToString(e) );
-    	}
+        logger.debug("touch");
+  		StorageProxy.touchProtocol(DatabaseDescriptor.getTables().get(0), key, fData, StorageService.ConsistencyLevel.WEAK);
 	}
-    
+
     // main method moved to CassandraDaemon
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=771412&r1=771411&r2=771412&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Mon May  4 19:26:23 2009
@@ -26,29 +26,48 @@
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.lang.management.ManagementFactory;
 
 import org.apache.commons.lang.StringUtils;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ReadCommand;
-import org.apache.cassandra.db.ReadResponse;
-import org.apache.cassandra.db.Row;
-import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.db.Table;
-import org.apache.cassandra.db.TouchMessage;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.net.EndPoint;
 import org.apache.cassandra.net.IAsyncResult;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.LogUtil;
 import org.apache.log4j.Logger;
 
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
 
-public class StorageProxy
+
+public class StorageProxy implements StorageProxyMBean
 {
-    private static Logger logger_ = Logger.getLogger(StorageProxy.class);    
-    
+    private static Logger logger = Logger.getLogger(StorageProxy.class);
+
+    // mbean stuff
+    private static volatile long readLatency;
+    private static volatile int readOperations;
+    private static volatile long rangeLatency;
+    private static volatile int rangeOperations;
+    private static volatile long writeLatency;
+    private static volatile int writeOperations;
+    private StorageProxy() {}
+    static
+    {
+        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        try
+        {
+            mbs.registerMBean(new StorageProxy(), new ObjectName("org.apache.cassandra.service:type=StorageProxy"));
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
     /**
      * This method is responsible for creating Message to be
      * sent over the wire to N replicas where some of the replicas
@@ -67,7 +86,7 @@
 			{
 				Message hintedMessage = rm.makeRowMutationMessage();
 				hintedMessage.addHeader(RowMutation.HINT, EndPoint.toBytes(hint) );
-				logger_.debug("Sending the hint of " + target.getHost() + " to " + hint.getHost());
+				logger.debug("Sending the hint of " + target.getHost() + " to " + hint.getHost());
 				messageMap.put(target, hintedMessage);
 			}
 			else
@@ -87,8 +106,6 @@
     */
     public static void insert(RowMutation rm)
 	{
-        // TODO check for valid Table, ColumnFamily
-
         /*
          * Get the N nodes from storage service where the data needs to be
          * replicated
@@ -96,26 +113,36 @@
          * Send them asynchronously to the replicas.
         */
 
+        long startTime = System.currentTimeMillis();
 		try
 		{
 			Map<EndPoint, EndPoint> endpointMap = StorageService.instance().getNStorageEndPointMap(rm.key());
 			// TODO: throw a thrift exception if we do not have N nodes
 			Map<EndPoint, Message> messageMap = createWriteMessages(rm, endpointMap);
-            logger_.debug("insert writing to [" + StringUtils.join(messageMap.keySet(), ", ") + "]");
+            logger.debug("insert writing to [" + StringUtils.join(messageMap.keySet(), ", ") + "]");
 			for (Map.Entry<EndPoint, Message> entry : messageMap.entrySet())
 			{
 				MessagingService.getMessagingInstance().sendOneWay(entry.getValue(), entry.getKey());
 			}
 		}
-        catch (Exception e)
+        catch (IOException e)
         {
-            logger_.error( LogUtil.throwableToString(e) );
+            throw new RuntimeException(e);
+        }
+        finally
+        {
+            if (writeOperations++ == Integer.MAX_VALUE)
+            {
+                writeOperations = 1;
+                writeLatency = 0;
+            }
+            writeLatency += System.currentTimeMillis() - startTime;
         }
-        return;
     }
 
     public static boolean insertBlocking(RowMutation rm)
     {
+        long startTime = System.currentTimeMillis();
         try
         {
             Message message = rm.makeRowMutationMessage();
@@ -125,7 +152,7 @@
                     DatabaseDescriptor.getReplicationFactor(),
                     writeResponseResolver);
             EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(rm.key());
-            logger_.debug("insertBlocking writing to [" + StringUtils.join(endpoints, ", ") + "]");
+            logger.debug("insertBlocking writing to [" + StringUtils.join(endpoints, ", ") + "]");
             // TODO: throw a thrift exception if we do not have N nodes
 
             MessagingService.getMessagingInstance().sendRR(message, endpoints, quorumResponseHandler);
@@ -137,8 +164,16 @@
         }
         catch (Exception e)
         {
-            logger_.error( LogUtil.throwableToString(e) );
-            return false;
+            throw new RuntimeException(e);
+        }
+        finally
+        {
+            if (writeOperations++ == Integer.MAX_VALUE)
+            {
+                writeOperations = 1;
+                writeLatency = 0;
+            }
+            writeLatency += System.currentTimeMillis() - startTime;
         }
     }
     
@@ -213,7 +248,7 @@
     {
         EndPoint endPoint = StorageService.instance().findSuitableEndPoint(command.key);
         assert endPoint != null;
-        logger_.debug("weakreadremote reading " + command + " from " + endPoint);
+        logger.debug("weakreadremote reading " + command + " from " + endPoint);
         Message message = command.makeReadMessage();
         message.addHeader(ReadCommand.DO_REPAIR, ReadCommand.DO_REPAIR.getBytes());
         IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(message, endPoint);
@@ -239,7 +274,7 @@
 		table.touch(key, fData);
     }
 
-    static void weakTouchProtocol(String tablename, String key, boolean fData) throws Exception
+    static void weakTouchProtocol(String tablename, String key, boolean fData) throws IOException
     {
     	EndPoint endPoint = null;
     	try
@@ -262,10 +297,9 @@
             Message message = TouchMessage.makeTouchMessage(touchMessage);
             MessagingService.getMessagingInstance().sendOneWay(message, endPoint);
     	}
-    	return ;
     }
     
-    static void strongTouchProtocol(String tablename, String key, boolean fData) throws Exception
+    static void strongTouchProtocol(String tablename, String key, boolean fData) throws IOException
     {
         Map<EndPoint, EndPoint> endpointMap = StorageService.instance().getNStorageEndPointMap(key);
         Set<EndPoint> endpoints = endpointMap.keySet();
@@ -281,23 +315,40 @@
     /*
      * Only touch data on the most suitable end point.
      */
-    public static void touchProtocol(String tablename, String key, boolean fData, StorageService.ConsistencyLevel consistencyLevel) throws Exception
+    public static void touchProtocol(String tablename, String key, boolean fData, StorageService.ConsistencyLevel consistencyLevel)
     {
-        switch ( consistencyLevel )
+        long startTime = System.currentTimeMillis();
+        try
+        {
+            switch ( consistencyLevel )
+            {
+                case WEAK:
+                    weakTouchProtocol(tablename, key, fData);
+                    break;
+
+                case STRONG:
+                    strongTouchProtocol(tablename, key, fData);
+                    break;
+
+                default:
+                    weakTouchProtocol(tablename, key, fData);
+                    break;
+            }
+        }
+        catch (IOException ex)
+        {
+            throw new RuntimeException(ex);
+        }
+        finally
         {
-	        case WEAK:
-	            weakTouchProtocol(tablename, key, fData);
-	            break;
-	            
-	        case STRONG:
-	            strongTouchProtocol(tablename, key, fData);
-	            break;
-	            
-	        default:
-	            weakTouchProtocol(tablename, key, fData);
-	            break;
+            if (readOperations++ == Integer.MAX_VALUE)
+            {
+                readOperations = 1;
+                readLatency = 0;
+            }
+            readLatency += System.currentTimeMillis() - startTime;
         }
-    }  
+    }
 
     /**
      * Performs the actual reading of a row out of the StorageService, fetching
@@ -307,7 +358,8 @@
     throws IOException, TimeoutException
     {
         long startTime = System.currentTimeMillis();
-        Row row = null;
+
+        Row row;
         EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(command.key);
 
         if (consistencyLevel == StorageService.ConsistencyLevel.WEAK)
@@ -328,7 +380,13 @@
             row = strongRead(command);
         }
 
-        logger_.debug("Finished reading " + row + " in " + (System.currentTimeMillis() - startTime) + " ms.");
+        if (readOperations++ == Integer.MAX_VALUE)
+        {
+            readOperations = 1;
+            readLatency = 0;
+        }
+        readLatency += System.currentTimeMillis() - startTime;
+
         return row;
     }
 
@@ -365,8 +423,7 @@
      */
     public static Map<String, Row> strongReadProtocol(String[] keys, ReadCommand readCommand) throws IOException, TimeoutException
     {       
-        Map<String, Row> rows = new HashMap<String, Row>();
-        long startTime = System.currentTimeMillis();        
+        Map<String, Row> rows;
         // TODO: throw a thrift exception if we do not have N nodes
         Map<String, ReadCommand[]> readMessages = new HashMap<String, ReadCommand[]>();
         for (String key : keys )
@@ -378,7 +435,6 @@
             readMessages.put(key, readParameters);
         }        
         rows = doStrongReadProtocol(readMessages);         
-        logger_.debug("readProtocol: " + (System.currentTimeMillis() - startTime) + " ms.");
         return rows;
     }
 
@@ -428,7 +484,7 @@
             endPoints[i] = endpointList.get(i - 1);
             messages[i] = messageDigestOnly;
         }
-        logger_.debug("strongread reading " + command + " from " + StringUtils.join(endPoints, ", "));
+        logger.debug("strongread reading " + command + " from " + StringUtils.join(endPoints, ", "));
 
         try
         {
@@ -436,7 +492,7 @@
 
             long startTime2 = System.currentTimeMillis();
             row = quorumResponseHandler.get();
-            logger_.debug("quorumResponseHandler: " + (System.currentTimeMillis() - startTime2) + " ms.");
+            logger.debug("quorumResponseHandler: " + (System.currentTimeMillis() - startTime2) + " ms.");
         }
         catch (DigestMismatchException ex)
         {
@@ -446,7 +502,7 @@
                 QuorumResponseHandler<Row> quorumResponseHandlerRepair = new QuorumResponseHandler<Row>(
                         DatabaseDescriptor.getReplicationFactor(),
                         readResponseResolverRepair);
-                logger_.info("DigestMismatchException: " + command.key);
+                logger.info("DigestMismatchException: " + command.key);
                 Message messageRepair = command.makeReadMessage();
                 MessagingService.getMessagingInstance().sendRR(messageRepair, endPoints,
                                                                quorumResponseHandlerRepair);
@@ -552,10 +608,9 @@
                 rows.put(row.key(), row);
             }
         }
-        catch ( TimeoutException ex )
+        catch (TimeoutException e)
         {
-            logger_.info("Operation timed out waiting for responses ...");
-            logger_.info(LogUtil.throwableToString(ex));
+            throw new RuntimeException(e);
         }
         return rows;
     }
@@ -574,7 +629,6 @@
     public static Map<String, Row> weakReadProtocol(String[] keys, ReadCommand readCommand) throws Exception
     {
         Row row = null;
-        long startTime = System.currentTimeMillis();
         Map<String, ReadCommand> readMessages = new HashMap<String, ReadCommand>();
         for ( String key : keys )
         {
@@ -606,7 +660,7 @@
     */
     private static Row weakReadLocal(ReadCommand command) throws IOException
     {
-        logger_.debug("weakreadlocal for " + command);
+        logger.debug("weakreadlocal reading " + command);
         List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(command.key);
         /* Remove the local storage endpoint from the list. */
         endpoints.remove(StorageService.getLocalStorageEndPoint());
@@ -623,4 +677,62 @@
             StorageService.instance().doConsistencyCheck(row, endpoints, command);
         return row;
     }
+
+    static List<String> getRange(RangeCommand command)
+    {
+        long startTime = System.currentTimeMillis();
+        try
+        {
+            EndPoint endPoint = StorageService.instance().findSuitableEndPoint(command.startWith);
+            IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(command.getMessage(), endPoint);
+
+            // read response
+            // TODO send more requests if we need to span multiple nodes
+            byte[] responseBody = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
+            return RangeReply.read(responseBody).keys;
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+        finally
+        {
+            if (rangeOperations++ == Integer.MAX_VALUE)
+            {
+                rangeOperations = 1;
+                rangeLatency = 0;
+            }
+            rangeLatency += System.currentTimeMillis() - startTime;
+        }
+    }
+
+    public double getReadLatency()
+    {
+        return ((double)readLatency) / readOperations;
+    }
+
+    public double getRangeLatency()
+    {
+        return ((double)rangeLatency) / rangeOperations;
+    }
+
+    public double getWriteLatency()
+    {
+        return ((double)writeLatency) / writeOperations;
+    }
+
+    public int getReadOperations()
+    {
+        return readOperations;
+    }
+
+    public int getRangeOperations()
+    {
+        return rangeOperations;
+    }
+
+    public int getWriteOperations()
+    {
+        return writeOperations;
+    }
 }

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxyMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxyMBean.java?rev=771412&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxyMBean.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxyMBean.java Mon May  4 19:26:23 2009
@@ -0,0 +1,11 @@
+package org.apache.cassandra.service;
+
+public interface StorageProxyMBean
+{
+    public double getReadLatency();
+    public int getReadOperations();
+    public double getRangeLatency();
+    public int getRangeOperations();
+    public double getWriteLatency();
+    public int getWriteOperations();
+}