You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by vi...@apache.org on 2012/03/09 01:42:56 UTC
git commit: Bootstrapping to handle more failure patch by Vijay;
reviewed by Brandon Williams for CASSANDRA-3555
Updated Branches:
refs/heads/trunk b7a8b57ca -> f16becfc2
Bootstrapping to handle more failure
patch by Vijay; reviewed by Brandon Williams for CASSANDRA-3555
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f16becfc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f16becfc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f16becfc
Branch: refs/heads/trunk
Commit: f16becfc29ef7e8bbf5f92913be51e33a99ba537
Parents: b7a8b57
Author: Vijay Parthasarathy <vi...@gmail.com>
Authored: Thu Mar 8 16:41:07 2012 -0800
Committer: Vijay Parthasarathy <vi...@gmail.com>
Committed: Thu Mar 8 16:41:07 2012 -0800
----------------------------------------------------------------------
.../org/apache/cassandra/dht/BootStrapper.java | 2 +-
.../org/apache/cassandra/dht/RangeStreamer.java | 64 ++++++++++++++-
.../org/apache/cassandra/dht/BootStrapperTest.java | 21 +++++-
3 files changed, 83 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f16becfc/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java
index de7f0cf..ae5b95e 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -117,7 +117,7 @@ public class BootStrapper
List<InetAddress> endpoints = new ArrayList<InetAddress>(load.size());
for (InetAddress endpoint : load.keySet())
{
- if (!metadata.isMember(endpoint))
+ if (!metadata.isMember(endpoint) || !FailureDetector.instance.isAlive(endpoint))
continue;
endpoints.add(endpoint);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f16becfc/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index 5c55f5a..078851c 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -30,8 +30,14 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Table;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
+import org.apache.cassandra.gms.IFailureDetectionEventListener;
import org.apache.cassandra.gms.IFailureDetector;
+import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.TokenMetadata;
@@ -42,7 +48,7 @@ import org.apache.cassandra.utils.FBUtilities;
/**
* Assists in streaming ranges to a node.
*/
-public class RangeStreamer
+public class RangeStreamer implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener
{
private static final Logger logger = LoggerFactory.getLogger(RangeStreamer.class);
@@ -51,6 +57,9 @@ public class RangeStreamer
private final OperationType opType;
private final Multimap<String, Map.Entry<InetAddress, Collection<Range<Token>>>> toFetch = HashMultimap.create();
private final Set<ISourceFilter> sourceFilters = new HashSet<ISourceFilter>();
+ // protected for testing.
+ protected CountDownLatch latch;
+ protected volatile String exceptionMessage = null;
/**
* A filter applied to sources to stream from when constructing a fetch map.
@@ -212,7 +221,7 @@ public class RangeStreamer
public void fetch()
{
- final CountDownLatch latch = new CountDownLatch(toFetch().entries().size());
+ latch = new CountDownLatch(toFetch().entries().size());
for (Map.Entry<String, Map.Entry<InetAddress, Collection<Range<Token>>>> entry : toFetch.entries())
{
final String table = entry.getKey();
@@ -234,13 +243,64 @@ public class RangeStreamer
StreamIn.requestRanges(source, table, ranges, callback, opType);
}
+ FailureDetector.instance.registerFailureDetectionEventListener(this);
+ Gossiper.instance.register(this);
try
{
latch.await();
+ if (exceptionMessage != null)
+ throw new RuntimeException(exceptionMessage);
}
catch (InterruptedException e)
{
throw new AssertionError(e);
}
+ finally
+ {
+ FailureDetector.instance.unregisterFailureDetectionEventListener(this);
+ Gossiper.instance.unregister(this);
+ }
+ }
+
+ @Override
+ public void onJoin(InetAddress endpoint, EndpointState epState) {}
+
+ @Override
+ public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
+
+ @Override
+ public void onAlive(InetAddress endpoint, EndpointState state) {}
+
+ @Override
+ public void onDead(InetAddress endpoint, EndpointState state) {}
+
+ @Override
+ public void onRemove(InetAddress endpoint)
+ {
+ convict(endpoint, Double.MAX_VALUE);
+ }
+
+ @Override
+ public void onRestart(InetAddress endpoint, EndpointState epState)
+ {
+ convict(endpoint, Double.MAX_VALUE);
+ }
+
+ public void convict(InetAddress endpoint, double phi)
+ {
+ // We want a higher confidence in the failure detection than usual because failing a repair wrongly has a high cost.
+ // same logic as in RepairSession
+ if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
+ return;
+
+ for (Map.Entry<InetAddress, Collection<Range<Token>>> value: toFetch().values())
+ {
+ if (value.getKey().equals(endpoint))
+ {
+ exceptionMessage = String.format("Node: %s died while streaming the ranges. Boostrap/rebuild Aborded.", endpoint);
+ while (latch.getCount() > 0)
+ latch.countDown();
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f16becfc/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
index 4b3a00d..65a6a5c 100644
--- a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
+++ b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
@@ -26,6 +26,7 @@ import java.util.HashSet;
import java.util.HashMap;
import java.util.Set;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
import org.junit.Test;
@@ -33,6 +34,7 @@ import org.apache.cassandra.CleanupHelper;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.IFailureDetectionEventListener;
import org.apache.cassandra.gms.IFailureDetector;
import org.apache.cassandra.locator.TokenMetadata;
@@ -40,6 +42,8 @@ import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.OperationType;
import org.apache.cassandra.utils.FBUtilities;
+import static org.junit.Assert.*;
+
import static org.junit.Assert.assertEquals;
public class BootStrapperTest extends CleanupHelper
@@ -73,7 +77,10 @@ public class BootStrapperTest extends CleanupHelper
};
Map<InetAddress, Double> load = new HashMap<InetAddress, Double>();
for (int i = 0; i < addrs.length; i++)
+ {
+ Gossiper.instance.initializeNodeUnsafe(addrs[i], 1);
load.put(addrs[i], (double)i+2);
+ }
// give every node a bootstrap source.
for (int i = 3; i >=0; i--)
@@ -155,7 +162,7 @@ public class BootStrapperTest extends CleanupHelper
}
}
- private void testSourceTargetComputation(String table, int numOldNodes, int replicationFactor) throws UnknownHostException
+ private RangeStreamer testSourceTargetComputation(String table, int numOldNodes, int replicationFactor) throws UnknownHostException
{
StorageService ss = StorageService.instance;
@@ -196,6 +203,18 @@ public class BootStrapperTest extends CleanupHelper
// is used, they will vary.
assert toFetch.iterator().next().getValue().size() > 0;
assert !toFetch.iterator().next().getKey().equals(myEndpoint);
+ return s;
+ }
+
+ @Test
+ public void testException() throws UnknownHostException
+ {
+ String table = Schema.instance.getNonSystemTables().iterator().next();
+ int replicationFactor = Table.open(table).getReplicationStrategy().getReplicationFactor();
+ RangeStreamer streamer = testSourceTargetComputation(table, replicationFactor, replicationFactor);
+ streamer.latch = new CountDownLatch(4);
+ streamer.convict(streamer.toFetch().get(table).iterator().next().getKey(), Double.MAX_VALUE);
+ assertNotNull("Exception message not set, test failed", streamer.exceptionMessage);
}
private void generateFakeEndpoints(int numOldNodes) throws UnknownHostException