You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/07/22 00:42:27 UTC

svn commit: r1149396 - in /cassandra/trunk: CHANGES.txt src/java/org/apache/cassandra/db/HintedHandOffManager.java src/java/org/apache/cassandra/db/RowMutation.java src/java/org/apache/cassandra/db/RowMutationVerbHandler.java

Author: jbellis
Date: Thu Jul 21 22:42:26 2011
New Revision: 1149396

URL: http://svn.apache.org/viewvc?rev=1149396&view=rev
Log:
store hints as serialized mutations instead of pointers to data rows
patch by Nick Telford, jbellis, and Patricio Echague for CASSANDRA-2045

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
    cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1149396&r1=1149395&r2=1149396&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Jul 21 22:42:26 2011
@@ -15,6 +15,7 @@
  * single-pass streaming (CASSANDRA-2677)
  * use reference counting for deleting sstables instead of relying on the GC
    (CASSANDRA-2521)
+ * store hints as serialized mutations instead of pointers to data rows
 
 
 0.8.2

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1149396&r1=1149395&r2=1149396&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Thu Jul 21 22:42:26 2011
@@ -18,26 +18,22 @@
 
 package org.apache.cassandra.db;
 
+import java.io.DataInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
 import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeoutException;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
 
-import static com.google.common.base.Charsets.UTF_8;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.commons.lang.ArrayUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.compaction.CompactionManager;
@@ -45,13 +41,14 @@ import org.apache.cassandra.db.filter.Qu
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.*;
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.WrappedRunnable;
 import org.cliffc.high_scale_lib.NonBlockingHashSet;
 
@@ -60,30 +57,24 @@ import org.cliffc.high_scale_lib.NonBloc
 /**
  * For each endpoint for which we have hints, there is a row in the system hints CF.
  * The key for this row is ByteBuffer.wrap(string), i.e. "127.0.0.1".
+ * (We have to use String keys for compatibility with OPP.)
+ * SuperColumns in these rows are the mutations to replay, with uuid names:
  *
- * SuperColumns in that row are keys for which we have hinted data.
- * Subcolumns names within that supercolumn are keyspace+CF, concatenated with SEPARATOR.
- * Subcolumn values are always empty; instead, we store the row data "normally"
- * in the application table it belongs in.
+ *  <dest ip>: {              // key
+ *    <uuid>: {               // supercolumn
+ *      mutation: <mutation>  // subcolumn
+ *      version: <mutation serialization version>
+ *      table: <table of hinted mutation>
+ *      key: <key of hinted mutation>
+ *    }
+ *  }
  *
- * When FailureDetector signals that a node that was down is back up, we read its
- * hints row to see what rows we need to forward data for, then reach each row in its
- * entirety and send it over.
+ * When FailureDetector signals that a node that was down is back up, we page through
+ * the hinted mutations and send them over one at a time, waiting for
+ * hinted_handoff_throttle_delay in between each.
  *
  * deliverHints is also exposed to JMX so it can be run manually if FD ever misses
  * its cue somehow.
- *
- * HHM never deletes the row from Application tables; usually (but not for CL.ANY!)
- * the row belongs on this node, as well.  instead, we rely on cleanup compactions
- * to remove data that doesn't belong.  (Cleanup compactions may be started manually
- * -- on a per node basis -- with "nodeprobe cleanup.")
- *
- * TODO this avoids our hint rows from growing excessively large by offloading the
- * message data into application tables.  But, this means that cleanup compactions
- * will nuke HH data.  Probably better would be to store the RowMutation messages
- * in a HHData (non-super) CF, modifying the above to store a UUID value in the
- * HH subcolumn value, which we use as a key to a [standard] HHData system CF
- * that would contain the message bytes.
  */
 
 public class HintedHandOffManager implements HintedHandOffManagerMBean
@@ -117,80 +108,36 @@ public class HintedHandOffManager implem
         logger_.debug("Created HHOM instance, registered MBean.");
     }
 
-    private static boolean sendRow(InetAddress endpoint, String tableName, String cfName, ByteBuffer key) throws IOException
+    private static boolean sendMutation(InetAddress endpoint, RowMutation mutation) throws IOException
     {
-        if (!Gossiper.instance.isKnownEndpoint(endpoint))
+        IWriteResponseHandler responseHandler = WriteResponseHandler.create(endpoint);
+        MessagingService.instance().sendRR(mutation, endpoint, responseHandler);
+
+        try
         {
-            logger_.warn("Hints found for endpoint " + endpoint + " which is not part of the gossip network.  discarding.");
-            return true;
+            responseHandler.get();
         }
-        if (!FailureDetector.instance.isAlive(endpoint))
+        catch (TimeoutException e)
         {
             return false;
         }
 
-        if (CFMetaData.getId(tableName, cfName) == null)
+        try
         {
-            logger_.debug("Discarding hints for dropped keyspace or columnfamily {}/{}", tableName, cfName);
-            return true;
+            Thread.sleep(DatabaseDescriptor.getHintedHandoffThrottleDelay());
         }
-        Table table = Table.open(tableName);
-        ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
-
-        int pageSize = PAGE_SIZE;
-        // send less columns per page if they are very large
-        if (cfs.getMeanColumns() > 0)
+        catch (InterruptedException e)
         {
-            int averageColumnSize = (int) (cfs.getMeanRowSize() / cfs.getMeanColumns());
-            pageSize = Math.min(PAGE_SIZE, DatabaseDescriptor.getInMemoryCompactionLimit() / averageColumnSize);
-            pageSize = Math.max(2, pageSize); // page size of 1 does not allow actual paging b/c of >= behavior on startColumn
-            logger_.debug("average hinted-row column size is {}; using pageSize of {}", averageColumnSize, pageSize);
+            throw new AssertionError(e);
         }
 
-        DecoratedKey<?> dkey = StorageService.getPartitioner().decorateKey(key);
-        ByteBuffer startColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER;
-        while (true)
-        {
-            QueryFilter filter = QueryFilter.getSliceFilter(dkey, new QueryPath(cfs.getColumnFamilyName()), startColumn, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, pageSize);
-            ColumnFamily cf = cfs.getColumnFamily(filter);
-            if (pagingFinished(cf, startColumn))
-                break;
-            if (cf.getColumnNames().isEmpty())
-            {
-                logger_.debug("Nothing to hand off for {}", dkey);
-                break;
-            }
-
-            startColumn = cf.getColumnNames().last();
-            RowMutation rm = new RowMutation(tableName, key);
-            rm.add(cf);
-            IWriteResponseHandler responseHandler =  WriteResponseHandler.create(endpoint);
-            MessagingService.instance().sendRR(rm, endpoint, responseHandler);
-            try
-            {
-                responseHandler.get();
-            }
-            catch (TimeoutException e)
-            {
-                return false;
-            }
-
-            try
-            {
-                Thread.sleep(DatabaseDescriptor.getHintedHandoffThrottleDelay());
-            }
-            catch (InterruptedException e)
-            {
-                throw new AssertionError(e);
-            }
-        }
         return true;
     }
 
-    private static void deleteHintKey(ByteBuffer endpointAddress, ByteBuffer key, ByteBuffer tableCF, long timestamp) throws IOException
+    private static void deleteHint(ByteBuffer endpointAddress, ByteBuffer hintId, long timestamp) throws IOException
     {
         RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, endpointAddress);
-        rm.delete(new QueryPath(HINTS_CF, key, tableCF), timestamp);
+        rm.delete(new QueryPath(HINTS_CF, hintId), timestamp);
         rm.apply();
     }
 
@@ -244,30 +191,6 @@ public class HintedHandOffManager implem
                || (hintColumnFamily.getSortedColumns().size() == 1 && hintColumnFamily.getColumn(startColumn) != null);
     }
 
-    public static ByteBuffer makeCombinedName(String tableName, String columnFamily)
-    {
-        byte[] withsep = ArrayUtils.addAll(tableName.getBytes(UTF_8), SEPARATOR.getBytes(UTF_8));
-        return ByteBuffer.wrap(ArrayUtils.addAll(withsep, columnFamily.getBytes(UTF_8)));
-    }
-
-    private static String[] getTableAndCFNames(ByteBuffer joined)
-    {
-        int index = ByteBufferUtil.lastIndexOf(joined, SEPARATOR.getBytes(UTF_8)[0], joined.limit());
-
-        if (index == -1 || index < (joined.position() + 1))
-            throw new RuntimeException("Corrupted hint name " + ByteBufferUtil.bytesToHex(joined));
-
-        try
-        {
-            return new String[] { ByteBufferUtil.string(joined, joined.position(), index - joined.position()),
-                                  ByteBufferUtil.string(joined, index + 1, joined.limit() - (index + 1)) };
-        }
-        catch (CharacterCodingException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
     private int waitForSchemaAgreement(InetAddress endpoint) throws InterruptedException
     {
         Gossiper gossiper = Gossiper.instance;
@@ -306,7 +229,8 @@ public class HintedHandOffManager implem
                 logger_.debug("Sleeping {}ms to stagger hint delivery", sleep);
                 Thread.sleep(sleep);
             }
-            if (!Gossiper.instance.getEndpointStateForEndpoint(endpoint).isAlive())
+
+            if (!FailureDetector.instance.isAlive(endpoint))
             {
                 logger_.info("Endpoint {} died before hint delivery, aborting", endpoint);
                 return;
@@ -320,7 +244,7 @@ public class HintedHandOffManager implem
         logger_.info("Started hinted handoff for endpoint " + endpoint);
 
         // 1. Get the key of the endpoint we need to handoff
-        // 2. For each column read the list of rows: subcolumns are KS + SEPARATOR + CF
+        // 2. For each column, deserialize the mutation and send it to the endpoint
         // 3. Delete the subcolumn if the write was successful
         // 4. Force a flush
         // 5. Do major compaction to clean up all deletes etc.
@@ -337,25 +261,31 @@ public class HintedHandOffManager implem
             ColumnFamily hintColumnFamily = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter), Integer.MAX_VALUE);
             if (pagingFinished(hintColumnFamily, startColumn))
                 break;
-            for (IColumn keyColumn : hintColumnFamily.getSortedColumns())
+
+            for (IColumn hint : hintColumnFamily.getSortedColumns())
             {
-                startColumn = keyColumn.name();
-                Collection<IColumn> tableCFs = keyColumn.getSubColumns();
-                for (IColumn tableCF : tableCFs)
-                {
-                    String[] parts = getTableAndCFNames(tableCF.name());
-                    if (sendRow(endpoint, parts[0], parts[1], keyColumn.name()))
-                    {
-                        deleteHintKey(endpointAsUTF8, keyColumn.name(), tableCF.name(), tableCF.timestamp());
-                        rowsReplayed++;
-                    }
-                    else
-                    {
-                        logger_.info("Could not complete hinted handoff to " + endpoint);
-                        break delivery;
-                    }
+                startColumn = hint.name();
+
+                IColumn versionColumn = hint.getSubColumn(ByteBufferUtil.bytes("version"));
+                IColumn tableColumn = hint.getSubColumn(ByteBufferUtil.bytes("table"));
+                IColumn keyColumn = hint.getSubColumn(ByteBufferUtil.bytes("key"));
+                IColumn mutationColumn = hint.getSubColumn(ByteBufferUtil.bytes("mutation"));
+                assert versionColumn != null;
+                assert tableColumn != null;
+                assert keyColumn != null;
+                assert mutationColumn != null;
+                DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(mutationColumn.value()));
+                RowMutation rm = RowMutation.serializer().deserialize(in, ByteBufferUtil.toInt(versionColumn.value()));
 
-                    startColumn = keyColumn.name();
+                if (sendMutation(endpoint, rm))
+                {
+                    deleteHint(endpointAsUTF8, hint.name(), versionColumn.timestamp());
+                    rowsReplayed++;
+                }
+                else
+                {
+                    logger_.info("Could not complete hinted handoff to " + endpoint);
+                    break delivery;
                 }
             }
         }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=1149396&r1=1149395&r2=1149396&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Thu Jul 21 22:42:26 2011
@@ -21,9 +21,7 @@ package org.apache.cassandra.db;
 import java.io.*;
 import java.nio.ByteBuffer;
 import java.util.*;
-import java.util.concurrent.ExecutionException;
 
-import org.apache.cassandra.net.MessageProducer;
 import org.apache.commons.lang.StringUtils;
 
 import org.apache.cassandra.config.CFMetaData;
@@ -32,13 +30,14 @@ import org.apache.cassandra.config.KSMet
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageProducer;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.ColumnOrSuperColumn;
 import org.apache.cassandra.thrift.Deletion;
-import org.apache.cassandra.thrift.Mutation;
-import org.apache.cassandra.thrift.SlicePredicate;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
 
 public class RowMutation implements IMutation, MessageProducer
 {
@@ -98,14 +97,35 @@ public class RowMutation implements IMut
         return modifications_.values();
     }
 
-    void addHints(RowMutation rm) throws IOException
+    public static RowMutation hintFor(RowMutation mutation, ByteBuffer address) throws IOException
     {
-        for (ColumnFamily cf : rm.getColumnFamilies())
-        {
-            ByteBuffer combined = HintedHandOffManager.makeCombinedName(rm.getTable(), cf.metadata().cfName);
-            QueryPath path = new QueryPath(HintedHandOffManager.HINTS_CF, rm.key(), combined);
-            add(path, ByteBufferUtil.EMPTY_BYTE_BUFFER, System.currentTimeMillis(), cf.metadata().getGcGraceSeconds());
-        }
+        RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, address);
+        ByteBuffer hintId = ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes());
+
+        // determine the TTL for the RowMutation
+        // this is set at the smallest GCGraceSeconds for any of the CFs in the RM
+        // this ensures that deletes aren't "undone" by delivery of an old hint
+        int ttl = Integer.MAX_VALUE;
+        for (ColumnFamily cf : mutation.getColumnFamilies())
+            ttl = Math.min(ttl, cf.metadata().getGcGraceSeconds());
+
+        // serialized RowMutation
+        QueryPath path = new QueryPath(HintedHandOffManager.HINTS_CF, hintId, ByteBufferUtil.bytes("mutation"));
+        rm.add(path, ByteBuffer.wrap(mutation.getSerializedBuffer(MessagingService.version_)), System.currentTimeMillis(), ttl);
+
+        // serialization version
+        path = new QueryPath(HintedHandOffManager.HINTS_CF, hintId, ByteBufferUtil.bytes("version"));
+        rm.add(path, ByteBufferUtil.bytes(MessagingService.version_), System.currentTimeMillis(), ttl);
+
+        // table
+        path = new QueryPath(HintedHandOffManager.HINTS_CF, hintId, ByteBufferUtil.bytes("table"));
+        rm.add(path, ByteBufferUtil.bytes(mutation.getTable()), System.currentTimeMillis(), ttl);
+
+        // key
+        path = new QueryPath(HintedHandOffManager.HINTS_CF, hintId, ByteBufferUtil.bytes("key"));
+        rm.add(path, mutation.key(), System.currentTimeMillis(), ttl);
+
+        return rm;
     }
 
     /*

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=1149396&r1=1149395&r2=1149396&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Thu Jul 21 22:42:26 2011
@@ -58,8 +58,7 @@ public class RowMutationVerbHandler impl
                     ByteBuffer addressBytes = ByteBufferUtil.readWithShortLength(dis);
                     if (logger_.isDebugEnabled())
                         logger_.debug("Adding hint for " + InetAddress.getByName(ByteBufferUtil.string(addressBytes)));
-                    RowMutation hintedMutation = new RowMutation(Table.SYSTEM_TABLE, addressBytes);
-                    hintedMutation.addHints(rm);
+                    RowMutation hintedMutation = RowMutation.hintFor(rm, addressBytes);
                     hintedMutation.apply();
                 }
             }