You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2010/07/09 22:27:30 UTC
svn commit: r962683 - in /cassandra/trunk/src/java/org/apache/cassandra/db:
HintedHandOffManager.java RowMutation.java
Author: brandonwilliams
Date: Fri Jul 9 20:27:30 2010
New Revision: 962683
URL: http://svn.apache.org/viewvc?rev=962683&view=rev
Log:
Hinted handoff improvements; patch by brandonwilliams reviewed by jbellis for CASSANDRA-1223
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=962683&r1=962682&r2=962683&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Fri Jul 9 20:27:30 2010
@@ -25,6 +25,7 @@ import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
+import org.apache.cassandra.db.IClock;
import org.apache.commons.lang.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -121,12 +122,12 @@ public class HintedHandOffManager
return true;
}
- private static void deleteHintKey(byte[] endpointAddress, byte[] key, byte[] tableCF) throws IOException
+ private static void deleteHintKey(byte[] endpointAddress, byte[] key, byte[] tableCF, IClock clock) throws IOException
{
RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, endpointAddress);
- rm.delete(new QueryPath(HINTS_CF, key, tableCF), new TimestampClock(System.currentTimeMillis()));
+ rm.delete(new QueryPath(HINTS_CF, key, tableCF), clock);
rm.apply();
- }
+ }
public static void deleteHintsForEndPoint(InetAddress endpoint)
{
@@ -185,30 +186,36 @@ public class HintedHandOffManager
int rowsReplayed = 0;
ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF);
byte[] startColumn = ArrayUtils.EMPTY_BYTE_ARRAY;
- while (true)
- {
- QueryFilter filter = QueryFilter.getSliceFilter(epkey, new QueryPath(HINTS_CF), startColumn, ArrayUtils.EMPTY_BYTE_ARRAY, null, false, PAGE_SIZE);
- ColumnFamily hintColumnFamily = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter), Integer.MAX_VALUE);
- if (pagingFinished(hintColumnFamily, startColumn))
- break;
- Collection<IColumn> keyColumns = hintColumnFamily.getSortedColumns();
- for (IColumn keyColumn : keyColumns)
+ delivery:
+ while (true)
{
- startColumn = keyColumn.name();
- Collection<IColumn> tableCFs = keyColumn.getSubColumns();
- for (IColumn tableCF : tableCFs)
+ QueryFilter filter = QueryFilter.getSliceFilter(epkey, new QueryPath(HINTS_CF), startColumn, ArrayUtils.EMPTY_BYTE_ARRAY, null, false, PAGE_SIZE);
+ ColumnFamily hintColumnFamily = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter), Integer.MAX_VALUE);
+ if (pagingFinished(hintColumnFamily, startColumn))
+ break;
+ Collection<IColumn> keyColumns = hintColumnFamily.getSortedColumns();
+ for (IColumn keyColumn : keyColumns)
{
- String[] parts = getTableAndCFNames(tableCF.name());
- if (sendMessage(endpoint, parts[0], parts[1], keyColumn.name()))
+ startColumn = keyColumn.name();
+ Collection<IColumn> tableCFs = keyColumn.getSubColumns();
+ for (IColumn tableCF : tableCFs)
{
- deleteHintKey(endpoint.getAddress(), keyColumn.name(), tableCF.name());
- rowsReplayed++;
- }
+ String[] parts = getTableAndCFNames(tableCF.name());
+ if (sendMessage(endpoint, parts[0], parts[1], keyColumn.name()))
+ {
+ deleteHintKey(endpoint.getAddress(), keyColumn.name(), tableCF.name(), tableCF.clock());
+ rowsReplayed++;
+ }
+ else
+ {
+ logger_.info("Could not complete hinted handoff to " + endpoint);
+ break delivery;
+ }
- startColumn = keyColumn.name();
+ startColumn = keyColumn.name();
+ }
}
}
- }
if (rowsReplayed > 0)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=962683&r1=962682&r2=962683&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Fri Jul 9 20:27:30 2010
@@ -103,7 +103,7 @@ public class RowMutation
{
byte[] combined = HintedHandOffManager.makeCombinedName(rm.getTable(), cf.metadata().cfName);
QueryPath path = new QueryPath(HintedHandOffManager.HINTS_CF, rm.key(), combined);
- add(path, ArrayUtils.EMPTY_BYTE_ARRAY, new TimestampClock(System.currentTimeMillis()));
+ add(path, ArrayUtils.EMPTY_BYTE_ARRAY, new TimestampClock(System.currentTimeMillis()), DatabaseDescriptor.getGcGraceInSeconds());
}
}