You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jz...@apache.org on 2019/07/19 04:45:35 UTC

[cassandra] branch cassandra-3.11 updated (670dde9 -> 71cb061)

This is an automated email from the ASF dual-hosted git repository.

jzhuang pushed a change to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from 670dde9  Merge branch 'cassandra-3.0' into cassandra-3.11
     add 3f70e7c  Avoid updating unchanged gossip states
     new 71cb061  Merge branch 'cassandra-3.0' into cassandra-3.11

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/gms/Gossiper.java    |  21 +++-
 .../org/apache/cassandra/gms/GossiperTest.java     | 123 ++++++++++++++++++---
 3 files changed, 124 insertions(+), 21 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by jz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jzhuang pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 71cb0616b7710366a8cd364348c864d656dc5542
Merge: 670dde9 3f70e7c
Author: Jay Zhuang <jz...@apache.org>
AuthorDate: Thu Jul 18 21:23:17 2019 -0700

    Merge branch 'cassandra-3.0' into cassandra-3.11

 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/gms/Gossiper.java    |  21 +++-
 .../org/apache/cassandra/gms/GossiperTest.java     | 123 ++++++++++++++++++---
 3 files changed, 124 insertions(+), 21 deletions(-)

diff --cc CHANGES.txt
index f055068,f04b489..0233c0f
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,8 -1,5 +1,9 @@@
 -3.0.19
 +3.11.5
 + * Fix cassandra-env.sh to use $CASSANDRA_CONF to find cassandra-jaas.config (CASSANDRA-14305)
 + * Fixed nodetool cfstats printing index name twice (CASSANDRA-14903)
 + * Add flag to disable SASI indexes, and warnings on creation (CASSANDRA-14866)
 +Merged from 3.0:
+  * Avoid updating unchanged gossip states (CASSANDRA-15097)
   * Prevent recreation of previously dropped columns with a different kind (CASSANDRA-14948)
   * Prevent client requests from blocking on executor task queue (CASSANDRA-15013)
   * Toughen up column drop/recreate type validations (CASSANDRA-15204)
diff --cc src/java/org/apache/cassandra/gms/Gossiper.java
index 5d2e997,c39f45a..6a862e5
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@@ -24,7 -23,7 +24,8 @@@ import java.util.*
  import java.util.Map.Entry;
  import java.util.concurrent.*;
  import java.util.concurrent.locks.ReentrantLock;
 +import javax.annotation.Nullable;
+ import java.util.stream.Collectors;
  
  import com.google.common.annotations.VisibleForTesting;
  import com.google.common.base.Throwables;
@@@ -1288,12 -1253,26 +1289,26 @@@ public class Gossiper implements IFailu
  
          Set<Entry<ApplicationState, VersionedValue>> remoteStates = remoteState.states();
          assert remoteState.getHeartBeatState().getGeneration() == localState.getHeartBeatState().getGeneration();
-         localState.addApplicationStates(remoteStates);
  
-         for (Entry<ApplicationState, VersionedValue> remoteEntry : remoteStates)
-             doOnChangeNotifications(addr, remoteEntry.getKey(), remoteEntry.getValue());
+         // filter out the states that are already up to date (has the same or higher version)
+         Set<Entry<ApplicationState, VersionedValue>> updatedStates = remoteStates.stream().filter(entry -> {
+             VersionedValue local = localState.getApplicationState(entry.getKey());
+             return (local == null || local.version < entry.getValue().version);
+             }).collect(Collectors.toSet());
+ 
+         if (logger.isTraceEnabled() && updatedStates.size() > 0)
+         {
+             for (Entry<ApplicationState, VersionedValue> entry : updatedStates)
+             {
+                 logger.trace("Updating {} state version to {} for {}", entry.getKey().toString(), entry.getValue().version, addr);
+             }
+         }
+         localState.addApplicationStates(updatedStates);
+ 
+         for (Entry<ApplicationState, VersionedValue> updatedEntry : updatedStates)
+             doOnChangeNotifications(addr, updatedEntry.getKey(), updatedEntry.getValue());
      }
 -    
 +
      // notify that a local application state is going to change (doesn't get triggered for remote changes)
      private void doBeforeChangeNotifications(InetAddress addr, EndpointState epState, ApplicationState apState, VersionedValue newValue)
      {
diff --cc test/unit/org/apache/cassandra/gms/GossiperTest.java
index 448620a,42e4483..b6b3ffb
--- a/test/unit/org/apache/cassandra/gms/GossiperTest.java
+++ b/test/unit/org/apache/cassandra/gms/GossiperTest.java
@@@ -70,88 -58,124 +71,174 @@@ public class GossiperTes
      public void setup()
      {
          tmd.clearUnsafe();
 -    };
 +    }
  
      @Test
 -    public void testLargeGenerationJump() throws UnknownHostException, InterruptedException
 +    public void testLargeGenerationJump() throws UnknownHostException
      {
          Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 2);
-         InetAddress remoteHostAddress = hosts.get(1);
+         try
+         {
+             InetAddress remoteHostAddress = hosts.get(1);
  
-         EndpointState initialRemoteState = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress);
-         HeartBeatState initialRemoteHeartBeat = initialRemoteState.getHeartBeatState();
+             EndpointState initialRemoteState = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress);
+             HeartBeatState initialRemoteHeartBeat = initialRemoteState.getHeartBeatState();
  
-         //Util.createInitialRing should have initialized remoteHost's HeartBeatState's generation to 1
-         assertEquals(initialRemoteHeartBeat.getGeneration(), 1);
+             //Util.createInitialRing should have initialized remoteHost's HeartBeatState's generation to 1
+             assertEquals(initialRemoteHeartBeat.getGeneration(), 1);
  
-         HeartBeatState proposedRemoteHeartBeat = new HeartBeatState(initialRemoteHeartBeat.getGeneration() + Gossiper.MAX_GENERATION_DIFFERENCE + 1);
-         EndpointState proposedRemoteState = new EndpointState(proposedRemoteHeartBeat);
+             HeartBeatState proposedRemoteHeartBeat = new HeartBeatState(initialRemoteHeartBeat.getGeneration() + Gossiper.MAX_GENERATION_DIFFERENCE + 1);
+             EndpointState proposedRemoteState = new EndpointState(proposedRemoteHeartBeat);
  
-         Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, proposedRemoteState));
+             Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, proposedRemoteState));
  
-         //The generation should have been updated because it isn't over Gossiper.MAX_GENERATION_DIFFERENCE in the future
-         HeartBeatState actualRemoteHeartBeat = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState();
-         assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration());
+             //The generation should have been updated because it isn't over Gossiper.MAX_GENERATION_DIFFERENCE in the future
+             HeartBeatState actualRemoteHeartBeat = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState();
+             assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration());
  
-         //Propose a generation 10 years in the future - this should be rejected.
-         HeartBeatState badProposedRemoteHeartBeat = new HeartBeatState((int) (System.currentTimeMillis()/1000) + Gossiper.MAX_GENERATION_DIFFERENCE * 10);
-         EndpointState badProposedRemoteState = new EndpointState(badProposedRemoteHeartBeat);
+             //Propose a generation 10 years in the future - this should be rejected.
+             HeartBeatState badProposedRemoteHeartBeat = new HeartBeatState((int) (System.currentTimeMillis() / 1000) + Gossiper.MAX_GENERATION_DIFFERENCE * 10);
+             EndpointState badProposedRemoteState = new EndpointState(badProposedRemoteHeartBeat);
  
-         Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, badProposedRemoteState));
+             Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, badProposedRemoteState));
  
-         actualRemoteHeartBeat = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState();
+             actualRemoteHeartBeat = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState();
  
-         //The generation should not have been updated because it is over Gossiper.MAX_GENERATION_DIFFERENCE in the future
-         assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration());
+             //The generation should not have been updated because it is over Gossiper.MAX_GENERATION_DIFFERENCE in the future
+             assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration());
+         }
+         finally
+         {
+             // clean up the gossip states
+             Gossiper.instance.endpointStateMap.clear();
+         }
+     }
+ 
+     int stateChangedNum = 0;
+ 
+     @Test
+     public void testDuplicatedStateUpdate() throws Exception
+     {
+         VersionedValue.VersionedValueFactory valueFactory =
+             new VersionedValue.VersionedValueFactory(DatabaseDescriptor.getPartitioner());
+ 
+         Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 2);
+         try
+         {
+             InetAddress remoteHostAddress = hosts.get(1);
+ 
+             EndpointState initialRemoteState = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress);
+             HeartBeatState initialRemoteHeartBeat = initialRemoteState.getHeartBeatState();
+ 
+             //Util.createInitialRing should have initialized remoteHost's HeartBeatState's generation to 1
+             assertEquals(initialRemoteHeartBeat.getGeneration(), 1);
+ 
+             HeartBeatState proposedRemoteHeartBeat = new HeartBeatState(initialRemoteHeartBeat.getGeneration());
+             EndpointState proposedRemoteState = new EndpointState(proposedRemoteHeartBeat);
+ 
+             final Token token = DatabaseDescriptor.getPartitioner().getRandomToken();
+             VersionedValue tokensValue = valueFactory.tokens(Collections.singletonList(token));
+             proposedRemoteState.addApplicationState(ApplicationState.TOKENS, tokensValue);
+ 
+             Gossiper.instance.register(
+             new IEndpointStateChangeSubscriber()
+             {
+                 public void onJoin(InetAddress endpoint, EndpointState epState) { }
+ 
+                 public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) { }
+ 
+                 public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value)
+                 {
+                     assertEquals(ApplicationState.TOKENS, state);
+                     stateChangedNum++;
+                 }
+ 
+                 public void onAlive(InetAddress endpoint, EndpointState state) { }
+ 
+                 public void onDead(InetAddress endpoint, EndpointState state) { }
+ 
+                 public void onRemove(InetAddress endpoint) { }
+ 
+                 public void onRestart(InetAddress endpoint, EndpointState state) { }
+             }
+             );
+ 
+             stateChangedNum = 0;
+             Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, proposedRemoteState));
+             assertEquals(1, stateChangedNum);
+ 
+             HeartBeatState actualRemoteHeartBeat = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState();
+             assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration());
+ 
+             // Clone a new HeartBeatState
+             proposedRemoteHeartBeat = new HeartBeatState(initialRemoteHeartBeat.getGeneration(), proposedRemoteHeartBeat.getHeartBeatVersion());
+             proposedRemoteState = new EndpointState(proposedRemoteHeartBeat);
+ 
+             // Bump the heartbeat version and use the same TOKENS state
+             proposedRemoteHeartBeat.updateHeartBeat();
+             proposedRemoteState.addApplicationState(ApplicationState.TOKENS, tokensValue);
+ 
+             // The following state change should only update heartbeat without updating the TOKENS state
+             Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, proposedRemoteState));
+             assertEquals(1, stateChangedNum);
+ 
+             actualRemoteHeartBeat = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState();
+             assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration());
+         }
+         finally
+         {
+             // clean up the gossip states
+             Gossiper.instance.endpointStateMap.clear();
+         }
      }
 +
 +    @Test
 +    public void testSchemaVersionUpdate() throws UnknownHostException, InterruptedException
 +    {
 +        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 2);
 +        MessagingService.instance().listen();
 +        Gossiper.instance.start(1);
 +        InetAddress remoteHostAddress = hosts.get(1);
 +
 +        EndpointState initialRemoteState = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress);
 +        // Set to any 3.0 version
 +        Gossiper.instance.injectApplicationState(remoteHostAddress, ApplicationState.RELEASE_VERSION, StorageService.instance.valueFactory.releaseVersion("3.0.14"));
 +
 +        Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, initialRemoteState));
 +
 +        // wait until the schema is set
 +        VersionedValue schema = null;
 +        for (int i = 0; i < 10; i++)
 +        {
 +            EndpointState localState = Gossiper.instance.getEndpointStateForEndpoint(hosts.get(0));
 +            schema = localState.getApplicationState(ApplicationState.SCHEMA);
 +            if (schema != null)
 +                break;
 +            Thread.sleep(1000);
 +        }
 +
 +        // schema is set and equals to "alternative" version
 +        assertTrue(schema != null);
 +        assertEquals(schema.value, Schema.instance.getAltVersion().toString());
 +
 +        // Upgrade remote host version to the latest one (3.11)
 +        Gossiper.instance.injectApplicationState(remoteHostAddress, ApplicationState.RELEASE_VERSION, StorageService.instance.valueFactory.releaseVersion());
 +
 +        Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, initialRemoteState));
 +
 +        // wait until the schema change
 +        VersionedValue newSchema = null;
 +        for (int i = 0; i < 10; i++)
 +        {
 +            EndpointState localState = Gossiper.instance.getEndpointStateForEndpoint(hosts.get(0));
 +            newSchema = localState.getApplicationState(ApplicationState.SCHEMA);
 +            if (!schema.value.equals(newSchema.value))
 +                break;
 +            Thread.sleep(1000);
 +        }
 +
 +        // schema is changed and equals to real version
 +        assertFalse(schema.value.equals(newSchema.value));
 +        assertEquals(newSchema.value, Schema.instance.getRealVersion().toString());
 +    }
  }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org