You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/07/22 00:42:27 UTC
svn commit: r1149396 - in /cassandra/trunk: CHANGES.txt
src/java/org/apache/cassandra/db/HintedHandOffManager.java
src/java/org/apache/cassandra/db/RowMutation.java
src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
Author: jbellis
Date: Thu Jul 21 22:42:26 2011
New Revision: 1149396
URL: http://svn.apache.org/viewvc?rev=1149396&view=rev
Log:
store hints as serialized mutations instead of pointers to data rows
patch by Nick Telford, jbellis, and Patricio Echague for CASSANDRA-2045
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1149396&r1=1149395&r2=1149396&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Jul 21 22:42:26 2011
@@ -15,6 +15,7 @@
* single-pass streaming (CASSANDRA-2677)
* use reference counting for deleting sstables instead of relying on the GC
(CASSANDRA-2521)
+ * store hints as serialized mutations instead of pointers to data rows
0.8.2
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1149396&r1=1149395&r2=1149396&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Thu Jul 21 22:42:26 2011
@@ -18,26 +18,22 @@
package org.apache.cassandra.db;
+import java.io.DataInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
-import static com.google.common.base.Charsets.UTF_8;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.commons.lang.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.compaction.CompactionManager;
@@ -45,13 +41,14 @@ import org.apache.cassandra.db.filter.Qu
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.*;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.WrappedRunnable;
import org.cliffc.high_scale_lib.NonBlockingHashSet;
@@ -60,30 +57,24 @@ import org.cliffc.high_scale_lib.NonBloc
/**
* For each endpoint for which we have hints, there is a row in the system hints CF.
* The key for this row is ByteBuffer.wrap(string), i.e. "127.0.0.1".
+ * (We have to use String keys for compatibility with OPP.)
+ * SuperColumns in these rows are the mutations to replay, with uuid names:
*
- * SuperColumns in that row are keys for which we have hinted data.
- * Subcolumns names within that supercolumn are keyspace+CF, concatenated with SEPARATOR.
- * Subcolumn values are always empty; instead, we store the row data "normally"
- * in the application table it belongs in.
+ * <dest ip>: { // key
+ * <uuid>: { // supercolumn
+ * mutation: <mutation> // subcolumn
+ * version: <mutation serialization version>
+ * table: <table of hinted mutation>
+ * key: <key of hinted mutation>
+ * }
+ * }
*
- * When FailureDetector signals that a node that was down is back up, we read its
- * hints row to see what rows we need to forward data for, then reach each row in its
- * entirety and send it over.
+ * When FailureDetector signals that a node that was down is back up, we page through
+ * the hinted mutations and send them over one at a time, waiting for
+ * hinted_handoff_throttle_delay in between each.
*
* deliverHints is also exposed to JMX so it can be run manually if FD ever misses
* its cue somehow.
- *
- * HHM never deletes the row from Application tables; usually (but not for CL.ANY!)
- * the row belongs on this node, as well. instead, we rely on cleanup compactions
- * to remove data that doesn't belong. (Cleanup compactions may be started manually
- * -- on a per node basis -- with "nodeprobe cleanup.")
- *
- * TODO this avoids our hint rows from growing excessively large by offloading the
- * message data into application tables. But, this means that cleanup compactions
- * will nuke HH data. Probably better would be to store the RowMutation messages
- * in a HHData (non-super) CF, modifying the above to store a UUID value in the
- * HH subcolumn value, which we use as a key to a [standard] HHData system CF
- * that would contain the message bytes.
*/
public class HintedHandOffManager implements HintedHandOffManagerMBean
@@ -117,80 +108,36 @@ public class HintedHandOffManager implem
logger_.debug("Created HHOM instance, registered MBean.");
}
- private static boolean sendRow(InetAddress endpoint, String tableName, String cfName, ByteBuffer key) throws IOException
+ private static boolean sendMutation(InetAddress endpoint, RowMutation mutation) throws IOException
{
- if (!Gossiper.instance.isKnownEndpoint(endpoint))
+ IWriteResponseHandler responseHandler = WriteResponseHandler.create(endpoint);
+ MessagingService.instance().sendRR(mutation, endpoint, responseHandler);
+
+ try
{
- logger_.warn("Hints found for endpoint " + endpoint + " which is not part of the gossip network. discarding.");
- return true;
+ responseHandler.get();
}
- if (!FailureDetector.instance.isAlive(endpoint))
+ catch (TimeoutException e)
{
return false;
}
- if (CFMetaData.getId(tableName, cfName) == null)
+ try
{
- logger_.debug("Discarding hints for dropped keyspace or columnfamily {}/{}", tableName, cfName);
- return true;
+ Thread.sleep(DatabaseDescriptor.getHintedHandoffThrottleDelay());
}
- Table table = Table.open(tableName);
- ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
-
- int pageSize = PAGE_SIZE;
- // send less columns per page if they are very large
- if (cfs.getMeanColumns() > 0)
+ catch (InterruptedException e)
{
- int averageColumnSize = (int) (cfs.getMeanRowSize() / cfs.getMeanColumns());
- pageSize = Math.min(PAGE_SIZE, DatabaseDescriptor.getInMemoryCompactionLimit() / averageColumnSize);
- pageSize = Math.max(2, pageSize); // page size of 1 does not allow actual paging b/c of >= behavior on startColumn
- logger_.debug("average hinted-row column size is {}; using pageSize of {}", averageColumnSize, pageSize);
+ throw new AssertionError(e);
}
- DecoratedKey<?> dkey = StorageService.getPartitioner().decorateKey(key);
- ByteBuffer startColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER;
- while (true)
- {
- QueryFilter filter = QueryFilter.getSliceFilter(dkey, new QueryPath(cfs.getColumnFamilyName()), startColumn, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, pageSize);
- ColumnFamily cf = cfs.getColumnFamily(filter);
- if (pagingFinished(cf, startColumn))
- break;
- if (cf.getColumnNames().isEmpty())
- {
- logger_.debug("Nothing to hand off for {}", dkey);
- break;
- }
-
- startColumn = cf.getColumnNames().last();
- RowMutation rm = new RowMutation(tableName, key);
- rm.add(cf);
- IWriteResponseHandler responseHandler = WriteResponseHandler.create(endpoint);
- MessagingService.instance().sendRR(rm, endpoint, responseHandler);
- try
- {
- responseHandler.get();
- }
- catch (TimeoutException e)
- {
- return false;
- }
-
- try
- {
- Thread.sleep(DatabaseDescriptor.getHintedHandoffThrottleDelay());
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
- }
return true;
}
- private static void deleteHintKey(ByteBuffer endpointAddress, ByteBuffer key, ByteBuffer tableCF, long timestamp) throws IOException
+ private static void deleteHint(ByteBuffer endpointAddress, ByteBuffer hintId, long timestamp) throws IOException
{
RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, endpointAddress);
- rm.delete(new QueryPath(HINTS_CF, key, tableCF), timestamp);
+ rm.delete(new QueryPath(HINTS_CF, hintId), timestamp);
rm.apply();
}
@@ -244,30 +191,6 @@ public class HintedHandOffManager implem
|| (hintColumnFamily.getSortedColumns().size() == 1 && hintColumnFamily.getColumn(startColumn) != null);
}
- public static ByteBuffer makeCombinedName(String tableName, String columnFamily)
- {
- byte[] withsep = ArrayUtils.addAll(tableName.getBytes(UTF_8), SEPARATOR.getBytes(UTF_8));
- return ByteBuffer.wrap(ArrayUtils.addAll(withsep, columnFamily.getBytes(UTF_8)));
- }
-
- private static String[] getTableAndCFNames(ByteBuffer joined)
- {
- int index = ByteBufferUtil.lastIndexOf(joined, SEPARATOR.getBytes(UTF_8)[0], joined.limit());
-
- if (index == -1 || index < (joined.position() + 1))
- throw new RuntimeException("Corrupted hint name " + ByteBufferUtil.bytesToHex(joined));
-
- try
- {
- return new String[] { ByteBufferUtil.string(joined, joined.position(), index - joined.position()),
- ByteBufferUtil.string(joined, index + 1, joined.limit() - (index + 1)) };
- }
- catch (CharacterCodingException e)
- {
- throw new RuntimeException(e);
- }
- }
-
private int waitForSchemaAgreement(InetAddress endpoint) throws InterruptedException
{
Gossiper gossiper = Gossiper.instance;
@@ -306,7 +229,8 @@ public class HintedHandOffManager implem
logger_.debug("Sleeping {}ms to stagger hint delivery", sleep);
Thread.sleep(sleep);
}
- if (!Gossiper.instance.getEndpointStateForEndpoint(endpoint).isAlive())
+
+ if (!FailureDetector.instance.isAlive(endpoint))
{
logger_.info("Endpoint {} died before hint delivery, aborting", endpoint);
return;
@@ -320,7 +244,7 @@ public class HintedHandOffManager implem
logger_.info("Started hinted handoff for endpoint " + endpoint);
// 1. Get the key of the endpoint we need to handoff
- // 2. For each column read the list of rows: subcolumns are KS + SEPARATOR + CF
+ // 2. For each column, deserialize the mutation and send it to the endpoint
// 3. Delete the subcolumn if the write was successful
// 4. Force a flush
// 5. Do major compaction to clean up all deletes etc.
@@ -337,25 +261,31 @@ public class HintedHandOffManager implem
ColumnFamily hintColumnFamily = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter), Integer.MAX_VALUE);
if (pagingFinished(hintColumnFamily, startColumn))
break;
- for (IColumn keyColumn : hintColumnFamily.getSortedColumns())
+
+ for (IColumn hint : hintColumnFamily.getSortedColumns())
{
- startColumn = keyColumn.name();
- Collection<IColumn> tableCFs = keyColumn.getSubColumns();
- for (IColumn tableCF : tableCFs)
- {
- String[] parts = getTableAndCFNames(tableCF.name());
- if (sendRow(endpoint, parts[0], parts[1], keyColumn.name()))
- {
- deleteHintKey(endpointAsUTF8, keyColumn.name(), tableCF.name(), tableCF.timestamp());
- rowsReplayed++;
- }
- else
- {
- logger_.info("Could not complete hinted handoff to " + endpoint);
- break delivery;
- }
+ startColumn = hint.name();
+
+ IColumn versionColumn = hint.getSubColumn(ByteBufferUtil.bytes("version"));
+ IColumn tableColumn = hint.getSubColumn(ByteBufferUtil.bytes("table"));
+ IColumn keyColumn = hint.getSubColumn(ByteBufferUtil.bytes("key"));
+ IColumn mutationColumn = hint.getSubColumn(ByteBufferUtil.bytes("mutation"));
+ assert versionColumn != null;
+ assert tableColumn != null;
+ assert keyColumn != null;
+ assert mutationColumn != null;
+ DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(mutationColumn.value()));
+ RowMutation rm = RowMutation.serializer().deserialize(in, ByteBufferUtil.toInt(versionColumn.value()));
- startColumn = keyColumn.name();
+ if (sendMutation(endpoint, rm))
+ {
+ deleteHint(endpointAsUTF8, hint.name(), versionColumn.timestamp());
+ rowsReplayed++;
+ }
+ else
+ {
+ logger_.info("Could not complete hinted handoff to " + endpoint);
+ break delivery;
}
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=1149396&r1=1149395&r2=1149396&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Thu Jul 21 22:42:26 2011
@@ -21,9 +21,7 @@ package org.apache.cassandra.db;
import java.io.*;
import java.nio.ByteBuffer;
import java.util.*;
-import java.util.concurrent.ExecutionException;
-import org.apache.cassandra.net.MessageProducer;
import org.apache.commons.lang.StringUtils;
import org.apache.cassandra.config.CFMetaData;
@@ -32,13 +30,14 @@ import org.apache.cassandra.config.KSMet
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageProducer;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.Deletion;
-import org.apache.cassandra.thrift.Mutation;
-import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
public class RowMutation implements IMutation, MessageProducer
{
@@ -98,14 +97,35 @@ public class RowMutation implements IMut
return modifications_.values();
}
- void addHints(RowMutation rm) throws IOException
+ public static RowMutation hintFor(RowMutation mutation, ByteBuffer address) throws IOException
{
- for (ColumnFamily cf : rm.getColumnFamilies())
- {
- ByteBuffer combined = HintedHandOffManager.makeCombinedName(rm.getTable(), cf.metadata().cfName);
- QueryPath path = new QueryPath(HintedHandOffManager.HINTS_CF, rm.key(), combined);
- add(path, ByteBufferUtil.EMPTY_BYTE_BUFFER, System.currentTimeMillis(), cf.metadata().getGcGraceSeconds());
- }
+ RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, address);
+ ByteBuffer hintId = ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes());
+
+ // determine the TTL for the RowMutation
+ // this is set at the smallest GCGraceSeconds for any of the CFs in the RM
+ // this ensures that deletes aren't "undone" by delivery of an old hint
+ int ttl = Integer.MAX_VALUE;
+ for (ColumnFamily cf : mutation.getColumnFamilies())
+ ttl = Math.min(ttl, cf.metadata().getGcGraceSeconds());
+
+ // serialized RowMutation
+ QueryPath path = new QueryPath(HintedHandOffManager.HINTS_CF, hintId, ByteBufferUtil.bytes("mutation"));
+ rm.add(path, ByteBuffer.wrap(mutation.getSerializedBuffer(MessagingService.version_)), System.currentTimeMillis(), ttl);
+
+ // serialization version
+ path = new QueryPath(HintedHandOffManager.HINTS_CF, hintId, ByteBufferUtil.bytes("version"));
+ rm.add(path, ByteBufferUtil.bytes(MessagingService.version_), System.currentTimeMillis(), ttl);
+
+ // table
+ path = new QueryPath(HintedHandOffManager.HINTS_CF, hintId, ByteBufferUtil.bytes("table"));
+ rm.add(path, ByteBufferUtil.bytes(mutation.getTable()), System.currentTimeMillis(), ttl);
+
+ // key
+ path = new QueryPath(HintedHandOffManager.HINTS_CF, hintId, ByteBufferUtil.bytes("key"));
+ rm.add(path, mutation.key(), System.currentTimeMillis(), ttl);
+
+ return rm;
}
/*
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=1149396&r1=1149395&r2=1149396&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Thu Jul 21 22:42:26 2011
@@ -58,8 +58,7 @@ public class RowMutationVerbHandler impl
ByteBuffer addressBytes = ByteBufferUtil.readWithShortLength(dis);
if (logger_.isDebugEnabled())
logger_.debug("Adding hint for " + InetAddress.getByName(ByteBufferUtil.string(addressBytes)));
- RowMutation hintedMutation = new RowMutation(Table.SYSTEM_TABLE, addressBytes);
- hintedMutation.addHints(rm);
+ RowMutation hintedMutation = RowMutation.hintFor(rm, addressBytes);
hintedMutation.apply();
}
}