You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/05/15 00:14:09 UTC
git commit: Make batchlog replay asynchronous
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 e7b3deee6 -> 92c38c0e6
Make batchlog replay asynchronous
patch by Oleg Anastasyev; reviewed by Aleksey Yeschenko for
CASSANDRA-6134
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/92c38c0e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/92c38c0e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/92c38c0e
Branch: refs/heads/cassandra-2.1
Commit: 92c38c0e6a5e23bdb77c23073a28f118a9f23add
Parents: e7b3dee
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Thu May 15 01:13:09 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Thu May 15 01:13:09 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/BatchlogManager.java | 287 ++++++++++++-------
2 files changed, 188 insertions(+), 100 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/92c38c0e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3dd47a1..d43a0f5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -12,6 +12,7 @@
* Fix repair hang when given CF does not exist (CASSANDRA-7189)
* Allow c* to be shutdown in an embedded mode (CASSANDRA-5635)
* Add server side batching to native transport (CASSANDRA-5663)
+ * Make batchlog replay asynchronous (CASSANDRA-6134)
Merged from 2.0:
* (Hadoop) Close java driver Cluster in CQLRR.close (CASSANDRA-7228)
* Warn when 'USING TIMESTAMP' is used on a CAS BATCH (CASSANDRA-7067)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/92c38c0e/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index 3ffc7a7..1a441f6 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -48,6 +48,8 @@ import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
@@ -193,162 +195,247 @@ public class BatchlogManager implements BatchlogManagerMBean
logger.debug("Finished replayAllFailedBatches");
}
- // returns the UUID of the last seen batch
+ private void deleteBatch(UUID id)
+ {
+ Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(id));
+ mutation.delete(SystemKeyspace.BATCHLOG_CF, FBUtilities.timestampMicros());
+ mutation.apply();
+ }
+
private UUID processBatchlogPage(UntypedResultSet page, RateLimiter rateLimiter)
{
UUID id = null;
+ ArrayList<Batch> batches = new ArrayList<>(page.size());
+
+ // Sending out batches for replay without waiting for them, so that one stuck batch doesn't affect others
for (UntypedResultSet.Row row : page)
{
id = row.getUUID("id");
long writtenAt = row.getLong("written_at");
- int version = row.has("version") ? row.getInt("version") : MessagingService.VERSION_12;
// enough time for the actual write + batchlog entry mutation delivery (two separate requests).
long timeout = DatabaseDescriptor.getWriteRpcTimeout() * 2; // enough time for the actual write + BM removal mutation
if (System.currentTimeMillis() < writtenAt + timeout)
continue; // not ready to replay yet, might still get a deletion.
- replayBatch(id, row.getBytes("data"), writtenAt, version, rateLimiter);
+
+ int version = row.has("version") ? row.getInt("version") : MessagingService.VERSION_12;
+ Batch batch = new Batch(id, writtenAt, row.getBytes("data"), version);
+ try
+ {
+ if (batch.replay(rateLimiter) > 0)
+ {
+ batches.add(batch);
+ }
+ else
+ {
+ deleteBatch(id); // no write mutations were sent (either expired or all CFs involved truncated).
+ totalBatchesReplayed.incrementAndGet();
+ }
+ }
+ catch (IOException e)
+ {
+ logger.warn("Skipped batch replay of {} due to {}", id, e);
+ deleteBatch(id);
+ }
+ }
+
+ // now waiting for all batches to complete their processing
+ // schedule hints for timed out deliveries
+ for (Batch batch : batches)
+ {
+ batch.finish();
+ deleteBatch(batch.id);
}
+
+ totalBatchesReplayed.addAndGet(batches.size());
+
return id;
}
- private void replayBatch(UUID id, ByteBuffer data, long writtenAt, int version, RateLimiter rateLimiter)
+ private static class Batch
{
- logger.debug("Replaying batch {}", id);
+ private final UUID id;
+ private final long writtenAt;
+ private final ByteBuffer data;
+ private final int version;
- try
+ private List<ReplayWriteResponseHandler> replayHandlers;
+
+ public Batch(UUID id, long writtenAt, ByteBuffer data, int version)
{
- replaySerializedMutations(data, writtenAt, version, rateLimiter);
+ this.id = id;
+ this.writtenAt = writtenAt;
+ this.data = data;
+ this.version = version;
}
- catch (IOException e)
+
+ public int replay(RateLimiter rateLimiter) throws IOException
{
- logger.warn("Skipped batch replay of {} due to {}", id, e);
- }
+ logger.debug("Replaying batch {}", id);
- deleteBatch(id);
+ List<Mutation> mutations = replayingMutations();
- totalBatchesReplayed.incrementAndGet();
- }
+ if (mutations.isEmpty())
+ return 0;
- private void deleteBatch(UUID id)
- {
- Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(id));
- mutation.delete(SystemKeyspace.BATCHLOG_CF, FBUtilities.timestampMicros());
- mutation.apply();
- }
+ int ttl = calculateHintTTL(mutations);
+ if (ttl <= 0)
+ return 0;
- private void replaySerializedMutations(ByteBuffer data, long writtenAt, int version, RateLimiter rateLimiter) throws IOException
- {
- DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data));
- int size = in.readInt();
- List<Mutation> mutations = new ArrayList<>(size);
+ replayHandlers = sendReplays(mutations, writtenAt, ttl);
+
+ rateLimiter.acquire(data.remaining()); // acquire afterwards, to not mess up ttl calculation.
- for (int i = 0; i < size; i++)
+ return replayHandlers.size();
+ }
+
+ public void finish()
{
- Mutation mutation = Mutation.serializer.deserialize(in, version);
+ for (int i = 0; i < replayHandlers.size(); i++)
+ {
+ ReplayWriteResponseHandler handler = replayHandlers.get(i);
+ try
+ {
+ handler.get();
+ }
+ catch (WriteTimeoutException e)
+ {
+ logger.debug("Timed out replaying a batched mutation to a node, will write a hint");
+ // writing hints for the rest to hints, starting from i
+ writeHintsForUndeliveredEndpoints(i);
+ return;
+ }
+ }
+ }
+
+ private List<Mutation> replayingMutations() throws IOException
+ {
+ DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data));
+ int size = in.readInt();
+ List<Mutation> mutations = new ArrayList<>(size);
+ for (int i = 0; i < size; i++)
+ {
+ Mutation mutation = Mutation.serializer.deserialize(in, version);
- // Remove CFs that have been truncated since. writtenAt and SystemTable#getTruncatedAt() both return millis.
- // We don't abort the replay entirely b/c this can be considered a succes (truncated is same as delivered then
- // truncated.
- for (UUID cfId : mutation.getColumnFamilyIds())
- if (writtenAt <= SystemKeyspace.getTruncatedAt(cfId))
- mutation = mutation.without(cfId);
+ // Remove CFs that have been truncated since. writtenAt and SystemTable#getTruncatedAt() both return millis.
+ // We don't abort the replay entirely b/c this can be considered a success (truncated is same as delivered then
+ // truncated.
+ for (UUID cfId : mutation.getColumnFamilyIds())
+ if (writtenAt <= SystemKeyspace.getTruncatedAt(cfId))
+ mutation = mutation.without(cfId);
- if (!mutation.isEmpty())
- mutations.add(mutation);
+ if (!mutation.isEmpty())
+ mutations.add(mutation);
+ }
+ return mutations;
}
- if (!mutations.isEmpty())
- replayMutations(mutations, writtenAt, version, rateLimiter);
- }
+ private void writeHintsForUndeliveredEndpoints(int startFrom)
+ {
+ try
+ {
+ // Here we deserialize mutations 2nd time from byte buffer.
+ // but this is ok, because timeout on batch direct delivery is rare
+ // (it can happen only several seconds until node is marked dead)
+ // so trading some cpu to keep less objects
+ List<Mutation> replayingMutations = replayingMutations();
+ for (int i = startFrom; i < replayHandlers.size(); i++)
+ {
+ Mutation undeliveredMutation = replayingMutations.get(i);
+ int ttl = calculateHintTTL(replayingMutations);
+ ReplayWriteResponseHandler handler = replayHandlers.get(i);
- /*
- * We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints
- * when a replica is down or a write request times out.
- */
- private void replayMutations(List<Mutation> mutations, long writtenAt, int version, RateLimiter rateLimiter) throws IOException
- {
- int ttl = calculateHintTTL(mutations, writtenAt);
- if (ttl <= 0)
- return; // this batchlog entry has 'expired'
-
- List<InetAddress> liveEndpoints = new ArrayList<>();
- List<InetAddress> hintEndpoints = new ArrayList<>();
-
- for (Mutation mutation : mutations)
+ if (ttl > 0 && handler != null)
+ for (InetAddress endpoint : handler.undelivered)
+ StorageProxy.writeHintForMutation(undeliveredMutation, writtenAt, ttl, endpoint);
+ }
+ }
+ catch (IOException e)
+ {
+ logger.error("Cannot schedule hints for undelivered batch", e);
+ }
+ }
+
+ private List<ReplayWriteResponseHandler> sendReplays(List<Mutation> mutations, long writtenAt, int ttl)
{
+ List<ReplayWriteResponseHandler> handlers = new ArrayList<>(mutations.size());
+ for (Mutation mutation : mutations)
+ {
+ ReplayWriteResponseHandler handler = sendSingleReplayMutation(mutation, writtenAt, ttl);
+ if (handler != null)
+ handlers.add(handler);
+ }
+ return handlers;
+ }
+
+ /**
+ * We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints
+ * when a replica is down or a write request times out.
+ *
+ * @return direct delivery handler to wait on or null, if no live nodes found
+ */
+ private ReplayWriteResponseHandler sendSingleReplayMutation(final Mutation mutation, long writtenAt, int ttl)
+ {
+ Set<InetAddress> liveEndpoints = new HashSet<>();
String ks = mutation.getKeyspaceName();
- Token tk = StorageService.getPartitioner().getToken(mutation.key());
- int mutationSize = (int) Mutation.serializer.serializedSize(mutation, version);
+ Token<?> tk = StorageService.getPartitioner().getToken(mutation.key());
for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk),
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks)))
{
- rateLimiter.acquire(mutationSize);
if (endpoint.equals(FBUtilities.getBroadcastAddress()))
mutation.apply();
else if (FailureDetector.instance.isAlive(endpoint))
liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint.
else
- hintEndpoints.add(endpoint);
+ StorageProxy.writeHintForMutation(mutation, writtenAt, ttl, endpoint);
}
- if (!liveEndpoints.isEmpty())
- hintEndpoints.addAll(attemptDirectDelivery(mutation, liveEndpoints));
+ if (liveEndpoints.isEmpty())
+ return null;
- for (InetAddress endpoint : hintEndpoints)
- StorageProxy.writeHintForMutation(mutation, writtenAt, ttl, endpoint);
-
- liveEndpoints.clear();
- hintEndpoints.clear();
+ ReplayWriteResponseHandler handler = new ReplayWriteResponseHandler(liveEndpoints);
+ MessageOut<Mutation> message = mutation.createMessage();
+ for (InetAddress endpoint : liveEndpoints)
+ MessagingService.instance().sendRR(message, endpoint, handler, false);
+ return handler;
}
- }
- // Returns the endpoints we failed to deliver to.
- private Set<InetAddress> attemptDirectDelivery(Mutation mutation, List<InetAddress> endpoints) throws IOException
- {
- final List<WriteResponseHandler> handlers = new ArrayList<>();
- final Set<InetAddress> undelivered = Collections.synchronizedSet(new HashSet<InetAddress>());
-
- for (final InetAddress ep : endpoints)
+ /*
+ * Calculate ttl for the mutations' hints (and reduce ttl by the time the mutations spent in the batchlog).
+ * This ensures that deletes aren't "undone" by an old batch replay.
+ */
+ private int calculateHintTTL(Collection<Mutation> mutations)
{
- Runnable callback = new Runnable()
- {
- public void run()
- {
- undelivered.remove(ep);
- }
- };
- WriteResponseHandler handler = new WriteResponseHandler(ep, WriteType.UNLOGGED_BATCH, callback);
- MessagingService.instance().sendRR(mutation.createMessage(), ep, handler, false);
- handlers.add(handler);
+ int unadjustedTTL = Integer.MAX_VALUE;
+ for (Mutation mutation : mutations)
+ unadjustedTTL = Math.min(unadjustedTTL, HintedHandOffManager.calculateHintTTL(mutation));
+ return unadjustedTTL - (int) TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - writtenAt);
}
- // Wait for all the requests to complete.
- for (WriteResponseHandler handler : handlers)
+ private static class ReplayWriteResponseHandler extends WriteResponseHandler
{
- try
+ private final Set<InetAddress> undelivered = Collections.newSetFromMap(new ConcurrentHashMap<InetAddress, Boolean>());
+
+ public ReplayWriteResponseHandler(Collection<InetAddress> writeEndpoints)
{
- handler.get();
+ super(writeEndpoints, Collections.<InetAddress>emptySet(), null, null, null, WriteType.UNLOGGED_BATCH);
+ undelivered.addAll(writeEndpoints);
}
- catch (WriteTimeoutException e)
+
+ @Override
+ protected int totalBlockFor()
{
- logger.debug("Timed out replaying a batched mutation to a node, will write a hint");
+ return this.naturalEndpoints.size();
}
- }
- return undelivered;
- }
-
- /*
- * Calculate ttl for the mutations' hints (and reduce ttl by the time the mutations spent in the batchlog).
- * This ensures that deletes aren't "undone" by an old batch replay.
- */
- private int calculateHintTTL(List<Mutation> mutations, long writtenAt)
- {
- int unadjustedTTL = Integer.MAX_VALUE;
- for (Mutation mutation : mutations)
- unadjustedTTL = Math.min(unadjustedTTL, HintedHandOffManager.calculateHintTTL(mutation));
- return unadjustedTTL - (int) TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - writtenAt);
+ @Override
+ public void response(MessageIn m)
+ {
+ boolean removed = undelivered.remove(m.from);
+ assert removed;
+ super.response(m);
+ }
+ }
}
// force flush + compaction to reclaim space from the replayed batches