You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/05/15 00:15:15 UTC

[1/2] git commit: Make batchlog replay asynchronous

Repository: cassandra
Updated Branches:
  refs/heads/trunk 1c86f6688 -> eea5c3748


Make batchlog replay asynchronous

patch by Oleg Anastasyev; reviewed by Aleksey Yeschenko for
CASSANDRA-6134


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/92c38c0e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/92c38c0e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/92c38c0e

Branch: refs/heads/trunk
Commit: 92c38c0e6a5e23bdb77c23073a28f118a9f23add
Parents: e7b3dee
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Thu May 15 01:13:09 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Thu May 15 01:13:09 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/BatchlogManager.java    | 287 ++++++++++++-------
 2 files changed, 188 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/92c38c0e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3dd47a1..d43a0f5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -12,6 +12,7 @@
  * Fix repair hang when given CF does not exist (CASSANDRA-7189)
  * Allow c* to be shutdown in an embedded mode (CASSANDRA-5635)
  * Add server side batching to native transport (CASSANDRA-5663)
+ * Make batchlog replay asynchronous (CASSANDRA-6134)
 Merged from 2.0:
  * (Hadoop) Close java driver Cluster in CQLRR.close (CASSANDRA-7228)
  * Warn when 'USING TIMESTAMP' is used on a CAS BATCH (CASSANDRA-7067)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/92c38c0e/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index 3ffc7a7..1a441f6 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -48,6 +48,8 @@ import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
@@ -193,162 +195,247 @@ public class BatchlogManager implements BatchlogManagerMBean
         logger.debug("Finished replayAllFailedBatches");
     }
 
-    // returns the UUID of the last seen batch
+    private void deleteBatch(UUID id)
+    {
+        Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(id));
+        mutation.delete(SystemKeyspace.BATCHLOG_CF, FBUtilities.timestampMicros());
+        mutation.apply();
+    }
+
     private UUID processBatchlogPage(UntypedResultSet page, RateLimiter rateLimiter)
     {
         UUID id = null;
+        ArrayList<Batch> batches = new ArrayList<>(page.size());
+
+        // Sending out batches for replay without waiting for them, so that one stuck batch doesn't affect others
         for (UntypedResultSet.Row row : page)
         {
             id = row.getUUID("id");
             long writtenAt = row.getLong("written_at");
-            int version = row.has("version") ? row.getInt("version") : MessagingService.VERSION_12;
             // enough time for the actual write + batchlog entry mutation delivery (two separate requests).
             long timeout = DatabaseDescriptor.getWriteRpcTimeout() * 2; // enough time for the actual write + BM removal mutation
             if (System.currentTimeMillis() < writtenAt + timeout)
                 continue; // not ready to replay yet, might still get a deletion.
-            replayBatch(id, row.getBytes("data"), writtenAt, version, rateLimiter);
+
+            int version = row.has("version") ? row.getInt("version") : MessagingService.VERSION_12;
+            Batch batch = new Batch(id, writtenAt, row.getBytes("data"), version);
+            try
+            {
+                if (batch.replay(rateLimiter) > 0)
+                {
+                    batches.add(batch);
+                }
+                else
+                {
+                    deleteBatch(id); // no write mutations were sent (either expired or all CFs involved truncated).
+                    totalBatchesReplayed.incrementAndGet();
+                }
+            }
+            catch (IOException e)
+            {
+                logger.warn("Skipped batch replay of {} due to {}", id, e);
+                deleteBatch(id);
+            }
+        }
+
+        // now waiting for all batches to complete their processing
+        // schedule hints for timed out deliveries
+        for (Batch batch : batches)
+        {
+            batch.finish();
+            deleteBatch(batch.id);
         }
+
+        totalBatchesReplayed.addAndGet(batches.size());
+
         return id;
     }
 
-    private void replayBatch(UUID id, ByteBuffer data, long writtenAt, int version, RateLimiter rateLimiter)
+    private static class Batch
     {
-        logger.debug("Replaying batch {}", id);
+        private final UUID id;
+        private final long writtenAt;
+        private final ByteBuffer data;
+        private final int version;
 
-        try
+        private List<ReplayWriteResponseHandler> replayHandlers;
+
+        public Batch(UUID id, long writtenAt, ByteBuffer data, int version)
         {
-            replaySerializedMutations(data, writtenAt, version, rateLimiter);
+            this.id = id;
+            this.writtenAt = writtenAt;
+            this.data = data;
+            this.version = version;
         }
-        catch (IOException e)
+
+        public int replay(RateLimiter rateLimiter) throws IOException
         {
-            logger.warn("Skipped batch replay of {} due to {}", id, e);
-        }
+            logger.debug("Replaying batch {}", id);
 
-        deleteBatch(id);
+            List<Mutation> mutations = replayingMutations();
 
-        totalBatchesReplayed.incrementAndGet();
-    }
+            if (mutations.isEmpty())
+                return 0;
 
-    private void deleteBatch(UUID id)
-    {
-        Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(id));
-        mutation.delete(SystemKeyspace.BATCHLOG_CF, FBUtilities.timestampMicros());
-        mutation.apply();
-    }
+            int ttl = calculateHintTTL(mutations);
+            if (ttl <= 0)
+                return 0;
 
-    private void replaySerializedMutations(ByteBuffer data, long writtenAt, int version, RateLimiter rateLimiter) throws IOException
-    {
-        DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data));
-        int size = in.readInt();
-        List<Mutation> mutations = new ArrayList<>(size);
+            replayHandlers = sendReplays(mutations, writtenAt, ttl);
+
+            rateLimiter.acquire(data.remaining()); // acquire afterwards, to not mess up ttl calculation.
 
-        for (int i = 0; i < size; i++)
+            return replayHandlers.size();
+        }
+
+        public void finish()
         {
-            Mutation mutation = Mutation.serializer.deserialize(in, version);
+            for (int i = 0; i < replayHandlers.size(); i++)
+            {
+                ReplayWriteResponseHandler handler = replayHandlers.get(i);
+                try
+                {
+                    handler.get();
+                }
+                catch (WriteTimeoutException e)
+                {
+                    logger.debug("Timed out replaying a batched mutation to a node, will write a hint");
+                    // writing hints for the rest to hints, starting from i
+                    writeHintsForUndeliveredEndpoints(i);
+                    return;
+                }
+            }
+        }
+
+        private List<Mutation> replayingMutations() throws IOException
+        {
+            DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data));
+            int size = in.readInt();
+            List<Mutation> mutations = new ArrayList<>(size);
+            for (int i = 0; i < size; i++)
+            {
+                Mutation mutation = Mutation.serializer.deserialize(in, version);
 
-            // Remove CFs that have been truncated since. writtenAt and SystemTable#getTruncatedAt() both return millis.
-            // We don't abort the replay entirely b/c this can be considered a succes (truncated is same as delivered then
-            // truncated.
-            for (UUID cfId : mutation.getColumnFamilyIds())
-                if (writtenAt <= SystemKeyspace.getTruncatedAt(cfId))
-                    mutation = mutation.without(cfId);
+                // Remove CFs that have been truncated since. writtenAt and SystemTable#getTruncatedAt() both return millis.
+                // We don't abort the replay entirely b/c this can be considered a success (truncated is same as delivered then
+                // truncated.
+                for (UUID cfId : mutation.getColumnFamilyIds())
+                    if (writtenAt <= SystemKeyspace.getTruncatedAt(cfId))
+                        mutation = mutation.without(cfId);
 
-            if (!mutation.isEmpty())
-                mutations.add(mutation);
+                if (!mutation.isEmpty())
+                    mutations.add(mutation);
+            }
+            return mutations;
         }
 
-        if (!mutations.isEmpty())
-            replayMutations(mutations, writtenAt, version, rateLimiter);
-    }
+        private void writeHintsForUndeliveredEndpoints(int startFrom)
+        {
+            try
+            {
+                // Here we deserialize mutations 2nd time from byte buffer.
+                // but this is ok, because timeout on batch direct delivery is rare
+                // (it can happen only several seconds until node is marked dead)
+                // so trading some cpu to keep less objects
+                List<Mutation> replayingMutations = replayingMutations();
+                for (int i = startFrom; i < replayHandlers.size(); i++)
+                {
+                    Mutation undeliveredMutation = replayingMutations.get(i);
+                    int ttl = calculateHintTTL(replayingMutations);
+                    ReplayWriteResponseHandler handler = replayHandlers.get(i);
 
-    /*
-     * We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints
-     * when a replica is down or a write request times out.
-     */
-    private void replayMutations(List<Mutation> mutations, long writtenAt, int version, RateLimiter rateLimiter) throws IOException
-    {
-        int ttl = calculateHintTTL(mutations, writtenAt);
-        if (ttl <= 0)
-            return; // this batchlog entry has 'expired'
-
-        List<InetAddress> liveEndpoints = new ArrayList<>();
-        List<InetAddress> hintEndpoints = new ArrayList<>();
-        
-        for (Mutation mutation : mutations)
+                    if (ttl > 0 && handler != null)
+                        for (InetAddress endpoint : handler.undelivered)
+                            StorageProxy.writeHintForMutation(undeliveredMutation, writtenAt, ttl, endpoint);
+                }
+            }
+            catch (IOException e)
+            {
+                logger.error("Cannot schedule hints for undelivered batch", e);
+            }
+        }
+
+        private List<ReplayWriteResponseHandler> sendReplays(List<Mutation> mutations, long writtenAt, int ttl)
         {
+            List<ReplayWriteResponseHandler> handlers = new ArrayList<>(mutations.size());
+            for (Mutation mutation : mutations)
+            {
+                ReplayWriteResponseHandler handler = sendSingleReplayMutation(mutation, writtenAt, ttl);
+                if (handler != null)
+                    handlers.add(handler);
+            }
+            return handlers;
+        }
+
+        /**
+         * We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints
+         * when a replica is down or a write request times out.
+         *
+         * @return direct delivery handler to wait on or null, if no live nodes found
+         */
+        private ReplayWriteResponseHandler sendSingleReplayMutation(final Mutation mutation, long writtenAt, int ttl)
+        {
+            Set<InetAddress> liveEndpoints = new HashSet<>();
             String ks = mutation.getKeyspaceName();
-            Token tk = StorageService.getPartitioner().getToken(mutation.key());
-            int mutationSize = (int) Mutation.serializer.serializedSize(mutation, version);
+            Token<?> tk = StorageService.getPartitioner().getToken(mutation.key());
 
             for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk),
                                                          StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks)))
             {
-                rateLimiter.acquire(mutationSize);
                 if (endpoint.equals(FBUtilities.getBroadcastAddress()))
                     mutation.apply();
                 else if (FailureDetector.instance.isAlive(endpoint))
                     liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint.
                 else
-                    hintEndpoints.add(endpoint);
+                    StorageProxy.writeHintForMutation(mutation, writtenAt, ttl, endpoint);
             }
 
-            if (!liveEndpoints.isEmpty())
-                hintEndpoints.addAll(attemptDirectDelivery(mutation, liveEndpoints));
+            if (liveEndpoints.isEmpty())
+                return null;
 
-            for (InetAddress endpoint : hintEndpoints)
-                StorageProxy.writeHintForMutation(mutation, writtenAt, ttl, endpoint);
-            
-            liveEndpoints.clear();
-            hintEndpoints.clear();
+            ReplayWriteResponseHandler handler = new ReplayWriteResponseHandler(liveEndpoints);
+            MessageOut<Mutation> message = mutation.createMessage();
+            for (InetAddress endpoint : liveEndpoints)
+                MessagingService.instance().sendRR(message, endpoint, handler, false);
+            return handler;
         }
-    }
 
-    // Returns the endpoints we failed to deliver to.
-    private Set<InetAddress> attemptDirectDelivery(Mutation mutation, List<InetAddress> endpoints) throws IOException
-    {
-        final List<WriteResponseHandler> handlers = new ArrayList<>();
-        final Set<InetAddress> undelivered = Collections.synchronizedSet(new HashSet<InetAddress>());
-
-        for (final InetAddress ep : endpoints)
+        /*
+         * Calculate ttl for the mutations' hints (and reduce ttl by the time the mutations spent in the batchlog).
+         * This ensures that deletes aren't "undone" by an old batch replay.
+         */
+        private int calculateHintTTL(Collection<Mutation> mutations)
         {
-            Runnable callback = new Runnable()
-            {
-                public void run()
-                {
-                    undelivered.remove(ep);
-                }
-            };
-            WriteResponseHandler handler = new WriteResponseHandler(ep, WriteType.UNLOGGED_BATCH, callback);
-            MessagingService.instance().sendRR(mutation.createMessage(), ep, handler, false);
-            handlers.add(handler);
+            int unadjustedTTL = Integer.MAX_VALUE;
+            for (Mutation mutation : mutations)
+                unadjustedTTL = Math.min(unadjustedTTL, HintedHandOffManager.calculateHintTTL(mutation));
+            return unadjustedTTL - (int) TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - writtenAt);
         }
 
-        // Wait for all the requests to complete.
-        for (WriteResponseHandler handler : handlers)
+        private static class ReplayWriteResponseHandler extends WriteResponseHandler
         {
-            try
+            private final Set<InetAddress> undelivered = Collections.newSetFromMap(new ConcurrentHashMap<InetAddress, Boolean>());
+
+            public ReplayWriteResponseHandler(Collection<InetAddress> writeEndpoints)
             {
-                handler.get();
+                super(writeEndpoints, Collections.<InetAddress>emptySet(), null, null, null, WriteType.UNLOGGED_BATCH);
+                undelivered.addAll(writeEndpoints);
             }
-            catch (WriteTimeoutException e)
+
+            @Override
+            protected int totalBlockFor()
             {
-                logger.debug("Timed out replaying a batched mutation to a node, will write a hint");
+                return this.naturalEndpoints.size();
             }
-        }
 
-        return undelivered;
-    }
-
-    /*
-     * Calculate ttl for the mutations' hints (and reduce ttl by the time the mutations spent in the batchlog).
-     * This ensures that deletes aren't "undone" by an old batch replay.
-     */
-    private int calculateHintTTL(List<Mutation> mutations, long writtenAt)
-    {
-        int unadjustedTTL = Integer.MAX_VALUE;
-        for (Mutation mutation : mutations)
-            unadjustedTTL = Math.min(unadjustedTTL, HintedHandOffManager.calculateHintTTL(mutation));
-        return unadjustedTTL - (int) TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - writtenAt);
+            @Override
+            public void response(MessageIn m)
+            {
+                boolean removed = undelivered.remove(m.from);
+                assert removed;
+                super.response(m);
+            }
+        }
     }
 
     // force flush + compaction to reclaim space from the replayed batches


[2/2] git commit: Merge branch 'cassandra-2.1' into trunk

Posted by al...@apache.org.
Merge branch 'cassandra-2.1' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/eea5c374
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/eea5c374
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/eea5c374

Branch: refs/heads/trunk
Commit: eea5c37480d532c18c2ec629969c6911e08a867a
Parents: 1c86f66 92c38c0
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Thu May 15 01:14:19 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Thu May 15 01:14:19 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/BatchlogManager.java    | 287 ++++++++++++-------
 2 files changed, 188 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/eea5c374/CHANGES.txt
----------------------------------------------------------------------