You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/10/11 23:12:49 UTC

git commit: Make hint delivery asynchronous patch by Alexey Zotov; reviewed by jbellis for CASSANDRA-4761

Updated Branches:
  refs/heads/trunk a4288fdc5 -> 488122136


Make hint delivery asynchronous
patch by Alexey Zotov; reviewed by jbellis for CASSANDRA-4761


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/48812213
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/48812213
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/48812213

Branch: refs/heads/trunk
Commit: 4881221363f984ab6610756cab38e1a016b79e15
Parents: a4288fd
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu Oct 11 16:12:21 2012 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu Oct 11 16:12:25 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../apache/cassandra/db/HintedHandOffManager.java  |   68 +++++++++------
 .../cassandra/service/WriteResponseHandler.java    |    7 ++
 test/unit/org/apache/cassandra/Util.java           |    5 +
 test/unit/org/apache/cassandra/db/TableTest.java   |   38 ++++++++
 5 files changed, 91 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/48812213/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 15bebf3..94f65ad 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2-beta2
+ * Make hint delivery asynchronous (CASSANDRA-4761)
  * Pluggable Thrift transport factories for CLI and cqlsh (CASSANDRA-4609, 4610)
  * cassandra-cli: allow Double value type to be inserted to a column (CASSANDRA-4661)
  * Add ability to use custom TServerFactory implementations (CASSANDRA-4608)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48812213/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index bc503ea..2d88ecc 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -25,6 +25,7 @@ import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
@@ -46,12 +47,12 @@ import org.apache.cassandra.db.marshal.UUIDType;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.*;
@@ -125,13 +126,6 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         StorageService.optionalTasks.scheduleWithFixedDelay(runnable, 10, 10, TimeUnit.MINUTES);
     }
 
-    private static void sendMutation(InetAddress endpoint, MessageOut<?> message) throws WriteTimeoutException
-    {
-        AbstractWriteResponseHandler responseHandler = new WriteResponseHandler(endpoint, WriteType.UNLOGGED_BATCH);
-        MessagingService.instance().sendRR(message, endpoint, responseHandler);
-        responseHandler.get();
-    }
-
     private static void deleteHint(ByteBuffer tokenBytes, ByteBuffer columnName, long timestamp) throws IOException
     {
         RowMutation rm = new RowMutation(Table.SYSTEM_KS, tokenBytes);
@@ -287,10 +281,10 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         // find the hints for the node using its token.
         UUID hostId = StorageService.instance.getTokenMetadata().getHostId(endpoint);
         logger.info("Started hinted handoff for host: {} with IP: {}", hostId, endpoint);
-        ByteBuffer hostIdBytes = ByteBuffer.wrap(UUIDGen.decompose(hostId));
+        final ByteBuffer hostIdBytes = ByteBuffer.wrap(UUIDGen.decompose(hostId));
         DecoratedKey epkey =  StorageService.getPartitioner().decorateKey(hostIdBytes);
 
-        int rowsReplayed = 0;
+        final AtomicInteger rowsReplayed = new AtomicInteger(0);
         ByteBuffer startColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER;
 
         int pageSize = PAGE_SIZE;
@@ -307,15 +301,28 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         int throttleInKB = DatabaseDescriptor.getHintedHandoffThrottleInKB();
         RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024);
 
-        delivery:
         while (true)
         {
             QueryFilter filter = QueryFilter.getSliceFilter(epkey, new QueryPath(SystemTable.HINTS_CF), startColumn, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, pageSize);
             ColumnFamily hintsPage = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter), (int)(System.currentTimeMillis() / 1000));
             if (pagingFinished(hintsPage, startColumn))
-                break;
+            {
+                if (ByteBufferUtil.EMPTY_BYTE_BUFFER.equals(startColumn))
+                {
+                    // we've started from the beginning and could not find anything (only maybe tombstones)
+                    break;
+                }
+                else
+                {
+                    // restart query from the first column until we read an empty row;
+                    // that will tell us everything was delivered successfully with no timeouts
+                    startColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+                    continue;
+                }
+
+            }
 
-            for (IColumn hint : hintsPage.getSortedColumns())
+            for (final IColumn hint : hintsPage.getSortedColumns())
             {
                 // Skip tombstones:
                 // if we iterate quickly enough, it's possible that we could request a new page in the same millisecond
@@ -338,29 +345,33 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
                 catch (UnknownColumnFamilyException e)
                 {
                     logger.debug("Skipping delivery of hint for deleted columnfamily", e);
-                    rm = null;
+                    deleteHint(hostIdBytes, hint.name(), hint.maxTimestamp());
+                    continue;
                 }
 
-                try
+                MessageOut<RowMutation> message = rm.createMessage();
+                rateLimiter.acquire(message.serializedSize(MessagingService.current_version));
+                WrappedRunnable callback = new WrappedRunnable()
                 {
-                    if (rm != null)
+                    public void runMayThrow() throws IOException
                     {
-                        MessageOut<RowMutation> message = rm.createMessage();
-                        rateLimiter.acquire(message.serializedSize(MessagingService.current_version));
-                        sendMutation(endpoint, message);
-                        rowsReplayed++;
+                        rowsReplayed.incrementAndGet();
+                        deleteHint(hostIdBytes, hint.name(), hint.maxTimestamp());
                     }
-                    deleteHint(hostIdBytes, hint.name(), hint.maxTimestamp());
-                }
-                catch (WriteTimeoutException e)
-                {
-                    logger.info(String.format("Timed out replaying hints to %s; aborting further deliveries", endpoint));
-                    break delivery;
-                }
+                };
+                IAsyncCallback responseHandler = new WriteResponseHandler(endpoint, WriteType.UNLOGGED_BATCH, callback);
+                MessagingService.instance().sendRR(message, endpoint, responseHandler);
+            }
+
+            // check if node is still alive and we should continue delivery process
+            if (!FailureDetector.instance.isAlive(endpoint))
+            {
+                logger.debug("Endpoint {} died during hint delivery, aborting", endpoint);
+                return;
             }
         }
 
-        if (rowsReplayed > 0)
+        if (rowsReplayed.get() > 0)
         {
             try
             {
@@ -489,4 +500,5 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         }
         return rows;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48812213/src/java/org/apache/cassandra/service/WriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
index 707a583..ab7223a 100644
--- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
@@ -55,6 +55,13 @@ public class WriteResponseHandler extends AbstractWriteResponseHandler
         responses = new AtomicInteger(blockFor);
     }
 
+    public WriteResponseHandler(InetAddress endpoint, WriteType writeType, Runnable callback)
+    {
+        super(Arrays.asList(endpoint), Collections.<InetAddress>emptyList(), ConsistencyLevel.ALL, callback, writeType);
+        blockFor = 1;
+        responses = new AtomicInteger(1);
+    }
+
     public WriteResponseHandler(InetAddress endpoint, WriteType writeType)
     {
         super(Arrays.asList(endpoint), Collections.<InetAddress>emptyList(), ConsistencyLevel.ALL, null, writeType);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48812213/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index 2ce7ca8..1e9031e 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -80,6 +80,11 @@ public class Util
         return new Column(ByteBufferUtil.bytes(name), ByteBufferUtil.bytes(value), timestamp);
     }
 
+    public static Column expiringColumn(String name, String value, long timestamp, int ttl)
+    {
+        return new ExpiringColumn(ByteBufferUtil.bytes(name), ByteBufferUtil.bytes(value), timestamp, ttl);
+    }
+
     public static Column counterColumn(String name, long value, long timestamp)
     {
         return new CounterUpdateColumn(ByteBufferUtil.bytes(name), value, timestamp);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48812213/test/unit/org/apache/cassandra/db/TableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/TableTest.java b/test/unit/org/apache/cassandra/db/TableTest.java
index b287744..4244e1c 100644
--- a/test/unit/org/apache/cassandra/db/TableTest.java
+++ b/test/unit/org/apache/cassandra/db/TableTest.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.utils.WrappedRunnable;
 import static org.apache.cassandra.Util.column;
+import static org.apache.cassandra.Util.expiringColumn;
 import static org.apache.cassandra.Util.getBytes;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.db.filter.QueryPath;
@@ -325,6 +326,43 @@ public class TableTest extends SchemaLoader
     }
 
     @Test
+    public void testGetSliceWithExpiration() throws Throwable
+    {
+        // tests slicing against data from one row with expiring column in a memtable and then flushed to an sstable
+        final Table table = Table.open("Keyspace1");
+        final ColumnFamilyStore cfStore = table.getColumnFamilyStore("Standard1");
+        final DecoratedKey ROW = Util.dk("row5");
+
+        RowMutation rm = new RowMutation("Keyspace1", ROW.key);
+        ColumnFamily cf = ColumnFamily.create("Keyspace1", "Standard1");
+        cf.addColumn(column("col1", "val1", 1L));
+        cf.addColumn(expiringColumn("col2", "val2", 1L, 1));
+        cf.addColumn(column("col3", "val3", 1L));
+
+        rm.add(cf);
+        rm.apply();
+        cfStore.forceBlockingFlush();
+
+        Runnable verify = new WrappedRunnable()
+        {
+            public void runMayThrow() throws Exception
+            {
+                ColumnFamily cf;
+
+                cf = cfStore.getColumnFamily(ROW, new QueryPath("Standard1"), ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 2);
+                assertColumns(cf, "col1", "col2");
+                assertColumns(ColumnFamilyStore.removeDeleted(cf, Integer.MAX_VALUE), "col1");
+
+                cf = cfStore.getColumnFamily(ROW, new QueryPath("Standard1"), ByteBufferUtil.bytes("col2"), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1);
+                assertColumns(cf, "col2");
+                assertColumns(ColumnFamilyStore.removeDeleted(cf, Integer.MAX_VALUE));
+            }
+        };
+
+        reTest(table.getColumnFamilyStore("Standard1"), verify);
+    }
+
+    @Test
     public void testGetSliceFromAdvanced() throws Throwable
     {
         // tests slicing against data from one row spread across two sstables