You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by mc...@apache.org on 2021/03/19 15:56:13 UTC
[cassandra] branch trunk updated: Better handle legacy gossip
application states during (and after) upgrades
This is an automated email from the ASF dual-hosted git repository.
mck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new c591978 Better handle legacy gossip application states during (and after) upgrades
c591978 is described below
commit c591978f4d265e42d0132418005ba63a99278c75
Author: Yifan Cai <yc...@apache.org>
AuthorDate: Mon Mar 15 20:40:25 2021 -0700
Better handle legacy gossip application states during (and after) upgrades
Only remove duplicated legacy application states when `!hasMajorVersion3Nodes()` but always avoid duplicate status notifications.
patch by Yifan Cai; reviewed by Mick Semb Wever for CASSANDRA-16525
---
CHANGES.txt | 1 +
.../org/apache/cassandra/gms/EndpointState.java | 48 ++++++
src/java/org/apache/cassandra/gms/Gossiper.java | 54 +++----
.../distributed/upgrade/MixedModeGossipTest.java | 168 +++++++++++++++++++++
.../distributed/upgrade/UpgradeTestBase.java | 18 ++-
.../operations/InsertUpdateIfConditionTest.java | 9 +-
.../cassandra/db/filter/ColumnFilterTest.java | 8 +-
.../org/apache/cassandra/gms/GossiperTest.java | 143 +++++++++++++++---
8 files changed, 393 insertions(+), 56 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index e6f15ff..b700547 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-beta5
+ * Better handle legacy gossip application states during (and after) upgrades (CASSANDRA-16525)
* Mark StreamingMetrics.ActiveOutboundStreams as deprecated (CASSANDRA-11174)
* Increase the cqlsh version number (CASSANDRA-16509)
* Fix the CQL generated for the views.where_clause column when some identifiers require quoting (CASSANDRA-16479)
diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java b/src/java/org/apache/cassandra/gms/EndpointState.java
index a4b294c..b8d5626 100644
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@ -20,11 +20,13 @@ package org.apache.cassandra.gms;
import java.io.*;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
@@ -84,6 +86,11 @@ public class EndpointState
return applicationState.get().get(key);
}
+ public boolean containsApplicationState(ApplicationState key)
+ {
+ return applicationState.get().containsKey(key);
+ }
+
public Set<Map.Entry<ApplicationState, VersionedValue>> states()
{
return applicationState.get().entrySet();
@@ -114,6 +121,47 @@ public class EndpointState
}
}
+ void removeMajorVersion3LegacyApplicationStates()
+ {
+ while (hasLegacyFields())
+ {
+ Map<ApplicationState, VersionedValue> orig = applicationState.get();
+ Map<ApplicationState, VersionedValue> updatedStates = filterMajorVersion3LegacyApplicationStates(orig);
+ // avoid updating if no state is removed
+ if (orig.size() == updatedStates.size()
+ || applicationState.compareAndSet(orig, updatedStates))
+ return;
+ }
+ }
+
+ private boolean hasLegacyFields()
+ {
+ Set<ApplicationState> statesPresent = applicationState.get().keySet();
+ if (statesPresent.isEmpty())
+ return false;
+ return (statesPresent.contains(ApplicationState.STATUS) && statesPresent.contains(ApplicationState.STATUS_WITH_PORT))
+ || (statesPresent.contains(ApplicationState.INTERNAL_IP) && statesPresent.contains(ApplicationState.INTERNAL_ADDRESS_AND_PORT))
+ || (statesPresent.contains(ApplicationState.RPC_ADDRESS) && statesPresent.contains(ApplicationState.NATIVE_ADDRESS_AND_PORT));
+ }
+
+ private static Map<ApplicationState, VersionedValue> filterMajorVersion3LegacyApplicationStates(Map<ApplicationState, VersionedValue> states)
+ {
+ return states.entrySet().stream().filter(entry -> {
+ // Filter out pre-4.0 versions of data for more complete 4.0 versions
+ switch (entry.getKey())
+ {
+ case INTERNAL_IP:
+ return !states.containsKey(ApplicationState.INTERNAL_ADDRESS_AND_PORT);
+ case STATUS:
+ return !states.containsKey(ApplicationState.STATUS_WITH_PORT);
+ case RPC_ADDRESS:
+ return !states.containsKey(ApplicationState.NATIVE_ADDRESS_AND_PORT);
+ default:
+ return true;
+ }
+ }).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
+
/* getters and setters */
/**
* @return System.nanoTime() when state was updated last time.
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index a092c77..b5434aa 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -175,9 +175,17 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
if (!upgradeInProgressPossible)
return new ExpiringMemoizingSupplier.Memoized<>(null);
- Iterable<InetAddressAndPort> allHosts = Iterables.concat(Gossiper.instance.getLiveMembers(), Gossiper.instance.getUnreachableMembers());
-
CassandraVersion minVersion = SystemKeyspace.CURRENT_VERSION.familyLowerBound.get();
+
+ // Skip the round if the gossiper has not started yet
+ // Otherwise, upgradeInProgressPossible can be set to false wrongly.
+ if (!isEnabled())
+ {
+ return new ExpiringMemoizingSupplier.Memoized<>(minVersion);
+ }
+
+ Iterable<InetAddressAndPort> allHosts = Iterables.concat(Gossiper.instance.getLiveMembers(),
+ Gossiper.instance.getUnreachableMembers());
boolean allHostsHaveKnownVersion = true;
for (InetAddressAndPort host : allHosts)
{
@@ -1394,7 +1402,9 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
}
EndpointState localEpStatePtr = endpointStateMap.get(ep);
- EndpointState remoteState = removeRedundantApplicationStates(entry.getValue());
+ EndpointState remoteState = entry.getValue();
+ if (!hasMajorVersion3Nodes())
+ remoteState.removeMajorVersion3LegacyApplicationStates();
/*
If state does not exist just add it. If it does then add it if the remote generation is greater.
@@ -1452,32 +1462,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
}
}
- // remove duplicated deprecated states
- private static EndpointState removeRedundantApplicationStates(EndpointState remoteState)
- {
- if (remoteState.states().isEmpty())
- return remoteState;
-
- Map<ApplicationState, VersionedValue> updatedStates = remoteState.states().stream().filter(entry -> {
- // Filter out pre-4.0 versions of data for more complete 4.0 versions
- switch (entry.getKey())
- {
- case INTERNAL_IP:
- return (null == remoteState.getApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT));
- case STATUS:
- return (null == remoteState.getApplicationState(ApplicationState.STATUS_WITH_PORT));
- case RPC_ADDRESS:
- return (null == remoteState.getApplicationState(ApplicationState.NATIVE_ADDRESS_AND_PORT));
- default:
- return true;
- }
- }).collect(Collectors.toMap(Entry::getKey, Entry::getValue));
-
- EndpointState updated = new EndpointState(remoteState.getHeartBeatState(), updatedStates);
- if (!remoteState.isAlive()) updated.markDead();
- return updated;
- }
-
private void applyNewStates(InetAddressAndPort addr, EndpointState localState, EndpointState remoteState)
{
// don't assert here, since if the node restarts the version will go back to zero
@@ -1506,8 +1490,20 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
}
localState.addApplicationStates(updatedStates);
+ // get rid of legacy fields once the cluster is not in mixed mode
+ if (!hasMajorVersion3Nodes())
+ localState.removeMajorVersion3LegacyApplicationStates();
+
for (Entry<ApplicationState, VersionedValue> updatedEntry : updatedStates)
+ {
+ // filters out legacy change notifications
+ // only if local state already indicates that the peer has the new fields
+ if ((ApplicationState.INTERNAL_IP == updatedEntry.getKey() && localState.containsApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT))
+ ||(ApplicationState.STATUS == updatedEntry.getKey() && localState.containsApplicationState(ApplicationState.STATUS_WITH_PORT))
+ || (ApplicationState.RPC_ADDRESS == updatedEntry.getKey() && localState.containsApplicationState(ApplicationState.NATIVE_ADDRESS_AND_PORT)))
+ continue;
doOnChangeNotifications(addr, updatedEntry.getKey(), updatedEntry.getValue());
+ }
}
// notify that a local application state is going to change (doesn't get triggered for remote changes)
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeGossipTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeGossipTest.java
new file mode 100644
index 0000000..83e911d
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeGossipTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.distributed.shared.Versions;
+import org.apache.cassandra.net.Verb;
+import org.assertj.core.api.Assertions;
+
+public class MixedModeGossipTest extends UpgradeTestBase
+{
+ Pattern expectedNormalStatus = Pattern.compile("STATUS:\\d+:NORMAL,-?\\d+");
+ Pattern expectedNormalStatusWithPort = Pattern.compile("STATUS_WITH_PORT:\\d+:NORMAL,-?\\d+");
+ Pattern expectedNormalX3 = Pattern.compile("X3:\\d+:NORMAL,-?\\d+");
+
+ @Test
+ public void testStatusFieldShouldExistInOldVersionNodes() throws Throwable
+ {
+ new TestCase()
+ .withConfig(c -> c.with(Feature.GOSSIP, Feature.NETWORK))
+ .nodes(3)
+ .nodesToUpgradeOrdered(1, 2, 3)
+ .upgrade(Versions.Major.v30, Versions.Major.v4)
+ .upgrade(Versions.Major.v3X, Versions.Major.v4)
+ .setup(c -> {})
+ .runAfterNodeUpgrade((cluster, node) -> {
+ if (node == 1) {
+ checkPeerGossipInfoShouldContainNormalStatus(cluster, 2);
+ checkPeerGossipInfoShouldContainNormalStatus(cluster, 3);
+ }
+ if (node == 2) {
+ checkPeerGossipInfoShouldContainNormalStatus(cluster, 3);
+ }
+ })
+ .runAfterClusterUpgrade(cluster -> {
+ // wait 1 minute for `org.apache.cassandra.gms.Gossiper.upgradeFromVersionSupplier` to update
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MINUTES);
+ checkPeerGossipInfoOfAllNodesShouldContainNewStatusAfterUpgrade(cluster);
+ })
+ .run();
+ }
+
+ /**
+ * Similar to {@link #testStatusFieldShouldExistInOldVersionNodes}, but in an edge case that
+ * 1) node2 and node3 cannot gossip with each other.
+ * 2) node2 sends SYN to node1 first when upgrading.
+ * 3) node3 is at the lower version during the cluster upgrade
+ * In this case, node3 gossip info does not contain STATUS field for node2
+ */
+ @Test
+ public void testStatusFieldShouldExistInOldVersionNodesEdgeCase() throws Throwable
+ {
+ AtomicReference<IMessageFilters.Filter> n1GossipSynBlocker = new AtomicReference<>();
+ new TestCase()
+ .withConfig(c -> c.with(Feature.GOSSIP, Feature.NETWORK))
+ .nodes(3)
+ .nodesToUpgradeOrdered(1, 2, 3)
+ .upgrade(Versions.Major.v30, Versions.Major.v4)
+ .upgrade(Versions.Major.v3X, Versions.Major.v4)
+ .setup(cluster -> {
+ // node2 and node3 gossiper cannot talk with each other
+ cluster.filters().verbs(Verb.GOSSIP_DIGEST_SYN.id).from(2).to(3).drop();
+ cluster.filters().verbs(Verb.GOSSIP_DIGEST_SYN.id).from(3).to(2).drop();
+ })
+ .runAfterNodeUpgrade((cluster, node) -> {
+ // let node2 sends the SYN to node1 first
+ if (node == 1)
+ {
+ IMessageFilters.Filter filter = cluster.filters().verbs(Verb.GOSSIP_DIGEST_SYN.id).from(1).to(2).drop();
+ n1GossipSynBlocker.set(filter);
+ }
+ else if (node == 2)
+ {
+ n1GossipSynBlocker.get().off();
+ String node3GossipView = cluster.get(3).nodetoolResult("gossipinfo").getStdout();
+ String node2GossipState = getGossipStateOfNode(node3GossipView, "/127.0.0.2");
+ Assertions.assertThat(node2GossipState)
+ .as("The node2's gossip state from node3's perspective should contain status. " +
+ "And it should carry an unrecognized field X3 with NORMAL.")
+ .containsPattern(expectedNormalStatus)
+ .containsPattern(expectedNormalX3);
+ }
+ })
+ .runAfterClusterUpgrade(cluster -> {
+ // wait 1 minute for `org.apache.cassandra.gms.Gossiper.upgradeFromVersionSupplier` to update
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MINUTES);
+ checkPeerGossipInfoOfAllNodesShouldContainNewStatusAfterUpgrade(cluster);
+ })
+ .run();
+ }
+
+ private void checkPeerGossipInfoOfAllNodesShouldContainNewStatusAfterUpgrade(UpgradeableCluster cluster)
+ {
+ for (int i = 1; i <= 3; i++)
+ {
+ int n = i;
+ checkPeerGossipInfo(cluster, i, (gossipInfo, peers) -> {
+ for (String p : peers)
+ {
+ Assertions.assertThat(getGossipStateOfNode(gossipInfo, p))
+ .as(String.format("%s gossip state in node%s should not contain STATUS " +
+ "and should contain STATUS_WITH_PORT.", p, n))
+ .doesNotContain("STATUS:")
+ .containsPattern(expectedNormalStatusWithPort);
+ }
+ });
+ }
+ }
+
+ private void checkPeerGossipInfoShouldContainNormalStatus(UpgradeableCluster cluster, int node)
+ {
+ checkPeerGossipInfo(cluster, node, (gossipInfo, peers) -> {
+ for (String n : peers)
+ {
+ Assertions.assertThat(getGossipStateOfNode(gossipInfo, n))
+ .containsPattern(expectedNormalStatus);
+ }
+ });
+ }
+
+ private void checkPeerGossipInfo(UpgradeableCluster cluster, int node, BiConsumer<String, Set<String>> verifier)
+ {
+ Set<Integer> peers = new HashSet<>(Arrays.asList(1, 2, 3));
+ peers.remove(node);
+ String gossipInfo = cluster.get(node).nodetoolResult("gossipinfo").getStdout();
+ verifier.accept(gossipInfo, peers.stream().map(i -> "127.0.0." + i).collect(Collectors.toSet()));
+ }
+
+ private String getGossipStateOfNode(String rawOutput, String nodeInterested)
+ {
+ String temp = rawOutput.substring(rawOutput.indexOf(nodeInterested));
+ int nextSlashIndex = temp.indexOf('/', 1);
+ if (nextSlashIndex != -1)
+ return temp.substring(0, nextSlashIndex);
+ else
+ return temp;
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
index 39957e9..46c8d24 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.distributed.upgrade;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
@@ -32,7 +33,9 @@ import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.distributed.UpgradeableCluster;
import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstance;
import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.impl.AbstractCluster.AbstractBuilder;
import org.apache.cassandra.distributed.impl.Instance;
import org.apache.cassandra.distributed.shared.DistributedTestBase;
import org.apache.cassandra.distributed.shared.Versions;
@@ -93,7 +96,7 @@ public class UpgradeTestBase extends DistributedTestBase
private RunOnCluster setup;
private RunOnClusterAndNode runAfterNodeUpgrade;
private RunOnCluster runAfterClusterUpgrade;
- private final Set<Integer> nodesToUpgrade = new HashSet<>();
+ private final Set<Integer> nodesToUpgrade = new LinkedHashSet<>();
private Consumer<IInstanceConfig> configConsumer;
public TestCase()
@@ -191,13 +194,24 @@ public class UpgradeTestBase extends DistributedTestBase
}
public TestCase nodesToUpgrade(int ... nodes)
{
+ Set<Integer> set = new HashSet<>(nodes.length);
+ for (int n : nodes)
+ {
+ set.add(n);
+ }
+ nodesToUpgrade.addAll(set);
+ return this;
+ }
+
+ public TestCase nodesToUpgradeOrdered(int ... nodes)
+ {
for (int n : nodes)
{
nodesToUpgrade.add(n);
}
return this;
}
- }
+ }
protected TestCase allUpgrades(int nodes, int... toUpgrade)
{
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
index 59770b8..4db1ef5 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -72,7 +73,7 @@ public class InsertUpdateIfConditionTest extends CQLTester
@BeforeClass
public static void beforeClass()
{
- Gossiper.instance.maybeInitializeLocalState(0);
+ Gossiper.instance.start(0);
}
@Before
@@ -82,6 +83,12 @@ public class InsertUpdateIfConditionTest extends CQLTester
assertion.run();
}
+ @AfterClass
+ public static void afterClass()
+ {
+ Gossiper.instance.stop();
+ }
+
/**
* Migrated from cql_tests.py:TestCQL.cas_simple_test()
*/
diff --git a/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java b/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
index 194a92f..fd7ee25 100644
--- a/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
+++ b/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
@@ -31,6 +31,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.db.RegularAndStaticColumns;
import org.apache.cassandra.db.marshal.Int32Type;
@@ -41,6 +42,7 @@ import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.locator.SimpleSnitch;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableMetadata;
@@ -89,7 +91,9 @@ public class ColumnFilterTest
@BeforeClass
public static void beforeClass()
{
- Gossiper.instance.maybeInitializeLocalState(0);
+ DatabaseDescriptor.setSeedProvider(Arrays::asList);
+ DatabaseDescriptor.setEndpointSnitch(new SimpleSnitch());
+ Gossiper.instance.start(0);
}
@Before
@@ -533,4 +537,4 @@ public class ColumnFilterTest
}
}
}
-}
\ No newline at end of file
+}
diff --git a/test/unit/org/apache/cassandra/gms/GossiperTest.java b/test/unit/org/apache/cassandra/gms/GossiperTest.java
index c7abb44..ab28572 100644
--- a/test/unit/org/apache/cassandra/gms/GossiperTest.java
+++ b/test/unit/org/apache/cassandra/gms/GossiperTest.java
@@ -24,10 +24,13 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
import com.google.common.collect.ImmutableMap;
import com.google.common.net.InetAddresses;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -48,6 +51,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class GossiperTest
{
@@ -81,6 +85,12 @@ public class GossiperTest
DatabaseDescriptor.setSeedProvider(originalSeedProvider);
}
+ @AfterClass
+ public static void afterClass()
+ {
+ Gossiper.instance.stop();
+ }
+
@Test
public void testPaddingIntact() throws Exception
{
@@ -100,6 +110,7 @@ public class GossiperTest
@Test
public void testHasVersion3Nodes() throws Exception
{
+ Gossiper.instance.start(0);
Gossiper.instance.expireUpgradeFromVersion();
VersionedValue.VersionedValueFactory factory = new VersionedValue.VersionedValueFactory(null);
@@ -186,6 +197,7 @@ public class GossiperTest
VersionedValue.VersionedValueFactory valueFactory =
new VersionedValue.VersionedValueFactory(DatabaseDescriptor.getPartitioner());
+ SimpleStateChangeListener stateChangeListener = null;
Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 2);
try
{
@@ -204,28 +216,12 @@ public class GossiperTest
VersionedValue tokensValue = valueFactory.tokens(Collections.singletonList(token));
proposedRemoteState.addApplicationState(ApplicationState.TOKENS, tokensValue);
- Gossiper.instance.register(
- new IEndpointStateChangeSubscriber()
- {
- public void onJoin(InetAddressAndPort endpoint, EndpointState epState) { }
-
- public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) { }
-
- public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value)
- {
- assertEquals(ApplicationState.TOKENS, state);
- stateChangedNum++;
- }
-
- public void onAlive(InetAddressAndPort endpoint, EndpointState state) { }
-
- public void onDead(InetAddressAndPort endpoint, EndpointState state) { }
-
- public void onRemove(InetAddressAndPort endpoint) { }
-
- public void onRestart(InetAddressAndPort endpoint, EndpointState state) { }
- }
- );
+ stateChangeListener = new SimpleStateChangeListener();
+ stateChangeListener.setOnChangeVerifier(onChangeParams -> {
+ assertEquals(ApplicationState.TOKENS, onChangeParams.state);
+ stateChangedNum++;
+ });
+ Gossiper.instance.register(stateChangeListener);
stateChangedNum = 0;
Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, proposedRemoteState));
@@ -253,6 +249,8 @@ public class GossiperTest
{
// clean up the gossip states
Gossiper.instance.endpointStateMap.clear();
+ if (stateChangeListener != null)
+ Gossiper.instance.unregister(stateChangeListener);
}
}
@@ -352,6 +350,107 @@ public class GossiperTest
assertTrue(gossiper.getSeeds().contains(a.toString()));
}
+ @Test
+ public void testNotFireDuplicatedNotificationsWithUpdateContainsOldAndNewState() throws UnknownHostException
+ {
+ VersionedValue.VersionedValueFactory valueFactory =
+ new VersionedValue.VersionedValueFactory(DatabaseDescriptor.getPartitioner());
+
+ Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 2);
+ SimpleStateChangeListener stateChangeListener = null;
+ try
+ {
+ InetAddressAndPort remoteHostAddress = hosts.get(1);
+ EndpointState initialRemoteState = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress);
+ HeartBeatState initialRemoteHeartBeat = initialRemoteState.getHeartBeatState();
+ //Util.createInitialRing should have initialized remoteHost's HeartBeatState's generation to 1
+ assertEquals(initialRemoteHeartBeat.getGeneration(), 1);
+
+ // Test begins
+ AtomicInteger notificationCount = new AtomicInteger(0);
+ HeartBeatState proposedRemoteHeartBeat = new HeartBeatState(initialRemoteHeartBeat.getGeneration());
+ EndpointState proposedRemoteState = new EndpointState(proposedRemoteHeartBeat);
+ final Token token = DatabaseDescriptor.getPartitioner().getRandomToken();
+ proposedRemoteState.addApplicationState(ApplicationState.STATUS, valueFactory.normal(Collections.singletonList(token)));
+
+ stateChangeListener = new SimpleStateChangeListener();
+ Gossiper.instance.register(stateChangeListener);
+
+ // STEP 1. register verifier and apply state with just STATUS
+ // simulate applying gossip state from a v3 peer
+ stateChangeListener.setOnChangeVerifier(onChangeParams -> {
+ notificationCount.getAndIncrement();
+ assertEquals("It should fire notification for STATUS when gossiper local state not yet has STATUS_WITH_PORT",
+ ApplicationState.STATUS, onChangeParams.state);
+ });
+ Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, proposedRemoteState));
+
+ // STEP 2. includes both STATUS and STATUS_WITH_PORT. The gossiper knows that the remote peer is now in v4
+ // update verifier and apply state again
+ proposedRemoteState.addApplicationState(ApplicationState.STATUS_WITH_PORT, valueFactory.normal(Collections.singletonList(token)));
+ stateChangeListener.setOnChangeVerifier(onChangeParams -> {
+ notificationCount.getAndIncrement();
+ assertEquals("It should only fire notification for STATUS_WITH_PORT",
+ ApplicationState.STATUS_WITH_PORT, onChangeParams.state);
+ });
+ Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, proposedRemoteState));
+
+ // STEP 3. somehow, the peer send only the STATUS in the update.
+ proposedRemoteState = new EndpointState(proposedRemoteHeartBeat);
+ proposedRemoteState.addApplicationState(ApplicationState.STATUS, valueFactory.normal(Collections.singletonList(token)));
+ stateChangeListener.setOnChangeVerifier(onChangeParams -> {
+ notificationCount.getAndIncrement();
+ fail("It should not fire notification for STATUS");
+ });
+
+ assertEquals("Expect exact 2 notifications with the test setup",
+ 2, notificationCount.get());
+ }
+ finally
+ {
+ // clean up the gossip states
+ Gossiper.instance.endpointStateMap.clear();
+ if (stateChangeListener != null)
+ Gossiper.instance.unregister(stateChangeListener);
+ }
+ }
+
+ static class SimpleStateChangeListener implements IEndpointStateChangeSubscriber
+ {
+ static class OnChangeParams
+ {
+ InetAddressAndPort endpoint;
+ ApplicationState state;
+ VersionedValue value;
+
+ OnChangeParams(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value)
+ {
+ this.endpoint = endpoint;
+ this.state = state;
+ this.value = value;
+ }
+ }
+
+ private volatile Consumer<OnChangeParams> onChangeVerifier;
+
+ public void setOnChangeVerifier(Consumer<OnChangeParams> verifier)
+ {
+ onChangeVerifier = verifier;
+ }
+
+ public void onJoin(InetAddressAndPort endpoint, EndpointState epState) {}
+ public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
+ public void onAlive(InetAddressAndPort endpoint, EndpointState state) {}
+ public void onDead(InetAddressAndPort endpoint, EndpointState state) {}
+ public void onRemove(InetAddressAndPort endpoint) {}
+ public void onRestart(InetAddressAndPort endpoint, EndpointState state) {}
+
+ public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value)
+ {
+ onChangeVerifier.accept(new OnChangeParams(endpoint, state, value));
+ }
+ }
+
static class TestSeedProvider implements SeedProvider
{
private List<InetAddressAndPort> seeds;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org