You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/11/26 00:16:35 UTC

svn commit: r884331 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/service/ test/unit/org/apache/cassandra/db/

Author: jbellis
Date: Wed Nov 25 23:16:34 2009
New Revision: 884331

URL: http://svn.apache.org/viewvc?rev=884331&view=rev
Log:
r/m unused Row code, and move table variable into callers rather than serializing it redundantly
patch by jbellis; tested by Dan Di Spaltro for CASSANDRA-578

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=884331&r1=884330&r2=884331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Nov 25 23:16:34 2009
@@ -1464,7 +1464,7 @@
             columnNameSet.addAll(columnNames);
         for (String key : rr.keys)
         {
-            Row row = new Row(table_, key);
+            Row row = new Row(key);
             QueryFilter filter = sliceRange == null ? new NamesQueryFilter(key, queryPath, columnNameSet) : new SliceQueryFilter(key, queryPath, sliceRange.start, sliceRange.finish, sliceRange.reversed, sliceRange.count);
             row.addColumnFamily(getColumnFamily(filter));
             rows.add(row);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=884331&r1=884330&r2=884331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java Wed Nov 25 23:16:34 2009
@@ -74,11 +74,11 @@
                 /* Don't service reads! */
                 throw new RuntimeException("Cannot service reads while bootstrapping!");
             }
-            ReadCommand readCommand = ReadCommand.serializer().deserialize(readCtx.bufIn_);
-            Table table = Table.open(readCommand.table);
-            Row row = readCommand.getRow(table);
+            ReadCommand command = ReadCommand.serializer().deserialize(readCtx.bufIn_);
+            Table table = Table.open(command.table);
+            Row row = command.getRow(table);
             ReadResponse readResponse;
-            if (readCommand.isDigestQuery())
+            if (command.isDigestQuery())
             {
                 if (logger_.isDebugEnabled())
                     logger_.debug("digest is " + FBUtilities.bytesToHex(row.digest()));
@@ -88,7 +88,7 @@
             {
                 readResponse = new ReadResponse(row);
             }
-            readResponse.setIsDigestQuery(readCommand.isDigestQuery());
+            readResponse.setIsDigestQuery(command.isDigestQuery());
             /* serialize the ReadResponseMessage. */
             readCtx.bufOut_.reset();
 
@@ -99,13 +99,17 @@
 
             Message response = message.getReply(FBUtilities.getLocalAddress(), bytes);
             if (logger_.isDebugEnabled())
-              logger_.debug("Read key " + readCommand.key + "; sending response to " + message.getMessageId() + "@" + message.getFrom());
+              logger_.debug("Read key " + command.key + "; sending response to " + message.getMessageId() + "@" + message.getFrom());
             MessagingService.instance().sendOneWay(response, message.getFrom());
 
             /* Do read repair if header of the message says so */
             if (message.getHeader(ReadCommand.DO_REPAIR) != null)
             {
-                doReadRepair(row, readCommand);
+                List<InetAddress> endpoints = StorageService.instance().getLiveNaturalEndpoints(command.key);
+                /* Remove the local storage endpoint from the list. */
+                endpoints.remove(FBUtilities.getLocalAddress());
+                if (endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
+                    StorageService.instance().doConsistencyCheck(row, endpoints, command);
             }
         }
         catch (IOException ex)
@@ -113,14 +117,4 @@
             throw new RuntimeException(ex);
         }
     }
-    
-    private void doReadRepair(Row row, ReadCommand readCommand)
-    {
-        List<InetAddress> endpoints = StorageService.instance().getLiveNaturalEndpoints(readCommand.key);
-        /* Remove the local storage endpoint from the list. */
-        endpoints.remove(FBUtilities.getLocalAddress());
-            
-        if (endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
-            StorageService.instance().doConsistencyCheck(row, endpoints, readCommand);
-    }     
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java?rev=884331&r1=884330&r2=884331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java Wed Nov 25 23:16:34 2009
@@ -38,7 +38,6 @@
 public class Row
 {
     private static Logger logger_ = Logger.getLogger(Row.class);
-    private String table_;
     private static RowSerializer serializer = new RowSerializer();
 
     static RowSerializer serializer()
@@ -46,17 +45,11 @@
         return serializer;
     }
 
-    public Row(String table, String key)
+    public Row(String key)
     {
-        assert table != null;
-        this.table_ = table;
         this.key_ = key;
     }
 
-    public String getTable() {
-        return table_;
-    }
-
     private String key_;
 
     private Map<String, ColumnFamily> columnFamilies_ = new HashMap<String, ColumnFamily>();
@@ -66,21 +59,6 @@
         return key_;
     }
 
-    void setKey(String key)
-    {
-        key_ = key;
-    }
-
-    public void setTable(String table)
-    {
-        table_ = table;
-    }
-
-    public Set<String> getColumnFamilyNames()
-    {
-        return columnFamilies_.keySet();
-    }
-
     public Collection<ColumnFamily> getColumnFamilies()
     {
         return columnFamilies_.values();
@@ -96,11 +74,6 @@
         columnFamilies_.put(columnFamily.name(), columnFamily);
     }
 
-    void removeColumnFamily(ColumnFamily columnFamily)
-    {
-        columnFamilies_.remove(columnFamily.name());
-    }
-
     public boolean isEmpty()
     {
         return (columnFamilies_.size() == 0);
@@ -138,7 +111,7 @@
      */
     public Row diff(Row rowComposite)
     {
-        Row rowDiff = new Row(table_, key_);
+        Row rowDiff = new Row(key_);
 
         for (ColumnFamily cfComposite : rowComposite.getColumnFamilies())
         {
@@ -160,7 +133,7 @@
 
     public Row cloneMe()
     {
-        Row row = new Row(table_, key_);
+        Row row = new Row(key_);
         row.columnFamilies_ = new HashMap<String, ColumnFamily>(columnFamilies_);
         return row;
     }
@@ -185,18 +158,6 @@
         return digest.digest();
     }
 
-    void clear()
-    {
-        columnFamilies_.clear();
-    }
-
-    public DataOutputBuffer getSerializedBuffer() throws IOException
-    {
-        DataOutputBuffer buffer = new DataOutputBuffer();
-        Row.serializer().serialize(this, buffer);
-        return buffer;
-    }
-
     public String toString()
     {
         return "Row(" + key_ + " [" + StringUtils.join(columnFamilies_.values(), ", ") + "])";
@@ -207,7 +168,6 @@
 {
     public void serialize(Row row, DataOutputStream dos) throws IOException
     {
-        dos.writeUTF(row.getTable());
         dos.writeUTF(row.key());
         Collection<ColumnFamily> columnFamilies = row.getColumnFamilies();
         int size = columnFamilies.size();
@@ -221,9 +181,8 @@
 
     public Row deserialize(DataInputStream dis) throws IOException
     {
-        String table = dis.readUTF();
         String key = dis.readUTF();
-        Row row = new Row(table, key);
+        Row row = new Row(key);
         int size = dis.readInt();
 
         for (int i = 0; i < size; ++i)

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=884331&r1=884330&r2=884331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed Nov 25 23:16:34 2009
@@ -399,7 +399,7 @@
     @Deprecated // CF should be our atom of work, not Row
     public Row get(String key) throws IOException
     {
-        Row row = new Row(table_, key);
+        Row row = new Row(key);
         for (String columnFamily : getColumnFamilies())
         {
             ColumnFamily cf = get(key, columnFamily);
@@ -429,7 +429,7 @@
     @Deprecated
     public Row getRow(String key, String cfName) throws IOException
     {
-        Row row = new Row(table_, key);
+        Row row = new Row(key);
         ColumnFamily columnFamily = get(key, cfName);
         if ( columnFamily != null )
         	row.addColumnFamily(columnFamily);
@@ -439,7 +439,7 @@
     public Row getRow(QueryFilter filter) throws IOException
     {
         ColumnFamilyStore cfStore = columnFamilyStores_.get(filter.getColumnFamilyName());
-        Row row = new Row(table_, filter.key);
+        Row row = new Row(filter.key);
         ColumnFamily columnFamily = cfStore.getColumnFamily(filter);
         if (columnFamily != null)
             row.addColumnFamily(columnFamily);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java?rev=884331&r1=884330&r2=884331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java Wed Nov 25 23:16:34 2009
@@ -40,8 +40,9 @@
 class ConsistencyManager implements Runnable
 {
 	private static Logger logger_ = Logger.getLogger(ConsistencyManager.class);
-	
-	class DigestResponseHandler implements IAsyncCallback
+    private final String table_;
+
+    class DigestResponseHandler implements IAsyncCallback
 	{
 		List<Message> responses_ = new ArrayList<Message>();
 
@@ -78,7 +79,7 @@
 		
 		private void doReadRepair() throws IOException
 		{
-			IResponseResolver<Row> readResponseResolver = new ReadResponseResolver();
+			IResponseResolver<Row> readResponseResolver = new ReadResponseResolver(table_);
             /* Add the local storage endpoint to the replicas_ list */
             replicas_.add(FBUtilities.getLocalAddress());
 			IAsyncCallback responseHandler = new DataRepairHandler(ConsistencyManager.this.replicas_.size(), readResponseResolver);	
@@ -134,8 +135,9 @@
 	protected final List<InetAddress> replicas_;
 	private final ReadCommand readCommand_;
 
-    public ConsistencyManager(Row row, List<InetAddress> replicas, ReadCommand readCommand)
+    public ConsistencyManager(String table, Row row, List<InetAddress> replicas, ReadCommand readCommand)
     {
+        table_ = table;
         row_ = row;
         replicas_ = replicas;
         readCommand_ = readCommand;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=884331&r1=884330&r2=884331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java Wed Nov 25 23:16:34 2009
@@ -48,14 +48,20 @@
 public class ReadResponseResolver implements IResponseResolver<Row>
 {
 	private static Logger logger_ = Logger.getLogger(ReadResponseResolver.class);
+    private final String table;
 
-	/*
-	 * This method for resolving read data should look at the timestamps of each
-	 * of the columns that are read and should pick up columns with the latest
-	 * timestamp. For those columns where the timestamp is not the latest a
-	 * repair request should be scheduled.
-	 * 
-	 */
+    public ReadResponseResolver(String table)
+    {
+        this.table = table;
+    }
+
+    /*
+      * This method for resolving read data should look at the timestamps of each
+      * of the columns that are read and should pick up columns with the latest
+      * timestamp. For those columns where the timestamp is not the latest a
+      * repair request should be scheduled.
+      *
+      */
 	public Row resolve(List<Message> responses) throws DigestMismatchException, IOException
     {
         long startTime = System.currentTimeMillis();
@@ -63,7 +69,6 @@
 		List<Row> rowList = new ArrayList<Row>();
 		List<InetAddress> endPoints = new ArrayList<InetAddress>();
 		String key = null;
-		String table = null;
 		byte[] digest = new byte[0];
 		boolean isDigestQuery = false;
         
@@ -89,7 +94,6 @@
                 rowList.add(result.row());
                 endPoints.add(response.getFrom());
                 key = result.row().key();
-                table = result.row().getTable();
             }
         }
 		// If there was a digest query compare it with all the data digests 
@@ -114,7 +118,7 @@
         }
 
         /* Now calculate the resolved row */
-        retRow = new Row(table, key);
+        retRow = new Row(key);
         for (int i = 0; i < rowList.size(); i++)
         {
             retRow.repair(rowList.get(i));

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=884331&r1=884330&r2=884331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed Nov 25 23:16:34 2009
@@ -437,7 +437,7 @@
             }
             if (n < DatabaseDescriptor.getQuorum())
                 throw new UnavailableException();
-            QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(DatabaseDescriptor.getQuorum(), new ReadResponseResolver());
+            QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(DatabaseDescriptor.getQuorum(), new ReadResponseResolver(command.table));
             MessagingService.instance().sendRR(messages, endPoints, quorumResponseHandler);
             quorumResponseHandlers.add(quorumResponseHandler);
             commandEndPoints.add(endPoints);
@@ -465,7 +465,7 @@
             {
                 if (DatabaseDescriptor.getConsistencyCheck())
                 {
-                    IResponseResolver<Row> readResponseResolverRepair = new ReadResponseResolver();
+                    IResponseResolver<Row> readResponseResolverRepair = new ReadResponseResolver(command.table);
                     QuorumResponseHandler<Row> quorumResponseHandlerRepair = new QuorumResponseHandler<Row>(
                             DatabaseDescriptor.getQuorum(),
                             readResponseResolverRepair);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=884331&r1=884330&r2=884331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed Nov 25 23:16:34 2009
@@ -346,14 +346,14 @@
         return endPointSnitch_.isInSameDataCenter(FBUtilities.getLocalAddress(), endpoint);
     }
     
-    /*
+    /**
      * This method performs the requisite operations to make
      * sure that the N replicas are in sync. We do this in the
      * background when we do not care much about consistency.
      */
     public void doConsistencyCheck(Row row, List<InetAddress> endpoints, ReadCommand command)
     {
-        Runnable consistencySentinel = new ConsistencyManager(row.cloneMe(), endpoints, command);
+        Runnable consistencySentinel = new ConsistencyManager(command.table, row.cloneMe(), endpoints, command);
         consistencyManager_.submit(consistencySentinel);
     }
 

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java?rev=884331&r1=884330&r2=884331&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java Wed Nov 25 23:16:34 2009
@@ -60,12 +60,12 @@
     @Test
     public void testRepair()
     {
-        Row row1 = new Row("", "");
+        Row row1 = new Row("");
         ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1");
         cf1.addColumn(column("one", "A", 0));
         row1.addColumnFamily(cf1);
 
-        Row row2 = new Row("", "");
+        Row row2 = new Row("");
         ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
         cf2.addColumn(column("one", "B", 1));
         cf2.addColumn(column("two", "C", 1));