You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2009/03/27 06:34:11 UTC
svn commit: r759025 - in
/incubator/cassandra/trunk/src/org/apache/cassandra/db: ReadMessage.java
ReadResponseMessage.java ReadVerbHandler.java
Author: alakshman
Date: Fri Mar 27 05:34:11 2009
New Revision: 759025
URL: http://svn.apache.org/viewvc?rev=759025&view=rev
Log:
Changes in here to enable multiget() support.
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadMessage.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadResponseMessage.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadMessage.java?rev=759025&r1=759024&r2=759025&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadMessage.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadMessage.java Fri Mar 27 05:34:11 2009
@@ -25,8 +25,6 @@
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
-import javax.xml.bind.annotation.XmlElement;
-
import org.apache.cassandra.continuations.Suspendable;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.Message;
@@ -40,6 +38,7 @@
public class ReadMessage implements Serializable
{
private static ICompactSerializer<ReadMessage> serializer_;
+ public static final String doRepair_ = "READ-REPAIR";
static
{
@@ -60,28 +59,13 @@
return message;
}
- @XmlElement(name="Table")
private String table_;
-
- @XmlElement(name="Key")
private String key_;
-
- @XmlElement(name="ColumnFamily")
private String columnFamily_column_ = null;
-
- @XmlElement(name="start")
private int start_ = -1;
-
- @XmlElement(name="count")
private int count_ = -1 ;
-
- @XmlElement(name="sinceTimestamp")
private long sinceTimestamp_ = -1 ;
-
- @XmlElement(name="columnNames")
private List<String> columns_ = new ArrayList<String>();
-
- @XmlElement(name="isDigestQuery")
private boolean isDigestQuery_ = false;
private ReadMessage()
@@ -131,7 +115,7 @@
return table_;
}
- String key()
+ public String key()
{
return key_;
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadResponseMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadResponseMessage.java?rev=759025&r1=759024&r2=759025&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadResponseMessage.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadResponseMessage.java Fri Mar 27 05:34:11 2009
@@ -23,9 +23,6 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.Serializable;
-
-import javax.xml.bind.annotation.XmlElement;
-
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
@@ -61,32 +58,25 @@
return message;
}
- @XmlElement(name = "Table")
private String table_;
-
- @XmlElement(name = "Row")
private Row row_;
-
- @XmlElement(name = "Digest")
private byte[] digest_ = new byte[0];
-
- @XmlElement(name="isDigestQuery")
private boolean isDigestQuery_ = false;
-
- private ReadResponseMessage() {
- }
- public ReadResponseMessage(String table, byte[] digest ) {
+ public ReadResponseMessage(String table, byte[] digest )
+ {
table_ = table;
digest_= digest;
}
- public ReadResponseMessage(String table, Row row) {
+ public ReadResponseMessage(String table, Row row)
+ {
table_ = table;
row_ = row;
}
- public String table() {
+ public String table()
+ {
return table_;
}
@@ -95,7 +85,8 @@
return row_;
}
- public byte[] digest() {
+ public byte[] digest()
+ {
return digest_;
}
@@ -110,7 +101,6 @@
}
}
-
class ReadResponseMessageSerializer implements ICompactSerializer<ReadResponseMessage>
{
public void serialize(ReadResponseMessage rm, DataOutputStream dos) throws IOException
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java?rev=759025&r1=759024&r2=759025&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java Fri Mar 27 05:34:11 2009
@@ -21,10 +21,13 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
+import java.util.List;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.continuations.Suspendable;
import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
@@ -40,7 +43,7 @@
public class ReadVerbHandler implements IVerbHandler
{
- private static class ReadContext
+ protected static class ReadContext
{
protected DataInputBuffer bufIn_ = new DataInputBuffer();
protected DataOutputBuffer bufOut_ = new DataOutputBuffer();
@@ -48,7 +51,17 @@
private static Logger logger_ = Logger.getLogger( ReadVerbHandler.class );
/* We use this so that we can reuse the same row mutation context for the mutation. */
- private static ThreadLocal<ReadContext> tls_ = new InheritableThreadLocal<ReadContext>();
+ private static ThreadLocal<ReadVerbHandler.ReadContext> tls_ = new InheritableThreadLocal<ReadVerbHandler.ReadContext>();
+
+ protected static ReadVerbHandler.ReadContext getCurrentReadContext()
+ {
+ return tls_.get();
+ }
+
+ protected static void setCurrentReadContext(ReadVerbHandler.ReadContext readContext)
+ {
+ tls_.set(readContext);
+ }
public void doVerb(Message message)
{
@@ -90,7 +103,6 @@
if(readMessage.isDigestQuery())
{
readResponseMessage = new ReadResponseMessage(table.getTableName(), row.digest());
-
}
else
{
@@ -111,8 +123,12 @@
Message response = message.getReply( StorageService.getLocalStorageEndPoint(), new Object[]{bytes} );
MessagingService.getMessagingInstance().sendOneWay(response, message.getFrom());
- logger_.info("ReadVerbHandler TIME 2: " + (System.currentTimeMillis() - start)
- + " ms.");
+ logger_.info("ReadVerbHandler TIME 2: " + (System.currentTimeMillis() - start) + " ms.");
+
+ /* Do read repair if header of the message says so */
+ String repair = new String( message.getHeader(ReadMessage.doRepair_) );
+ if ( repair.equals( ReadMessage.doRepair_ ) )
+ doReadRepair(row, readMessage);
}
catch ( IOException ex)
{
@@ -123,4 +139,31 @@
logger_.info( LogUtil.throwableToString(ex) );
}
}
+
+ private void doReadRepair(Row row, ReadMessage readMessage)
+ {
+ if ( DatabaseDescriptor.getConsistencyCheck() )
+ {
+ List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(readMessage.key());
+ /* Remove the local storage endpoint from the list. */
+ endpoints.remove( StorageService.getLocalStorageEndPoint() );
+
+ if(readMessage.getColumnNames().size() == 0)
+ {
+ if( readMessage.start() >= 0 && readMessage.count() < Integer.MAX_VALUE)
+ {
+ StorageService.instance().doConsistencyCheck(row, endpoints, readMessage.columnFamily_column(), readMessage.start(), readMessage.count());
+ }
+
+ if( readMessage.sinceTimestamp() > 0)
+ {
+ StorageService.instance().doConsistencyCheck(row, endpoints, readMessage.columnFamily_column(), readMessage.sinceTimestamp());
+ }
+ }
+ else
+ {
+ StorageService.instance().doConsistencyCheck(row, endpoints, readMessage.columnFamily_column(), readMessage.getColumnNames());
+ }
+ }
+ }
}