You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/08 16:45:45 UTC
svn commit: r897233 -
/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
Author: jbellis
Date: Fri Jan 8 15:45:38 2010
New Revision: 897233
URL: http://svn.apache.org/viewvc?rev=897233&view=rev
Log:
add hint delivery paging. patch by jbellis; tested by Brandon Williams for CASSANDRA-680
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=897233&r1=897232&r2=897233&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Fri Jan 8 15:45:38 2010
@@ -32,10 +32,15 @@
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
import java.net.InetAddress;
+
+import org.apache.commons.lang.ArrayUtils;
+
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.*;
@@ -84,6 +89,7 @@
private final ExecutorService executor_ = new JMXEnabledThreadPoolExecutor("HINTED-HANDOFF-POOL");
final Timer timer = new Timer("HINTED-HANDOFF-TIMER");
public static final String HINTS_CF = "HintsColumnFamily";
+ private static final int PAGE_SIZE = 10000;
public static HintedHandOffManager instance()
@@ -168,29 +174,36 @@
// 7. I guess we are done
for (String tableName : DatabaseDescriptor.getTables())
{
- ColumnFamily hintColumnFamily = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(new IdentityQueryFilter(tableName, new QueryPath(HINTS_CF))), Integer.MAX_VALUE);
- if (hintColumnFamily == null)
+ byte[] startColumn = ArrayUtils.EMPTY_BYTE_ARRAY;
+ while (true)
{
- continue;
- }
- Collection<IColumn> keys = hintColumnFamily.getSortedColumns();
+ QueryFilter filter = new SliceQueryFilter(tableName, new QueryPath(HINTS_CF), startColumn, ArrayUtils.EMPTY_BYTE_ARRAY, false, PAGE_SIZE);
+ ColumnFamily hintColumnFamily = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter), Integer.MAX_VALUE);
+ if (hintColumnFamily == null)
+ {
+ break;
+ }
+ Collection<IColumn> keys = hintColumnFamily.getSortedColumns();
- for (IColumn keyColumn : keys)
- {
- Collection<IColumn> endpoints = keyColumn.getSubColumns();
- String keyStr = new String(keyColumn.name(), "UTF-8");
- int deleted = 0;
- for (IColumn endpoint : endpoints)
+ for (IColumn keyColumn : keys)
{
- if (sendMessage(InetAddress.getByAddress(endpoint.name()), tableName, keyStr))
+ Collection<IColumn> endpoints = keyColumn.getSubColumns();
+ String keyStr = new String(keyColumn.name(), "UTF-8");
+ int deleted = 0;
+ for (IColumn endpoint : endpoints)
{
- deleteEndPoint(endpoint.name(), tableName, keyColumn.name(), System.currentTimeMillis());
- deleted++;
+ if (sendMessage(InetAddress.getByAddress(endpoint.name()), tableName, keyStr))
+ {
+ deleteEndPoint(endpoint.name(), tableName, keyColumn.name(), System.currentTimeMillis());
+ deleted++;
+ }
}
- }
- if (deleted == endpoints.size())
- {
- deleteHintKey(tableName, keyColumn.name());
+ if (deleted == endpoints.size())
+ {
+ deleteHintKey(tableName, keyColumn.name());
+ }
+
+ startColumn = keyColumn.name(); // repeating the last as the first is fine since we just deleted it
}
}
}
@@ -218,33 +231,41 @@
// 2. For each key read the list of recipients if the endpoint matches send
// 3. Delete that recipient from the key if write was successful
Table systemTable = Table.open(Table.SYSTEM_TABLE);
+ ColumnFamilyStore hintStore = systemTable.getColumnFamilyStore(HINTS_CF);
for (String tableName : DatabaseDescriptor.getTables())
{
- ColumnFamily hintedColumnFamily = ColumnFamilyStore.removeDeleted(systemTable.get(tableName, HINTS_CF), Integer.MAX_VALUE);
- if (hintedColumnFamily == null)
+ byte[] startColumn = ArrayUtils.EMPTY_BYTE_ARRAY;
+ while (true)
{
- continue;
- }
- Collection<IColumn> keys = hintedColumnFamily.getSortedColumns();
+ QueryFilter filter = new SliceQueryFilter(tableName, new QueryPath(HINTS_CF), startColumn, ArrayUtils.EMPTY_BYTE_ARRAY, false, PAGE_SIZE);
+ ColumnFamily hintedColumnFamily = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter), Integer.MAX_VALUE);
+ if (hintedColumnFamily == null)
+ {
+ break;
+ }
+ Collection<IColumn> keys = hintedColumnFamily.getSortedColumns();
- for (IColumn keyColumn : keys)
- {
- String keyStr = new String(keyColumn.name(), "UTF-8");
- Collection<IColumn> endpoints = keyColumn.getSubColumns();
- for (IColumn hintEndPoint : endpoints)
+ for (IColumn keyColumn : keys)
{
- if (Arrays.equals(hintEndPoint.name(), targetEPBytes) && sendMessage(endPoint, tableName, keyStr))
+ String keyStr = new String(keyColumn.name(), "UTF-8");
+ Collection<IColumn> endpoints = keyColumn.getSubColumns();
+ for (IColumn hintEndPoint : endpoints)
{
- if (endpoints.size() == 1)
+ if (Arrays.equals(hintEndPoint.name(), targetEPBytes) && sendMessage(endPoint, tableName, keyStr))
{
- deleteHintKey(tableName, keyColumn.name());
+ if (endpoints.size() == 1)
+ {
+ deleteHintKey(tableName, keyColumn.name());
+ }
+ else
+ {
+ deleteEndPoint(hintEndPoint.name(), tableName, keyColumn.name(), System.currentTimeMillis());
+ }
+ break;
}
- else
- {
- deleteEndPoint(hintEndPoint.name(), tableName, keyColumn.name(), System.currentTimeMillis());
- }
- break;
}
+
+ startColumn = keyColumn.name(); // repeating the last as the first is fine since we just deleted it
}
}
}