You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/06/24 17:45:19 UTC
svn commit: r957584 - in /cassandra/trunk: ./
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/service/
Author: jbellis
Date: Thu Jun 24 15:45:18 2010
New Revision: 957584
URL: http://svn.apache.org/viewvc?rev=957584&view=rev
Log:
revise HH schema to be per-endpoint, since that's how it's queried. patch by Brandon Williams; reviewed by jbellis for CASSANDRA-1142
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=957584&r1=957583&r2=957584&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Jun 24 15:45:18 2010
@@ -33,6 +33,7 @@ dev
* split commitlog header into separate file and add size checksum to
mutations (CASSANDRA-1179)
* avoid allocating a new byte[] for each mutation on replay (CASSANDRA-1219)
+ * revise HH schema to be per-endpoint (CASSANDRA-1142)
0.6.3
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=957584&r1=957583&r2=957584&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Thu Jun 24 15:45:18 2010
@@ -20,7 +20,6 @@ package org.apache.cassandra.db;
import java.net.UnknownHostException;
import java.util.Collection;
-import java.util.Arrays;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.ExecutorService;
import java.io.IOException;
@@ -49,15 +48,20 @@ import org.apache.cassandra.utils.Wrappe
/**
- * For each table (keyspace), there is a row in the system hints CF.
+ * For each endpoint for which we have hints, there is a row in the system hints CF.
* SuperColumns in that row are keys for which we have hinted data.
- * Subcolumns names within that supercolumn are host IPs. Subcolumn values are always empty.
- * Instead, we store the row data "normally" in the application table it belongs in.
- *
- * So when we deliver hints we look up endpoints that need data delivered
- * on a per-key basis, then read that entire row out and send it over.
+ * Subcolumns names within that supercolumn are keyspace+CF, concatenated with SEPARATOR.
+ * Subcolumn values are always empty; instead, we store the row data "normally"
+ * in the application table it belongs in.
+ *
+ * When FailureDetector signals that a node that was down is back up, we read its
+ * hints row to see what rows we need to forward data for, then reach each row in its
+ * entirety and send it over.
* (TODO handle rows that have incrementally grown too large for a single message.)
*
+ * deliverHints is also exposed to JMX so it can be run manually if FD ever misses
+ * its cue somehow.
+ *
* HHM never deletes the row from Application tables; there is no way to distinguish that
* from hinted tombstones! instead, rely on cleanup compactions to remove data
* that doesn't belong on this node. (Cleanup compactions may be started manually
@@ -69,14 +73,6 @@ import org.apache.cassandra.utils.Wrappe
* in a HHData (non-super) CF, modifying the above to store a UUID value in the
* HH subcolumn value, which we use as a key to a [standard] HHData system CF
* that would contain the message bytes.
- *
- * There are two ways hinted data gets delivered to the intended nodes.
- *
- * runHints() runs periodically and pushes the hinted data on this node to
- * every intended node.
- *
- * runDelieverHints() is called when some other node starts up (potentially
- * from a failure) and delivers the hinted data just to that node.
*/
public class HintedHandOffManager
@@ -86,10 +82,11 @@ public class HintedHandOffManager
private static final Logger logger_ = LoggerFactory.getLogger(HintedHandOffManager.class);
public static final String HINTS_CF = "HintsColumnFamily";
private static final int PAGE_SIZE = 10000;
+ private static final String SEPARATOR = "-";
private final ExecutorService executor_ = new JMXEnabledThreadPoolExecutor("HINTED-HANDOFF-POOL");
- private static boolean sendMessage(InetAddress endpoint, String tableName, byte[] key) throws IOException
+ private static boolean sendMessage(InetAddress endpoint, String tableName, String cfName, byte[] key) throws IOException
{
if (!Gossiper.instance.isKnownEndpoint(endpoint))
{
@@ -104,12 +101,10 @@ public class HintedHandOffManager
Table table = Table.open(tableName);
RowMutation rm = new RowMutation(tableName, key);
DecoratedKey dkey = StorageService.getPartitioner().decorateKey(key);
- for (ColumnFamilyStore cfstore : table.getColumnFamilyStores())
- {
- ColumnFamily cf = cfstore.getColumnFamily(QueryFilter.getIdentityFilter(dkey, new QueryPath(cfstore.getColumnFamilyName())));
- if (cf != null)
- rm.add(cf);
- }
+ ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
+ ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dkey, new QueryPath(cfs.getColumnFamilyName())));
+ if (cf != null)
+ rm.add(cf);
Message message = rm.makeRowMutationMessage();
WriteResponseHandler responseHandler = new WriteResponseHandler(endpoint);
MessagingService.instance.sendRR(message, new InetAddress[] { endpoint }, responseHandler);
@@ -125,18 +120,28 @@ public class HintedHandOffManager
return true;
}
- private static void deleteEndpoint(byte[] endpointAddress, String tableName, byte[] key, long timestamp) throws IOException
+ private static void deleteHintKey(byte[] endpointAddress, byte[] key, byte[] tableCF) throws IOException
{
- RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, tableName.getBytes(UTF8));
- rm.delete(new QueryPath(HINTS_CF, key, endpointAddress), new TimestampClock(timestamp));
+ RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, endpointAddress);
+ rm.delete(new QueryPath(HINTS_CF, key, tableCF), new TimestampClock(System.currentTimeMillis()));
rm.apply();
}
- private static void deleteHintKey(String tableName, byte[] key) throws IOException
+ public static void deleteHintsForEndPoint(InetAddress endpoint)
{
- RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, tableName.getBytes(UTF8));
- rm.delete(new QueryPath(HINTS_CF, key, null), new TimestampClock(System.currentTimeMillis()));
- rm.apply();
+ ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF);
+ RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, endpoint.getAddress());
+ rm.delete(new QueryPath(HINTS_CF), new TimestampClock(System.currentTimeMillis()));
+ try {
+ logger_.info("Deleting any stored hints for " + endpoint);
+ rm.apply();
+ hintStore.forceFlush();
+ CompactionManager.instance.submitMajor(hintStore, 0, Integer.MAX_VALUE).get();
+ }
+ catch (Exception e)
+ {
+ logger_.warn("Could not delete hints for " + endpoint + ": " + e);
+ }
}
private static boolean pagingFinished(ColumnFamily hintColumnFamily, byte[] startColumn)
@@ -146,47 +151,54 @@ public class HintedHandOffManager
|| (hintColumnFamily.getSortedColumns().size() == 1 && hintColumnFamily.getColumn(startColumn) != null);
}
+ public static byte[] makeCombinedName(String tableName, String columnFamily)
+ {
+ byte[] withsep = ArrayUtils.addAll(tableName.getBytes(FBUtilities.UTF8), SEPARATOR.getBytes());
+ return ArrayUtils.addAll(withsep, columnFamily.getBytes(FBUtilities.UTF8));
+ }
+
+ private static String[] getTableAndCFNames(byte[] joined)
+ {
+ int index;
+ index = ArrayUtils.lastIndexOf(joined, SEPARATOR.getBytes()[0]);
+ if (index < 1)
+ throw new RuntimeException("Corrupted hint name " + joined.toString());
+ String[] parts = new String[2];
+ parts[0] = new String(ArrayUtils.subarray(joined, 0, index));
+ parts[1] = new String(ArrayUtils.subarray(joined, index+1, joined.length));
+ return parts;
+
+ }
+
private static void deliverHintsToEndpoint(InetAddress endpoint) throws IOException, DigestMismatchException, InvalidRequestException, TimeoutException
{
if (logger_.isDebugEnabled())
logger_.debug("Started hinted handoff for endpoint " + endpoint);
- byte[] targetEPBytes = endpoint.getAddress();
- // 1. Scan through all the keys that we need to handoff
- // 2. For each key read the list of recipients if the endpoint matches send
- // 3. Delete that recipient from the key if write was successful
- // 4. Now force a flush
+ // 1. Get the key of the endpoint we need to handoff
+ // 2. For each column read the list of rows: subcolumns are KS + SEPARATOR + CF
+ // 3. Delete the subcolumn if the write was successful
+ // 4. Force a flush
// 5. Do major compaction to clean up all deletes etc.
+ DecoratedKey epkey = StorageService.getPartitioner().decorateKey(endpoint.getAddress());
ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF);
- for (String tableName : DatabaseDescriptor.getTables())
+ byte[] startColumn = ArrayUtils.EMPTY_BYTE_ARRAY;
+ while (true)
{
- DecoratedKey tableNameKey = StorageService.getPartitioner().decorateKey(tableName.getBytes(UTF8));
- byte[] startColumn = ArrayUtils.EMPTY_BYTE_ARRAY;
- while (true)
+ QueryFilter filter = QueryFilter.getSliceFilter(epkey, new QueryPath(HINTS_CF), startColumn, ArrayUtils.EMPTY_BYTE_ARRAY, null, false, PAGE_SIZE);
+ ColumnFamily hintColumnFamily = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter), Integer.MAX_VALUE);
+ if (pagingFinished(hintColumnFamily, startColumn))
+ break;
+ Collection<IColumn> keyColumns = hintColumnFamily.getSortedColumns();
+ for (IColumn keyColumn : keyColumns)
{
- QueryFilter filter = QueryFilter.getSliceFilter(tableNameKey, new QueryPath(HINTS_CF), startColumn, ArrayUtils.EMPTY_BYTE_ARRAY, null, false, PAGE_SIZE);
- ColumnFamily hintColumnFamily = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter), Integer.MAX_VALUE);
- if (pagingFinished(hintColumnFamily, startColumn))
- break;
- Collection<IColumn> keys = hintColumnFamily.getSortedColumns();
-
- for (IColumn keyColumn : keys)
+ startColumn = keyColumn.name();
+ Collection<IColumn> tableCFs = keyColumn.getSubColumns();
+ for (IColumn tableCF : tableCFs)
{
- byte[] keyBytes = keyColumn.name();
- Collection<IColumn> endpoints = keyColumn.getSubColumns();
- for (IColumn hintEndpoint : endpoints)
- {
- if (Arrays.equals(hintEndpoint.name(), targetEPBytes) && sendMessage(endpoint, tableName, keyBytes))
- {
- if (endpoints.size() == 1)
- deleteHintKey(tableName, keyColumn.name());
- else
- deleteEndpoint(hintEndpoint.name(), tableName, keyColumn.name(), System.currentTimeMillis());
- break;
- }
- }
-
- startColumn = keyColumn.name();
+ String[] parts = getTableAndCFNames(tableCF.name());
+ if (sendMessage(endpoint, parts[0], parts[1], keyColumn.name()))
+ deleteHintKey(endpoint.getAddress(), keyColumn.name(), tableCF.name());
}
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=957584&r1=957583&r2=957584&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Thu Jun 24 15:45:18 2010
@@ -97,10 +97,14 @@ public class RowMutation
return modifications_.values();
}
- void addHints(byte[] key, byte[] host) throws IOException
+ void addHints(RowMutation rm) throws IOException
{
- QueryPath path = new QueryPath(HintedHandOffManager.HINTS_CF, key, host);
- add(path, ArrayUtils.EMPTY_BYTE_ARRAY, new TimestampClock(System.currentTimeMillis()));
+ for (ColumnFamily cf : rm.getColumnFamilies())
+ {
+ byte[] combined = HintedHandOffManager.makeCombinedName(rm.getTable(), cf.metadata().cfName);
+ QueryPath path = new QueryPath(HintedHandOffManager.HINTS_CF, rm.key(), combined);
+ add(path, ArrayUtils.EMPTY_BYTE_ARRAY, new TimestampClock(System.currentTimeMillis()));
+ }
}
/*
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=957584&r1=957583&r2=957584&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Thu Jun 24 15:45:18 2010
@@ -57,11 +57,10 @@ public class RowMutationVerbHandler impl
while (bb.remaining() > 0)
{
bb.get(addressBytes);
- InetAddress hint = InetAddress.getByAddress(addressBytes);
if (logger_.isDebugEnabled())
- logger_.debug("Adding hint for " + hint);
- RowMutation hintedMutation = new RowMutation(Table.SYSTEM_TABLE, rm.getTable().getBytes(FBUtilities.UTF8));
- hintedMutation.addHints(rm.key(), addressBytes);
+ logger_.debug("Adding hint for " + InetAddress.getByAddress(addressBytes));
+ RowMutation hintedMutation = new RowMutation(Table.SYSTEM_TABLE, addressBytes);
+ hintedMutation.addHints(rm);
hintedMutation.apply();
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=957584&r1=957583&r2=957584&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Thu Jun 24 15:45:18 2010
@@ -668,6 +668,7 @@ public class StorageService implements I
if (!tokenMetadata_.getToken(endpoint).equals(token))
logger_.warn("Node " + endpoint + " 'left' token mismatch. Long network partition?");
tokenMetadata_.removeEndpoint(endpoint);
+ HintedHandOffManager.deleteHintsForEndPoint(endpoint);
}
}
else
@@ -698,13 +699,14 @@ public class StorageService implements I
/**
* endpoint was completely removed from ring (as a result of removetoken command). Remove it
- * from token metadata and gossip and restore replica count.
+ * from token metadata and gossip and restore replica count. Also delete any hints for it.
*/
private void removeEndpointLocally(InetAddress endpoint)
{
restoreReplicaCount(endpoint);
Gossiper.instance.removeEndpoint(endpoint);
tokenMetadata_.removeEndpoint(endpoint);
+ HintedHandOffManager.deleteHintsForEndPoint(endpoint);
}
/**