You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by vi...@apache.org on 2012/03/09 01:42:56 UTC

git commit: Bootstrapping to handle more failure patch by Vijay; reviewed by Brandon Williams for CASSANDRA-3555

Updated Branches:
  refs/heads/trunk b7a8b57ca -> f16becfc2


Bootstrapping to handle more failure
patch by Vijay; reviewed by Brandon Williams for CASSANDRA-3555


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f16becfc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f16becfc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f16becfc

Branch: refs/heads/trunk
Commit: f16becfc29ef7e8bbf5f92913be51e33a99ba537
Parents: b7a8b57
Author: Vijay Parthasarathy <vi...@gmail.com>
Authored: Thu Mar 8 16:41:07 2012 -0800
Committer: Vijay Parthasarathy <vi...@gmail.com>
Committed: Thu Mar 8 16:41:07 2012 -0800

----------------------------------------------------------------------
 .../org/apache/cassandra/dht/BootStrapper.java     |    2 +-
 .../org/apache/cassandra/dht/RangeStreamer.java    |   64 ++++++++++++++-
 .../org/apache/cassandra/dht/BootStrapperTest.java |   21 +++++-
 3 files changed, 83 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f16becfc/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java
index de7f0cf..ae5b95e 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -117,7 +117,7 @@ public class BootStrapper
         List<InetAddress> endpoints = new ArrayList<InetAddress>(load.size());
         for (InetAddress endpoint : load.keySet())
         {
-            if (!metadata.isMember(endpoint))
+            if (!metadata.isMember(endpoint) || !FailureDetector.instance.isAlive(endpoint))
                 continue;
             endpoints.add(endpoint);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f16becfc/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index 5c55f5a..078851c 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -30,8 +30,14 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Table;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
+import org.apache.cassandra.gms.IFailureDetectionEventListener;
 import org.apache.cassandra.gms.IFailureDetector;
+import org.apache.cassandra.gms.VersionedValue;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.TokenMetadata;
@@ -42,7 +48,7 @@ import org.apache.cassandra.utils.FBUtilities;
 /**
  * Assists in streaming ranges to a node.
  */
-public class RangeStreamer
+public class RangeStreamer implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener
 {
     private static final Logger logger = LoggerFactory.getLogger(RangeStreamer.class);
 
@@ -51,6 +57,9 @@ public class RangeStreamer
     private final OperationType opType;
     private final Multimap<String, Map.Entry<InetAddress, Collection<Range<Token>>>> toFetch = HashMultimap.create();
     private final Set<ISourceFilter> sourceFilters = new HashSet<ISourceFilter>();
+    // protected for testing.
+    protected CountDownLatch latch;
+    protected volatile String exceptionMessage = null;
 
     /**
      * A filter applied to sources to stream from when constructing a fetch map.
@@ -212,7 +221,7 @@ public class RangeStreamer
 
     public void fetch()
     {
-        final CountDownLatch latch = new CountDownLatch(toFetch().entries().size());
+        latch = new CountDownLatch(toFetch().entries().size());
         for (Map.Entry<String, Map.Entry<InetAddress, Collection<Range<Token>>>> entry : toFetch.entries())
         {
             final String table = entry.getKey();
@@ -234,13 +243,64 @@ public class RangeStreamer
             StreamIn.requestRanges(source, table, ranges, callback, opType);
         }
 
+        FailureDetector.instance.registerFailureDetectionEventListener(this);
+        Gossiper.instance.register(this);
         try
         {
             latch.await();
+            if (exceptionMessage != null)
+                throw new RuntimeException(exceptionMessage);
         }
         catch (InterruptedException e)
         {
             throw new AssertionError(e);
         }
+        finally
+        {
+            FailureDetector.instance.unregisterFailureDetectionEventListener(this);
+            Gossiper.instance.unregister(this);
+        }
+    }
+    
+    @Override
+    public void onJoin(InetAddress endpoint, EndpointState epState) {}
+
+    @Override
+    public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
+
+    @Override
+    public void onAlive(InetAddress endpoint, EndpointState state) {}
+
+    @Override
+    public void onDead(InetAddress endpoint, EndpointState state) {}
+    
+    @Override
+    public void onRemove(InetAddress endpoint)
+    {
+        convict(endpoint, Double.MAX_VALUE);
+    }
+
+    @Override
+    public void onRestart(InetAddress endpoint, EndpointState epState)
+    {
+        convict(endpoint, Double.MAX_VALUE);
+    }
+
+    public void convict(InetAddress endpoint, double phi)
+    {        
+        // We want a higher confidence in the failure detection than usual because failing a repair wrongly has a high cost.
+        // same logic as in RepairSession
+        if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
+            return;
+
+        for (Map.Entry<InetAddress, Collection<Range<Token>>> value: toFetch().values())
+        {
+            if (value.getKey().equals(endpoint))
+            {
+                exceptionMessage = String.format("Node: %s died while streaming the ranges. Boostrap/rebuild Aborded.", endpoint);
+                while (latch.getCount() > 0)
+                    latch.countDown();
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f16becfc/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
index 4b3a00d..65a6a5c 100644
--- a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
+++ b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
@@ -26,6 +26,7 @@ import java.util.HashSet;
 import java.util.HashMap;
 import java.util.Set;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 
 import org.junit.Test;
 
@@ -33,6 +34,7 @@ import org.apache.cassandra.CleanupHelper;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.IFailureDetectionEventListener;
 import org.apache.cassandra.gms.IFailureDetector;
 import org.apache.cassandra.locator.TokenMetadata;
@@ -40,6 +42,8 @@ import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.OperationType;
 import org.apache.cassandra.utils.FBUtilities;
 
+import static org.junit.Assert.*;
+
 import static org.junit.Assert.assertEquals;
 
 public class BootStrapperTest extends CleanupHelper
@@ -73,7 +77,10 @@ public class BootStrapperTest extends CleanupHelper
         };
         Map<InetAddress, Double> load = new HashMap<InetAddress, Double>();
         for (int i = 0; i < addrs.length; i++)
+        {
+            Gossiper.instance.initializeNodeUnsafe(addrs[i], 1);
             load.put(addrs[i], (double)i+2);
+        }
 
         // give every node a bootstrap source.
         for (int i = 3; i >=0; i--)
@@ -155,7 +162,7 @@ public class BootStrapperTest extends CleanupHelper
         }
     }
 
-    private void testSourceTargetComputation(String table, int numOldNodes, int replicationFactor) throws UnknownHostException
+    private RangeStreamer testSourceTargetComputation(String table, int numOldNodes, int replicationFactor) throws UnknownHostException
     {
         StorageService ss = StorageService.instance;
 
@@ -196,6 +203,18 @@ public class BootStrapperTest extends CleanupHelper
         // is used, they will vary.
         assert toFetch.iterator().next().getValue().size() > 0;
         assert !toFetch.iterator().next().getKey().equals(myEndpoint);
+        return s;
+    }
+
+    @Test
+    public void testException() throws UnknownHostException
+    {
+        String table = Schema.instance.getNonSystemTables().iterator().next();
+        int replicationFactor = Table.open(table).getReplicationStrategy().getReplicationFactor();
+        RangeStreamer streamer = testSourceTargetComputation(table, replicationFactor, replicationFactor);
+        streamer.latch = new CountDownLatch(4);
+        streamer.convict(streamer.toFetch().get(table).iterator().next().getKey(), Double.MAX_VALUE);
+        assertNotNull("Exception message not set, test failed", streamer.exceptionMessage);
     }
 
     private void generateFakeEndpoints(int numOldNodes) throws UnknownHostException