You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/09/30 21:39:58 UTC
svn commit: r820417 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra: db/ service/ utils/
Author: jbellis
Date: Wed Sep 30 19:39:57 2009
New Revision: 820417
URL: http://svn.apache.org/viewvc?rev=820417&view=rev
Log:
formatting + cleanup. patch by jbellis for CASSANDRA-462
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java?rev=820417&r1=820416&r2=820417&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java Wed Sep 30 19:39:57 2009
@@ -23,6 +23,7 @@
import java.security.MessageDigest;
import java.io.IOException;
+import org.apache.log4j.Logger;
import org.apache.commons.lang.ArrayUtils;
import org.apache.cassandra.db.marshal.AbstractType;
@@ -37,6 +38,8 @@
public final class Column implements IColumn
{
+ private static Logger logger_ = Logger.getLogger(Column.class);
+
private static ColumnSerializer serializer_ = new ColumnSerializer();
public static ColumnSerializer serializer()
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java?rev=820417&r1=820416&r2=820417&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java Wed Sep 30 19:39:57 2009
@@ -115,20 +115,12 @@
boolean isDigest = dis.readBoolean();
Row row = null;
- if ( !isDigest )
+ if (!isDigest)
{
row = Row.serializer().deserialize(dis);
}
-
- ReadResponse rmsg = null;
- if( isDigest )
- {
- rmsg = new ReadResponse(digest);
- }
- else
- {
- rmsg = new ReadResponse(row);
- }
+
+ ReadResponse rmsg = isDigest ? new ReadResponse(digest) : new ReadResponse(row);
rmsg.setIsDigestQuery(isDigest);
return rmsg;
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=820417&r1=820416&r2=820417&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java Wed Sep 30 19:39:57 2009
@@ -75,9 +75,8 @@
}
ReadCommand readCommand = ReadCommand.serializer().deserialize(readCtx.bufIn_);
Table table = Table.open(readCommand.table);
- Row row = null;
- row = readCommand.getRow(table);
- ReadResponse readResponse = null;
+ Row row = readCommand.getRow(table);
+ ReadResponse readResponse;
if (readCommand.isDigestQuery())
{
readResponse = new ReadResponse(row.digest());
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java?rev=820417&r1=820416&r2=820417&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java Wed Sep 30 19:39:57 2009
@@ -29,13 +29,11 @@
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
-import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.io.DataOutputBuffer;
-import org.apache.cassandra.utils.FBUtilities;
public class Row
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java?rev=820417&r1=820416&r2=820417&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java Wed Sep 30 19:39:57 2009
@@ -23,7 +23,6 @@
import java.util.Arrays;
import java.util.List;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.db.Row;
@@ -129,21 +128,17 @@
public void callMe(String key, String value)
{
- handleResponses();
- }
-
- private void handleResponses()
- {
- try
+ try
{
readResponseResolver_.resolve(new ArrayList<Message>(responses_));
- }
- catch ( DigestMismatchException ex )
- {
- throw new RuntimeException(ex);
- }
- }
- }
+ }
+ catch (Exception ex)
+ {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ }
private static long scheduledTimeMillis_ = 600;
private static ICachetable<String, String> readRepairTable_ = new Cachetable<String, String>(scheduledTimeMillis_);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java?rev=820417&r1=820416&r2=820417&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java Wed Sep 30 19:39:57 2009
@@ -19,6 +19,7 @@
package org.apache.cassandra.service;
import java.util.List;
+import java.io.IOException;
import org.apache.cassandra.net.Message;
@@ -32,7 +33,7 @@
* repairs . Hence you need to derive a response resolver based on your
* needs from this interface.
*/
- public T resolve(List<Message> responses) throws DigestMismatchException;
+ public T resolve(List<Message> responses) throws DigestMismatchException, IOException;
public boolean isDataPresent(List<Message> responses);
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java?rev=820417&r1=820416&r2=820417&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java Wed Sep 30 19:39:57 2009
@@ -24,6 +24,7 @@
import java.util.concurrent.locks.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.io.IOException;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.net.IAsyncCallback;
@@ -55,64 +56,67 @@
startTime_ = System.currentTimeMillis();
}
- public T get() throws TimeoutException, DigestMismatchException
+ public T get() throws TimeoutException, DigestMismatchException, IOException
{
- lock_.lock();
+ lock_.lock();
try
- {
- boolean bVal = true;
+ {
+ boolean bVal = true;
try
{
- if ( !done_.get() )
+ if (!done_.get())
{
long timeout = System.currentTimeMillis() - startTime_ + DatabaseDescriptor.getRpcTimeout();
- if(timeout > 0)
+ if (timeout > 0)
+ {
bVal = condition_.await(timeout, TimeUnit.MILLISECONDS);
+ }
else
+ {
bVal = false;
+ }
}
}
- catch ( InterruptedException ex )
+ catch (InterruptedException ex)
{
- if (logger_.isDebugEnabled())
- logger_.debug( LogUtil.throwableToString(ex) );
+ throw new AssertionError(ex);
}
-
- if ( !bVal && !done_.get() )
+
+ if (!bVal && !done_.get())
{
StringBuilder sb = new StringBuilder("");
- for ( Message message : responses_ )
+ for (Message message : responses_)
{
- sb.append(message.getFrom());
- }
- throw new TimeoutException("Operation timed out - received only " + responses_.size() + " responses from " + sb.toString() + " .");
+ sb.append(message.getFrom());
+ }
+ throw new TimeoutException("Operation timed out - received only " + responses_.size() + " responses from " + sb.toString() + " .");
}
}
finally
{
lock_.unlock();
- for(Message response : responses_)
+ for (Message response : responses_)
{
- MessagingService.removeRegisteredCallback( response.getMessageId() );
+ MessagingService.removeRegisteredCallback(response.getMessageId());
}
}
- return responseResolver_.resolve( responses_);
+ return responseResolver_.resolve(responses_);
}
public void response(Message message)
{
lock_.lock();
try
- {
- if ( !done_.get() )
+ {
+ if (!done_.get())
{
- responses_.add( message );
- if ( responses_.size() >= responseCount_ && responseResolver_.isDataPresent(responses_))
- {
- done_.set(true);
- condition_.signal();
- }
+ responses_.add(message);
+ if (responses_.size() >= responseCount_ && responseResolver_.isDataPresent(responses_))
+ {
+ done_.set(true);
+ condition_.signal();
+ }
}
}
finally
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=820417&r1=820416&r2=820417&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java Wed Sep 30 19:39:57 2009
@@ -54,8 +54,8 @@
* repair request should be scheduled.
*
*/
- public Row resolve(List<Message> responses) throws DigestMismatchException
- {
+ public Row resolve(List<Message> responses) throws DigestMismatchException, IOException
+ {
long startTime = System.currentTimeMillis();
Row retRow = null;
List<Row> rowList = new ArrayList<Row>();
@@ -76,38 +76,31 @@
{
byte[] body = response.getMessageBody();
bufIn.reset(body, body.length);
- try
+ long start = System.currentTimeMillis();
+ ReadResponse result = ReadResponse.serializer().deserialize(bufIn);
+ if (logger_.isDebugEnabled())
+ logger_.debug( "Response deserialization time : " + (System.currentTimeMillis() - start) + " ms.");
+ if (result.isDigestQuery())
{
- long start = System.currentTimeMillis();
- ReadResponse result = ReadResponse.serializer().deserialize(bufIn);
- if (logger_.isDebugEnabled())
- logger_.debug( "Response deserialization time : " + (System.currentTimeMillis() - start) + " ms.");
- if(!result.isDigestQuery())
- {
- rowList.add(result.row());
- endPoints.add(response.getFrom());
- key = result.row().key();
- table = result.row().getTable();
- }
- else
- {
- digest = result.digest();
- isDigestQuery = true;
- }
+ digest = result.digest();
+ isDigestQuery = true;
}
- catch( IOException ex )
+ else
{
- logger_.info(LogUtil.throwableToString(ex));
+ rowList.add(result.row());
+ endPoints.add(response.getFrom());
+ key = result.row().key();
+ table = result.row().getTable();
}
- }
+ }
// If there was a digest query compare it with all the data digests
// If there is a mismatch then throw an exception so that read repair can happen.
- if(isDigestQuery)
- {
- for(Row row: rowList)
- {
- if( !Arrays.equals(row.digest(), digest) )
- {
+ if (isDigestQuery)
+ {
+ for (Row row : rowList)
+ {
+ if (!Arrays.equals(row.digest(), digest))
+ {
/* Wrap the key as the context in this exception */
throw new DigestMismatchException(row.key());
}
@@ -115,36 +108,36 @@
}
/* If the rowList is empty then we had some exception above. */
- if ( rowList.size() == 0 )
+ if (rowList.size() == 0)
{
return retRow;
}
-
+
/* Now calculate the resolved row */
- retRow = new Row(table, key);
- for (int i = 0 ; i < rowList.size(); i++)
- {
- retRow.repair(rowList.get(i));
- }
+ retRow = new Row(table, key);
+ for (int i = 0; i < rowList.size(); i++)
+ {
+ retRow.repair(rowList.get(i));
+ }
// At this point we have the return row .
- // Now we need to calculate the difference
- // so that we can schedule read repairs
- for (int i = 0 ; i < rowList.size(); i++)
- {
- // since retRow is the resolved row it can be used as the super set
- Row diffRow = rowList.get(i).diff(retRow);
- if(diffRow == null) // no repair needs to happen
- continue;
- // create the row mutation message based on the diff and schedule a read repair
- RowMutation rowMutation = new RowMutation(table, key);
- for (ColumnFamily cf : diffRow.getColumnFamilies())
- {
- rowMutation.add(cf);
- }
+ // Now we need to calculate the difference
+ // so that we can schedule read repairs
+ for (int i = 0; i < rowList.size(); i++)
+ {
+ // since retRow is the resolved row it can be used as the super set
+ Row diffRow = rowList.get(i).diff(retRow);
+ if (diffRow == null) // no repair needs to happen
+ continue;
+ // create the row mutation message based on the diff and schedule a read repair
+ RowMutation rowMutation = new RowMutation(table, key);
+ for (ColumnFamily cf : diffRow.getColumnFamilies())
+ {
+ rowMutation.add(cf);
+ }
RowMutationMessage rowMutationMessage = new RowMutationMessage(rowMutation);
- ReadRepairManager.instance().schedule(endPoints.get(i),rowMutationMessage);
- }
+ ReadRepairManager.instance().schedule(endPoints.get(i), rowMutationMessage);
+ }
if (logger_.isDebugEnabled())
logger_.debug("resolve: " + (System.currentTimeMillis() - startTime) + " ms.");
return retRow;
@@ -152,26 +145,26 @@
public boolean isDataPresent(List<Message> responses)
{
- boolean isDataPresent = false;
- for (Message response : responses)
- {
+ boolean isDataPresent = false;
+ for (Message response : responses)
+ {
byte[] body = response.getMessageBody();
- DataInputBuffer bufIn = new DataInputBuffer();
+ DataInputBuffer bufIn = new DataInputBuffer();
bufIn.reset(body, body.length);
try
{
- ReadResponse result = ReadResponse.serializer().deserialize(bufIn);
- if(!result.isDigestQuery())
- {
- isDataPresent = true;
- }
+ ReadResponse result = ReadResponse.serializer().deserialize(bufIn);
+ if (!result.isDigestQuery())
+ {
+ isDataPresent = true;
+ }
bufIn.close();
}
- catch(IOException ex)
+ catch (IOException ex)
{
logger_.info(LogUtil.throwableToString(ex));
- }
- }
- return isDataPresent;
- }
+ }
+ }
+ return isDataPresent;
+ }
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=820417&r1=820416&r2=820417&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed Sep 30 19:39:57 2009
@@ -352,10 +352,7 @@
Message message = command.makeReadMessage();
Message messageDigestOnly = readMessageDigestOnly.makeReadMessage();
- IResponseResolver<Row> readResponseResolver = new ReadResponseResolver();
- QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(
- DatabaseDescriptor.getQuorum(),
- readResponseResolver);
+ QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(DatabaseDescriptor.getQuorum(), new ReadResponseResolver());
EndPoint dataPoint = StorageService.instance().findSuitableEndPoint(command.key);
List<EndPoint> endpointList = new ArrayList<EndPoint>(Arrays.asList(StorageService.instance().getReadStorageEndPoints(command.key)));
/* Remove the local storage endpoint from the list. */
@@ -401,7 +398,7 @@
}
catch (DigestMismatchException ex)
{
- if ( DatabaseDescriptor.getConsistencyCheck())
+ if (DatabaseDescriptor.getConsistencyCheck())
{
IResponseResolver<Row> readResponseResolverRepair = new ReadResponseResolver();
QuorumResponseHandler<Row> quorumResponseHandlerRepair = new QuorumResponseHandler<Row>(
@@ -409,8 +406,7 @@
readResponseResolverRepair);
logger.info("DigestMismatchException: " + command.key);
Message messageRepair = command.makeReadMessage();
- MessagingService.getMessagingInstance().sendRR(messageRepair, commandEndPoints.get(commandIndex),
- quorumResponseHandlerRepair);
+ MessagingService.getMessagingInstance().sendRR(messageRepair, commandEndPoints.get(commandIndex), quorumResponseHandlerRepair);
try
{
row = quorumResponseHandlerRepair.get();
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=820417&r1=820416&r2=820417&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java Wed Sep 30 19:39:57 2009
@@ -398,15 +398,12 @@
return bytes;
}
- public static String bytesToHex(byte[] buf)
+ public static String bytesToHex(byte[] bytes)
{
- char[] chars = new char[2*buf.length];
- for (int i = 0; i < buf.length; i++)
- {
- chars[i*2] = HEX_CHARS[(buf[i] & 0xF0) >>> 4];
- chars[i*2+1] = HEX_CHARS[buf[i] & 0x0F];
- }
- return new String(chars);
+ StringBuilder sb = new StringBuilder();
+ for (byte b : bytes)
+ sb.append(Integer.toHexString(b & 0xff));
+ return sb.toString();
}
public static String mapToString(Map<?,?> map)