You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jz...@apache.org on 2019/07/19 04:45:54 UTC

[cassandra] branch trunk updated (26a134a -> ba6821b)

This is an automated email from the ASF dual-hosted git repository.

jzhuang pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from 26a134a  Merge branch 'cassandra-3.11' into trunk
     add 3f70e7c  Avoid updating unchanged gossip states
     add 71cb061  Merge branch 'cassandra-3.0' into cassandra-3.11
     new ba6821b  Merge branch 'cassandra-3.11' into trunk

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/gms/Gossiper.java    |  51 ++++++---
 .../org/apache/cassandra/gms/GossiperTest.java     | 123 ++++++++++++++++++---
 3 files changed, 140 insertions(+), 35 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-3.11' into trunk

Posted by jz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jzhuang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit ba6821be4f952781d573cd6ed705250de6aeb5f7
Merge: 26a134a 71cb061
Author: Jay Zhuang <jz...@apache.org>
AuthorDate: Thu Jul 18 21:25:55 2019 -0700

    Merge branch 'cassandra-3.11' into trunk

 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/gms/Gossiper.java    |  51 ++++++---
 .../org/apache/cassandra/gms/GossiperTest.java     | 123 ++++++++++++++++++---
 3 files changed, 140 insertions(+), 35 deletions(-)

diff --cc src/java/org/apache/cassandra/gms/Gossiper.java
index a6c9be7,6a862e5..062abe0
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@@ -22,14 -24,10 +22,15 @@@ import java.util.*
  import java.util.Map.Entry;
  import java.util.concurrent.*;
  import java.util.concurrent.locks.ReentrantLock;
 +import java.util.function.BooleanSupplier;
 +import java.util.function.Supplier;
 +import java.util.stream.Collectors;
 +
  import javax.annotation.Nullable;
+ import java.util.stream.Collectors;
  
  import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Suppliers;
  import com.google.common.base.Throwables;
  import com.google.common.collect.ImmutableList;
  import com.google.common.collect.ImmutableMap;
@@@ -1377,25 -1289,24 +1378,41 @@@ public class Gossiper implements IFailu
  
          Set<Entry<ApplicationState, VersionedValue>> remoteStates = remoteState.states();
          assert remoteState.getHeartBeatState().getGeneration() == localState.getHeartBeatState().getGeneration();
-         localState.addApplicationStates(remoteStates);
- 
-         //Filter out pre-4.0 versions of data for more complete 4.0 versions
-         Set<Entry<ApplicationState, VersionedValue>> filtered = remoteStates.stream().filter(entry -> {
-            switch (entry.getKey())
-            {
-                case INTERNAL_IP:
-                     return remoteState.getApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT) == null;
-                case STATUS:
-                    return remoteState.getApplicationState(ApplicationState.STATUS_WITH_PORT) == null;
-                case RPC_ADDRESS:
-                    return remoteState.getApplicationState(ApplicationState.NATIVE_ADDRESS_AND_PORT) == null;
-                default:
-                    return true;
-            }
+ 
 -        // filter out the states that are already up to date (has the same or higher version)
++
+         Set<Entry<ApplicationState, VersionedValue>> updatedStates = remoteStates.stream().filter(entry -> {
++            // Filter out pre-4.0 versions of data for more complete 4.0 versions
++            switch (entry.getKey())
++            {
++                case INTERNAL_IP:
++                    if (remoteState.getApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT) != null) return false;
++                    break;
++                case STATUS:
++                    if (remoteState.getApplicationState(ApplicationState.STATUS_WITH_PORT) != null) return false;
++                    break;
++                case RPC_ADDRESS:
++                    if (remoteState.getApplicationState(ApplicationState.NATIVE_ADDRESS_AND_PORT) != null) return false;
++                    break;
++                default:
++                    break;
++            }
++
++            // filter out the states that are already up to date (has the same or higher version)
+             VersionedValue local = localState.getApplicationState(entry.getKey());
+             return (local == null || local.version < entry.getValue().version);
 -            }).collect(Collectors.toSet());
 +        }).collect(Collectors.toSet());
  
-         for (Entry<ApplicationState, VersionedValue> remoteEntry : filtered)
-             doOnChangeNotifications(addr, remoteEntry.getKey(), remoteEntry.getValue());
+         if (logger.isTraceEnabled() && updatedStates.size() > 0)
+         {
+             for (Entry<ApplicationState, VersionedValue> entry : updatedStates)
+             {
+                 logger.trace("Updating {} state version to {} for {}", entry.getKey().toString(), entry.getValue().version, addr);
+             }
+         }
+         localState.addApplicationStates(updatedStates);
+ 
+         for (Entry<ApplicationState, VersionedValue> updatedEntry : updatedStates)
+             doOnChangeNotifications(addr, updatedEntry.getKey(), updatedEntry.getValue());
      }
  
      // notify that a local application state is going to change (doesn't get triggered for remote changes)
diff --cc test/unit/org/apache/cassandra/gms/GossiperTest.java
index 9c25b86,b6b3ffb..97c577c
--- a/test/unit/org/apache/cassandra/gms/GossiperTest.java
+++ b/test/unit/org/apache/cassandra/gms/GossiperTest.java
@@@ -114,158 -74,171 +115,244 @@@ public class GossiperTes
      }
  
      @Test
 -    public void testLargeGenerationJump() throws UnknownHostException
 +    public void testLargeGenerationJump() throws UnknownHostException, InterruptedException
      {
          Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 2);
-         InetAddressAndPort remoteHostAddress = hosts.get(1);
+         try
+         {
 -            InetAddress remoteHostAddress = hosts.get(1);
++            InetAddressAndPort remoteHostAddress = hosts.get(1);
+ 
+             EndpointState initialRemoteState = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress);
+             HeartBeatState initialRemoteHeartBeat = initialRemoteState.getHeartBeatState();
+ 
+             //Util.createInitialRing should have initialized remoteHost's HeartBeatState's generation to 1
+             assertEquals(initialRemoteHeartBeat.getGeneration(), 1);
+ 
+             HeartBeatState proposedRemoteHeartBeat = new HeartBeatState(initialRemoteHeartBeat.getGeneration() + Gossiper.MAX_GENERATION_DIFFERENCE + 1);
+             EndpointState proposedRemoteState = new EndpointState(proposedRemoteHeartBeat);
+ 
+             Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, proposedRemoteState));
+ 
+             //The generation should have been updated because it isn't over Gossiper.MAX_GENERATION_DIFFERENCE in the future
+             HeartBeatState actualRemoteHeartBeat = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState();
+             assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration());
+ 
+             //Propose a generation 10 years in the future - this should be rejected.
+             HeartBeatState badProposedRemoteHeartBeat = new HeartBeatState((int) (System.currentTimeMillis() / 1000) + Gossiper.MAX_GENERATION_DIFFERENCE * 10);
+             EndpointState badProposedRemoteState = new EndpointState(badProposedRemoteHeartBeat);
+ 
+             Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, badProposedRemoteState));
+ 
+             actualRemoteHeartBeat = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState();
+ 
+             //The generation should not have been updated because it is over Gossiper.MAX_GENERATION_DIFFERENCE in the future
+             assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration());
+         }
+         finally
+         {
+             // clean up the gossip states
+             Gossiper.instance.endpointStateMap.clear();
+         }
+     }
+ 
+     int stateChangedNum = 0;
+ 
+     @Test
+     public void testDuplicatedStateUpdate() throws Exception
+     {
+         VersionedValue.VersionedValueFactory valueFactory =
+             new VersionedValue.VersionedValueFactory(DatabaseDescriptor.getPartitioner());
+ 
+         Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 2);
+         try
+         {
 -            InetAddress remoteHostAddress = hosts.get(1);
++            InetAddressAndPort remoteHostAddress = hosts.get(1);
+ 
+             EndpointState initialRemoteState = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress);
+             HeartBeatState initialRemoteHeartBeat = initialRemoteState.getHeartBeatState();
+ 
+             //Util.createInitialRing should have initialized remoteHost's HeartBeatState's generation to 1
+             assertEquals(initialRemoteHeartBeat.getGeneration(), 1);
+ 
+             HeartBeatState proposedRemoteHeartBeat = new HeartBeatState(initialRemoteHeartBeat.getGeneration());
+             EndpointState proposedRemoteState = new EndpointState(proposedRemoteHeartBeat);
  
-         EndpointState initialRemoteState = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress);
-         HeartBeatState initialRemoteHeartBeat = initialRemoteState.getHeartBeatState();
+             final Token token = DatabaseDescriptor.getPartitioner().getRandomToken();
+             VersionedValue tokensValue = valueFactory.tokens(Collections.singletonList(token));
+             proposedRemoteState.addApplicationState(ApplicationState.TOKENS, tokensValue);
  
-         //Util.createInitialRing should have initialized remoteHost's HeartBeatState's generation to 1
-         assertEquals(initialRemoteHeartBeat.getGeneration(), 1);
+             Gossiper.instance.register(
+             new IEndpointStateChangeSubscriber()
+             {
 -                public void onJoin(InetAddress endpoint, EndpointState epState) { }
++                public void onJoin(InetAddressAndPort endpoint, EndpointState epState) { }
  
-         HeartBeatState proposedRemoteHeartBeat = new HeartBeatState(initialRemoteHeartBeat.getGeneration() + Gossiper.MAX_GENERATION_DIFFERENCE + 1);
-         EndpointState proposedRemoteState = new EndpointState(proposedRemoteHeartBeat);
 -                public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) { }
++                public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) { }
  
-         Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, proposedRemoteState));
 -                public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value)
++                public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value)
+                 {
+                     assertEquals(ApplicationState.TOKENS, state);
+                     stateChangedNum++;
+                 }
  
-         //The generation should have been updated because it isn't over Gossiper.MAX_GENERATION_DIFFERENCE in the future
-         HeartBeatState actualRemoteHeartBeat = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState();
-         assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration());
 -                public void onAlive(InetAddress endpoint, EndpointState state) { }
++                public void onAlive(InetAddressAndPort endpoint, EndpointState state) { }
  
-         //Propose a generation 10 years in the future - this should be rejected.
-         HeartBeatState badProposedRemoteHeartBeat = new HeartBeatState((int) (System.currentTimeMillis()/1000) + Gossiper.MAX_GENERATION_DIFFERENCE * 10);
-         EndpointState badProposedRemoteState = new EndpointState(badProposedRemoteHeartBeat);
 -                public void onDead(InetAddress endpoint, EndpointState state) { }
++                public void onDead(InetAddressAndPort endpoint, EndpointState state) { }
  
-         Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, badProposedRemoteState));
 -                public void onRemove(InetAddress endpoint) { }
++                public void onRemove(InetAddressAndPort endpoint) { }
  
-         actualRemoteHeartBeat = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState();
 -                public void onRestart(InetAddress endpoint, EndpointState state) { }
++                public void onRestart(InetAddressAndPort endpoint, EndpointState state) { }
+             }
+             );
  
-         //The generation should not have been updated because it is over Gossiper.MAX_GENERATION_DIFFERENCE in the future
-         assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration());
+             stateChangedNum = 0;
+             Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, proposedRemoteState));
+             assertEquals(1, stateChangedNum);
+ 
+             HeartBeatState actualRemoteHeartBeat = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState();
+             assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration());
+ 
+             // Clone a new HeartBeatState
+             proposedRemoteHeartBeat = new HeartBeatState(initialRemoteHeartBeat.getGeneration(), proposedRemoteHeartBeat.getHeartBeatVersion());
+             proposedRemoteState = new EndpointState(proposedRemoteHeartBeat);
+ 
+             // Bump the heartbeat version and use the same TOKENS state
+             proposedRemoteHeartBeat.updateHeartBeat();
+             proposedRemoteState.addApplicationState(ApplicationState.TOKENS, tokensValue);
+ 
+             // The following state change should only update heartbeat without updating the TOKENS state
+             Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, proposedRemoteState));
+             assertEquals(1, stateChangedNum);
+ 
+             actualRemoteHeartBeat = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState();
+             assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration());
+         }
+         finally
+         {
+             // clean up the gossip states
+             Gossiper.instance.endpointStateMap.clear();
+         }
      }
  
 +    // Note: This test might fail if for some reason the node broadcast address is in 127.99.0.0/16
      @Test
 -    public void testSchemaVersionUpdate() throws UnknownHostException, InterruptedException
 +    public void testReloadSeeds() throws UnknownHostException
      {
 -        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 2);
 -        MessagingService.instance().listen();
 -        Gossiper.instance.start(1);
 -        InetAddress remoteHostAddress = hosts.get(1);
 -
 -        EndpointState initialRemoteState = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress);
 -        // Set to any 3.0 version
 -        Gossiper.instance.injectApplicationState(remoteHostAddress, ApplicationState.RELEASE_VERSION, StorageService.instance.valueFactory.releaseVersion("3.0.14"));
 +        Gossiper gossiper = new Gossiper(false);
 +        List<String> loadedList;
 +
 +        // Initialize the seed list directly to a known set to start with
 +        gossiper.seeds.clear();
 +        InetAddressAndPort addr = InetAddressAndPort.getByAddress(InetAddress.getByName("127.99.1.1"));
 +        int nextSize = 4;
 +        List<InetAddressAndPort> nextSeeds = new ArrayList<>(nextSize);
 +        for (int i = 0; i < nextSize; i ++)
 +        {
 +            gossiper.seeds.add(addr);
 +            nextSeeds.add(addr);
 +            addr = InetAddressAndPort.getByAddress(InetAddresses.increment(addr.address));
 +        }
 +        Assert.assertEquals(nextSize, gossiper.seeds.size());
 +
 +        // Add another unique address to the list
 +        addr = InetAddressAndPort.getByAddress(InetAddresses.increment(addr.address));
 +        nextSeeds.add(addr);
 +        nextSize++;
 +        DatabaseDescriptor.setSeedProvider(new TestSeedProvider(nextSeeds));
 +        loadedList = gossiper.reloadSeeds();
 +
 +        // Check that the new entry was added
 +        Assert.assertEquals(nextSize, loadedList.size());
 +        for (InetAddressAndPort a : nextSeeds)
 +            assertTrue(loadedList.contains(a.toString()));
 +
 +        // Check that the return value of the reloadSeeds matches the content of the getSeeds call
 +        // and that they both match the internal contents of the Gossiper seeds list
 +        Assert.assertEquals(loadedList.size(), gossiper.getSeeds().size());
 +        for (InetAddressAndPort a : gossiper.seeds)
 +        {
 +            assertTrue(loadedList.contains(a.toString()));
 +            assertTrue(gossiper.getSeeds().contains(a.toString()));
 +        }
  
 -        Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, initialRemoteState));
 +        // Add a duplicate of the last address to the seed provider list
 +        int uniqueSize = nextSize;
 +        nextSeeds.add(addr);
 +        nextSize++;
 +        DatabaseDescriptor.setSeedProvider(new TestSeedProvider(nextSeeds));
 +        loadedList = gossiper.reloadSeeds();
 +
 +        // Check that the number of seed nodes reported hasn't increased
 +        Assert.assertEquals(uniqueSize, loadedList.size());
 +        for (InetAddressAndPort a : nextSeeds)
 +            assertTrue(loadedList.contains(a.toString()));
 +
 +        // Create a new list that has no overlaps with the previous list
 +        addr = InetAddressAndPort.getByAddress(InetAddress.getByName("127.99.2.1"));
 +        int disjointSize = 3;
 +        List<InetAddressAndPort> disjointSeeds = new ArrayList<>(disjointSize);
 +        for (int i = 0; i < disjointSize; i ++)
 +        {
 +            disjointSeeds.add(addr);
 +            addr = InetAddressAndPort.getByAddress(InetAddresses.increment(addr.address));
 +        }
 +        DatabaseDescriptor.setSeedProvider(new TestSeedProvider(disjointSeeds));
 +        loadedList = gossiper.reloadSeeds();
  
 -        // wait until the schema is set
 -        VersionedValue schema = null;
 -        for (int i = 0; i < 10; i++)
 +        // Check that the list now contains exactly the new other list.
 +        Assert.assertEquals(disjointSize, gossiper.getSeeds().size());
 +        Assert.assertEquals(disjointSize, loadedList.size());
 +        for (InetAddressAndPort a : disjointSeeds)
          {
 -            EndpointState localState = Gossiper.instance.getEndpointStateForEndpoint(hosts.get(0));
 -            schema = localState.getApplicationState(ApplicationState.SCHEMA);
 -            if (schema != null)
 -                break;
 -            Thread.sleep(1000);
 +            assertTrue(gossiper.getSeeds().contains(a.toString()));
 +            assertTrue(loadedList.contains(a.toString()));
          }
  
 -        // schema is set and equals to "alternative" version
 -        assertTrue(schema != null);
 -        assertEquals(schema.value, Schema.instance.getAltVersion().toString());
 +        // Set the seed node provider to return an empty list
 +        DatabaseDescriptor.setSeedProvider(new TestSeedProvider(new ArrayList<InetAddressAndPort>()));
 +        loadedList = gossiper.reloadSeeds();
 +
 +        // Check that the in memory seed node list was not modified
 +        Assert.assertEquals(disjointSize, loadedList.size());
 +        for (InetAddressAndPort a : disjointSeeds)
 +            assertTrue(loadedList.contains(a.toString()));
 +
 +        // Change the seed provider to one that throws an unchecked exception
 +        DatabaseDescriptor.setSeedProvider(new ErrorSeedProvider());
 +        loadedList = gossiper.reloadSeeds();
  
 -        // Upgrade remote host version to the latest one (3.11)
 -        Gossiper.instance.injectApplicationState(remoteHostAddress, ApplicationState.RELEASE_VERSION, StorageService.instance.valueFactory.releaseVersion());
 +        // Check for the expected null response from a reload error
 +        Assert.assertNull(loadedList);
  
 -        Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, initialRemoteState));
 +        // Check that the in memory seed node list was not modified and the exception was caught
 +        Assert.assertEquals(disjointSize, gossiper.getSeeds().size());
 +        for (InetAddressAndPort a : disjointSeeds)
 +            assertTrue(gossiper.getSeeds().contains(a.toString()));
 +    }
 +
 +    static class TestSeedProvider implements SeedProvider
 +    {
 +        private List<InetAddressAndPort> seeds;
 +
 +        TestSeedProvider(List<InetAddressAndPort> seeds)
 +        {
 +            this.seeds = seeds;
 +        }
  
 -        // wait until the schema change
 -        VersionedValue newSchema = null;
 -        for (int i = 0; i < 10; i++)
 +        @Override
 +        public List<InetAddressAndPort> getSeeds()
          {
 -            EndpointState localState = Gossiper.instance.getEndpointStateForEndpoint(hosts.get(0));
 -            newSchema = localState.getApplicationState(ApplicationState.SCHEMA);
 -            if (!schema.value.equals(newSchema.value))
 -                break;
 -            Thread.sleep(1000);
 +            return seeds;
          }
 +    }
  
 -        // schema is changed and equals to real version
 -        assertFalse(schema.value.equals(newSchema.value));
 -        assertEquals(newSchema.value, Schema.instance.getRealVersion().toString());
 +    // A seed provider for testing which throws assertion errors when queried
 +    static class ErrorSeedProvider implements SeedProvider
 +    {
 +        @Override
 +        public List<InetAddressAndPort> getSeeds()
 +        {
 +            assert(false);
 +            return new ArrayList<>();
 +        }
      }
  }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org