You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2013/05/21 23:26:29 UTC
git commit: Improve batchlog replay behavior and hint ttl handling
Updated Branches:
refs/heads/cassandra-1.2 dac699266 -> 3a51ccf2d
Improve batchlog replay behavior and hint ttl handling
patch by Aleksey Yeschenko; reviewed by Jonathan Ellis for
CASSANDRA-5314
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3a51ccf2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3a51ccf2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3a51ccf2
Branch: refs/heads/cassandra-1.2
Commit: 3a51ccf2d12a5fcfaa1378eff0209526c9a33278
Parents: dac6992
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed May 22 00:25:27 2013 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed May 22 00:25:27 2013 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/cql3/UntypedResultSet.java | 5 +
.../org/apache/cassandra/db/BatchlogManager.java | 173 ++++++++-------
src/java/org/apache/cassandra/db/RowMutation.java | 38 ++-
.../org/apache/cassandra/service/StorageProxy.java | 26 ++-
.../org/apache/cassandra/db/HintedHandOffTest.java | 2 +-
6 files changed, 145 insertions(+), 100 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a51ccf2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 25290cd..3902dec 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,7 @@
1.2.6
* Write row markers when serializing schema (CASSANDRA-5572)
* Check only SSTables for the requested range when streaming (CASSANDRA-5569)
+ * Improve batchlog replay behavior and hint ttl handling (CASSANDRA-5314)
Merged from 1.1:
* Remove buggy thrift max message length option (CASSANDRA-5529)
* Fix NPE in Pig's widerow mode (CASSANDRA-5488)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a51ccf2/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java b/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
index b6fcb55..9bee563 100644
--- a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
@@ -131,6 +131,11 @@ public class UntypedResultSet implements Iterable<UntypedResultSet.Row>
return DateType.instance.compose(data.get(column));
}
+ public long getLong(String column)
+ {
+ return LongType.instance.compose(data.get(column));
+ }
+
public <T> Set<T> getSet(String column, AbstractType<T> type)
{
ByteBuffer raw = data.get(column);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a51ccf2/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index 9da9b2d..c56e106 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -22,6 +22,7 @@ import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -29,31 +30,29 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.db.filter.NamesQueryFilter;
-import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.marshal.LongType;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.marshal.UUIDType;
-import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.util.FastByteArrayOutputStream;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.WriteResponseHandler;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.WrappedRunnable;
@@ -63,12 +62,9 @@ public class BatchlogManager implements BatchlogManagerMBean
private static final String MBEAN_NAME = "org.apache.cassandra.db:type=BatchlogManager";
private static final int VERSION = MessagingService.VERSION_12;
private static final long TIMEOUT = 2 * DatabaseDescriptor.getWriteRpcTimeout();
-
- private static final ByteBuffer WRITTEN_AT = columnName("written_at");
- private static final ByteBuffer DATA = columnName("data");
+ private static final long REPLAY_INTERVAL = 60 * 1000; // milliseconds
private static final Logger logger = LoggerFactory.getLogger(BatchlogManager.class);
-
public static final BatchlogManager instance = new BatchlogManager();
private final AtomicLong totalBatchesReplayed = new AtomicLong();
@@ -93,23 +89,12 @@ public class BatchlogManager implements BatchlogManagerMBean
replayAllFailedBatches();
}
};
- StorageService.optionalTasks.scheduleWithFixedDelay(runnable,
- StorageService.RING_DELAY,
- 10 * 60 * 1000,
- TimeUnit.MILLISECONDS);
+ StorageService.optionalTasks.scheduleWithFixedDelay(runnable, StorageService.RING_DELAY, REPLAY_INTERVAL, TimeUnit.MILLISECONDS);
}
public int countAllBatches()
{
- int count = 0;
-
- for (Row row : getRangeSlice(new NamesQueryFilter(ImmutableSortedSet.<ByteBuffer>of())))
- {
- if (row.cf != null && !row.cf.isMarkedForDelete())
- count++;
- }
-
- return count;
+ return (int) process("SELECT count(*) FROM %s.%s", Table.SYSTEM_KS, SystemTable.BATCHLOG_CF).one().getLong("count");
}
public long getTotalBatchesReplayed()
@@ -136,8 +121,9 @@ public class BatchlogManager implements BatchlogManagerMBean
ByteBuffer data = serializeRowMutations(mutations);
ColumnFamily cf = ColumnFamily.create(CFMetaData.BatchlogCf);
- cf.addColumn(new Column(WRITTEN_AT, writtenAt, timestamp));
- cf.addColumn(new Column(DATA, data, timestamp));
+ cf.addColumn(new Column(columnName(""), ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp));
+ cf.addColumn(new Column(columnName("written_at"), writtenAt, timestamp));
+ cf.addColumn(new Column(columnName("data"), data, timestamp));
RowMutation rm = new RowMutation(Table.SYSTEM_KS, UUIDType.instance.decompose(uuid));
rm.add(cf);
@@ -168,20 +154,13 @@ public class BatchlogManager implements BatchlogManagerMBean
if (!isReplaying.compareAndSet(false, true))
return;
+ logger.debug("Started replayAllFailedBatches");
+
try
{
- logger.debug("Started replayAllFailedBatches");
-
- for (Row row : getRangeSlice(new NamesQueryFilter(WRITTEN_AT)))
- {
- if (row.cf == null || row.cf.isMarkedForDelete())
- continue;
-
- IColumn writtenAt = row.cf.getColumn(WRITTEN_AT);
- if (writtenAt == null || System.currentTimeMillis() > LongType.instance.compose(writtenAt.value()) + TIMEOUT)
- replayBatch(row.key);
- }
-
+ for (UntypedResultSet.Row row : process("SELECT id, written_at FROM %s.%s", Table.SYSTEM_KS, SystemTable.BATCHLOG_CF))
+ if (System.currentTimeMillis() > row.getLong("written_at") + TIMEOUT)
+ replayBatch(row.getUUID("id"));
cleanup();
}
finally
@@ -192,80 +171,117 @@ public class BatchlogManager implements BatchlogManagerMBean
logger.debug("Finished replayAllFailedBatches");
}
- private void replayBatch(DecoratedKey key)
+ private void replayBatch(UUID id)
{
- UUID uuid = UUIDType.instance.compose(key.key);
-
- logger.debug("Replaying batch {}", uuid);
-
- ColumnFamilyStore store = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.BATCHLOG_CF);
- QueryFilter filter = QueryFilter.getIdentityFilter(key, new QueryPath(SystemTable.BATCHLOG_CF));
- ColumnFamily batch = store.getColumnFamily(filter);
+ logger.debug("Replaying batch {}", id);
- if (batch == null || batch.isMarkedForDelete())
+ UntypedResultSet result = process("SELECT written_at, data FROM %s.%s WHERE id = %s", Table.SYSTEM_KS, SystemTable.BATCHLOG_CF, id);
+ if (result.isEmpty())
return;
- IColumn dataColumn = batch.getColumn(DATA);
try
{
- if (dataColumn != null)
- writeHintsForSerializedMutations(dataColumn.value());
+ replaySerializedMutations(result.one().getBytes("data"), result.one().getLong("written_at"));
}
catch (IOException e)
{
- logger.warn("Skipped batch replay of {} due to {}", uuid, e);
+ logger.warn("Skipped batch replay of {} due to {}", id, e);
}
- deleteBatch(key);
+ process("DELETE FROM %s.%s WHERE id = %s", Table.SYSTEM_KS, SystemTable.BATCHLOG_CF, id);
+
totalBatchesReplayed.incrementAndGet();
}
- private static void writeHintsForSerializedMutations(ByteBuffer data) throws IOException
+ private void replaySerializedMutations(ByteBuffer data, long writtenAt) throws IOException
{
DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data));
int size = in.readInt();
for (int i = 0; i < size; i++)
- writeHintsForMutation(RowMutation.serializer.deserialize(in, VERSION));
+ replaySerializedMutation(RowMutation.serializer.deserialize(in, VERSION), writtenAt);
}
- private static void writeHintsForMutation(RowMutation mutation) throws IOException
+ /*
+ * We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints
+ * when a replica is down or a write request times out.
+ */
+ private void replaySerializedMutation(RowMutation mutation, long writtenAt) throws IOException
{
- String table = mutation.getTable();
+ int ttl = calculateHintTTL(mutation, writtenAt);
+ if (ttl <= 0)
+ return; // the mutation isn't safe to replay.
+
+ Set<InetAddress> liveEndpoints = new HashSet<InetAddress>();
+ String ks = mutation.getTable();
Token tk = StorageService.getPartitioner().getToken(mutation.key());
- List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(table, tk);
- Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, table);
- for (InetAddress target : Iterables.concat(naturalEndpoints, pendingEndpoints))
+ for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk),
+ StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks)))
{
- if (target.equals(FBUtilities.getBroadcastAddress()))
+ if (endpoint.equals(FBUtilities.getBroadcastAddress()))
mutation.apply();
+ else if (FailureDetector.instance.isAlive(endpoint))
+ liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint.
else
- StorageProxy.writeHintForMutation(mutation, target);
+ StorageProxy.writeHintForMutation(mutation, ttl, endpoint);
}
+
+ if (!liveEndpoints.isEmpty())
+ attemptDirectDelivery(mutation, writtenAt, liveEndpoints);
}
- private static void deleteBatch(DecoratedKey key)
+ private void attemptDirectDelivery(RowMutation mutation, long writtenAt, Set<InetAddress> endpoints) throws IOException
{
- RowMutation rm = new RowMutation(Table.SYSTEM_KS, key.key);
- rm.delete(new QueryPath(SystemTable.BATCHLOG_CF), FBUtilities.timestampMicros());
- rm.apply();
+ List<WriteResponseHandler> handlers = Lists.newArrayList();
+ final CopyOnWriteArraySet<InetAddress> undelivered = new CopyOnWriteArraySet<InetAddress>(endpoints);
+ for (final InetAddress ep : endpoints)
+ {
+ Runnable callback = new Runnable()
+ {
+ public void run()
+ {
+ undelivered.remove(ep);
+ }
+ };
+ WriteResponseHandler handler = new WriteResponseHandler(ep, WriteType.UNLOGGED_BATCH, callback);
+ MessagingService.instance().sendRR(mutation.createMessage(), ep, handler);
+ handlers.add(handler);
+ }
+
+ // Wait for all the requests to complete.
+ for (WriteResponseHandler handler : handlers)
+ {
+ try
+ {
+ handler.get();
+ }
+ catch (WriteTimeoutException e)
+ {
+ logger.debug("Timed out replaying a batched mutation to a node, will write a hint");
+ }
+ }
+
+ if (!undelivered.isEmpty())
+ {
+ int ttl = calculateHintTTL(mutation, writtenAt); // recalculate ttl
+ if (ttl > 0)
+ for (InetAddress endpoint : undelivered)
+ StorageProxy.writeHintForMutation(mutation, ttl, endpoint);
+ }
}
- private static ByteBuffer columnName(String name)
+ // calculate ttl for the mutation's hint (and reduce ttl by the time the mutation spent in the batchlog).
+ // this ensures that deletes aren't "undone" by an old batch replay.
+ private int calculateHintTTL(RowMutation mutation, long writtenAt)
{
- ByteBuffer raw = UTF8Type.instance.decompose(name);
- return CFMetaData.BatchlogCf.getCfDef().getColumnNameBuilder().add(raw).build();
+ return (int) ((mutation.calculateHintTTL() * 1000 - (System.currentTimeMillis() - writtenAt)) / 1000);
}
- private static List<Row> getRangeSlice(IDiskAtomFilter columnFilter)
+ private static ByteBuffer columnName(String name)
{
- ColumnFamilyStore store = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.BATCHLOG_CF);
- IPartitioner partitioner = StorageService.getPartitioner();
- RowPosition minPosition = partitioner.getMinimumToken().minKeyBound();
- AbstractBounds<RowPosition> range = new Range<RowPosition>(minPosition, minPosition, partitioner);
- return store.getRangeSlice(null, range, Integer.MAX_VALUE, columnFilter, null);
+ return CFMetaData.BatchlogCf.getCfDef().getColumnNameBuilder().add(UTF8Type.instance.decompose(name)).build();
}
- /** force flush + compaction to reclaim space from replayed batches */
+ // force flush + compaction to reclaim space from the replayed batches
private void cleanup() throws ExecutionException, InterruptedException
{
ColumnFamilyStore cfs = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.BATCHLOG_CF);
@@ -276,4 +292,9 @@ public class BatchlogManager implements BatchlogManagerMBean
if (!descriptors.isEmpty()) // don't pollute the logs if there is nothing to compact.
CompactionManager.instance.submitUserDefined(cfs, descriptors, Integer.MAX_VALUE).get();
}
+
+ private static UntypedResultSet process(String format, Object... args)
+ {
+ return QueryProcessor.processInternal(String.format(format, args));
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a51ccf2/src/java/org/apache/cassandra/db/RowMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowMutation.java b/src/java/org/apache/cassandra/db/RowMutation.java
index 3cb0038..826f3e0 100644
--- a/src/java/org/apache/cassandra/db/RowMutation.java
+++ b/src/java/org/apache/cassandra/db/RowMutation.java
@@ -95,29 +95,41 @@ public class RowMutation implements IMutation
}
/**
- * Returns mutation representing a Hints to be sent to <code>address</code>
- * as soon as it becomes available. See HintedHandoffManager for more details.
+ * Returns mutation representing a Hint to be sent to <code>targetId</code>
+ * as soon as it becomes available. See HintedHandoffManager for more details.
*/
- public static RowMutation hintFor(RowMutation mutation, UUID targetId) throws IOException
+ public RowMutation toHint(int ttl, UUID targetId) throws IOException
{
+ assert ttl > 0;
+
RowMutation rm = new RowMutation(Table.SYSTEM_KS, UUIDType.instance.decompose(targetId));
UUID hintId = UUIDGen.getTimeUUID();
-
- // determine the TTL for the RowMutation
- // this is set at the smallest GCGraceSeconds for any of the CFs in the RM
- // this ensures that deletes aren't "undone" by delivery of an old hint
- int ttl = Integer.MAX_VALUE;
- for (ColumnFamily cf : mutation.getColumnFamilies())
- ttl = Math.min(ttl, cf.metadata().getGcGraceSeconds());
-
// serialize the hint with id and version as a composite column name
- QueryPath path = new QueryPath(SystemTable.HINTS_CF, null, HintedHandOffManager.comparator.decompose(hintId, MessagingService.current_version));
- rm.add(path, ByteBuffer.wrap(FBUtilities.serialize(mutation, serializer, MessagingService.current_version)), System.currentTimeMillis(), ttl);
+ QueryPath path = new QueryPath(SystemTable.HINTS_CF,
+ null,
+ HintedHandOffManager.comparator.decompose(hintId, MessagingService.current_version));
+ rm.add(path,
+ ByteBuffer.wrap(FBUtilities.serialize(this, serializer, MessagingService.current_version)),
+ System.currentTimeMillis(),
+ ttl);
return rm;
}
/*
+ * determine the TTL for the hint RowMutation
+ * this is set at the smallest GCGraceSeconds for any of the CFs in the RM
+ * this ensures that deletes aren't "undone" by delivery of an old hint
+ */
+ public int calculateHintTTL()
+ {
+ int ttl = Integer.MAX_VALUE;
+ for (ColumnFamily cf : getColumnFamilies())
+ ttl = Math.min(ttl, cf.metadata().getGcGraceSeconds());
+ return ttl;
+ }
+
+ /*
* Specify a column family name and the corresponding column
* family object.
* param @ cf - column family name
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a51ccf2/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index b9cce15..e8440c4 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -550,12 +550,19 @@ public class StorageProxy implements StorageProxyMBean
{
public void runMayThrow() throws IOException
{
- logger.debug("Adding hint for {}", target);
-
- writeHintForMutation(mutation, target);
- // Notify the handler only for CL == ANY
- if (responseHandler != null && consistencyLevel == ConsistencyLevel.ANY)
- responseHandler.response(null);
+ int ttl = mutation.calculateHintTTL();
+ if (ttl > 0)
+ {
+ logger.debug("Adding hint for {}", target);
+ writeHintForMutation(mutation, ttl, target);
+ // Notify the handler only for CL == ANY
+ if (responseHandler != null && consistencyLevel == ConsistencyLevel.ANY)
+ responseHandler.response(null);
+ }
+ else
+ {
+ logger.debug("Skipped writing hint for {} (ttl {})", target, ttl);
+ }
}
};
@@ -569,8 +576,9 @@ public class StorageProxy implements StorageProxyMBean
return (Future<Void>) StageManager.getStage(Stage.MUTATION).submit(runnable);
}
- public static void writeHintForMutation(RowMutation mutation, InetAddress target) throws IOException
+ public static void writeHintForMutation(RowMutation mutation, int ttl, InetAddress target) throws IOException
{
+ assert ttl > 0;
UUID hostId = StorageService.instance.getTokenMetadata().getHostId(target);
if ((hostId == null) && (MessagingService.instance().getVersion(target) < MessagingService.VERSION_12))
{
@@ -578,9 +586,7 @@ public class StorageProxy implements StorageProxyMBean
return;
}
assert hostId != null : "Missing host ID for " + target.getHostAddress();
- RowMutation hintedMutation = RowMutation.hintFor(mutation, hostId);
- hintedMutation.apply();
-
+ mutation.toHint(ttl, hostId).apply();
totalHints.incrementAndGet();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a51ccf2/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/HintedHandOffTest.java b/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
index 260f1c5..a012109 100644
--- a/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
+++ b/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
@@ -70,7 +70,7 @@ public class HintedHandOffTest extends SchemaLoader
ByteBufferUtil.EMPTY_BYTE_BUFFER,
System.currentTimeMillis());
- RowMutation.hintFor(rm, UUID.randomUUID()).apply();
+ rm.toHint(rm.calculateHintTTL(), UUID.randomUUID()).apply();
// flush data to disk
hintStore.forceBlockingFlush();