You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2009/03/27 06:34:11 UTC

svn commit: r759025 - in /incubator/cassandra/trunk/src/org/apache/cassandra/db: ReadMessage.java ReadResponseMessage.java ReadVerbHandler.java

Author: alakshman
Date: Fri Mar 27 05:34:11 2009
New Revision: 759025

URL: http://svn.apache.org/viewvc?rev=759025&view=rev
Log:
Changes in here to enable multiget() support.

Modified:
    incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadMessage.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadResponseMessage.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadMessage.java?rev=759025&r1=759024&r2=759025&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadMessage.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadMessage.java Fri Mar 27 05:34:11 2009
@@ -25,8 +25,6 @@
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
-import javax.xml.bind.annotation.XmlElement;
-
 import org.apache.cassandra.continuations.Suspendable;
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.Message;
@@ -40,6 +38,7 @@
 public class ReadMessage implements Serializable
 {
     private static ICompactSerializer<ReadMessage> serializer_;	
+    public static final String doRepair_ = "READ-REPAIR";
 	
     static
     {
@@ -60,28 +59,13 @@
         return message;
     }
     
-    @XmlElement(name="Table")
     private String table_;
-    
-    @XmlElement(name="Key")
     private String key_;
-    
-    @XmlElement(name="ColumnFamily")
     private String columnFamily_column_ = null;
-    
-    @XmlElement(name="start")
     private int start_ = -1;
-
-    @XmlElement(name="count")
     private int count_ = -1 ;
-    
-    @XmlElement(name="sinceTimestamp")
     private long sinceTimestamp_ = -1 ;
-
-    @XmlElement(name="columnNames")
     private List<String> columns_ = new ArrayList<String>();
-    
-    @XmlElement(name="isDigestQuery")
     private boolean isDigestQuery_ = false;
         
     private ReadMessage()
@@ -131,7 +115,7 @@
         return table_;
     }
     
-    String key()
+    public String key()
     {
         return key_;
     }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadResponseMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadResponseMessage.java?rev=759025&r1=759024&r2=759025&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadResponseMessage.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadResponseMessage.java Fri Mar 27 05:34:11 2009
@@ -23,9 +23,6 @@
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.Serializable;
-
-import javax.xml.bind.annotation.XmlElement;
-
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
@@ -61,32 +58,25 @@
         return message;
     }
 	
-	@XmlElement(name = "Table")
 	private String table_;
-
-	@XmlElement(name = "Row")
 	private Row row_;
-
-	@XmlElement(name = "Digest")
 	private byte[] digest_ = new byte[0];
-
-    @XmlElement(name="isDigestQuery")
     private boolean isDigestQuery_ = false;
-	
-	private ReadResponseMessage() {
-	}
 
-	public ReadResponseMessage(String table, byte[] digest ) {
+	public ReadResponseMessage(String table, byte[] digest ) 
+    {
 		table_ = table;
 		digest_= digest;
 	}
 
-	public ReadResponseMessage(String table, Row row) {
+	public ReadResponseMessage(String table, Row row) 
+    {
 		table_ = table;
 		row_ = row;
 	}
 
-	public String table() {
+	public String table() 
+    {
 		return table_;
 	}
 
@@ -95,7 +85,8 @@
 		return row_;
     }
         
-	public byte[] digest() {
+	public byte[] digest() 
+    {
 		return digest_;
 	}
 
@@ -110,7 +101,6 @@
     }
 }
 
-
 class ReadResponseMessageSerializer implements ICompactSerializer<ReadResponseMessage>
 {
 	public void serialize(ReadResponseMessage rm, DataOutputStream dos) throws IOException

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java?rev=759025&r1=759024&r2=759025&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java Fri Mar 27 05:34:11 2009
@@ -21,10 +21,13 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collection;
+import java.util.List;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.continuations.Suspendable;
 import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.net.EndPoint;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
@@ -40,7 +43,7 @@
 
 public class ReadVerbHandler implements IVerbHandler
 {
-    private static class ReadContext
+    protected static class ReadContext
     {
         protected DataInputBuffer bufIn_ = new DataInputBuffer();
         protected DataOutputBuffer bufOut_ = new DataOutputBuffer();
@@ -48,7 +51,17 @@
 
     private static Logger logger_ = Logger.getLogger( ReadVerbHandler.class );
     /* We use this so that we can reuse the same row mutation context for the mutation. */
-    private static ThreadLocal<ReadContext> tls_ = new InheritableThreadLocal<ReadContext>();
+    private static ThreadLocal<ReadVerbHandler.ReadContext> tls_ = new InheritableThreadLocal<ReadVerbHandler.ReadContext>();
+    
+    protected static ReadVerbHandler.ReadContext getCurrentReadContext()
+    {
+        return tls_.get();
+    }
+    
+    protected static void setCurrentReadContext(ReadVerbHandler.ReadContext readContext)
+    {
+        tls_.set(readContext);
+    }
 
     public void doVerb(Message message)
     {
@@ -90,7 +103,6 @@
             if(readMessage.isDigestQuery())
             {
                 readResponseMessage = new ReadResponseMessage(table.getTableName(), row.digest());
-
             }
             else
             {
@@ -111,8 +123,12 @@
 
             Message response = message.getReply( StorageService.getLocalStorageEndPoint(), new Object[]{bytes} );
             MessagingService.getMessagingInstance().sendOneWay(response, message.getFrom());
-            logger_.info("ReadVerbHandler  TIME 2: " + (System.currentTimeMillis() - start)
-                    + " ms.");
+            logger_.info("ReadVerbHandler  TIME 2: " + (System.currentTimeMillis() - start) + " ms.");
+            
+            /* Do read repair if header of the message says so */
+            String repair = new String( message.getHeader(ReadMessage.doRepair_) );
+            if ( repair.equals( ReadMessage.doRepair_ ) )
+                doReadRepair(row, readMessage);
         }
         catch ( IOException ex)
         {
@@ -123,4 +139,31 @@
             logger_.info( LogUtil.throwableToString(ex) );
         }
     }
+    
+    private void doReadRepair(Row row, ReadMessage readMessage)
+    {
+        if ( DatabaseDescriptor.getConsistencyCheck() )
+        {
+            List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(readMessage.key());
+            /* Remove the local storage endpoint from the list. */ 
+            endpoints.remove( StorageService.getLocalStorageEndPoint() );
+            
+            if(readMessage.getColumnNames().size() == 0)
+            {
+                if( readMessage.start() >= 0 && readMessage.count() < Integer.MAX_VALUE)
+                {                
+                    StorageService.instance().doConsistencyCheck(row, endpoints, readMessage.columnFamily_column(), readMessage.start(), readMessage.count());                    
+                }
+                
+                if( readMessage.sinceTimestamp() > 0)
+                {                    
+                    StorageService.instance().doConsistencyCheck(row, endpoints, readMessage.columnFamily_column(), readMessage.sinceTimestamp());                    
+                }                
+            }
+            else
+            {
+                StorageService.instance().doConsistencyCheck(row, endpoints, readMessage.columnFamily_column(), readMessage.getColumnNames());                                
+            }
+        }
+    }     
 }