You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2013/05/21 23:26:29 UTC

git commit: Improve batchlog replay behavior and hint ttl handling

Updated Branches:
  refs/heads/cassandra-1.2 dac699266 -> 3a51ccf2d


Improve batchlog replay behavior and hint ttl handling

patch by Aleksey Yeschenko; reviewed by Jonathan Ellis for
CASSANDRA-5314


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3a51ccf2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3a51ccf2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3a51ccf2

Branch: refs/heads/cassandra-1.2
Commit: 3a51ccf2d12a5fcfaa1378eff0209526c9a33278
Parents: dac6992
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed May 22 00:25:27 2013 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed May 22 00:25:27 2013 +0300

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../apache/cassandra/cql3/UntypedResultSet.java    |    5 +
 .../org/apache/cassandra/db/BatchlogManager.java   |  173 ++++++++-------
 src/java/org/apache/cassandra/db/RowMutation.java  |   38 ++-
 .../org/apache/cassandra/service/StorageProxy.java |   26 ++-
 .../org/apache/cassandra/db/HintedHandOffTest.java |    2 +-
 6 files changed, 145 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a51ccf2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 25290cd..3902dec 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,7 @@
 1.2.6
  * Write row markers when serializing schema (CASSANDRA-5572)
  * Check only SSTables for the requested range when streaming (CASSANDRA-5569)
+ * Improve batchlog replay behavior and hint ttl handling (CASSANDRA-5314)
 Merged from 1.1:
  * Remove buggy thrift max message length option (CASSANDRA-5529)
  * Fix NPE in Pig's widerow mode (CASSANDRA-5488)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a51ccf2/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java b/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
index b6fcb55..9bee563 100644
--- a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
@@ -131,6 +131,11 @@ public class UntypedResultSet implements Iterable<UntypedResultSet.Row>
             return DateType.instance.compose(data.get(column));
         }
 
+        public long getLong(String column)
+        {
+            return LongType.instance.compose(data.get(column));
+        }
+
         public <T> Set<T> getSet(String column, AbstractType<T> type)
         {
             ByteBuffer raw = data.get(column);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a51ccf2/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index 9da9b2d..c56e106 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -22,6 +22,7 @@ import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -29,31 +30,29 @@ import java.util.concurrent.atomic.AtomicLong;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
-import com.google.common.collect.ImmutableSortedSet;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.db.filter.NamesQueryFilter;
-import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.marshal.UUIDType;
-import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FastByteArrayOutputStream;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.WriteResponseHandler;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.WrappedRunnable;
@@ -63,12 +62,9 @@ public class BatchlogManager implements BatchlogManagerMBean
     private static final String MBEAN_NAME = "org.apache.cassandra.db:type=BatchlogManager";
     private static final int VERSION = MessagingService.VERSION_12;
     private static final long TIMEOUT = 2 * DatabaseDescriptor.getWriteRpcTimeout();
-
-    private static final ByteBuffer WRITTEN_AT = columnName("written_at");
-    private static final ByteBuffer DATA = columnName("data");
+    private static final long REPLAY_INTERVAL = 60 * 1000; // milliseconds
 
     private static final Logger logger = LoggerFactory.getLogger(BatchlogManager.class);
-
     public static final BatchlogManager instance = new BatchlogManager();
 
     private final AtomicLong totalBatchesReplayed = new AtomicLong();
@@ -93,23 +89,12 @@ public class BatchlogManager implements BatchlogManagerMBean
                 replayAllFailedBatches();
             }
         };
-        StorageService.optionalTasks.scheduleWithFixedDelay(runnable,
-                                                            StorageService.RING_DELAY,
-                                                            10 * 60 * 1000,
-                                                            TimeUnit.MILLISECONDS);
+        StorageService.optionalTasks.scheduleWithFixedDelay(runnable, StorageService.RING_DELAY, REPLAY_INTERVAL, TimeUnit.MILLISECONDS);
     }
 
     public int countAllBatches()
     {
-        int count = 0;
-
-        for (Row row : getRangeSlice(new NamesQueryFilter(ImmutableSortedSet.<ByteBuffer>of())))
-        {
-            if (row.cf != null && !row.cf.isMarkedForDelete())
-                count++;
-        }
-
-        return count;
+        return (int) process("SELECT count(*) FROM %s.%s", Table.SYSTEM_KS, SystemTable.BATCHLOG_CF).one().getLong("count");
     }
 
     public long getTotalBatchesReplayed()
@@ -136,8 +121,9 @@ public class BatchlogManager implements BatchlogManagerMBean
         ByteBuffer data = serializeRowMutations(mutations);
 
         ColumnFamily cf = ColumnFamily.create(CFMetaData.BatchlogCf);
-        cf.addColumn(new Column(WRITTEN_AT, writtenAt, timestamp));
-        cf.addColumn(new Column(DATA, data, timestamp));
+        cf.addColumn(new Column(columnName(""), ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp));
+        cf.addColumn(new Column(columnName("written_at"), writtenAt, timestamp));
+        cf.addColumn(new Column(columnName("data"), data, timestamp));
         RowMutation rm = new RowMutation(Table.SYSTEM_KS, UUIDType.instance.decompose(uuid));
         rm.add(cf);
 
@@ -168,20 +154,13 @@ public class BatchlogManager implements BatchlogManagerMBean
         if (!isReplaying.compareAndSet(false, true))
             return;
 
+        logger.debug("Started replayAllFailedBatches");
+
         try
         {
-            logger.debug("Started replayAllFailedBatches");
-
-            for (Row row : getRangeSlice(new NamesQueryFilter(WRITTEN_AT)))
-            {
-                if (row.cf == null || row.cf.isMarkedForDelete())
-                    continue;
-
-                IColumn writtenAt = row.cf.getColumn(WRITTEN_AT);
-                if (writtenAt == null || System.currentTimeMillis() > LongType.instance.compose(writtenAt.value()) + TIMEOUT)
-                    replayBatch(row.key);
-            }
-
+            for (UntypedResultSet.Row row : process("SELECT id, written_at FROM %s.%s", Table.SYSTEM_KS, SystemTable.BATCHLOG_CF))
+                if (System.currentTimeMillis() > row.getLong("written_at") + TIMEOUT)
+                    replayBatch(row.getUUID("id"));
             cleanup();
         }
         finally
@@ -192,80 +171,117 @@ public class BatchlogManager implements BatchlogManagerMBean
         logger.debug("Finished replayAllFailedBatches");
     }
 
-    private void replayBatch(DecoratedKey key)
+    private void replayBatch(UUID id)
     {
-        UUID uuid = UUIDType.instance.compose(key.key);
-
-        logger.debug("Replaying batch {}", uuid);
-
-        ColumnFamilyStore store = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.BATCHLOG_CF);
-        QueryFilter filter = QueryFilter.getIdentityFilter(key, new QueryPath(SystemTable.BATCHLOG_CF));
-        ColumnFamily batch = store.getColumnFamily(filter);
+        logger.debug("Replaying batch {}", id);
 
-        if (batch == null || batch.isMarkedForDelete())
+        UntypedResultSet result = process("SELECT written_at, data FROM %s.%s WHERE id = %s", Table.SYSTEM_KS, SystemTable.BATCHLOG_CF, id);
+        if (result.isEmpty())
             return;
 
-        IColumn dataColumn = batch.getColumn(DATA);
         try
         {
-            if (dataColumn != null)
-                writeHintsForSerializedMutations(dataColumn.value());
+            replaySerializedMutations(result.one().getBytes("data"), result.one().getLong("written_at"));
         }
         catch (IOException e)
         {
-            logger.warn("Skipped batch replay of {} due to {}", uuid, e);
+            logger.warn("Skipped batch replay of {} due to {}", id, e);
         }
 
-        deleteBatch(key);
+        process("DELETE FROM %s.%s WHERE id = %s", Table.SYSTEM_KS, SystemTable.BATCHLOG_CF, id);
+
         totalBatchesReplayed.incrementAndGet();
     }
 
-    private static void writeHintsForSerializedMutations(ByteBuffer data) throws IOException
+    private void replaySerializedMutations(ByteBuffer data, long writtenAt) throws IOException
     {
         DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data));
         int size = in.readInt();
         for (int i = 0; i < size; i++)
-            writeHintsForMutation(RowMutation.serializer.deserialize(in, VERSION));
+            replaySerializedMutation(RowMutation.serializer.deserialize(in, VERSION), writtenAt);
     }
 
-    private static void writeHintsForMutation(RowMutation mutation) throws IOException
+    /*
+     * We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints
+     * when a replica is down or a write request times out.
+     */
+    private void replaySerializedMutation(RowMutation mutation, long writtenAt) throws IOException
     {
-        String table = mutation.getTable();
+        int ttl = calculateHintTTL(mutation, writtenAt);
+        if (ttl <= 0)
+            return; // the mutation isn't safe to replay.
+
+        Set<InetAddress> liveEndpoints = new HashSet<InetAddress>();
+        String ks = mutation.getTable();
         Token tk = StorageService.getPartitioner().getToken(mutation.key());
-        List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(table, tk);
-        Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, table);
-        for (InetAddress target : Iterables.concat(naturalEndpoints, pendingEndpoints))
+        for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk),
+                                                     StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks)))
         {
-            if (target.equals(FBUtilities.getBroadcastAddress()))
+            if (endpoint.equals(FBUtilities.getBroadcastAddress()))
                 mutation.apply();
+            else if (FailureDetector.instance.isAlive(endpoint))
+                liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint.
             else
-                StorageProxy.writeHintForMutation(mutation, target);
+                StorageProxy.writeHintForMutation(mutation, ttl, endpoint);
         }
+
+        if (!liveEndpoints.isEmpty())
+            attemptDirectDelivery(mutation, writtenAt, liveEndpoints);
     }
 
-    private static void deleteBatch(DecoratedKey key)
+    private void attemptDirectDelivery(RowMutation mutation, long writtenAt, Set<InetAddress> endpoints) throws IOException
     {
-        RowMutation rm = new RowMutation(Table.SYSTEM_KS, key.key);
-        rm.delete(new QueryPath(SystemTable.BATCHLOG_CF), FBUtilities.timestampMicros());
-        rm.apply();
+        List<WriteResponseHandler> handlers = Lists.newArrayList();
+        final CopyOnWriteArraySet<InetAddress> undelivered = new CopyOnWriteArraySet<InetAddress>(endpoints);
+        for (final InetAddress ep : endpoints)
+        {
+            Runnable callback = new Runnable()
+            {
+                public void run()
+                {
+                    undelivered.remove(ep);
+                }
+            };
+            WriteResponseHandler handler = new WriteResponseHandler(ep, WriteType.UNLOGGED_BATCH, callback);
+            MessagingService.instance().sendRR(mutation.createMessage(), ep, handler);
+            handlers.add(handler);
+        }
+
+        // Wait for all the requests to complete.
+        for (WriteResponseHandler handler : handlers)
+        {
+            try
+            {
+                handler.get();
+            }
+            catch (WriteTimeoutException e)
+            {
+                logger.debug("Timed out replaying a batched mutation to a node, will write a hint");
+            }
+        }
+
+        if (!undelivered.isEmpty())
+        {
+            int ttl = calculateHintTTL(mutation, writtenAt); // recalculate ttl
+            if (ttl > 0)
+                for (InetAddress endpoint : undelivered)
+                    StorageProxy.writeHintForMutation(mutation, ttl, endpoint);
+        }
     }
 
-    private static ByteBuffer columnName(String name)
+    // calculate ttl for the mutation's hint (and reduce ttl by the time the mutation spent in the batchlog).
+    // this ensures that deletes aren't "undone" by an old batch replay.
+    private int calculateHintTTL(RowMutation mutation, long writtenAt)
     {
-        ByteBuffer raw = UTF8Type.instance.decompose(name);
-        return CFMetaData.BatchlogCf.getCfDef().getColumnNameBuilder().add(raw).build();
+        return (int) ((mutation.calculateHintTTL() * 1000 - (System.currentTimeMillis() - writtenAt)) / 1000);
     }
 
-    private static List<Row> getRangeSlice(IDiskAtomFilter columnFilter)
+    private static ByteBuffer columnName(String name)
     {
-        ColumnFamilyStore store = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.BATCHLOG_CF);
-        IPartitioner partitioner = StorageService.getPartitioner();
-        RowPosition minPosition = partitioner.getMinimumToken().minKeyBound();
-        AbstractBounds<RowPosition> range = new Range<RowPosition>(minPosition, minPosition, partitioner);
-        return store.getRangeSlice(null, range, Integer.MAX_VALUE, columnFilter, null);
+        return CFMetaData.BatchlogCf.getCfDef().getColumnNameBuilder().add(UTF8Type.instance.decompose(name)).build();
     }
 
-    /** force flush + compaction to reclaim space from replayed batches */
+    // force flush + compaction to reclaim space from the replayed batches
     private void cleanup() throws ExecutionException, InterruptedException
     {
         ColumnFamilyStore cfs = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.BATCHLOG_CF);
@@ -276,4 +292,9 @@ public class BatchlogManager implements BatchlogManagerMBean
         if (!descriptors.isEmpty()) // don't pollute the logs if there is nothing to compact.
             CompactionManager.instance.submitUserDefined(cfs, descriptors, Integer.MAX_VALUE).get();
     }
+
+    private static UntypedResultSet process(String format, Object... args)
+    {
+        return QueryProcessor.processInternal(String.format(format, args));
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a51ccf2/src/java/org/apache/cassandra/db/RowMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowMutation.java b/src/java/org/apache/cassandra/db/RowMutation.java
index 3cb0038..826f3e0 100644
--- a/src/java/org/apache/cassandra/db/RowMutation.java
+++ b/src/java/org/apache/cassandra/db/RowMutation.java
@@ -95,29 +95,41 @@ public class RowMutation implements IMutation
     }
 
     /**
-     * Returns mutation representing a Hints to be sent to <code>address</code>
-     * as soon as it becomes available.  See HintedHandoffManager for more details.
+     * Returns mutation representing a Hint to be sent to <code>targetId</code>
+     * as soon as it becomes available. See HintedHandoffManager for more details.
      */
-    public static RowMutation hintFor(RowMutation mutation, UUID targetId) throws IOException
+    public RowMutation toHint(int ttl, UUID targetId) throws IOException
     {
+        assert ttl > 0;
+
         RowMutation rm = new RowMutation(Table.SYSTEM_KS, UUIDType.instance.decompose(targetId));
         UUID hintId = UUIDGen.getTimeUUID();
-
-        // determine the TTL for the RowMutation
-        // this is set at the smallest GCGraceSeconds for any of the CFs in the RM
-        // this ensures that deletes aren't "undone" by delivery of an old hint
-        int ttl = Integer.MAX_VALUE;
-        for (ColumnFamily cf : mutation.getColumnFamilies())
-            ttl = Math.min(ttl, cf.metadata().getGcGraceSeconds());
-
         // serialize the hint with id and version as a composite column name
-        QueryPath path = new QueryPath(SystemTable.HINTS_CF, null, HintedHandOffManager.comparator.decompose(hintId, MessagingService.current_version));
-        rm.add(path, ByteBuffer.wrap(FBUtilities.serialize(mutation, serializer, MessagingService.current_version)), System.currentTimeMillis(), ttl);
+        QueryPath path = new QueryPath(SystemTable.HINTS_CF,
+                                       null,
+                                       HintedHandOffManager.comparator.decompose(hintId, MessagingService.current_version));
+        rm.add(path,
+               ByteBuffer.wrap(FBUtilities.serialize(this, serializer, MessagingService.current_version)),
+               System.currentTimeMillis(),
+               ttl);
 
         return rm;
     }
 
     /*
+     * determine the TTL for the hint RowMutation
+     * this is set at the smallest GCGraceSeconds for any of the CFs in the RM
+     * this ensures that deletes aren't "undone" by delivery of an old hint
+     */
+    public int calculateHintTTL()
+    {
+        int ttl = Integer.MAX_VALUE;
+        for (ColumnFamily cf : getColumnFamilies())
+            ttl = Math.min(ttl, cf.metadata().getGcGraceSeconds());
+        return ttl;
+    }
+
+    /*
      * Specify a column family name and the corresponding column
      * family object.
      * param @ cf - column family name

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a51ccf2/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index b9cce15..e8440c4 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -550,12 +550,19 @@ public class StorageProxy implements StorageProxyMBean
         {
             public void runMayThrow() throws IOException
             {
-                logger.debug("Adding hint for {}", target);
-
-                writeHintForMutation(mutation, target);
-                // Notify the handler only for CL == ANY
-                if (responseHandler != null && consistencyLevel == ConsistencyLevel.ANY)
-                    responseHandler.response(null);
+                int ttl = mutation.calculateHintTTL();
+                if (ttl > 0)
+                {
+                    logger.debug("Adding hint for {}", target);
+                    writeHintForMutation(mutation, ttl, target);
+                    // Notify the handler only for CL == ANY
+                    if (responseHandler != null && consistencyLevel == ConsistencyLevel.ANY)
+                        responseHandler.response(null);
+                }
+                else
+                {
+                    logger.debug("Skipped writing hint for {} (ttl {})", target, ttl);
+                }
             }
         };
 
@@ -569,8 +576,9 @@ public class StorageProxy implements StorageProxyMBean
         return (Future<Void>) StageManager.getStage(Stage.MUTATION).submit(runnable);
     }
 
-    public static void writeHintForMutation(RowMutation mutation, InetAddress target) throws IOException
+    public static void writeHintForMutation(RowMutation mutation, int ttl, InetAddress target) throws IOException
     {
+        assert ttl > 0;
         UUID hostId = StorageService.instance.getTokenMetadata().getHostId(target);
         if ((hostId == null) && (MessagingService.instance().getVersion(target) < MessagingService.VERSION_12))
         {
@@ -578,9 +586,7 @@ public class StorageProxy implements StorageProxyMBean
             return;
         }
         assert hostId != null : "Missing host ID for " + target.getHostAddress();
-        RowMutation hintedMutation = RowMutation.hintFor(mutation, hostId);
-        hintedMutation.apply();
-
+        mutation.toHint(ttl, hostId).apply();
         totalHints.incrementAndGet();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a51ccf2/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/HintedHandOffTest.java b/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
index 260f1c5..a012109 100644
--- a/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
+++ b/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
@@ -70,7 +70,7 @@ public class HintedHandOffTest extends SchemaLoader
                ByteBufferUtil.EMPTY_BYTE_BUFFER,
                System.currentTimeMillis());
 
-        RowMutation.hintFor(rm, UUID.randomUUID()).apply();
+        rm.toHint(rm.calculateHintTTL(), UUID.randomUUID()).apply();
 
         // flush data to disk
         hintStore.forceBlockingFlush();