You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/01/24 20:44:17 UTC
[2/3] git commit: stream undelivered hints on decommission patch by
Jason Brown; reviewed by jbellis for CASSANDRA-5128
stream undelivered hints on decommission
patch by Jason Brown; reviewed by jbellis for CASSANDRA-5128
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cc66c73b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cc66c73b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cc66c73b
Branch: refs/heads/trunk
Commit: cc66c73ba60ec3f062e3fd699b76e352f8b10f6d
Parents: 360d1a2
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu Jan 24 13:42:37 2013 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu Jan 24 13:42:54 2013 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/locator/TokenMetadata.java | 17 ++-
.../apache/cassandra/service/StorageService.java | 86 ++++++++++++---
.../org/apache/cassandra/streaming/StreamOut.java | 10 ++-
4 files changed, 94 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc66c73b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 812abdd..3a53a34 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.2.1
+ * stream undelivered hints on decommission (CASSANDRA-5128)
* GossipingPropertyFileSnitch loads saved dc/rack info if needed (CASSANDRA-5133)
* drain should flush system CFs too (CASSANDRA-4446)
* add inter_dc_tcp_nodelay setting (CASSANDRA-5148)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc66c73b/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index 3b5f86d..925a811 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -105,14 +105,16 @@ public class TokenMetadata
public TokenMetadata()
{
- this(SortedBiMultiValMap.<Token, InetAddress>create(null, inetaddressCmp), new Topology());
+ this(SortedBiMultiValMap.<Token, InetAddress>create(null, inetaddressCmp),
+ HashBiMap.<InetAddress, UUID>create(),
+ new Topology());
}
- public TokenMetadata(BiMultiValMap<Token, InetAddress> tokenToEndpointMap, Topology topology)
+ private TokenMetadata(BiMultiValMap<Token, InetAddress> tokenToEndpointMap, BiMap<InetAddress, UUID> endpointsMap, Topology topology)
{
this.tokenToEndpointMap = tokenToEndpointMap;
this.topology = topology;
- endpointToHostIdMap = HashBiMap.create();
+ endpointToHostIdMap = endpointsMap;
sortedTokens = sortTokens();
}
@@ -556,7 +558,9 @@ public class TokenMetadata
lock.readLock().lock();
try
{
- return new TokenMetadata(SortedBiMultiValMap.<Token, InetAddress>create(tokenToEndpointMap, null, inetaddressCmp), new Topology(topology));
+ return new TokenMetadata(SortedBiMultiValMap.<Token, InetAddress>create(tokenToEndpointMap, null, inetaddressCmp),
+ HashBiMap.create(endpointToHostIdMap),
+ new Topology(topology));
}
finally
{
@@ -719,6 +723,11 @@ public class TokenMetadata
}
}
+ public Set<InetAddress> getAllEndpoints()
+ {
+ return endpointToHostIdMap.keySet();
+ }
+
/** caller should not modify leavingEndpoints */
public Set<InetAddress> getLeavingEndpoints()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc66c73b/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 2f178b6..21f7c69 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2688,12 +2688,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
setMode(Mode.LEAVING, "streaming data to other nodes", true);
CountDownLatch latch = streamRanges(rangesToStream);
+ CountDownLatch hintsLatch = streamHints();
// wait for the transfer runnables to signal the latch.
logger.debug("waiting for stream aks.");
try
{
latch.await();
+ hintsLatch.await();
}
catch (InterruptedException e)
{
@@ -2704,6 +2706,47 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
onFinish.run();
}
+ private CountDownLatch streamHints()
+ {
+ if (HintedHandOffManager.instance.listEndpointsPendingHints().size() == 0)
+ return new CountDownLatch(0);
+
+ // gather all live nodes in the cluster that aren't also leaving
+ List<InetAddress> candidates = new ArrayList<InetAddress>(StorageService.instance.getTokenMetadata().cloneAfterAllLeft().getAllEndpoints());
+ candidates.remove(FBUtilities.getBroadcastAddress());
+ for (Iterator<InetAddress> iter = candidates.iterator(); iter.hasNext(); )
+ {
+ InetAddress address = iter.next();
+ if (!FailureDetector.instance.isAlive(address))
+ iter.remove();
+ }
+
+ if (candidates.isEmpty())
+ {
+ logger.warn("Unable to stream hints since no live endpoints seen");
+ return new CountDownLatch(0);
+ }
+ else
+ {
+ // stream to the closest peer as chosen by the snitch
+ DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), candidates);
+ InetAddress hintsDestinationHost = candidates.get(0);
+
+ // stream all hints -- range list will be a singleton of "the entire ring"
+ Token token = StorageService.getPartitioner().getMinimumToken();
+ List<Range<Token>> ranges = Collections.singletonList(new Range<Token>(token, token));
+
+ CountDownLatch latch = new CountDownLatch(1);
+ StreamOut.transferRanges(hintsDestinationHost,
+ Table.open(Table.SYSTEM_KS),
+ Collections.singletonList(Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.HINTS_CF)),
+ ranges,
+ new CountingDownStreamCallback(latch, hintsDestinationHost),
+ OperationType.UNBOOTSTRAP);
+ return latch;
+ }
+ }
+
public void move(String newToken) throws IOException
{
try
@@ -3474,27 +3517,40 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
final List<Range<Token>> ranges = rangesEntry.getValue();
final InetAddress newEndpoint = rangesEntry.getKey();
- final IStreamCallback callback = new IStreamCallback()
- {
- public void onSuccess()
- {
- latch.countDown();
- }
-
- public void onFailure()
- {
- logger.warn("Streaming to " + newEndpoint + " failed");
- onSuccess(); // calling onSuccess for latch countdown
- }
- };
-
// TODO each call to transferRanges re-flushes, this is potentially a lot of waste
- StreamOut.transferRanges(newEndpoint, Table.open(table), ranges, callback, OperationType.UNBOOTSTRAP);
+ StreamOut.transferRanges(newEndpoint,
+ Table.open(table),
+ ranges,
+ new CountingDownStreamCallback(latch, newEndpoint),
+ OperationType.UNBOOTSTRAP);
}
}
return latch;
}
+ class CountingDownStreamCallback implements IStreamCallback
+ {
+ private final CountDownLatch latch;
+ private final InetAddress targetAddr;
+
+ CountingDownStreamCallback(CountDownLatch latch, InetAddress targetAddr)
+ {
+ this.latch = latch;
+ this.targetAddr = targetAddr;
+ }
+
+ public void onSuccess()
+ {
+ latch.countDown();
+ }
+
+ public void onFailure()
+ {
+ logger.warn("Streaming to " + targetAddr + " failed");
+ onSuccess(); // calling onSuccess for latch countdown
+ }
+ };
+
/**
* Used to request ranges from endpoints in the ring (will block until all data is fetched and ready)
* @param ranges ranges to fetch as map of the preferred address and range collection
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc66c73b/src/java/org/apache/cassandra/streaming/StreamOut.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamOut.java b/src/java/org/apache/cassandra/streaming/StreamOut.java
index 7043be4..7855d6b 100644
--- a/src/java/org/apache/cassandra/streaming/StreamOut.java
+++ b/src/java/org/apache/cassandra/streaming/StreamOut.java
@@ -80,8 +80,16 @@ public class StreamOut
*/
public static void transferRanges(InetAddress target, Table table, Collection<Range<Token>> ranges, IStreamCallback callback, OperationType type)
{
+ transferRanges(target, table, table.getColumnFamilyStores(), ranges, callback, type);
+ }
+
+ /**
+ * Stream the given ranges to the target endpoint for provided CFs in the given keyspace.
+ */
+ public static void transferRanges(InetAddress target, Table table, Iterable<ColumnFamilyStore> cfses, Collection<Range<Token>> ranges, IStreamCallback callback, OperationType type)
+ {
StreamOutSession session = StreamOutSession.create(table.name, target, callback);
- transferRanges(session, table.getColumnFamilyStores(), ranges, type);
+ transferRanges(session, cfses, ranges, type);
}
/**