You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/06/24 17:45:19 UTC

svn commit: r957584 - in /cassandra/trunk: ./ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/service/

Author: jbellis
Date: Thu Jun 24 15:45:18 2010
New Revision: 957584

URL: http://svn.apache.org/viewvc?rev=957584&view=rev
Log:
revise HH schema to be per-endpoint, since that's how it's queried.  patch by Brandon Williams; reviewed by jbellis for CASSANDRA-1142

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
    cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=957584&r1=957583&r2=957584&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Jun 24 15:45:18 2010
@@ -33,6 +33,7 @@ dev
  * split commitlog header into separate file and add size checksum to
    mutations (CASSANDRA-1179)
  * avoid allocating a new byte[] for each mutation on replay (CASSANDRA-1219)
+ * revise HH schema to be per-endpoint (CASSANDRA-1142)
 
 
 0.6.3

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=957584&r1=957583&r2=957584&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Thu Jun 24 15:45:18 2010
@@ -20,7 +20,6 @@ package org.apache.cassandra.db;
 
 import java.net.UnknownHostException;
 import java.util.Collection;
-import java.util.Arrays;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.ExecutorService;
 import java.io.IOException;
@@ -49,15 +48,20 @@ import org.apache.cassandra.utils.Wrappe
 
 
 /**
- * For each table (keyspace), there is a row in the system hints CF.
+ * For each endpoint for which we have hints, there is a row in the system hints CF.
  * SuperColumns in that row are keys for which we have hinted data.
- * Subcolumns names within that supercolumn are host IPs. Subcolumn values are always empty.
- * Instead, we store the row data "normally" in the application table it belongs in.
- *
- * So when we deliver hints we look up endpoints that need data delivered
- * on a per-key basis, then read that entire row out and send it over.
+ * Subcolumns names within that supercolumn are keyspace+CF, concatenated with SEPARATOR.
+ * Subcolumn values are always empty; instead, we store the row data "normally"
+ * in the application table it belongs in.
+ *
+ * When FailureDetector signals that a node that was down is back up, we read its
+ * hints row to see what rows we need to forward data for, then reach each row in its
+ * entirety and send it over.
  * (TODO handle rows that have incrementally grown too large for a single message.)
  *
+ * deliverHints is also exposed to JMX so it can be run manually if FD ever misses
+ * its cue somehow.
+ *
  * HHM never deletes the row from Application tables; there is no way to distinguish that
  * from hinted tombstones!  instead, rely on cleanup compactions to remove data
  * that doesn't belong on this node.  (Cleanup compactions may be started manually
@@ -69,14 +73,6 @@ import org.apache.cassandra.utils.Wrappe
  * in a HHData (non-super) CF, modifying the above to store a UUID value in the
  * HH subcolumn value, which we use as a key to a [standard] HHData system CF
  * that would contain the message bytes.
- *
- * There are two ways hinted data gets delivered to the intended nodes.
- *
- * runHints() runs periodically and pushes the hinted data on this node to
- * every intended node.
- *
- * runDelieverHints() is called when some other node starts up (potentially
- * from a failure) and delivers the hinted data just to that node.
  */
 
 public class HintedHandOffManager
@@ -86,10 +82,11 @@ public class HintedHandOffManager
     private static final Logger logger_ = LoggerFactory.getLogger(HintedHandOffManager.class);
     public static final String HINTS_CF = "HintsColumnFamily";
     private static final int PAGE_SIZE = 10000;
+    private static final String SEPARATOR = "-";
 
     private final ExecutorService executor_ = new JMXEnabledThreadPoolExecutor("HINTED-HANDOFF-POOL");
 
-    private static boolean sendMessage(InetAddress endpoint, String tableName, byte[] key) throws IOException
+    private static boolean sendMessage(InetAddress endpoint, String tableName, String cfName, byte[] key) throws IOException
     {
         if (!Gossiper.instance.isKnownEndpoint(endpoint))
         {
@@ -104,12 +101,10 @@ public class HintedHandOffManager
         Table table = Table.open(tableName);
         RowMutation rm = new RowMutation(tableName, key);
         DecoratedKey dkey = StorageService.getPartitioner().decorateKey(key);
-        for (ColumnFamilyStore cfstore : table.getColumnFamilyStores())
-        {
-            ColumnFamily cf = cfstore.getColumnFamily(QueryFilter.getIdentityFilter(dkey, new QueryPath(cfstore.getColumnFamilyName())));
-            if (cf != null)
-                rm.add(cf);
-        }
+        ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
+        ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(dkey, new QueryPath(cfs.getColumnFamilyName())));
+        if (cf != null)
+            rm.add(cf);
         Message message = rm.makeRowMutationMessage();
         WriteResponseHandler responseHandler = new WriteResponseHandler(endpoint);
         MessagingService.instance.sendRR(message, new InetAddress[] { endpoint }, responseHandler);
@@ -125,18 +120,28 @@ public class HintedHandOffManager
         return true;
     }
 
-    private static void deleteEndpoint(byte[] endpointAddress, String tableName, byte[] key, long timestamp) throws IOException
+    private static void deleteHintKey(byte[] endpointAddress, byte[] key, byte[] tableCF) throws IOException
     {
-        RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, tableName.getBytes(UTF8));
-        rm.delete(new QueryPath(HINTS_CF, key, endpointAddress), new TimestampClock(timestamp));
+        RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, endpointAddress);
+        rm.delete(new QueryPath(HINTS_CF, key, tableCF), new TimestampClock(System.currentTimeMillis()));
         rm.apply();
     }
 
-    private static void deleteHintKey(String tableName, byte[] key) throws IOException
+    public static void deleteHintsForEndPoint(InetAddress endpoint)
     {
-        RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, tableName.getBytes(UTF8));
-        rm.delete(new QueryPath(HINTS_CF, key, null), new TimestampClock(System.currentTimeMillis()));
-        rm.apply();
+        ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF);
+        RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, endpoint.getAddress());
+        rm.delete(new QueryPath(HINTS_CF), new TimestampClock(System.currentTimeMillis()));
+        try {
+            logger_.info("Deleting any stored hints for " + endpoint);
+            rm.apply();
+            hintStore.forceFlush();
+            CompactionManager.instance.submitMajor(hintStore, 0, Integer.MAX_VALUE).get();
+        }
+        catch (Exception e)
+        {
+            logger_.warn("Could not delete hints for " + endpoint + ": " + e);
+        }
     }
 
     private static boolean pagingFinished(ColumnFamily hintColumnFamily, byte[] startColumn)
@@ -146,47 +151,54 @@ public class HintedHandOffManager
                || (hintColumnFamily.getSortedColumns().size() == 1 && hintColumnFamily.getColumn(startColumn) != null);
     }
 
+    public static byte[] makeCombinedName(String tableName, String columnFamily)
+    {
+        byte[] withsep = ArrayUtils.addAll(tableName.getBytes(FBUtilities.UTF8), SEPARATOR.getBytes());
+        return ArrayUtils.addAll(withsep, columnFamily.getBytes(FBUtilities.UTF8));
+    }
+
+    private static String[] getTableAndCFNames(byte[] joined)
+    {
+        int index;
+        index = ArrayUtils.lastIndexOf(joined, SEPARATOR.getBytes()[0]);
+        if (index < 1)
+            throw new RuntimeException("Corrupted hint name " + joined.toString());
+        String[] parts = new String[2];
+        parts[0] = new String(ArrayUtils.subarray(joined, 0, index));
+        parts[1] = new String(ArrayUtils.subarray(joined, index+1, joined.length));
+        return parts;
+
+    }
+            
     private static void deliverHintsToEndpoint(InetAddress endpoint) throws IOException, DigestMismatchException, InvalidRequestException, TimeoutException
     {
         if (logger_.isDebugEnabled())
           logger_.debug("Started hinted handoff for endpoint " + endpoint);
 
-        byte[] targetEPBytes = endpoint.getAddress();
-        // 1. Scan through all the keys that we need to handoff
-        // 2. For each key read the list of recipients if the endpoint matches send
-        // 3. Delete that recipient from the key if write was successful
-        // 4. Now force a flush
+        // 1. Get the key of the endpoint we need to handoff
+        // 2. For each column read the list of rows: subcolumns are KS + SEPARATOR + CF
+        // 3. Delete the subcolumn if the write was successful
+        // 4. Force a flush
         // 5. Do major compaction to clean up all deletes etc.
+        DecoratedKey epkey =  StorageService.getPartitioner().decorateKey(endpoint.getAddress());
         ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF);
-        for (String tableName : DatabaseDescriptor.getTables())
+        byte[] startColumn = ArrayUtils.EMPTY_BYTE_ARRAY;
+        while (true)
         {
-            DecoratedKey tableNameKey = StorageService.getPartitioner().decorateKey(tableName.getBytes(UTF8));
-            byte[] startColumn = ArrayUtils.EMPTY_BYTE_ARRAY;
-            while (true)
+            QueryFilter filter = QueryFilter.getSliceFilter(epkey, new QueryPath(HINTS_CF), startColumn, ArrayUtils.EMPTY_BYTE_ARRAY, null, false, PAGE_SIZE);
+            ColumnFamily hintColumnFamily = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter), Integer.MAX_VALUE);
+            if (pagingFinished(hintColumnFamily, startColumn))
+                break;
+            Collection<IColumn> keyColumns = hintColumnFamily.getSortedColumns();
+            for (IColumn keyColumn : keyColumns)
             {
-                QueryFilter filter = QueryFilter.getSliceFilter(tableNameKey, new QueryPath(HINTS_CF), startColumn, ArrayUtils.EMPTY_BYTE_ARRAY, null, false, PAGE_SIZE);
-                ColumnFamily hintColumnFamily = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter), Integer.MAX_VALUE);
-                if (pagingFinished(hintColumnFamily, startColumn))
-                    break;
-                Collection<IColumn> keys = hintColumnFamily.getSortedColumns();
-
-                for (IColumn keyColumn : keys)
+                startColumn = keyColumn.name();
+                Collection<IColumn> tableCFs = keyColumn.getSubColumns();
+                for (IColumn tableCF : tableCFs)
                 {
-                    byte[] keyBytes = keyColumn.name();
-                    Collection<IColumn> endpoints = keyColumn.getSubColumns();
-                    for (IColumn hintEndpoint : endpoints)
-                    {
-                        if (Arrays.equals(hintEndpoint.name(), targetEPBytes) && sendMessage(endpoint, tableName, keyBytes))
-                        {
-                            if (endpoints.size() == 1)
-                                deleteHintKey(tableName, keyColumn.name());
-                            else
-                                deleteEndpoint(hintEndpoint.name(), tableName, keyColumn.name(), System.currentTimeMillis());
-                            break;
-                        }
-                    }
-
-                    startColumn = keyColumn.name();
+                    String[] parts = getTableAndCFNames(tableCF.name());
+                    if (sendMessage(endpoint, parts[0], parts[1], keyColumn.name()))
+                        deleteHintKey(endpoint.getAddress(), keyColumn.name(), tableCF.name());
                 }
             }
         }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=957584&r1=957583&r2=957584&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Thu Jun 24 15:45:18 2010
@@ -97,10 +97,14 @@ public class RowMutation
         return modifications_.values();
     }
 
-    void addHints(byte[] key, byte[] host) throws IOException
+    void addHints(RowMutation rm) throws IOException
     {
-        QueryPath path = new QueryPath(HintedHandOffManager.HINTS_CF, key, host);
-        add(path, ArrayUtils.EMPTY_BYTE_ARRAY, new TimestampClock(System.currentTimeMillis()));
+        for (ColumnFamily cf : rm.getColumnFamilies())
+        {
+            byte[] combined = HintedHandOffManager.makeCombinedName(rm.getTable(), cf.metadata().cfName);
+            QueryPath path = new QueryPath(HintedHandOffManager.HINTS_CF, rm.key(), combined);
+            add(path, ArrayUtils.EMPTY_BYTE_ARRAY, new TimestampClock(System.currentTimeMillis()));
+        }
     }
 
     /*

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=957584&r1=957583&r2=957584&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Thu Jun 24 15:45:18 2010
@@ -57,11 +57,10 @@ public class RowMutationVerbHandler impl
                 while (bb.remaining() > 0)
                 {
                     bb.get(addressBytes);
-                    InetAddress hint = InetAddress.getByAddress(addressBytes);
                     if (logger_.isDebugEnabled())
-                        logger_.debug("Adding hint for " + hint);
-                    RowMutation hintedMutation = new RowMutation(Table.SYSTEM_TABLE, rm.getTable().getBytes(FBUtilities.UTF8));
-                    hintedMutation.addHints(rm.key(), addressBytes);
+                        logger_.debug("Adding hint for " + InetAddress.getByAddress(addressBytes));
+                    RowMutation hintedMutation = new RowMutation(Table.SYSTEM_TABLE, addressBytes);
+                    hintedMutation.addHints(rm);
                     hintedMutation.apply();
                 }
             }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=957584&r1=957583&r2=957584&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Thu Jun 24 15:45:18 2010
@@ -668,6 +668,7 @@ public class StorageService implements I
                 if (!tokenMetadata_.getToken(endpoint).equals(token))
                     logger_.warn("Node " + endpoint + " 'left' token mismatch. Long network partition?");
                 tokenMetadata_.removeEndpoint(endpoint);
+                HintedHandOffManager.deleteHintsForEndPoint(endpoint);
             }
         }
         else
@@ -698,13 +699,14 @@ public class StorageService implements I
 
     /**
      * endpoint was completely removed from ring (as a result of removetoken command). Remove it
-     * from token metadata and gossip and restore replica count.
+     * from token metadata and gossip and restore replica count.  Also delete any hints for it.
      */
     private void removeEndpointLocally(InetAddress endpoint)
     {
         restoreReplicaCount(endpoint);
         Gossiper.instance.removeEndpoint(endpoint);
         tokenMetadata_.removeEndpoint(endpoint);
+        HintedHandOffManager.deleteHintsForEndPoint(endpoint);
     }
 
     /**