You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by mc...@apache.org on 2021/03/19 15:56:13 UTC

[cassandra] branch trunk updated: Better handle legacy gossip application states during (and after) upgrades

This is an automated email from the ASF dual-hosted git repository.

mck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c591978  Better handle legacy gossip application states during (and after) upgrades
c591978 is described below

commit c591978f4d265e42d0132418005ba63a99278c75
Author: Yifan Cai <yc...@apache.org>
AuthorDate: Mon Mar 15 20:40:25 2021 -0700

    Better handle legacy gossip application states during (and after) upgrades
    
     Only remove duplicated legacy application states when `!hasMajorVersion3Nodes()` but always avoid duplicate status notifications.
    
     patch by Yifan Cai; reviewed by Mick Semb Wever for CASSANDRA-16525
---
 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/gms/EndpointState.java    |  48 ++++++
 src/java/org/apache/cassandra/gms/Gossiper.java    |  54 +++----
 .../distributed/upgrade/MixedModeGossipTest.java   | 168 +++++++++++++++++++++
 .../distributed/upgrade/UpgradeTestBase.java       |  18 ++-
 .../operations/InsertUpdateIfConditionTest.java    |   9 +-
 .../cassandra/db/filter/ColumnFilterTest.java      |   8 +-
 .../org/apache/cassandra/gms/GossiperTest.java     | 143 +++++++++++++++---
 8 files changed, 393 insertions(+), 56 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index e6f15ff..b700547 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-beta5
+ * Better handle legacy gossip application states during (and after) upgrades (CASSANDRA-16525)
  * Mark StreamingMetrics.ActiveOutboundStreams as deprecated (CASSANDRA-11174)
  * Increase the cqlsh version number (CASSANDRA-16509)
  * Fix the CQL generated for the views.where_clause column when some identifiers require quoting (CASSANDRA-16479)
diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java b/src/java/org/apache/cassandra/gms/EndpointState.java
index a4b294c..b8d5626 100644
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@ -20,11 +20,13 @@ package org.apache.cassandra.gms;
 import java.io.*;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 import javax.annotation.Nullable;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
@@ -84,6 +86,11 @@ public class EndpointState
         return applicationState.get().get(key);
     }
 
+    public boolean containsApplicationState(ApplicationState key)
+    {
+        return applicationState.get().containsKey(key);
+    }
+
     public Set<Map.Entry<ApplicationState, VersionedValue>> states()
     {
         return applicationState.get().entrySet();
@@ -114,6 +121,47 @@ public class EndpointState
         }
     }
 
+    void removeMajorVersion3LegacyApplicationStates()
+    {
+        while (hasLegacyFields())
+        {
+            Map<ApplicationState, VersionedValue> orig = applicationState.get();
+            Map<ApplicationState, VersionedValue> updatedStates = filterMajorVersion3LegacyApplicationStates(orig);
+            // avoid updating if no state is removed
+            if (orig.size() == updatedStates.size()
+                || applicationState.compareAndSet(orig, updatedStates))
+                return;
+        }
+    }
+
+    private boolean hasLegacyFields()
+    {
+        Set<ApplicationState> statesPresent = applicationState.get().keySet();
+        if (statesPresent.isEmpty())
+            return false;
+        return (statesPresent.contains(ApplicationState.STATUS) && statesPresent.contains(ApplicationState.STATUS_WITH_PORT))
+               || (statesPresent.contains(ApplicationState.INTERNAL_IP) && statesPresent.contains(ApplicationState.INTERNAL_ADDRESS_AND_PORT))
+               || (statesPresent.contains(ApplicationState.RPC_ADDRESS) && statesPresent.contains(ApplicationState.NATIVE_ADDRESS_AND_PORT));
+    }
+
+    private static Map<ApplicationState, VersionedValue> filterMajorVersion3LegacyApplicationStates(Map<ApplicationState, VersionedValue> states)
+    {
+        return states.entrySet().stream().filter(entry -> {
+                // Filter out pre-4.0 versions of data for more complete 4.0 versions
+                switch (entry.getKey())
+                {
+                    case INTERNAL_IP:
+                        return !states.containsKey(ApplicationState.INTERNAL_ADDRESS_AND_PORT);
+                    case STATUS:
+                        return !states.containsKey(ApplicationState.STATUS_WITH_PORT);
+                    case RPC_ADDRESS:
+                        return !states.containsKey(ApplicationState.NATIVE_ADDRESS_AND_PORT);
+                    default:
+                        return true;
+                }
+            }).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    }
+
     /* getters and setters */
     /**
      * @return System.nanoTime() when state was updated last time.
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index a092c77..b5434aa 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -175,9 +175,17 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         if (!upgradeInProgressPossible)
             return new ExpiringMemoizingSupplier.Memoized<>(null);
 
-        Iterable<InetAddressAndPort> allHosts = Iterables.concat(Gossiper.instance.getLiveMembers(), Gossiper.instance.getUnreachableMembers());
-
         CassandraVersion minVersion = SystemKeyspace.CURRENT_VERSION.familyLowerBound.get();
+
+        // Skip the round if the gossiper has not started yet
+        // Otherwise, upgradeInProgressPossible can be set to false wrongly.
+        if (!isEnabled())
+        {
+            return new ExpiringMemoizingSupplier.Memoized<>(minVersion);
+        }
+
+        Iterable<InetAddressAndPort> allHosts = Iterables.concat(Gossiper.instance.getLiveMembers(),
+                                                                 Gossiper.instance.getUnreachableMembers());
         boolean allHostsHaveKnownVersion = true;
         for (InetAddressAndPort host : allHosts)
         {
@@ -1394,7 +1402,9 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
             }
 
             EndpointState localEpStatePtr = endpointStateMap.get(ep);
-            EndpointState remoteState = removeRedundantApplicationStates(entry.getValue());
+            EndpointState remoteState = entry.getValue();
+            if (!hasMajorVersion3Nodes())
+                remoteState.removeMajorVersion3LegacyApplicationStates();
 
             /*
                 If state does not exist just add it. If it does then add it if the remote generation is greater.
@@ -1452,32 +1462,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         }
     }
 
-    // remove duplicated deprecated states
-    private static EndpointState removeRedundantApplicationStates(EndpointState remoteState)
-    {
-        if (remoteState.states().isEmpty())
-            return remoteState;
-
-        Map<ApplicationState, VersionedValue> updatedStates = remoteState.states().stream().filter(entry -> {
-            // Filter out pre-4.0 versions of data for more complete 4.0 versions
-            switch (entry.getKey())
-            {
-                case INTERNAL_IP:
-                    return (null == remoteState.getApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT));
-                case STATUS:
-                    return (null == remoteState.getApplicationState(ApplicationState.STATUS_WITH_PORT));
-                case RPC_ADDRESS:
-                    return (null == remoteState.getApplicationState(ApplicationState.NATIVE_ADDRESS_AND_PORT));
-                default:
-                    return true;
-            }
-        }).collect(Collectors.toMap(Entry::getKey, Entry::getValue));
-
-        EndpointState updated = new EndpointState(remoteState.getHeartBeatState(), updatedStates);
-        if (!remoteState.isAlive()) updated.markDead();
-        return updated;
-    }
-
     private void applyNewStates(InetAddressAndPort addr, EndpointState localState, EndpointState remoteState)
     {
         // don't assert here, since if the node restarts the version will go back to zero
@@ -1506,8 +1490,20 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         }
         localState.addApplicationStates(updatedStates);
 
+        // get rid of legacy fields once the cluster is not in mixed mode
+        if (!hasMajorVersion3Nodes())
+            localState.removeMajorVersion3LegacyApplicationStates();
+
         for (Entry<ApplicationState, VersionedValue> updatedEntry : updatedStates)
+        {
+            // filters out legacy change notifications
+            // only if local state already indicates that the peer has the new fields
+            if ((ApplicationState.INTERNAL_IP == updatedEntry.getKey() && localState.containsApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT))
+                ||(ApplicationState.STATUS == updatedEntry.getKey() && localState.containsApplicationState(ApplicationState.STATUS_WITH_PORT))
+                || (ApplicationState.RPC_ADDRESS == updatedEntry.getKey() && localState.containsApplicationState(ApplicationState.NATIVE_ADDRESS_AND_PORT)))
+                continue;
             doOnChangeNotifications(addr, updatedEntry.getKey(), updatedEntry.getValue());
+        }
     }
 
     // notify that a local application state is going to change (doesn't get triggered for remote changes)
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeGossipTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeGossipTest.java
new file mode 100644
index 0000000..83e911d
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeGossipTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.distributed.shared.Versions;
+import org.apache.cassandra.net.Verb;
+import org.assertj.core.api.Assertions;
+
+public class MixedModeGossipTest extends UpgradeTestBase
+{
+    Pattern expectedNormalStatus = Pattern.compile("STATUS:\\d+:NORMAL,-?\\d+");
+    Pattern expectedNormalStatusWithPort = Pattern.compile("STATUS_WITH_PORT:\\d+:NORMAL,-?\\d+");
+    Pattern expectedNormalX3 = Pattern.compile("X3:\\d+:NORMAL,-?\\d+");
+
+    @Test
+    public void testStatusFieldShouldExistInOldVersionNodes() throws Throwable
+    {
+        new TestCase()
+        .withConfig(c -> c.with(Feature.GOSSIP, Feature.NETWORK))
+        .nodes(3)
+        .nodesToUpgradeOrdered(1, 2, 3)
+        .upgrade(Versions.Major.v30, Versions.Major.v4)
+        .upgrade(Versions.Major.v3X, Versions.Major.v4)
+        .setup(c -> {})
+        .runAfterNodeUpgrade((cluster, node) -> {
+            if (node == 1) {
+                checkPeerGossipInfoShouldContainNormalStatus(cluster, 2);
+                checkPeerGossipInfoShouldContainNormalStatus(cluster, 3);
+            }
+            if (node == 2) {
+                checkPeerGossipInfoShouldContainNormalStatus(cluster, 3);
+            }
+        })
+        .runAfterClusterUpgrade(cluster -> {
+            // wait 1 minute for `org.apache.cassandra.gms.Gossiper.upgradeFromVersionSupplier` to update
+            Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MINUTES);
+            checkPeerGossipInfoOfAllNodesShouldContainNewStatusAfterUpgrade(cluster);
+        })
+        .run();
+    }
+
+    /**
+     * Similar to {@link #testStatusFieldShouldExistInOldVersionNodes}, but in an edge case that
+     * 1) node2 and node3 cannot gossip with each other.
+     * 2) node2 sends SYN to node1 first when upgrading.
+     * 3) node3 is at the lower version during the cluster upgrade
+     * In this case, node3 gossip info does not contain STATUS field for node2
+     */
+    @Test
+    public void testStatusFieldShouldExistInOldVersionNodesEdgeCase() throws Throwable
+    {
+        AtomicReference<IMessageFilters.Filter> n1GossipSynBlocker = new AtomicReference<>();
+        new TestCase()
+        .withConfig(c -> c.with(Feature.GOSSIP, Feature.NETWORK))
+        .nodes(3)
+        .nodesToUpgradeOrdered(1, 2, 3)
+        .upgrade(Versions.Major.v30, Versions.Major.v4)
+        .upgrade(Versions.Major.v3X, Versions.Major.v4)
+        .setup(cluster -> {
+            // node2 and node3 gossiper cannot talk with each other
+            cluster.filters().verbs(Verb.GOSSIP_DIGEST_SYN.id).from(2).to(3).drop();
+            cluster.filters().verbs(Verb.GOSSIP_DIGEST_SYN.id).from(3).to(2).drop();
+        })
+        .runAfterNodeUpgrade((cluster, node) -> {
+            // let node2 sends the SYN to node1 first
+            if (node == 1)
+            {
+                IMessageFilters.Filter filter = cluster.filters().verbs(Verb.GOSSIP_DIGEST_SYN.id).from(1).to(2).drop();
+                n1GossipSynBlocker.set(filter);
+            }
+            else if (node == 2)
+            {
+                n1GossipSynBlocker.get().off();
+                String node3GossipView = cluster.get(3).nodetoolResult("gossipinfo").getStdout();
+                String node2GossipState = getGossipStateOfNode(node3GossipView, "/127.0.0.2");
+                Assertions.assertThat(node2GossipState)
+                          .as("The node2's gossip state from node3's perspective should contain status. " +
+                              "And it should carry an unrecognized field X3 with NORMAL.")
+                          .containsPattern(expectedNormalStatus)
+                          .containsPattern(expectedNormalX3);
+            }
+        })
+        .runAfterClusterUpgrade(cluster -> {
+            // wait 1 minute for `org.apache.cassandra.gms.Gossiper.upgradeFromVersionSupplier` to update
+            Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MINUTES);
+            checkPeerGossipInfoOfAllNodesShouldContainNewStatusAfterUpgrade(cluster);
+        })
+        .run();
+    }
+
+    private void checkPeerGossipInfoOfAllNodesShouldContainNewStatusAfterUpgrade(UpgradeableCluster cluster)
+    {
+        for (int i = 1; i <= 3; i++)
+        {
+            int n = i;
+            checkPeerGossipInfo(cluster, i, (gossipInfo, peers) -> {
+                for (String p : peers)
+                {
+                    Assertions.assertThat(getGossipStateOfNode(gossipInfo, p))
+                              .as(String.format("%s gossip state in node%s should not contain STATUS " +
+                                                "and should contain STATUS_WITH_PORT.", p, n))
+                              .doesNotContain("STATUS:")
+                              .containsPattern(expectedNormalStatusWithPort);
+                }
+            });
+        }
+    }
+
+    private void checkPeerGossipInfoShouldContainNormalStatus(UpgradeableCluster cluster, int node)
+    {
+        checkPeerGossipInfo(cluster, node, (gossipInfo, peers) -> {
+            for (String n : peers)
+            {
+                Assertions.assertThat(getGossipStateOfNode(gossipInfo, n))
+                          .containsPattern(expectedNormalStatus);
+            }
+        });
+    }
+
+    private void checkPeerGossipInfo(UpgradeableCluster cluster, int node, BiConsumer<String, Set<String>> verifier)
+    {
+        Set<Integer> peers = new HashSet<>(Arrays.asList(1, 2, 3));
+        peers.remove(node);
+        String gossipInfo = cluster.get(node).nodetoolResult("gossipinfo").getStdout();
+        verifier.accept(gossipInfo, peers.stream().map(i -> "127.0.0." + i).collect(Collectors.toSet()));
+    }
+
+    private String getGossipStateOfNode(String rawOutput, String nodeInterested)
+    {
+        String temp = rawOutput.substring(rawOutput.indexOf(nodeInterested));
+        int nextSlashIndex = temp.indexOf('/', 1);
+        if (nextSlashIndex != -1)
+            return temp.substring(0, nextSlashIndex);
+        else
+            return temp;
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
index 39957e9..46c8d24 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.distributed.upgrade;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.function.Consumer;
@@ -32,7 +33,9 @@ import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.distributed.UpgradeableCluster;
 import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstance;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.impl.AbstractCluster.AbstractBuilder;
 import org.apache.cassandra.distributed.impl.Instance;
 import org.apache.cassandra.distributed.shared.DistributedTestBase;
 import org.apache.cassandra.distributed.shared.Versions;
@@ -93,7 +96,7 @@ public class UpgradeTestBase extends DistributedTestBase
         private RunOnCluster setup;
         private RunOnClusterAndNode runAfterNodeUpgrade;
         private RunOnCluster runAfterClusterUpgrade;
-        private final Set<Integer> nodesToUpgrade = new HashSet<>();
+        private final Set<Integer> nodesToUpgrade = new LinkedHashSet<>();
         private Consumer<IInstanceConfig> configConsumer;
 
         public TestCase()
@@ -191,13 +194,24 @@ public class UpgradeTestBase extends DistributedTestBase
         }
         public TestCase nodesToUpgrade(int ... nodes)
         {
+            Set<Integer> set = new HashSet<>(nodes.length);
+            for (int n : nodes)
+            {
+                set.add(n);
+            }
+            nodesToUpgrade.addAll(set);
+            return this;
+        }
+
+        public TestCase nodesToUpgradeOrdered(int ... nodes)
+        {
             for (int n : nodes)
             {
                 nodesToUpgrade.add(n);
             }
             return this;
         }
-    }
+     }
 
     protected TestCase allUpgrades(int nodes, int... toUpgrade)
     {
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
index 59770b8..4db1ef5 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 
+import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -72,7 +73,7 @@ public class InsertUpdateIfConditionTest extends CQLTester
     @BeforeClass
     public static void beforeClass()
     {
-        Gossiper.instance.maybeInitializeLocalState(0);
+        Gossiper.instance.start(0);
     }
 
     @Before
@@ -82,6 +83,12 @@ public class InsertUpdateIfConditionTest extends CQLTester
         assertion.run();
     }
 
+    @AfterClass
+    public static void afterClass()
+    {
+        Gossiper.instance.stop();
+    }
+
     /**
      * Migrated from cql_tests.py:TestCQL.cas_simple_test()
      */
diff --git a/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java b/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
index 194a92f..fd7ee25 100644
--- a/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
+++ b/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
@@ -31,6 +31,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.db.RegularAndStaticColumns;
 import org.apache.cassandra.db.marshal.Int32Type;
@@ -41,6 +42,7 @@ import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.locator.SimpleSnitch;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.TableMetadata;
@@ -89,7 +91,9 @@ public class ColumnFilterTest
     @BeforeClass
     public static void beforeClass()
     {
-        Gossiper.instance.maybeInitializeLocalState(0);
+        DatabaseDescriptor.setSeedProvider(Arrays::asList);
+        DatabaseDescriptor.setEndpointSnitch(new SimpleSnitch());
+        Gossiper.instance.start(0);
     }
 
     @Before
@@ -533,4 +537,4 @@ public class ColumnFilterTest
             }
         }
     }
-}
\ No newline at end of file
+}
diff --git a/test/unit/org/apache/cassandra/gms/GossiperTest.java b/test/unit/org/apache/cassandra/gms/GossiperTest.java
index c7abb44..ab28572 100644
--- a/test/unit/org/apache/cassandra/gms/GossiperTest.java
+++ b/test/unit/org/apache/cassandra/gms/GossiperTest.java
@@ -24,10 +24,13 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.net.InetAddresses;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -48,6 +51,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class GossiperTest
 {
@@ -81,6 +85,12 @@ public class GossiperTest
         DatabaseDescriptor.setSeedProvider(originalSeedProvider);
     }
 
+    @AfterClass
+    public static void afterClass()
+    {
+        Gossiper.instance.stop();
+    }
+
     @Test
     public void testPaddingIntact() throws Exception
     {
@@ -100,6 +110,7 @@ public class GossiperTest
     @Test
     public void testHasVersion3Nodes() throws Exception
     {
+        Gossiper.instance.start(0);
         Gossiper.instance.expireUpgradeFromVersion();
 
         VersionedValue.VersionedValueFactory factory = new VersionedValue.VersionedValueFactory(null);
@@ -186,6 +197,7 @@ public class GossiperTest
         VersionedValue.VersionedValueFactory valueFactory =
             new VersionedValue.VersionedValueFactory(DatabaseDescriptor.getPartitioner());
 
+        SimpleStateChangeListener stateChangeListener = null;
         Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 2);
         try
         {
@@ -204,28 +216,12 @@ public class GossiperTest
             VersionedValue tokensValue = valueFactory.tokens(Collections.singletonList(token));
             proposedRemoteState.addApplicationState(ApplicationState.TOKENS, tokensValue);
 
-            Gossiper.instance.register(
-            new IEndpointStateChangeSubscriber()
-            {
-                public void onJoin(InetAddressAndPort endpoint, EndpointState epState) { }
-
-                public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) { }
-
-                public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value)
-                {
-                    assertEquals(ApplicationState.TOKENS, state);
-                    stateChangedNum++;
-                }
-
-                public void onAlive(InetAddressAndPort endpoint, EndpointState state) { }
-
-                public void onDead(InetAddressAndPort endpoint, EndpointState state) { }
-
-                public void onRemove(InetAddressAndPort endpoint) { }
-
-                public void onRestart(InetAddressAndPort endpoint, EndpointState state) { }
-            }
-            );
+            stateChangeListener = new SimpleStateChangeListener();
+            stateChangeListener.setOnChangeVerifier(onChangeParams -> {
+                assertEquals(ApplicationState.TOKENS, onChangeParams.state);
+                stateChangedNum++;
+            });
+            Gossiper.instance.register(stateChangeListener);
 
             stateChangedNum = 0;
             Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, proposedRemoteState));
@@ -253,6 +249,8 @@ public class GossiperTest
         {
             // clean up the gossip states
             Gossiper.instance.endpointStateMap.clear();
+            if (stateChangeListener != null)
+                Gossiper.instance.unregister(stateChangeListener);
         }
     }
 
@@ -352,6 +350,107 @@ public class GossiperTest
             assertTrue(gossiper.getSeeds().contains(a.toString()));
     }
 
+    @Test
+    public void testNotFireDuplicatedNotificationsWithUpdateContainsOldAndNewState() throws UnknownHostException
+    {
+        VersionedValue.VersionedValueFactory valueFactory =
+            new VersionedValue.VersionedValueFactory(DatabaseDescriptor.getPartitioner());
+
+        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 2);
+        SimpleStateChangeListener stateChangeListener = null;
+        try
+        {
+            InetAddressAndPort remoteHostAddress = hosts.get(1);
+            EndpointState initialRemoteState = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress);
+            HeartBeatState initialRemoteHeartBeat = initialRemoteState.getHeartBeatState();
+            //Util.createInitialRing should have initialized remoteHost's HeartBeatState's generation to 1
+            assertEquals(initialRemoteHeartBeat.getGeneration(), 1);
+
+            // Test begins
+            AtomicInteger notificationCount = new AtomicInteger(0);
+            HeartBeatState proposedRemoteHeartBeat = new HeartBeatState(initialRemoteHeartBeat.getGeneration());
+            EndpointState proposedRemoteState = new EndpointState(proposedRemoteHeartBeat);
+            final Token token = DatabaseDescriptor.getPartitioner().getRandomToken();
+            proposedRemoteState.addApplicationState(ApplicationState.STATUS, valueFactory.normal(Collections.singletonList(token)));
+
+            stateChangeListener = new SimpleStateChangeListener();
+            Gossiper.instance.register(stateChangeListener);
+
+            // STEP 1. register verifier and apply state with just STATUS
+            // simulate applying gossip state from a v3 peer
+            stateChangeListener.setOnChangeVerifier(onChangeParams -> {
+                notificationCount.getAndIncrement();
+                assertEquals("It should fire notification for STATUS when gossiper local state not yet has STATUS_WITH_PORT",
+                             ApplicationState.STATUS, onChangeParams.state);
+            });
+            Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, proposedRemoteState));
+
+            // STEP 2. includes both STATUS and STATUS_WITH_PORT. The gossiper knows that the remote peer is now in v4
+            // update verifier and apply state again
+            proposedRemoteState.addApplicationState(ApplicationState.STATUS_WITH_PORT, valueFactory.normal(Collections.singletonList(token)));
+            stateChangeListener.setOnChangeVerifier(onChangeParams -> {
+                notificationCount.getAndIncrement();
+                assertEquals("It should only fire notification for STATUS_WITH_PORT",
+                             ApplicationState.STATUS_WITH_PORT, onChangeParams.state);
+            });
+            Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, proposedRemoteState));
+
+            // STEP 3. somehow, the peer send only the STATUS in the update.
+            proposedRemoteState = new EndpointState(proposedRemoteHeartBeat);
+            proposedRemoteState.addApplicationState(ApplicationState.STATUS, valueFactory.normal(Collections.singletonList(token)));
+            stateChangeListener.setOnChangeVerifier(onChangeParams -> {
+                notificationCount.getAndIncrement();
+                fail("It should not fire notification for STATUS");
+            });
+
+            assertEquals("Expect exact 2 notifications with the test setup",
+                         2, notificationCount.get());
+        }
+        finally
+        {
+            // clean up the gossip states
+            Gossiper.instance.endpointStateMap.clear();
+            if (stateChangeListener != null)
+                Gossiper.instance.unregister(stateChangeListener);
+        }
+    }
+
+    static class SimpleStateChangeListener implements IEndpointStateChangeSubscriber
+    {
+        static class OnChangeParams
+        {
+            InetAddressAndPort endpoint;
+            ApplicationState state;
+            VersionedValue value;
+
+            OnChangeParams(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value)
+            {
+                this.endpoint = endpoint;
+                this.state = state;
+                this.value = value;
+            }
+        }
+
+        private volatile Consumer<OnChangeParams> onChangeVerifier;
+
+        public void setOnChangeVerifier(Consumer<OnChangeParams> verifier)
+        {
+            onChangeVerifier = verifier;
+        }
+
+        public void onJoin(InetAddressAndPort endpoint, EndpointState epState) {}
+        public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
+        public void onAlive(InetAddressAndPort endpoint, EndpointState state) {}
+        public void onDead(InetAddressAndPort endpoint, EndpointState state) {}
+        public void onRemove(InetAddressAndPort endpoint) {}
+        public void onRestart(InetAddressAndPort endpoint, EndpointState state) {}
+
+        public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value)
+        {
+            onChangeVerifier.accept(new OnChangeParams(endpoint, state, value));
+        }
+    }
+
     static class TestSeedProvider implements SeedProvider
     {
         private List<InetAddressAndPort> seeds;

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org