You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jz...@apache.org on 2019/07/19 04:45:18 UTC

[cassandra] branch cassandra-3.0 updated: Avoid updating unchanged gossip states

This is an automated email from the ASF dual-hosted git repository.

jzhuang pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
     new 3f70e7c  Avoid updating unchanged gossip states
3f70e7c is described below

commit 3f70e7c72c703bc323b169a28e8754ce67d4e479
Author: Jay Zhuang <jz...@apache.org>
AuthorDate: Thu Apr 18 14:59:39 2019 -0700

    Avoid updating unchanged gossip states
    
    patch by Jay Zhuang; reviewed by Sam Tunnicliffe for CASSANDRA-15097
---
 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/gms/Gossiper.java    |  21 +++-
 .../org/apache/cassandra/gms/GossiperTest.java     | 128 +++++++++++++++++----
 3 files changed, 124 insertions(+), 26 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index e0494f5..f04b489 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.19
+ * Avoid updating unchanged gossip states (CASSANDRA-15097)
  * Prevent recreation of previously dropped columns with a different kind (CASSANDRA-14948)
  * Prevent client requests from blocking on executor task queue (CASSANDRA-15013)
  * Toughen up column drop/recreate type validations (CASSANDRA-15204)
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 4ea0a4a..c39f45a 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -23,6 +23,7 @@ import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.*;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Throwables;
@@ -1252,10 +1253,24 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
 
         Set<Entry<ApplicationState, VersionedValue>> remoteStates = remoteState.states();
         assert remoteState.getHeartBeatState().getGeneration() == localState.getHeartBeatState().getGeneration();
-        localState.addApplicationStates(remoteStates);
 
-        for (Entry<ApplicationState, VersionedValue> remoteEntry : remoteStates)
-            doOnChangeNotifications(addr, remoteEntry.getKey(), remoteEntry.getValue());
+        // filter out the states that are already up to date (has the same or higher version)
+        Set<Entry<ApplicationState, VersionedValue>> updatedStates = remoteStates.stream().filter(entry -> {
+            VersionedValue local = localState.getApplicationState(entry.getKey());
+            return (local == null || local.version < entry.getValue().version);
+            }).collect(Collectors.toSet());
+
+        if (logger.isTraceEnabled() && updatedStates.size() > 0)
+        {
+            for (Entry<ApplicationState, VersionedValue> entry : updatedStates)
+            {
+                logger.trace("Updating {} state version to {} for {}", entry.getKey().toString(), entry.getValue().version, addr);
+            }
+        }
+        localState.addApplicationStates(updatedStates);
+
+        for (Entry<ApplicationState, VersionedValue> updatedEntry : updatedStates)
+            doOnChangeNotifications(addr, updatedEntry.getKey(), updatedEntry.getValue());
     }
     
     // notify that a local application state is going to change (doesn't get triggered for remote changes)
diff --git a/test/unit/org/apache/cassandra/gms/GossiperTest.java b/test/unit/org/apache/cassandra/gms/GossiperTest.java
index f23c016..42e4483 100644
--- a/test/unit/org/apache/cassandra/gms/GossiperTest.java
+++ b/test/unit/org/apache/cassandra/gms/GossiperTest.java
@@ -21,23 +21,19 @@ package org.apache.cassandra.gms;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 
 import com.google.common.collect.ImmutableMap;
-import org.junit.After;
-import org.junit.AfterClass;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
-import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.service.StorageService;
 
@@ -68,32 +64,118 @@ public class GossiperTest
     public void testLargeGenerationJump() throws UnknownHostException, InterruptedException
     {
         Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 2);
-        InetAddress remoteHostAddress = hosts.get(1);
+        try
+        {
+            InetAddress remoteHostAddress = hosts.get(1);
 
-        EndpointState initialRemoteState = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress);
-        HeartBeatState initialRemoteHeartBeat = initialRemoteState.getHeartBeatState();
+            EndpointState initialRemoteState = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress);
+            HeartBeatState initialRemoteHeartBeat = initialRemoteState.getHeartBeatState();
 
-        //Util.createInitialRing should have initialized remoteHost's HeartBeatState's generation to 1
-        assertEquals(initialRemoteHeartBeat.getGeneration(), 1);
+            //Util.createInitialRing should have initialized remoteHost's HeartBeatState's generation to 1
+            assertEquals(initialRemoteHeartBeat.getGeneration(), 1);
 
-        HeartBeatState proposedRemoteHeartBeat = new HeartBeatState(initialRemoteHeartBeat.getGeneration() + Gossiper.MAX_GENERATION_DIFFERENCE + 1);
-        EndpointState proposedRemoteState = new EndpointState(proposedRemoteHeartBeat);
+            HeartBeatState proposedRemoteHeartBeat = new HeartBeatState(initialRemoteHeartBeat.getGeneration() + Gossiper.MAX_GENERATION_DIFFERENCE + 1);
+            EndpointState proposedRemoteState = new EndpointState(proposedRemoteHeartBeat);
 
-        Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, proposedRemoteState));
+            Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, proposedRemoteState));
 
-        //The generation should have been updated because it isn't over Gossiper.MAX_GENERATION_DIFFERENCE in the future
-        HeartBeatState actualRemoteHeartBeat = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState();
-        assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration());
+            //The generation should have been updated because it isn't over Gossiper.MAX_GENERATION_DIFFERENCE in the future
+            HeartBeatState actualRemoteHeartBeat = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState();
+            assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration());
 
-        //Propose a generation 10 years in the future - this should be rejected.
-        HeartBeatState badProposedRemoteHeartBeat = new HeartBeatState((int) (System.currentTimeMillis()/1000) + Gossiper.MAX_GENERATION_DIFFERENCE * 10);
-        EndpointState badProposedRemoteState = new EndpointState(badProposedRemoteHeartBeat);
+            //Propose a generation 10 years in the future - this should be rejected.
+            HeartBeatState badProposedRemoteHeartBeat = new HeartBeatState((int) (System.currentTimeMillis() / 1000) + Gossiper.MAX_GENERATION_DIFFERENCE * 10);
+            EndpointState badProposedRemoteState = new EndpointState(badProposedRemoteHeartBeat);
 
-        Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, badProposedRemoteState));
+            Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, badProposedRemoteState));
 
-        actualRemoteHeartBeat = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState();
+            actualRemoteHeartBeat = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState();
 
-        //The generation should not have been updated because it is over Gossiper.MAX_GENERATION_DIFFERENCE in the future
-        assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration());
+            //The generation should not have been updated because it is over Gossiper.MAX_GENERATION_DIFFERENCE in the future
+            assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration());
+        }
+        finally
+        {
+            // clean up the gossip states
+            Gossiper.instance.endpointStateMap.clear();
+        }
+    }
+
+    int stateChangedNum = 0;
+
+    @Test
+    public void testDuplicatedStateUpdate() throws Exception
+    {
+        VersionedValue.VersionedValueFactory valueFactory =
+            new VersionedValue.VersionedValueFactory(DatabaseDescriptor.getPartitioner());
+
+        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 2);
+        try
+        {
+            InetAddress remoteHostAddress = hosts.get(1);
+
+            EndpointState initialRemoteState = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress);
+            HeartBeatState initialRemoteHeartBeat = initialRemoteState.getHeartBeatState();
+
+            //Util.createInitialRing should have initialized remoteHost's HeartBeatState's generation to 1
+            assertEquals(initialRemoteHeartBeat.getGeneration(), 1);
+
+            HeartBeatState proposedRemoteHeartBeat = new HeartBeatState(initialRemoteHeartBeat.getGeneration());
+            EndpointState proposedRemoteState = new EndpointState(proposedRemoteHeartBeat);
+
+            final Token token = DatabaseDescriptor.getPartitioner().getRandomToken();
+            VersionedValue tokensValue = valueFactory.tokens(Collections.singletonList(token));
+            proposedRemoteState.addApplicationState(ApplicationState.TOKENS, tokensValue);
+
+            Gossiper.instance.register(
+            new IEndpointStateChangeSubscriber()
+            {
+                public void onJoin(InetAddress endpoint, EndpointState epState) { }
+
+                public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) { }
+
+                public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value)
+                {
+                    assertEquals(ApplicationState.TOKENS, state);
+                    stateChangedNum++;
+                }
+
+                public void onAlive(InetAddress endpoint, EndpointState state) { }
+
+                public void onDead(InetAddress endpoint, EndpointState state) { }
+
+                public void onRemove(InetAddress endpoint) { }
+
+                public void onRestart(InetAddress endpoint, EndpointState state) { }
+            }
+            );
+
+            stateChangedNum = 0;
+            Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, proposedRemoteState));
+            assertEquals(1, stateChangedNum);
+
+            HeartBeatState actualRemoteHeartBeat = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState();
+            assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration());
+
+            // Clone a new HeartBeatState
+            proposedRemoteHeartBeat = new HeartBeatState(initialRemoteHeartBeat.getGeneration(), proposedRemoteHeartBeat.getHeartBeatVersion());
+            proposedRemoteState = new EndpointState(proposedRemoteHeartBeat);
+
+            // Bump the heartbeat version and use the same TOKENS state
+            proposedRemoteHeartBeat.updateHeartBeat();
+            proposedRemoteState.addApplicationState(ApplicationState.TOKENS, tokensValue);
+
+            // The following state change should only update heartbeat without updating the TOKENS state
+            Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, proposedRemoteState));
+            assertEquals(1, stateChangedNum);
+
+            actualRemoteHeartBeat = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState();
+            assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration());
+        }
+        finally
+        {
+            // clean up the gossip states
+            Gossiper.instance.endpointStateMap.clear();
+        }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org