You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/10/11 23:12:49 UTC
git commit: Make hint delivery asynchronous patch by Alexey Zotov;
reviewed by jbellis for CASSANDRA-4761
Updated Branches:
refs/heads/trunk a4288fdc5 -> 488122136
Make hint delivery asynchronous
patch by Alexey Zotov; reviewed by jbellis for CASSANDRA-4761
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/48812213
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/48812213
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/48812213
Branch: refs/heads/trunk
Commit: 4881221363f984ab6610756cab38e1a016b79e15
Parents: a4288fd
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu Oct 11 16:12:21 2012 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu Oct 11 16:12:25 2012 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/HintedHandOffManager.java | 68 +++++++++------
.../cassandra/service/WriteResponseHandler.java | 7 ++
test/unit/org/apache/cassandra/Util.java | 5 +
test/unit/org/apache/cassandra/db/TableTest.java | 38 ++++++++
5 files changed, 91 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/48812213/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 15bebf3..94f65ad 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.2-beta2
+ * Make hint delivery asynchronous (CASSANDRA-4761)
* Pluggable Thrift transport factories for CLI and cqlsh (CASSANDRA-4609, 4610)
* cassandra-cli: allow Double value type to be inserted to a column (CASSANDRA-4661)
* Add ability to use custom TServerFactory implementations (CASSANDRA-4608)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/48812213/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index bc503ea..2d88ecc 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -25,6 +25,7 @@ import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -46,12 +47,12 @@ import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.*;
@@ -125,13 +126,6 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
StorageService.optionalTasks.scheduleWithFixedDelay(runnable, 10, 10, TimeUnit.MINUTES);
}
- private static void sendMutation(InetAddress endpoint, MessageOut<?> message) throws WriteTimeoutException
- {
- AbstractWriteResponseHandler responseHandler = new WriteResponseHandler(endpoint, WriteType.UNLOGGED_BATCH);
- MessagingService.instance().sendRR(message, endpoint, responseHandler);
- responseHandler.get();
- }
-
private static void deleteHint(ByteBuffer tokenBytes, ByteBuffer columnName, long timestamp) throws IOException
{
RowMutation rm = new RowMutation(Table.SYSTEM_KS, tokenBytes);
@@ -287,10 +281,10 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
// find the hints for the node using its token.
UUID hostId = StorageService.instance.getTokenMetadata().getHostId(endpoint);
logger.info("Started hinted handoff for host: {} with IP: {}", hostId, endpoint);
- ByteBuffer hostIdBytes = ByteBuffer.wrap(UUIDGen.decompose(hostId));
+ final ByteBuffer hostIdBytes = ByteBuffer.wrap(UUIDGen.decompose(hostId));
DecoratedKey epkey = StorageService.getPartitioner().decorateKey(hostIdBytes);
- int rowsReplayed = 0;
+ final AtomicInteger rowsReplayed = new AtomicInteger(0);
ByteBuffer startColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER;
int pageSize = PAGE_SIZE;
@@ -307,15 +301,28 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
int throttleInKB = DatabaseDescriptor.getHintedHandoffThrottleInKB();
RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024);
- delivery:
while (true)
{
QueryFilter filter = QueryFilter.getSliceFilter(epkey, new QueryPath(SystemTable.HINTS_CF), startColumn, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, pageSize);
ColumnFamily hintsPage = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter), (int)(System.currentTimeMillis() / 1000));
if (pagingFinished(hintsPage, startColumn))
- break;
+ {
+ if (ByteBufferUtil.EMPTY_BYTE_BUFFER.equals(startColumn))
+ {
+ // we've started from the beginning and could not find anything (only maybe tombstones)
+ break;
+ }
+ else
+ {
+ // restart query from the first column until we read an empty row;
+ // that will tell us everything was delivered successfully with no timeouts
+ startColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+ continue;
+ }
+
+ }
- for (IColumn hint : hintsPage.getSortedColumns())
+ for (final IColumn hint : hintsPage.getSortedColumns())
{
// Skip tombstones:
// if we iterate quickly enough, it's possible that we could request a new page in the same millisecond
@@ -338,29 +345,33 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
catch (UnknownColumnFamilyException e)
{
logger.debug("Skipping delivery of hint for deleted columnfamily", e);
- rm = null;
+ deleteHint(hostIdBytes, hint.name(), hint.maxTimestamp());
+ continue;
}
- try
+ MessageOut<RowMutation> message = rm.createMessage();
+ rateLimiter.acquire(message.serializedSize(MessagingService.current_version));
+ WrappedRunnable callback = new WrappedRunnable()
{
- if (rm != null)
+ public void runMayThrow() throws IOException
{
- MessageOut<RowMutation> message = rm.createMessage();
- rateLimiter.acquire(message.serializedSize(MessagingService.current_version));
- sendMutation(endpoint, message);
- rowsReplayed++;
+ rowsReplayed.incrementAndGet();
+ deleteHint(hostIdBytes, hint.name(), hint.maxTimestamp());
}
- deleteHint(hostIdBytes, hint.name(), hint.maxTimestamp());
- }
- catch (WriteTimeoutException e)
- {
- logger.info(String.format("Timed out replaying hints to %s; aborting further deliveries", endpoint));
- break delivery;
- }
+ };
+ IAsyncCallback responseHandler = new WriteResponseHandler(endpoint, WriteType.UNLOGGED_BATCH, callback);
+ MessagingService.instance().sendRR(message, endpoint, responseHandler);
+ }
+
+ // check if node is still alive and we should continue delivery process
+ if (!FailureDetector.instance.isAlive(endpoint))
+ {
+ logger.debug("Endpoint {} died during hint delivery, aborting", endpoint);
+ return;
}
}
- if (rowsReplayed > 0)
+ if (rowsReplayed.get() > 0)
{
try
{
@@ -489,4 +500,5 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
}
return rows;
}
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/48812213/src/java/org/apache/cassandra/service/WriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
index 707a583..ab7223a 100644
--- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
@@ -55,6 +55,13 @@ public class WriteResponseHandler extends AbstractWriteResponseHandler
responses = new AtomicInteger(blockFor);
}
+ public WriteResponseHandler(InetAddress endpoint, WriteType writeType, Runnable callback)
+ {
+ super(Arrays.asList(endpoint), Collections.<InetAddress>emptyList(), ConsistencyLevel.ALL, callback, writeType);
+ blockFor = 1;
+ responses = new AtomicInteger(1);
+ }
+
public WriteResponseHandler(InetAddress endpoint, WriteType writeType)
{
super(Arrays.asList(endpoint), Collections.<InetAddress>emptyList(), ConsistencyLevel.ALL, null, writeType);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/48812213/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index 2ce7ca8..1e9031e 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -80,6 +80,11 @@ public class Util
return new Column(ByteBufferUtil.bytes(name), ByteBufferUtil.bytes(value), timestamp);
}
+ public static Column expiringColumn(String name, String value, long timestamp, int ttl)
+ {
+ return new ExpiringColumn(ByteBufferUtil.bytes(name), ByteBufferUtil.bytes(value), timestamp, ttl);
+ }
+
public static Column counterColumn(String name, long value, long timestamp)
{
return new CounterUpdateColumn(ByteBufferUtil.bytes(name), value, timestamp);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/48812213/test/unit/org/apache/cassandra/db/TableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/TableTest.java b/test/unit/org/apache/cassandra/db/TableTest.java
index b287744..4244e1c 100644
--- a/test/unit/org/apache/cassandra/db/TableTest.java
+++ b/test/unit/org/apache/cassandra/db/TableTest.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.utils.WrappedRunnable;
import static org.apache.cassandra.Util.column;
+import static org.apache.cassandra.Util.expiringColumn;
import static org.apache.cassandra.Util.getBytes;
import org.apache.cassandra.Util;
import org.apache.cassandra.db.filter.QueryPath;
@@ -325,6 +326,43 @@ public class TableTest extends SchemaLoader
}
@Test
+ public void testGetSliceWithExpiration() throws Throwable
+ {
+ // tests slicing against data from one row with expiring column in a memtable and then flushed to an sstable
+ final Table table = Table.open("Keyspace1");
+ final ColumnFamilyStore cfStore = table.getColumnFamilyStore("Standard1");
+ final DecoratedKey ROW = Util.dk("row5");
+
+ RowMutation rm = new RowMutation("Keyspace1", ROW.key);
+ ColumnFamily cf = ColumnFamily.create("Keyspace1", "Standard1");
+ cf.addColumn(column("col1", "val1", 1L));
+ cf.addColumn(expiringColumn("col2", "val2", 1L, 1));
+ cf.addColumn(column("col3", "val3", 1L));
+
+ rm.add(cf);
+ rm.apply();
+ cfStore.forceBlockingFlush();
+
+ Runnable verify = new WrappedRunnable()
+ {
+ public void runMayThrow() throws Exception
+ {
+ ColumnFamily cf;
+
+ cf = cfStore.getColumnFamily(ROW, new QueryPath("Standard1"), ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 2);
+ assertColumns(cf, "col1", "col2");
+ assertColumns(ColumnFamilyStore.removeDeleted(cf, Integer.MAX_VALUE), "col1");
+
+ cf = cfStore.getColumnFamily(ROW, new QueryPath("Standard1"), ByteBufferUtil.bytes("col2"), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1);
+ assertColumns(cf, "col2");
+ assertColumns(ColumnFamilyStore.removeDeleted(cf, Integer.MAX_VALUE));
+ }
+ };
+
+ reTest(table.getColumnFamilyStore("Standard1"), verify);
+ }
+
+ @Test
public void testGetSliceFromAdvanced() throws Throwable
{
// tests slicing against data from one row spread across two sstables