You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2009/03/27 06:42:00 UTC
svn commit: r759028 [1/2] -
/incubator/cassandra/trunk/src/org/apache/cassandra/service/
Author: alakshman
Date: Fri Mar 27 05:41:59 2009
New Revision: 759028
URL: http://svn.apache.org/viewvc?rev=759028&view=rev
Log:
Basic implementation of multiget() functionality. Fix to how read-repair is done in the ConsistencyManager.
Removed:
incubator/cassandra/trunk/src/org/apache/cassandra/service/RangeVerbHandler.java
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java
incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java
incubator/cassandra/trunk/src/org/apache/cassandra/service/HttpRequestVerbHandler.java
incubator/cassandra/trunk/src/org/apache/cassandra/service/QuorumResponseHandler.java
incubator/cassandra/trunk/src/org/apache/cassandra/service/ReadResponseResolver.java
incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageLoadBalancer.java
incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java
incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageServiceMBean.java
incubator/cassandra/trunk/src/org/apache/cassandra/service/TokenUpdateVerbHandler.java
incubator/cassandra/trunk/src/org/apache/cassandra/service/WriteResponseResolver.java
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java?rev=759028&r1=759027&r2=759028&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java Fri Mar 27 05:41:59 2009
@@ -18,29 +18,27 @@
package org.apache.cassandra.service;
-import java.io.BufferedInputStream;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
+import com.facebook.thrift.*;
+import com.facebook.thrift.server.*;
+import com.facebook.thrift.server.TThreadPoolServer.Options;
+import com.facebook.thrift.transport.*;
+import com.facebook.thrift.protocol.*;
+import com.facebook.fb303.FacebookBase;
+import com.facebook.fb303.fb_status;
+import java.io.*;
+import java.util.Arrays;
import java.util.Collection;
-import java.util.Iterator;
+import java.util.HashMap;
import java.util.List;
+import java.util.ArrayList;
+import java.util.Iterator;
import java.util.Map;
import java.util.Set;
-import java.util.Arrays;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
-
-import org.apache.log4j.Logger;
-
-import com.facebook.fb303.FacebookBase;
-import com.facebook.fb303.fb_status;
-import com.facebook.thrift.TException;
-import com.facebook.thrift.protocol.TBinaryProtocol;
-import com.facebook.thrift.protocol.TProtocolFactory;
-import com.facebook.thrift.server.TThreadPoolServer;
-import com.facebook.thrift.server.TThreadPoolServer.Options;
-import com.facebook.thrift.transport.TServerSocket;
+import java.util.concurrent.TimeoutException;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.net.*;
+import org.apache.cassandra.utils.*;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql.common.CqlResult;
@@ -50,21 +48,18 @@
import org.apache.cassandra.db.Row;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.RowMutationMessage;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.IAsyncResult;
import org.apache.cassandra.utils.LogUtil;
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.io.DataOutputBuffer;
-import org.apache.cassandra.dht.OrderPreservingPartitioner;
-
+import org.apache.log4j.Logger;
/**
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
*/
-public class CassandraServer extends FacebookBase implements
- Cassandra.Iface
+public class CassandraServer extends FacebookBase implements Cassandra.Iface
{
private static Logger logger_ = Logger.getLogger(CassandraServer.class);
@@ -81,7 +76,7 @@
storageService = StorageService.instance();
}
- public CassandraServer()
+ public CassandraServer() throws Throwable
{
super("CassandraServer");
// Create the instance of the storage service
@@ -93,7 +88,7 @@
* specified port.
*/
public void start() throws Throwable
- {
+ {
LogUtil.init();
//LogUtil.setLogLevel("com.facebook", "DEBUG");
// Start the storage service
@@ -125,7 +120,7 @@
{
throw new CassandraException("No row exists for key " + key);
}
- Map<String, ColumnFamily> cfMap = row.getColumnFamilyMap();
+ Map<String, ColumnFamily> cfMap = row.getColumnFamilies();
if (cfMap == null || cfMap.size() == 0)
{
logger_ .info("ERROR ColumnFamily " + columnFamily + " map is missing.....: " + " key:" + key );
@@ -167,7 +162,7 @@
throw new CassandraException("ERROR No row for this key .....: " + key);
}
- Map<String, ColumnFamily> cfMap = row.getColumnFamilyMap();
+ Map<String, ColumnFamily> cfMap = row.getColumnFamilies();
if (cfMap == null || cfMap.size() == 0)
{
logger_ .info("ERROR ColumnFamily " + columnFamily_column + " map is missing.....: " + " key:" + key);
@@ -282,7 +277,7 @@
throw new CassandraException("ERROR No row for this key .....: " + key);
}
- Map<String, ColumnFamily> cfMap = row.getColumnFamilyMap();
+ Map<String, ColumnFamily> cfMap = row.getColumnFamilies();
if (cfMap == null || cfMap.size() == 0)
{
logger_ .info("ERROR ColumnFamily " + columnFamily_column + " map is missing.....: " + " key:" + key);
@@ -350,7 +345,7 @@
throw new CassandraException("ERROR No row for this key .....: " + key);
}
- Map<String, ColumnFamily> cfMap = row.getColumnFamilyMap();
+ Map<String, ColumnFamily> cfMap = row.getColumnFamilies();
if (cfMap == null || cfMap.size() == 0)
{
logger_ .info("ERROR ColumnFamily map is missing.....: "
@@ -422,7 +417,7 @@
throw new CassandraException("ERROR No row for this key .....: " + key);
}
- Map<String, ColumnFamily> cfMap = row.getColumnFamilyMap();
+ Map<String, ColumnFamily> cfMap = row.getColumnFamilies();
if (cfMap == null || cfMap.size() == 0)
{
logger_ .info("ERROR ColumnFamily map is missing.....: "
@@ -486,34 +481,132 @@
public boolean batch_insert_blocking(batch_mutation_t batchMutation)
{
- logger_.debug("batch_insert_blocking");
- RowMutation rm = RowMutation.getRowMutation(batchMutation);
- return StorageProxy.insertBlocking(rm);
- }
+ // 1. Get the N nodes from storage service where the data needs to be
+ // replicated
+ // 2. Construct a message for read\write
+ // 3. SendRR ( to all the nodes above )
+ // 4. Wait for a response from atleast X nodes where X <= N
+ // 5. return success
+ boolean result = false;
+ try
+ {
+ logger_.warn(" batch_insert_blocking");
+ validateTable(batchMutation.table);
+ IResponseResolver<Boolean> writeResponseResolver = new WriteResponseResolver();
+ QuorumResponseHandler<Boolean> quorumResponseHandler = new QuorumResponseHandler<Boolean>(
+ DatabaseDescriptor.getReplicationFactor(),
+ writeResponseResolver);
+ EndPoint[] endpoints = storageService.getNStorageEndPoint(batchMutation.key);
+ // TODO: throw a thrift exception if we do not have N nodes
+
+ logger_.debug(" Creating the row mutation");
+ RowMutation rm = new RowMutation(batchMutation.table,
+ batchMutation.key.trim());
+ Set keys = batchMutation.cfmap.keySet();
+ Iterator keyIter = keys.iterator();
+ while (keyIter.hasNext())
+ {
+ Object key = keyIter.next(); // Get the next key.
+ List<column_t> list = batchMutation.cfmap.get(key);
+ for (column_t columnData : list)
+ {
+ rm.add(key.toString() + ":" + columnData.columnName,
+ columnData.value.getBytes(), columnData.timestamp);
+ }
+ }
+
+ RowMutationMessage rmMsg = new RowMutationMessage(rm);
+ Message message = new Message(StorageService.getLocalStorageEndPoint(),
+ StorageService.mutationStage_,
+ StorageService.mutationVerbHandler_,
+ new Object[]{ rmMsg }
+ );
+ MessagingService.getMessagingInstance().sendRR(message, endpoints,
+ quorumResponseHandler);
+ logger_.debug(" Calling quorum response handler's get");
+ result = quorumResponseHandler.get();
+
+ // TODO: if the result is false that means the writes to all the
+ // servers failed hence we need to throw an exception or return an
+ // error back to the client so that it can take appropriate action.
+ }
+ catch (Exception e)
+ {
+ logger_.info( LogUtil.throwableToString(e) );
+ }
+ return result;
+
+ }
public void batch_insert(batch_mutation_t batchMutation)
{
- logger_.debug("batch_insert");
- RowMutation rm = RowMutation.getRowMutation(batchMutation);
- StorageProxy.insert(rm);
- }
+ // 1. Get the N nodes from storage service where the data needs to be
+ // replicated
+ // 2. Construct a message for read\write
+ // 3. SendRR ( to all the nodes above )
+ // 4. Wait for a response from atleast X nodes where X <= N
+ // 5. return success
- public void remove(String tablename, String key, String columnFamily_column)
- {
- throw new UnsupportedOperationException("Remove is coming soon");
+ try
+ {
+ logger_.debug(" batch_insert");
+ logger_.debug(" Creating the row mutation");
+ validateTable(batchMutation.table);
+ RowMutation rm = new RowMutation(batchMutation.table,
+ batchMutation.key.trim());
+ if(batchMutation.cfmap != null)
+ {
+ Set keys = batchMutation.cfmap.keySet();
+ Iterator keyIter = keys.iterator();
+ while (keyIter.hasNext())
+ {
+ Object key = keyIter.next(); // Get the next key.
+ List<column_t> list = batchMutation.cfmap.get(key);
+ for (column_t columnData : list)
+ {
+ rm.add(key.toString() + ":" + columnData.columnName,
+ columnData.value.getBytes(), columnData.timestamp);
+
+ }
+ }
+ }
+ if(batchMutation.cfmapdel != null)
+ {
+ Set keys = batchMutation.cfmapdel.keySet();
+ Iterator keyIter = keys.iterator();
+ while (keyIter.hasNext())
+ {
+ Object key = keyIter.next(); // Get the next key.
+ List<column_t> list = batchMutation.cfmapdel.get(key);
+ for (column_t columnData : list)
+ {
+ rm.delete(key.toString() + ":" + columnData.columnName);
+ }
+ }
+ }
+ StorageProxy.insert(rm);
+ }
+ catch (Exception e)
+ {
+ logger_.info( LogUtil.throwableToString(e) );
+ }
+ return;
}
- public boolean remove(String tablename, String key, String columnFamily_column, long timestamp, int block_for)
+ public void remove(String tablename, String key, String columnFamily_column)
{
- logger_.debug("remove");
- RowMutation rm = new RowMutation(tablename, key.trim());
- rm.delete(columnFamily_column, timestamp);
- if (block_for > 0) {
- return StorageProxy.insertBlocking(rm);
- } else {
+ try
+ {
+ validateTable(tablename);
+ RowMutation rm = new RowMutation(tablename, key.trim());
+ rm.delete(columnFamily_column);
StorageProxy.insert(rm);
- return true;
- }
+ }
+ catch (Exception e)
+ {
+ logger_.debug( LogUtil.throwableToString(e) );
+ }
+ return;
}
public List<superColumn_t> get_slice_super_by_names(String tablename, String key, String columnFamily, List<String> superColumnNames) throws CassandraException, TException
@@ -590,7 +683,7 @@
throw new CassandraException("ERROR No row for this key .....: " + key);
}
- Map<String, ColumnFamily> cfMap = row.getColumnFamilyMap();
+ Map<String, ColumnFamily> cfMap = row.getColumnFamilies();
if (cfMap == null || cfMap.size() == 0)
{
logger_ .info("ERROR ColumnFamily map is missing.....: "
@@ -665,7 +758,7 @@
throw new CassandraException("ERROR No row for this key .....: " + key);
}
- Map<String, ColumnFamily> cfMap = row.getColumnFamilyMap();
+ Map<String, ColumnFamily> cfMap = row.getColumnFamilies();
if (cfMap == null || cfMap.size() == 0)
{
logger_ .info("ERROR ColumnFamily map is missing.....: "
@@ -721,16 +814,110 @@
public boolean batch_insert_superColumn_blocking(batch_mutation_super_t batchMutationSuper)
{
- logger_.debug("batch_insert_SuperColumn_blocking");
- RowMutation rm = RowMutation.getRowMutation(batchMutationSuper);
- return StorageProxy.insertBlocking(rm);
+ boolean result = false;
+ try
+ {
+ logger_.warn(" batch_insert_SuperColumn_blocking");
+ logger_.debug(" Creating the row mutation");
+ validateTable(batchMutationSuper.table);
+ RowMutation rm = new RowMutation(batchMutationSuper.table,
+ batchMutationSuper.key.trim());
+ Set keys = batchMutationSuper.cfmap.keySet();
+ Iterator keyIter = keys.iterator();
+ while (keyIter.hasNext())
+ {
+ Object key = keyIter.next(); // Get the next key.
+ List<superColumn_t> list = batchMutationSuper.cfmap.get(key);
+ for (superColumn_t superColumnData : list)
+ {
+ if(superColumnData.columns.size() != 0 )
+ {
+ for (column_t columnData : superColumnData.columns)
+ {
+ rm.add(key.toString() + ":" + superColumnData.name +":" + columnData.columnName,
+ columnData.value.getBytes(), columnData.timestamp);
+ }
+ }
+ else
+ {
+ rm.add(key.toString() + ":" + superColumnData.name, new byte[0], 0);
+ }
+ }
+ }
+ StorageProxy.insert(rm);
+ }
+ catch (Exception e)
+ {
+ logger_.info( LogUtil.throwableToString(e) );
+ }
+ return result;
+
}
-
public void batch_insert_superColumn(batch_mutation_super_t batchMutationSuper)
{
- logger_.debug("batch_insert_SuperColumn");
- RowMutation rm = RowMutation.getRowMutation(batchMutationSuper);
- StorageProxy.insert(rm);
+ try
+ {
+ logger_.debug(" batch_insert");
+ logger_.debug(" Creating the row mutation");
+ validateTable(batchMutationSuper.table);
+ RowMutation rm = new RowMutation(batchMutationSuper.table,
+ batchMutationSuper.key.trim());
+ if(batchMutationSuper.cfmap != null)
+ {
+ Set keys = batchMutationSuper.cfmap.keySet();
+ Iterator keyIter = keys.iterator();
+ while (keyIter.hasNext())
+ {
+ Object key = keyIter.next(); // Get the next key.
+ List<superColumn_t> list = batchMutationSuper.cfmap.get(key);
+ for (superColumn_t superColumnData : list)
+ {
+ if(superColumnData.columns.size() != 0 )
+ {
+ for (column_t columnData : superColumnData.columns)
+ {
+ rm.add(key.toString() + ":" + superColumnData.name +":" + columnData.columnName,
+ columnData.value.getBytes(), columnData.timestamp);
+ }
+ }
+ else
+ {
+ rm.add(key.toString() + ":" + superColumnData.name, new byte[0], 0);
+ }
+ }
+ }
+ }
+ if(batchMutationSuper.cfmapdel != null)
+ {
+ Set keys = batchMutationSuper.cfmapdel.keySet();
+ Iterator keyIter = keys.iterator();
+ while (keyIter.hasNext())
+ {
+ Object key = keyIter.next(); // Get the next key.
+ List<superColumn_t> list = batchMutationSuper.cfmapdel.get(key);
+ for (superColumn_t superColumnData : list)
+ {
+ if(superColumnData.columns.size() != 0 )
+ {
+ for (column_t columnData : superColumnData.columns)
+ {
+ rm.delete(key.toString() + ":" + superColumnData.name +":" + columnData.columnName);
+ }
+ }
+ else
+ {
+ rm.delete(key.toString() + ":" + superColumnData.name);
+ }
+ }
+ }
+ }
+ StorageProxy.insert(rm);
+ }
+ catch (Exception e)
+ {
+ logger_.info( LogUtil.throwableToString(e) );
+ }
+ return;
}
public String getStringProperty(String propertyName) throws TException
@@ -817,76 +1004,7 @@
}
return result;
}
-
- public List<String> get_range(String tablename, final String startkey) throws CassandraException
- {
- if (!(StorageService.getPartitioner() instanceof OrderPreservingPartitioner)) {
- throw new CassandraException("range queries may only be performed against an order-preserving partitioner");
- }
-
- logger_.debug("get_range");
-
- // send request
- Message message;
- DataOutputBuffer dob = new DataOutputBuffer();
- try
- {
- dob.writeUTF(startkey);
- }
- catch (IOException e)
- {
- logger_.error("unable to write startkey", e);
- throw new RuntimeException(e);
- }
- byte[] messageBody = Arrays.copyOf(dob.getData(), dob.getLength());
- message = new Message(StorageService.getLocalStorageEndPoint(),
- StorageService.readStage_,
- StorageService.rangeVerbHandler_,
- messageBody);
- EndPoint endPoint;
- try
- {
- endPoint = StorageService.instance().findSuitableEndPoint(startkey);
- }
- catch (Exception e)
- {
- throw new CassandraException("Unable to find endpoint for " + startkey);
- }
- IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(message, endPoint);
-
- // read response
- // TODO send more requests if we need to span multiple nodes (or can we just let client worry about that,
- // since they have to handle multiple requests anyway?)
- byte[] responseBody;
- try
- {
- responseBody = (byte[]) iar.get(2 * DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS)[0];
- }
- catch (TimeoutException e)
- {
- throw new RuntimeException(e);
- }
- DataInputBuffer bufIn = new DataInputBuffer();
- bufIn.reset(responseBody, responseBody.length);
-
- // turn into List
- List<String> keys = new ArrayList<String>();
- while (bufIn.getPosition() < responseBody.length)
- {
- try
- {
- keys.add(bufIn.readUTF());
- }
- catch (IOException e)
- {
- logger_.error("bad utf", e);
- throw new RuntimeException(e);
- }
- }
-
- return keys;
- }
-
+
/*
* This method is used to ensure that all keys
* prior to the specified key, as dtermined by
@@ -924,15 +1042,6 @@
public static void main(String[] args) throws Throwable
{
int port = 9160;
-
- Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler()
- {
- public void uncaughtException(Thread t, Throwable e)
- {
- logger_.error("Fatal exception in thread " + t, e);
- }
- });
-
try
{
CassandraServer peerStorageServer = new CassandraServer();
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java?rev=759028&r1=759027&r2=759028&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java Fri Mar 27 05:41:59 2009
@@ -58,6 +58,11 @@
if ( responses_.size() == ConsistencyManager.this.replicas_.size() )
handleDigestResponses();
}
+
+ public void attachContext(Object o)
+ {
+ throw new UnsupportedOperationException("This operation is not currently supported.");
+ }
private void handleDigestResponses()
{
@@ -91,7 +96,8 @@
replicas_.add(StorageService.getLocalStorageEndPoint());
IAsyncCallback responseHandler = new DataRepairHandler(ConsistencyManager.this.replicas_.size(), readResponseResolver);
String table = DatabaseDescriptor.getTables().get(0);
- ReadMessage readMessage = new ReadMessage(table, row_.key(), columnFamily_);
+ ReadMessage readMessage = constructReadMessage(false);
+ // ReadMessage readMessage = new ReadMessage(table, row_.key(), columnFamily_);
Message message = ReadMessage.makeReadMessage(readMessage);
MessagingService.getMessagingInstance().sendRR(message, replicas_.toArray( new EndPoint[0] ), responseHandler);
}
@@ -116,10 +122,14 @@
if ( responses_.size() == majority_ )
{
String messageId = message.getMessageId();
- readRepairTable_.put(messageId, messageId, this);
- // handleResponses();
+ readRepairTable_.put(messageId, messageId, this);
}
}
+
+ public void attachContext(Object o)
+ {
+ throw new UnsupportedOperationException("This operation is not currently supported.");
+ }
public void callMe(String key, String value)
{
@@ -176,30 +186,8 @@
public void run()
{
- logger_.debug(" Run the consistency checks for " + columnFamily_);
- String table = DatabaseDescriptor.getTables().get(0);
- ReadMessage readMessageDigestOnly = null;
- if(columnNames_.size() == 0)
- {
- if( start_ >= 0 && count_ < Integer.MAX_VALUE)
- {
- readMessageDigestOnly = new ReadMessage(table, row_.key(), columnFamily_, start_, count_);
- }
- else if(sinceTimestamp_ > 0)
- {
- readMessageDigestOnly = new ReadMessage(table, row_.key(), columnFamily_, sinceTimestamp_);
- }
- else
- {
- readMessageDigestOnly = new ReadMessage(table, row_.key(), columnFamily_);
- }
- }
- else
- {
- readMessageDigestOnly = new ReadMessage(table, row_.key(), columnFamily_, columnNames_);
-
- }
- readMessageDigestOnly.setIsDigestQuery(true);
+ logger_.debug(" Run the consistency checks for " + columnFamily_);
+ ReadMessage readMessageDigestOnly = constructReadMessage(true);
try
{
Message messageDigestOnly = ReadMessage.makeReadMessage(readMessageDigestOnly);
@@ -211,4 +199,33 @@
logger_.info(LogUtil.throwableToString(ex));
}
}
+
+ private ReadMessage constructReadMessage(boolean isDigestQuery)
+ {
+ ReadMessage readMessage = null;
+ String table = DatabaseDescriptor.getTables().get(0);
+
+ if(columnNames_.size() == 0)
+ {
+ if( start_ >= 0 && count_ < Integer.MAX_VALUE)
+ {
+ readMessage = new ReadMessage(table, row_.key(), columnFamily_, start_, count_);
+ }
+ else if(sinceTimestamp_ > 0)
+ {
+ readMessage = new ReadMessage(table, row_.key(), columnFamily_, sinceTimestamp_);
+ }
+ else
+ {
+ readMessage = new ReadMessage(table, row_.key(), columnFamily_);
+ }
+ }
+ else
+ {
+ readMessage = new ReadMessage(table, row_.key(), columnFamily_, columnNames_);
+
+ }
+ readMessage.setIsDigestQuery(isDigestQuery);
+ return readMessage;
+ }
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/HttpRequestVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/HttpRequestVerbHandler.java?rev=759028&r1=759027&r2=759028&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/HttpRequestVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/HttpRequestVerbHandler.java Fri Mar 27 05:41:59 2009
@@ -23,18 +23,15 @@
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
import java.lang.management.RuntimeMXBean;
+import java.math.BigInteger;
import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.log4j.Logger;
+import java.util.*;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.CalloutDeployMessage;
import org.apache.cassandra.db.CalloutManager;
import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.dht.Range;
@@ -51,6 +48,8 @@
import org.apache.cassandra.net.http.HttpWriteResponse;
import org.apache.cassandra.procedures.GroovyScriptRunner;
import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.net.*;
/*
* This class handles the incoming HTTP request after
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/QuorumResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/QuorumResponseHandler.java?rev=759028&r1=759027&r2=759028&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/QuorumResponseHandler.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/QuorumResponseHandler.java Fri Mar 27 05:41:59 2009
@@ -26,12 +26,13 @@
import java.util.concurrent.TimeoutException;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.WriteResponseMessage;
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.LogUtil;
import org.apache.log4j.Logger;
-
+import org.apache.cassandra.utils.*;
/**
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
*/
@@ -122,4 +123,9 @@
lock_.unlock();
}
}
+
+ public void attachContext(Object o)
+ {
+ throw new UnsupportedOperationException("This operation is not supported in this version of the callback handler");
+ }
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/ReadResponseResolver.java?rev=759028&r1=759027&r2=759028&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/ReadResponseResolver.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/ReadResponseResolver.java Fri Mar 27 05:41:59 2009
@@ -37,8 +37,8 @@
import org.apache.log4j.Logger;
-/*
- * This class is used by all read functions and is called by the Qorum
+/**
+ * This class is used by all read functions and is called by the Quorum
* when atleast a few of the servers ( few is specified in Quorum)
* have sent the response . The resolve fn then schedules read repair
* and resolution of read data from the various servers.
@@ -46,7 +46,6 @@
*/
public class ReadResponseResolver implements IResponseResolver<Row>
{
-
private static Logger logger_ = Logger.getLogger(WriteResponseResolver.class);
/*
@@ -101,15 +100,16 @@
logger_.info(LogUtil.throwableToString(ex));
}
}
- // If there was a digest query compare it withh all teh data digests
- // If there is a mismatch then thwrow an exception so that read repair can happen.
+ // If there was a digest query compare it with all the data digests
+ // If there is a mismatch then throw an exception so that read repair can happen.
if(isDigestQuery)
{
for(Row row: rowList)
{
if( !Arrays.equals(row.digest(), digest) )
{
- throw new DigestMismatchException("The Digest does not match");
+ /* Wrap the key as the context in this exception */
+ throw new DigestMismatchException(row.key());
}
}
}
@@ -141,13 +141,13 @@
continue;
// create the row mutation message based on the diff and schedule a read repair
RowMutation rowMutation = new RowMutation(table, key);
- Map<String, ColumnFamily> columnFamilies = diffRow.getColumnFamilyMap();
+ Map<String, ColumnFamily> columnFamilies = diffRow.getColumnFamilies();
Set<String> cfNames = columnFamilies.keySet();
for ( String cfName : cfNames )
{
ColumnFamily cf = columnFamilies.get(cfName);
- rowMutation.add(cf);
+ rowMutation.add(cfName, cf);
}
RowMutationMessage rowMutationMessage = new RowMutationMessage(rowMutation);
// schedule the read repair
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageLoadBalancer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageLoadBalancer.java?rev=759028&r1=759027&r2=759028&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageLoadBalancer.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageLoadBalancer.java Fri Mar 27 05:41:59 2009
@@ -19,30 +19,33 @@
package org.apache.cassandra.service;
import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
+import java.math.BigInteger;
+import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.log4j.Logger;
-
import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.SingleThreadedStage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.concurrent.ThreadFactoryImpl;
-import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.dht.LeaveJoinProtocolImpl;
+import org.apache.cassandra.dht.Range;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndPointState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.IEndPointStateChangeSubscriber;
+import org.apache.cassandra.io.SSTable;
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.net.*;
+import org.apache.cassandra.utils.*;
/*
* The load balancing algorithm here is an implementation of
@@ -161,6 +164,7 @@
if ( isMoveable_.get() )
{
MoveMessage moveMessage = (MoveMessage)message.getMessageBody()[0];
+ BigInteger targetToken = moveMessage.getTargetToken();
/* Start the leave operation and join the ring at the position specified */
isMoveable_.set(false);
}
@@ -392,18 +396,18 @@
class MoveMessage implements Serializable
{
- private Token targetToken_;
+ private BigInteger targetToken_;
private MoveMessage()
{
}
- MoveMessage(Token targetToken)
+ MoveMessage(BigInteger targetToken)
{
targetToken_ = targetToken;
}
- Token getTargetToken()
+ BigInteger getTargetToken()
{
return targetToken_;
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java?rev=759028&r1=759027&r2=759028&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java Fri Mar 27 05:41:59 2009
@@ -26,9 +26,6 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-
-import org.apache.commons.lang.StringUtils;
-
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ReadMessage;
import org.apache.cassandra.db.ReadResponseMessage;
@@ -39,6 +36,7 @@
import org.apache.cassandra.db.TouchMessage;
import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.IAsyncResult;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
@@ -48,35 +46,35 @@
public class StorageProxy
{
- private static Logger logger_ = Logger.getLogger(StorageProxy.class);
+ private static Logger logger_ = Logger.getLogger(StorageProxy.class);
/**
* This method is responsible for creating Message to be
* sent over the wire to N replicas where some of the replicas
* may be hints.
*/
- private static Map<EndPoint, Message> createWriteMessages(RowMutation rm, Map<EndPoint, EndPoint> endpointMap) throws IOException
+ private static Map<EndPoint, Message> createWriteMessages(RowMutationMessage rmMessage, Map<EndPoint, EndPoint> endpointMap) throws IOException
{
- Map<EndPoint, Message> messageMap = new HashMap<EndPoint, Message>();
- Message message = rm.makeRowMutationMessage();
-
- for (Map.Entry<EndPoint, EndPoint> entry : endpointMap.entrySet())
- {
- EndPoint target = entry.getKey();
- EndPoint hint = entry.getValue();
+ Map<EndPoint, Message> messageMap = new HashMap<EndPoint, Message>();
+ Message message = RowMutationMessage.makeRowMutationMessage(rmMessage);
+
+ Set<EndPoint> targets = endpointMap.keySet();
+ for( EndPoint target : targets )
+ {
+ EndPoint hint = endpointMap.get(target);
if ( !target.equals(hint) )
- {
- Message hintedMessage = rm.makeRowMutationMessage();
- hintedMessage.addHeader(RowMutation.HINT, EndPoint.toBytes(hint) );
- logger_.debug("Sending the hint of " + target.getHost() + " to " + hint.getHost());
- messageMap.put(target, hintedMessage);
- }
- else
- {
- messageMap.put(target, message);
- }
- }
- return messageMap;
+ {
+ Message hintedMessage = RowMutationMessage.makeRowMutationMessage(rmMessage);
+ hintedMessage.addHeader(RowMutationMessage.hint_, EndPoint.toBytes(hint) );
+ logger_.debug("Sending the hint of " + target.getHost() + " to " + hint.getHost());
+ messageMap.put(target, hintedMessage);
+ }
+ else
+ {
+ messageMap.put(target, message);
+ }
+ }
+ return messageMap;
}
/**
@@ -84,96 +82,124 @@
* across all replicas. This method will take care
* of the possibility of a replica being down and hint
* the data across to some other replica.
- * @param rm the mutation to be applied across the replicas
+ * @param RowMutation the mutation to be applied
+ * across the replicas
*/
public static void insert(RowMutation rm)
- {
+ {
/*
* Get the N nodes from storage service where the data needs to be
* replicated
* Construct a message for write
* Send them asynchronously to the replicas.
*/
- assert rm.key() != null;
-
- try
- {
- Map<EndPoint, EndPoint> endpointMap = StorageService.instance().getNStorageEndPointMap(rm.key());
- // TODO: throw a thrift exception if we do not have N nodes
- Map<EndPoint, Message> messageMap = createWriteMessages(rm, endpointMap);
- logger_.debug("insert writing to [" + StringUtils.join(messageMap.keySet(), ", ") + "]");
- for (Map.Entry<EndPoint, Message> entry : messageMap.entrySet())
- {
- MessagingService.getMessagingInstance().sendOneWay(entry.getValue(), entry.getKey());
- }
- }
+ try
+ {
+ logger_.debug(" insert");
+ Map<EndPoint, EndPoint> endpointMap = StorageService.instance().getNStorageEndPointMap(rm.key());
+ // TODO: throw a thrift exception if we do not have N nodes
+ RowMutationMessage rmMsg = new RowMutationMessage(rm);
+ /* Create the write messages to be sent */
+ Map<EndPoint, Message> messageMap = createWriteMessages(rmMsg, endpointMap);
+ Set<EndPoint> endpoints = messageMap.keySet();
+ for(EndPoint endpoint : endpoints)
+ {
+ MessagingService.getMessagingInstance().sendOneWay(messageMap.get(endpoint), endpoint);
+ }
+ }
catch (Exception e)
{
- logger_.error( LogUtil.throwableToString(e) );
+ logger_.info( LogUtil.throwableToString(e) );
}
return;
}
- public static boolean insertBlocking(RowMutation rm)
+
+ private static Map<String, Message> constructMessages(Map<String, ReadMessage> readMessages) throws IOException
{
- assert rm.key() != null;
-
- try
+ Map<String, Message> messages = new HashMap<String, Message>();
+ Set<String> keys = readMessages.keySet();
+ for ( String key : keys )
{
- Message message = rm.makeRowMutationMessage();
-
- IResponseResolver<Boolean> writeResponseResolver = new WriteResponseResolver();
- QuorumResponseHandler<Boolean> quorumResponseHandler = new QuorumResponseHandler<Boolean>(
- DatabaseDescriptor.getReplicationFactor(),
- writeResponseResolver);
- EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(rm.key());
- logger_.debug("insertBlocking writing to [" + StringUtils.join(endpoints, ", ") + "]");
- // TODO: throw a thrift exception if we do not have N nodes
-
- MessagingService.getMessagingInstance().sendRR(message, endpoints, quorumResponseHandler);
- return quorumResponseHandler.get();
-
- // TODO: if the result is false that means the writes to all the
- // servers failed hence we need to throw an exception or return an
- // error back to the client so that it can take appropriate action.
- }
- catch (Exception e)
- {
- logger_.error( LogUtil.throwableToString(e) );
- return false;
- }
+ Message message = ReadMessage.makeReadMessage( readMessages.get(key) );
+ messages.put(key, message);
+ }
+ return messages;
}
- public static Row doReadProtocol(String key, ReadMessage readMessage) throws IOException,TimeoutException
+ private static IAsyncResult dispatchMessages(Map<String, EndPoint> endPoints, Map<String, Message> messages)
{
- EndPoint endPoint = null;
- try
+ Set<String> keys = endPoints.keySet();
+ EndPoint[] eps = new EndPoint[keys.size()];
+ Message[] msgs = new Message[keys.size()];
+
+ int i = 0;
+ for ( String key : keys )
{
- endPoint = StorageService.instance().findSuitableEndPoint(key);
+ eps[i] = endPoints.get(key);
+ msgs[i] = messages.get(key);
+ ++i;
}
- catch( Throwable ex)
+
+ IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(msgs, eps);
+ return iar;
+ }
+
+ /**
+ * This is an implementation for the multiget version.
+ * @param readMessages map of key --> ReadMessage to be sent
+ * @return map of key --> Row
+ * @throws IOException
+ * @throws TimeoutException
+ */
+ public static Map<String, Row> doReadProtocol(Map<String, ReadMessage> readMessages) throws IOException,TimeoutException
+ {
+ Map<String, Row> rows = new HashMap<String, Row>();
+ Set<String> keys = readMessages.keySet();
+ /* Find all the suitable endpoints for the keys */
+ Map<String, EndPoint> endPoints = StorageService.instance().findSuitableEndPoints(keys.toArray( new String[0] ));
+ /* Construct the messages to be sent out */
+ Map<String, Message> messages = constructMessages(readMessages);
+ /* Dispatch the messages to the respective endpoints */
+ IAsyncResult iar = dispatchMessages(endPoints, messages);
+ List<Object[]> results = iar.multiget(2*DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
+
+ for ( Object[] result : results )
{
- ex.printStackTrace();
- }
+ byte[] body = (byte[])result[0];
+ DataInputBuffer bufIn = new DataInputBuffer();
+ bufIn.reset(body, body.length);
+ ReadResponseMessage responseMessage = ReadResponseMessage.serializer().deserialize(bufIn);
+ Row row = responseMessage.row();
+ rows.put(row.key(), row);
+ }
+ return rows;
+ }
+
+ public static Row doReadProtocol(String key, ReadMessage readMessage) throws IOException,TimeoutException
+ {
+ Row row = null;
+ EndPoint endPoint = StorageService.instance().findSuitableEndPoint(key);
if(endPoint != null)
{
Message message = ReadMessage.makeReadMessage(readMessage);
+ message.addHeader(ReadMessage.doRepair_, ReadMessage.doRepair_.getBytes());
IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(message, endPoint);
Object[] result = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
byte[] body = (byte[])result[0];
DataInputBuffer bufIn = new DataInputBuffer();
bufIn.reset(body, body.length);
ReadResponseMessage responseMessage = ReadResponseMessage.serializer().deserialize(bufIn);
- return responseMessage.row();
+ row = responseMessage.row();
}
else
{
logger_.warn(" Alert : Unable to find a suitable end point for the key : " + key );
}
- return null;
+ return row;
}
- static void touch_local (String tablename, String key, boolean fData ) throws IOException
+ static void touch_local(String tablename, String key, boolean fData ) throws IOException
{
Table table = Table.open( tablename );
table.touch(key, fData);
@@ -237,10 +263,8 @@
weakTouchProtocol(tablename, key, fData);
break;
}
- }
+ }
-
-
public static Row readProtocol(String tablename, String key, String columnFamily, List<String> columnNames, StorageService.ConsistencyLevel consistencyLevel) throws Exception
{
Row row = null;
@@ -277,9 +301,7 @@
break;
}
}
- return row;
-
-
+ return row;
}
public static Row readProtocol(String tablename, String key, String columnFamily, int start, int count, StorageService.ConsistencyLevel consistencyLevel) throws Exception
@@ -321,6 +343,26 @@
return row;
}
+ public static Map<String, Row> readProtocol(String tablename, String[] keys, String columnFamily, int start, int count, StorageService.ConsistencyLevel consistencyLevel) throws Exception
+ {
+ Map<String, Row> rows = new HashMap<String, Row>();
+ switch ( consistencyLevel )
+ {
+ case WEAK:
+ rows = weakReadProtocol(tablename, keys, columnFamily, start, count);
+ break;
+
+ case STRONG:
+ rows = strongReadProtocol(tablename, keys, columnFamily, start, count);
+ break;
+
+ default:
+ rows = weakReadProtocol(tablename, keys, columnFamily, start, count);
+ break;
+ }
+ return rows;
+ }
+
public static Row readProtocol(String tablename, String key, String columnFamily, long sinceTimestamp, StorageService.ConsistencyLevel consistencyLevel) throws Exception
{
Row row = null;
@@ -374,18 +416,21 @@
return row;
}
- /*
- * This function executes the read protocol.
- // 1. Get the N nodes from storage service where the data needs to be
- // replicated
- // 2. Construct a message for read\write
- * 3. Set one of teh messages to get teh data and teh rest to get teh digest
- // 4. SendRR ( to all the nodes above )
- // 5. Wait for a response from atleast X nodes where X <= N and teh data node
- * 6. If the digest matches return teh data.
- * 7. else carry out read repair by getting data from all the nodes.
- // 5. return success
- *
+ /**
+ * This function executes the read protocol.
+ * 1. Get the N nodes from storage service where the data needs to be replicated
+ * 2. Construct a message for read\write
+ * 3. Set one of teh messages to get teh data and teh rest to get teh digest
+ * 4. SendRR ( to all the nodes above )
+ * 5. Wait for a response from atleast X nodes where X <= N and teh data node
+ * 6. If the digest matches return teh data.
+ * 7. else carry out read repair by getting data from all the nodes.
+ * @param tablename the name of the table
+ * @param key the row key identifier
+ * @param columnFamily the column in Cassandra format
+ * @start the start position
+ * @count the number of columns we are interested in
+ * @throws IOException, TimeoutException
*/
public static Row strongReadProtocol(String tablename, String key, String columnFamily, int start, int count) throws IOException, TimeoutException
{
@@ -416,6 +461,49 @@
return row;
}
+ /**
+ * This is a multiget version of the above method.
+ * @param tablename
+ * @param keys
+ * @param columnFamily
+ * @param start
+ * @param count
+ * @return
+ * @throws IOException
+ * @throws TimeoutException
+ */
+ public static Map<String, Row> strongReadProtocol(String tablename, String[] keys, String columnFamily, int start, int count) throws IOException, TimeoutException
+ {
+ Map<String, Row> rows = new HashMap<String, Row>();
+ long startTime = System.currentTimeMillis();
+ // TODO: throw a thrift exception if we do not have N nodes
+ Map<String, ReadMessage[]> readMessages = new HashMap<String, ReadMessage[]>();
+ for (String key : keys )
+ {
+ ReadMessage[] readMessage = new ReadMessage[2];
+ if( start >= 0 && count < Integer.MAX_VALUE)
+ {
+ readMessage[0] = new ReadMessage(tablename, key, columnFamily, start, count);
+ }
+ else
+ {
+ readMessage[0] = new ReadMessage(tablename, key, columnFamily);
+ }
+ if( start >= 0 && count < Integer.MAX_VALUE)
+ {
+ readMessage[1] = new ReadMessage(tablename, key, columnFamily, start, count);
+ }
+ else
+ {
+ readMessage[1] = new ReadMessage(tablename, key, columnFamily);
+ }
+ readMessage[1].setIsDigestQuery(true);
+ }
+ rows = doStrongReadProtocol(readMessages);
+ logger_.debug("readProtocol: " + (System.currentTimeMillis() - startTime) + " ms.");
+ return rows;
+ }
+
public static Row strongReadProtocol(String tablename, String key, String columnFamily, long sinceTimestamp) throws IOException, TimeoutException
{
long startTime = System.currentTimeMillis();
@@ -431,8 +519,8 @@
return row;
}
- /*
- * This method performs the actual read from the replicas.
+ /**
+ * This method performs the read from the replicas.
* param @ key - key for which the data is required.
* param @ readMessage - the read message to get the actual data
* param @ readMessageDigest - the read message to get the digest.
@@ -454,11 +542,14 @@
EndPoint[] endPoints = new EndPoint[endpointList.size() + 1];
Message messages[] = new Message[endpointList.size() + 1];
- // first message is the data Point
+ /*
+ * First message is sent to the node that will actually get
+ * the data for us. The other two replicas are only sent a
+ * digest query.
+ */
endPoints[0] = dataPoint;
- messages[0] = message;
-
- for(int i=1; i < endPoints.length ; i++)
+ messages[0] = message;
+ for (int i=1; i < endPoints.length ; i++)
{
endPoints[i] = endpointList.get(i-1);
messages[i] = messageDigestOnly;
@@ -466,8 +557,7 @@
try
{
- MessagingService.getMessagingInstance().sendRR(messages, endPoints, quorumResponseHandler);
-
+ MessagingService.getMessagingInstance().sendRR(messages, endPoints, quorumResponseHandler);
long startTime2 = System.currentTimeMillis();
row = quorumResponseHandler.get();
logger_.debug("quorumResponseHandler: " + (System.currentTimeMillis() - startTime2)
@@ -490,8 +580,7 @@
readMessage.setIsDigestQuery(false);
logger_.info("DigestMismatchException: " + key);
Message messageRepair = ReadMessage.makeReadMessage(readMessage);
- MessagingService.getMessagingInstance().sendRR(messageRepair, endPoints,
- quorumResponseHandlerRepair);
+ MessagingService.getMessagingInstance().sendRR(messageRepair, endPoints, quorumResponseHandlerRepair);
try
{
row = quorumResponseHandlerRepair.get();
@@ -509,6 +598,111 @@
return row;
}
+ private static Map<String, Message[]> constructReplicaMessages(Map<String, ReadMessage[]> readMessages) throws IOException
+ {
+ Map<String, Message[]> messages = new HashMap<String, Message[]>();
+ Set<String> keys = readMessages.keySet();
+
+ for ( String key : keys )
+ {
+ Message[] msg = new Message[DatabaseDescriptor.getReplicationFactor()];
+ ReadMessage[] readMessage = readMessages.get(key);
+ msg[0] = ReadMessage.makeReadMessage( readMessage[0] );
+ for ( int i = 1; i < msg.length; ++i )
+ {
+ msg[i] = ReadMessage.makeReadMessage( readMessage[1] );
+ }
+ }
+ return messages;
+ }
+
+ private static MultiQuorumResponseHandler dispatchMessages(Map<String, ReadMessage[]> readMessages, Map<String, Message[]> messages) throws IOException
+ {
+ Set<String> keys = messages.keySet();
+ /* This maps the keys to the original data read messages */
+ Map<String, ReadMessage> readMessage = new HashMap<String, ReadMessage>();
+ /* This maps the keys to their respective endpoints/replicas */
+ Map<String, EndPoint[]> endpoints = new HashMap<String, EndPoint[]>();
+ /* Groups the messages that need to be sent to the individual keys */
+ Message[][] msgList = new Message[messages.size()][DatabaseDescriptor.getReplicationFactor()];
+ /* Respects the above grouping and provides the endpoints for the above messages */
+ EndPoint[][] epList = new EndPoint[messages.size()][DatabaseDescriptor.getReplicationFactor()];
+
+ int i = 0;
+ for ( String key : keys )
+ {
+ /* This is the primary */
+ EndPoint dataPoint = StorageService.instance().findSuitableEndPoint(key);
+ List<EndPoint> replicas = new ArrayList<EndPoint>( StorageService.instance().getNLiveStorageEndPoint(key) );
+ replicas.remove(dataPoint);
+ /* Get the messages to be sent index 0 is the data messages and index 1 is the digest message */
+ Message[] message = messages.get(key);
+ msgList[i][0] = message[0];
+ int N = DatabaseDescriptor.getReplicationFactor();
+ for ( int j = 1; j < N; ++j )
+ {
+ msgList[i][j] = message[1];
+ }
+ /* Get the endpoints to which the above messages need to be sent */
+ epList[i][0] = dataPoint;
+ for ( int j = 1; i < N; ++i )
+ {
+ epList[i][j] = replicas.get(j - 1);
+ }
+ /* Data ReadMessage associated with this key */
+ readMessage.put( key, readMessages.get(key)[0] );
+ /* EndPoints for this specific key */
+ endpoints.put(key, epList[i]);
+ ++i;
+ }
+
+ /* Handles the read semantics for this entire set of keys */
+ MultiQuorumResponseHandler quorumResponseHandlers = new MultiQuorumResponseHandler(readMessage, endpoints);
+ MessagingService.getMessagingInstance().sendRR(msgList, epList, quorumResponseHandlers);
+ return quorumResponseHandlers;
+ }
+
+ /**
+ * This method performs the read from the replicas for a bunch of keys.
+ * @param readMessages map of key --> readMessage[] of two entries where
+ * the first entry is the readMessage for the data and the second
+ * is the entry for the digest
+ * @return map containing key ---> Row
+ * @throws IOException, TimeoutException
+ */
+ private static Map<String, Row> doStrongReadProtocol(Map<String, ReadMessage[]> readMessages) throws IOException
+ {
+ Map<String, Row> rows = new HashMap<String, Row>();
+ /* Construct the messages to be sent to the replicas */
+ Map<String, Message[]> replicaMessages = constructReplicaMessages(readMessages);
+ /* Dispatch the messages to the different replicas */
+ MultiQuorumResponseHandler cb = dispatchMessages(readMessages, replicaMessages);
+ try
+ {
+ Row[] rows2 = cb.get();
+ for ( Row row : rows2 )
+ {
+ rows.put(row.key(), row);
+ }
+ }
+ catch ( TimeoutException ex )
+ {
+ logger_.info("Operation timed out waiting for responses ...");
+ logger_.info(LogUtil.throwableToString(ex));
+ }
+ return rows;
+ }
+
+ /**
+ * This version is used to retrieve the row associated with
+ * the specified key
+ * @param tablename name of the table that needs to be queried
+ * @param keys keys whose values we are interested in
+ * @param columnFamily name of the "column" we are interested in
+ * @param columns the columns we are interested in
+ * @return the interested row
+ * @throws Exception
+ */
public static Row weakReadProtocol(String tablename, String key, String columnFamily, List<String> columns) throws Exception
{
long startTime = System.currentTimeMillis();
@@ -530,11 +724,56 @@
return row;
}
- /*
+ /**
+ * This version is used when results for multiple keys needs to be
+ * retrieved.
+ *
+ * @param tablename name of the table that needs to be queried
+ * @param keys keys whose values we are interested in
+ * @param columnFamily name of the "column" we are interested in
+ * @param columns the columns we are interested in
+ * @return a mapping of key --> Row
+ * @throws Exception
+ */
+ public static Map<String, Row> weakReadProtocol(String tablename, String[] keys, String columnFamily, List<String> columns) throws Exception
+ {
+ Row row = null;
+ long startTime = System.currentTimeMillis();
+ Map<String, ReadMessage> readMessages = new HashMap<String, ReadMessage>();
+ for ( String key : keys )
+ {
+ ReadMessage readMessage = new ReadMessage(tablename, key, columnFamily, columns);
+ readMessages.put(key, readMessage);
+ }
+ /* Performs the multiget in parallel */
+ Map<String, Row> rows = doReadProtocol(readMessages);
+ /*
+ * Do the consistency checks for the keys that are being queried
+ * in the background.
+ */
+ for ( String key : keys )
+ {
+ List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(key);
+ /* Remove the local storage endpoint from the list. */
+ endpoints.remove( StorageService.getLocalStorageEndPoint() );
+ if ( endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
+ StorageService.instance().doConsistencyCheck(row, endpoints, columnFamily, columns);
+ }
+ return rows;
+ }
+
+ /**
* This function executes the read protocol locally and should be used only if consistency is not a concern.
* Read the data from the local disk and return if the row is NOT NULL. If the data is NULL do the read from
* one of the other replicas (in the same data center if possible) till we get the data. In the event we get
- * the data we perform consistency checks and figure out if any repairs need to be done to the replicas.
+ * the data we perform consistency checks and figure out if any repairs need to be done to the replicas.
+ * @param tablename name of the table that needs to be queried
+ * @param key key whose we are interested in
+ * @param columnFamily name of the "column" we are interested in
+ * @param start start index
+ * @param count the number of columns we are interested in
+ * @return the row associated with this key
+ * @throws Exception
*/
public static Row weakReadProtocol(String tablename, String key, String columnFamily, int start, int count) throws Exception
{
@@ -565,6 +804,55 @@
return row;
}
+ /**
+ * This version is used when results for multiple keys needs to be
+ * retrieved.
+ *
+ * @param tablename name of the table that needs to be queried
+ * @param keys keys whose values we are interested in
+ * @param columnFamily name of the "column" we are interested in
+ * @param start start index
+ * @param count the number of columns we are interested in
+ * @return a mapping of key --> Row
+ * @throws Exception
+ */
+ public static Map<String, Row> weakReadProtocol(String tablename, String[] keys, String columnFamily, int start, int count) throws Exception
+ {
+ Row row = null;
+ long startTime = System.currentTimeMillis();
+ Map<String, ReadMessage> readMessages = new HashMap<String, ReadMessage>();
+ for ( String key : keys )
+ {
+ ReadMessage readMessage = new ReadMessage(tablename, key, columnFamily, start, count);
+ readMessages.put(key, readMessage);
+ }
+ /* Performs the multiget in parallel */
+ Map<String, Row> rows = doReadProtocol(readMessages);
+ /*
+ * Do the consistency checks for the keys that are being queried
+ * in the background.
+ */
+ for ( String key : keys )
+ {
+ List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(key);
+ /* Remove the local storage endpoint from the list. */
+ endpoints.remove( StorageService.getLocalStorageEndPoint() );
+ if ( endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
+ StorageService.instance().doConsistencyCheck(row, endpoints, columnFamily, start, count);
+ }
+ return rows;
+ }
+
+ /**
+ * This version is used when retrieving a single key.
+ *
+ * @param tablename name of the table that needs to be queried
+ * @param key key whose we are interested in
+ * @param columnFamily name of the "column" we are interested in
+ * @param sinceTimestamp this is lower bound of the timestamp
+ * @return the row associated with this key
+ * @throws Exception
+ */
public static Row weakReadProtocol(String tablename, String key, String columnFamily, long sinceTimestamp) throws Exception
{
Row row = null;
@@ -585,5 +873,42 @@
StorageService.instance().doConsistencyCheck(row, endpoints, columnFamily, sinceTimestamp);
return row;
}
-
+
+ /**
+ * This version is used when results for multiple keys needs to be
+ * retrieved.
+ *
+ * @param tablename name of the table that needs to be queried
+ * @param keys keys whose values we are interested in
+ * @param columnFamily name of the "column" we are interested in
+ * @param sinceTimestamp this is lower bound of the timestamp
+ * @return a mapping of key --> Row
+ * @throws Exception
+ */
+ public static Map<String, Row> weakReadProtocol(String tablename, String[] keys, String columnFamily, long sinceTimestamp) throws Exception
+ {
+ Row row = null;
+ long startTime = System.currentTimeMillis();
+ Map<String, ReadMessage> readMessages = new HashMap<String, ReadMessage>();
+ for ( String key : keys )
+ {
+ ReadMessage readMessage = new ReadMessage(tablename, key, columnFamily, sinceTimestamp);
+ readMessages.put(key, readMessage);
+ }
+ /* Performs the multiget in parallel */
+ Map<String, Row> rows = doReadProtocol(readMessages);
+ /*
+ * Do the consistency checks for the keys that are being queried
+ * in the background.
+ */
+ for ( String key : keys )
+ {
+ List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(key);
+ /* Remove the local storage endpoint from the list. */
+ endpoints.remove( StorageService.getLocalStorageEndPoint() );
+ if ( endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
+ StorageService.instance().doConsistencyCheck(row, endpoints, columnFamily, sinceTimestamp);
+ }
+ return rows;
+ }
}