You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2020/11/13 13:55:02 UTC

[cassandra] branch trunk updated: Add saved Host IDs to TokenMetadata during startup

This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fde640f  Add saved Host IDs to TokenMetadata during startup
fde640f is described below

commit fde640fe52704836ec21fedd62cae21290e099ec
Author: yifan-c <yc...@gmail.com>
AuthorDate: Thu Nov 5 17:54:11 2020 -0800

    Add saved Host IDs to TokenMetadata during startup
    
    Patch by Yifan Cai; reviewed by Sam Tunnicliffe for CASSANDRA-16246
---
 CHANGES.txt                                        |  1 +
 .../apache/cassandra/service/StorageService.java   | 73 ++++++++++------------
 .../cassandra/distributed/impl/Instance.java       |  9 +++
 .../distributed/test/NetworkTopologyTest.java      | 26 +++++++-
 4 files changed, 68 insertions(+), 41 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index cb4d5bc..cbcc091 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-beta4
+ * Add saved Host IDs to TokenMetadata at startup (CASSANDRA-16246)
  * Ensure that CacheMetrics.requests is picked up by the metric reporter (CASSANDRA-16228)
  * Add a ratelimiter to snapshot creation and deletion (CASSANDRA-13019)
  * Produce consistent tombstone for reads to avoid digest mistmatch (CASSANDRA-15369)
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 4a3477c..3201d80 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -637,21 +637,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         MessagingService.instance().listen();
     }
 
-    public void populateTokenMetadata()
-    {
-        if (Boolean.parseBoolean(System.getProperty("cassandra.load_ring_state", "true")))
-        {
-            logger.info("Populating token metadata from system tables");
-            Multimap<InetAddressAndPort, Token> loadedTokens = SystemKeyspace.loadTokens();
-            if (!shouldBootstrap()) // if we have not completed bootstrapping, we should not add ourselves as a normal token
-                loadedTokens.putAll(FBUtilities.getBroadcastAddressAndPort(), SystemKeyspace.getSavedTokens());
-            for (InetAddressAndPort ep : loadedTokens.keySet())
-                tokenMetadata.updateNormalTokens(loadedTokens.get(ep), ep);
-
-            logger.info("Token metadata: {}", tokenMetadata);
-        }
-    }
-
     public synchronized void initServer() throws ConfigurationException
     {
         initServer(RING_DELAY);
@@ -676,6 +661,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             throw new AssertionError(e);
         }
 
+        if (Boolean.parseBoolean(System.getProperty("cassandra.load_ring_state", "true")))
+        {
+            logger.info("Loading persisted ring state");
+            populatePeerTokenMetadata();
+            for (InetAddressAndPort endpoint : tokenMetadata.getAllEndpoints())
+                Gossiper.runInGossipStageBlocking(() -> Gossiper.instance.addSavedEndpoint(endpoint));
+        }
+
         // daemon threads, like our executors', continue to run while shutdown hooks are invoked
         drainOnShutdown = NamedThreadFactory.createThread(new WrappedRunnable()
         {
@@ -697,8 +690,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         if (!Boolean.parseBoolean(System.getProperty("cassandra.start_gossip", "true")))
         {
             logger.info("Not starting gossip as requested.");
-            // load ring state in preparation for starting gossip later
-            loadRingState();
             initialized = true;
             return;
         }
@@ -740,27 +731,34 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         initialized = true;
     }
 
-    private void loadRingState()
+    public void populateTokenMetadata()
     {
         if (Boolean.parseBoolean(System.getProperty("cassandra.load_ring_state", "true")))
         {
-            logger.info("Loading persisted ring state");
-            Multimap<InetAddressAndPort, Token> loadedTokens = SystemKeyspace.loadTokens();
-            Map<InetAddressAndPort, UUID> loadedHostIds = SystemKeyspace.loadHostIds();
-            for (InetAddressAndPort ep : loadedTokens.keySet())
-            {
-                if (ep.equals(FBUtilities.getBroadcastAddressAndPort()))
-                {
-                    // entry has been mistakenly added, delete it
-                    SystemKeyspace.removeEndpoint(ep);
-                }
-                else
-                {
-                    if (loadedHostIds.containsKey(ep))
-                        tokenMetadata.updateHostId(loadedHostIds.get(ep), ep);
-                    Gossiper.runInGossipStageBlocking(() -> Gossiper.instance.addSavedEndpoint(ep));
-                }
-            }
+            populatePeerTokenMetadata();
+            // if we have not completed bootstrapping, we should not add ourselves as a normal token
+            if (!shouldBootstrap())
+                tokenMetadata.updateNormalTokens(SystemKeyspace.getSavedTokens(), FBUtilities.getBroadcastAddressAndPort());
+
+            logger.info("Token metadata: {}", tokenMetadata);
+        }
+    }
+
+    private void populatePeerTokenMetadata()
+    {
+        logger.info("Populating token metadata from system tables");
+        Multimap<InetAddressAndPort, Token> loadedTokens = SystemKeyspace.loadTokens();
+
+        // entry has been mistakenly added, delete it
+        if (loadedTokens.containsKey(FBUtilities.getBroadcastAddressAndPort()))
+            SystemKeyspace.removeEndpoint(FBUtilities.getBroadcastAddressAndPort());
+
+        Map<InetAddressAndPort, UUID> loadedHostIds = SystemKeyspace.loadHostIds();
+        for (InetAddressAndPort ep : loadedTokens.keySet())
+        {
+            tokenMetadata.updateNormalTokens(loadedTokens.get(ep), ep);
+            if (loadedHostIds.containsKey(ep))
+                tokenMetadata.updateHostId(loadedHostIds.get(ep), ep);
         }
     }
 
@@ -868,11 +866,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             appStates.put(ApplicationState.RPC_ADDRESS, valueFactory.rpcaddress(FBUtilities.getJustBroadcastNativeAddress()));
             appStates.put(ApplicationState.RELEASE_VERSION, valueFactory.releaseVersion());
 
-            // load the persisted ring state. This used to be done earlier in the init process,
-            // but now we always perform a shadow round when preparing to join and we have to
-            // clear endpoint states after doing that.
-            loadRingState();
-
             logger.info("Starting up server gossip");
             Gossiper.instance.register(this);
             Gossiper.instance.start(SystemKeyspace.incrementAndGetGeneration(), appStates); // needed for node-ring gathering.
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 7ccb59b..546f318 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -420,6 +420,10 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
                 SystemKeyspace.persistLocalMetadata();
                 SystemKeyspaceMigrator40.migrate();
 
+                // Same order to populate tokenMetadata for the first time,
+                // see org.apache.cassandra.service.CassandraDaemon.setup
+                StorageService.instance.populateTokenMetadata();
+
                 try
                 {
                     // load schema from disk
@@ -485,6 +489,11 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
                 }
 
                 StorageService.instance.ensureTraceKeyspace();
+
+                // Populate tokenMetadata for the second time,
+                // see org.apache.cassandra.service.CassandraDaemon.setup
+                StorageService.instance.populateTokenMetadata();
+
                 SystemKeyspace.finishStartup();
 
                 CassandraDaemon.getInstanceForTesting().setupCompleted();
diff --git a/test/distributed/org/apache/cassandra/distributed/test/NetworkTopologyTest.java b/test/distributed/org/apache/cassandra/distributed/test/NetworkTopologyTest.java
index a4968c6..0b25658 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/NetworkTopologyTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/NetworkTopologyTest.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.distributed.test;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -27,11 +28,15 @@ import java.util.stream.Stream;
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.ICluster;
 import org.apache.cassandra.distributed.api.IInstance;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.shared.NetworkTopology;
 
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
 // TODO: this test should be removed after running in-jvm dtests is set up via the shared API repository
 public class NetworkTopologyTest extends TestBaseImpl
 {
@@ -98,4 +103,23 @@ public class NetworkTopologyTest extends TestBaseImpl
     {
         builder().withNodeIdTopology(Collections.singletonMap(2, NetworkTopology.dcAndRack("doomed", "rack")));
     }
-}
\ No newline at end of file
+
+    @Test
+    public void noWarningForNetworkTopologyStategyConfigOnRestart() throws Exception {
+        int nodesPerDc = 2;
+        try (Cluster cluster = builder().withConfig(c -> c.with(GOSSIP, NETWORK))
+                                        .withRacks(2, 1, nodesPerDc)
+                                        .start()) {
+            cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE +
+                                 " WITH replication = {'class': 'NetworkTopologyStrategy', " +
+                                 "'datacenter1' : " + nodesPerDc + ", 'datacenter2' : " + nodesPerDc + " };");
+            cluster.get(2).nodetool("flush");
+            // Stop node 2 in datacenter 1
+            cluster.get(2).shutdown().get();
+            // Restart node 2 in datacenter 1
+            cluster.get(2).startup();
+            List<String> result = cluster.get(2).logs().grep("Ignoring Unrecognized strategy option \\{datacenter2\\}").getResult();
+            Assert.assertTrue("Not expected to see the warning about unrecognized option", result.isEmpty());
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org