You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2014/01/10 19:51:56 UTC

[7/8] git commit: Don't update system table for unknown nodes/dead states Patch by Tyler Hobbs, reviewed by brandonwilliams for CASSANDRA-6053

Don't update system table for unknown nodes/dead states
Patch by Tyler Hobbs, reviewed by brandonwilliams for CASSANDRA-6053


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c1c3d8f3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c1c3d8f3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c1c3d8f3

Branch: refs/heads/cassandra-2.0
Commit: c1c3d8f3f351a6834e5b02c12790d0d1163107bf
Parents: 15cd55f
Author: Brandon Williams <br...@apache.org>
Authored: Fri Jan 10 12:49:38 2014 -0600
Committer: Brandon Williams <br...@apache.org>
Committed: Fri Jan 10 12:49:38 2014 -0600

----------------------------------------------------------------------
 .../cassandra/service/StorageService.java       | 95 +++++++++++---------
 .../service/LeaveAndBootstrapTest.java          | 31 +++++++
 2 files changed, 84 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1c3d8f3/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 102e0d8..1ab97b2 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1307,48 +1307,59 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      */
     public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value)
     {
-        switch (state)
-        {
-            case STATUS:
-                String apStateValue = value.value;
-                String[] pieces = apStateValue.split(VersionedValue.DELIMITER_STR, -1);
-                assert (pieces.length > 0);
-
-                String moveName = pieces[0];
-
-                if (moveName.equals(VersionedValue.STATUS_BOOTSTRAPPING))
-                    handleStateBootstrap(endpoint, pieces);
-                else if (moveName.equals(VersionedValue.STATUS_NORMAL))
-                    handleStateNormal(endpoint, pieces);
-                else if (moveName.equals(VersionedValue.REMOVING_TOKEN) || moveName.equals(VersionedValue.REMOVED_TOKEN))
-                    handleStateRemoving(endpoint, pieces);
-                else if (moveName.equals(VersionedValue.STATUS_LEAVING))
-                    handleStateLeaving(endpoint, pieces);
-                else if (moveName.equals(VersionedValue.STATUS_LEFT))
-                    handleStateLeft(endpoint, pieces);
-                else if (moveName.equals(VersionedValue.STATUS_MOVING))
-                    handleStateMoving(endpoint, pieces);
-                else if (moveName.equals(VersionedValue.STATUS_RELOCATING))
-                    handleStateRelocating(endpoint, pieces);
-                break;
-            case RELEASE_VERSION:
-                SystemKeyspace.updatePeerInfo(endpoint, "release_version", quote(value.value));
-                break;
-            case DC:
-                SystemKeyspace.updatePeerInfo(endpoint, "data_center", quote(value.value));
-                break;
-            case RACK:
-                SystemKeyspace.updatePeerInfo(endpoint, "rack", quote(value.value));
-                break;
-            case RPC_ADDRESS:
-                SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", quote(value.value));
-                break;
-            case SCHEMA:
-                SystemKeyspace.updatePeerInfo(endpoint, "schema_version", value.value);
-                break;
-            case HOST_ID:
-                SystemKeyspace.updatePeerInfo(endpoint, "host_id", value.value);
-                break;
+        if (state.equals(ApplicationState.STATUS))
+        {
+            String apStateValue = value.value;
+            String[] pieces = apStateValue.split(VersionedValue.DELIMITER_STR, -1);
+            assert (pieces.length > 0);
+
+            String moveName = pieces[0];
+
+            if (moveName.equals(VersionedValue.STATUS_BOOTSTRAPPING))
+                handleStateBootstrap(endpoint, pieces);
+            else if (moveName.equals(VersionedValue.STATUS_NORMAL))
+                handleStateNormal(endpoint, pieces);
+            else if (moveName.equals(VersionedValue.REMOVING_TOKEN) || moveName.equals(VersionedValue.REMOVED_TOKEN))
+                handleStateRemoving(endpoint, pieces);
+            else if (moveName.equals(VersionedValue.STATUS_LEAVING))
+                handleStateLeaving(endpoint, pieces);
+            else if (moveName.equals(VersionedValue.STATUS_LEFT))
+                handleStateLeft(endpoint, pieces);
+            else if (moveName.equals(VersionedValue.STATUS_MOVING))
+                handleStateMoving(endpoint, pieces);
+            else if (moveName.equals(VersionedValue.STATUS_RELOCATING))
+                handleStateRelocating(endpoint, pieces);
+        }
+        else
+        {
+            EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
+            if (epState == null || Gossiper.instance.isDeadState(epState))
+            {
+                logger.debug("Ignoring state change for dead or unknown endpoint: {}", endpoint);
+                return;
+            }
+
+            switch (state)
+            {
+                case RELEASE_VERSION:
+                    SystemKeyspace.updatePeerInfo(endpoint, "release_version", quote(value.value));
+                    break;
+                case DC:
+                    SystemKeyspace.updatePeerInfo(endpoint, "data_center", quote(value.value));
+                    break;
+                case RACK:
+                    SystemKeyspace.updatePeerInfo(endpoint, "rack", quote(value.value));
+                    break;
+                case RPC_ADDRESS:
+                    SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", quote(value.value));
+                    break;
+                case SCHEMA:
+                    SystemKeyspace.updatePeerInfo(endpoint, "schema_version", value.value);
+                    break;
+                case HOST_ID:
+                    SystemKeyspace.updatePeerInfo(endpoint, "host_id", value.value);
+                    break;
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1c3d8f3/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
index 44b3400..a9d8057 100644
--- a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
+++ b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
@@ -24,6 +24,7 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.*;
 
+import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.config.Schema;
 import org.junit.Test;
@@ -650,6 +651,36 @@ public class LeaveAndBootstrapTest
         assertFalse(tmd.isLeaving(hosts.get(2)));
     }
 
+    /**
+     * Tests that the system.peers table is not updated after a node has been removed. (See CASSANDRA-6053)
+     */
+    @Test
+    public void testStateChangeOnRemovedNode() throws UnknownHostException
+    {
+        StorageService ss = StorageService.instance;
+        VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(partitioner);
+
+        // create a ring of 2 nodes
+        ArrayList<Token> endpointTokens = new ArrayList<>();
+        List<InetAddress> hosts = new ArrayList<>();
+        Util.createInitialRing(ss, partitioner, endpointTokens, new ArrayList<Token>(), hosts, new ArrayList<UUID>(), 2);
+
+        InetAddress toRemove = hosts.get(1);
+        SystemKeyspace.updatePeerInfo(toRemove, "data_center", "'dc42'");
+        SystemKeyspace.updatePeerInfo(toRemove, "rack", "'rack42'");
+        assertEquals("rack42", SystemKeyspace.loadDcRackInfo().get(toRemove).get("rack"));
+
+        // mark the node as removed
+        Gossiper.instance.injectApplicationState(toRemove, ApplicationState.STATUS,
+                valueFactory.left(Collections.singleton(endpointTokens.get(1)), Gossiper.computeExpireTime()));
+        assertTrue(Gossiper.instance.isDeadState(Gossiper.instance.getEndpointStateForEndpoint(hosts.get(1))));
+
+        // state changes made after the endpoint has left should be ignored
+        ss.onChange(hosts.get(1), ApplicationState.RACK,
+                valueFactory.rack("rack9999"));
+        assertEquals("rack42", SystemKeyspace.loadDcRackInfo().get(toRemove).get("rack"));
+    }
+
     private static Collection<InetAddress> makeAddrs(String... hosts) throws UnknownHostException
     {
         ArrayList<InetAddress> addrs = new ArrayList<InetAddress>(hosts.length);