You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jz...@apache.org on 2019/07/19 04:45:18 UTC
[cassandra] branch cassandra-3.0 updated: Avoid updating unchanged
gossip states
This is an automated email from the ASF dual-hosted git repository.
jzhuang pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
new 3f70e7c Avoid updating unchanged gossip states
3f70e7c is described below
commit 3f70e7c72c703bc323b169a28e8754ce67d4e479
Author: Jay Zhuang <jz...@apache.org>
AuthorDate: Thu Apr 18 14:59:39 2019 -0700
Avoid updating unchanged gossip states
patch by Jay Zhuang; reviewed by Sam Tunnicliffe for CASSANDRA-15097
---
CHANGES.txt | 1 +
src/java/org/apache/cassandra/gms/Gossiper.java | 21 +++-
.../org/apache/cassandra/gms/GossiperTest.java | 128 +++++++++++++++++----
3 files changed, 124 insertions(+), 26 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index e0494f5..f04b489 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.19
+ * Avoid updating unchanged gossip states (CASSANDRA-15097)
* Prevent recreation of previously dropped columns with a different kind (CASSANDRA-14948)
* Prevent client requests from blocking on executor task queue (CASSANDRA-15013)
* Toughen up column drop/recreate type validations (CASSANDRA-15204)
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 4ea0a4a..c39f45a 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -23,6 +23,7 @@ import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
@@ -1252,10 +1253,24 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
Set<Entry<ApplicationState, VersionedValue>> remoteStates = remoteState.states();
assert remoteState.getHeartBeatState().getGeneration() == localState.getHeartBeatState().getGeneration();
- localState.addApplicationStates(remoteStates);
- for (Entry<ApplicationState, VersionedValue> remoteEntry : remoteStates)
- doOnChangeNotifications(addr, remoteEntry.getKey(), remoteEntry.getValue());
+ // filter out the states that are already up to date (has the same or higher version)
+ Set<Entry<ApplicationState, VersionedValue>> updatedStates = remoteStates.stream().filter(entry -> {
+ VersionedValue local = localState.getApplicationState(entry.getKey());
+ return (local == null || local.version < entry.getValue().version);
+ }).collect(Collectors.toSet());
+
+ if (logger.isTraceEnabled() && updatedStates.size() > 0)
+ {
+ for (Entry<ApplicationState, VersionedValue> entry : updatedStates)
+ {
+ logger.trace("Updating {} state version to {} for {}", entry.getKey().toString(), entry.getValue().version, addr);
+ }
+ }
+ localState.addApplicationStates(updatedStates);
+
+ for (Entry<ApplicationState, VersionedValue> updatedEntry : updatedStates)
+ doOnChangeNotifications(addr, updatedEntry.getKey(), updatedEntry.getValue());
}
// notify that a local application state is going to change (doesn't get triggered for remote changes)
diff --git a/test/unit/org/apache/cassandra/gms/GossiperTest.java b/test/unit/org/apache/cassandra/gms/GossiperTest.java
index f23c016..42e4483 100644
--- a/test/unit/org/apache/cassandra/gms/GossiperTest.java
+++ b/test/unit/org/apache/cassandra/gms/GossiperTest.java
@@ -21,23 +21,19 @@ package org.apache.cassandra.gms;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.UUID;
import com.google.common.collect.ImmutableMap;
-import org.junit.After;
-import org.junit.AfterClass;
import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.Test;
-import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.RandomPartitioner;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.service.StorageService;
@@ -68,32 +64,118 @@ public class GossiperTest
public void testLargeGenerationJump() throws UnknownHostException, InterruptedException
{
Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 2);
- InetAddress remoteHostAddress = hosts.get(1);
+ try
+ {
+ InetAddress remoteHostAddress = hosts.get(1);
- EndpointState initialRemoteState = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress);
- HeartBeatState initialRemoteHeartBeat = initialRemoteState.getHeartBeatState();
+ EndpointState initialRemoteState = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress);
+ HeartBeatState initialRemoteHeartBeat = initialRemoteState.getHeartBeatState();
- //Util.createInitialRing should have initialized remoteHost's HeartBeatState's generation to 1
- assertEquals(initialRemoteHeartBeat.getGeneration(), 1);
+ //Util.createInitialRing should have initialized remoteHost's HeartBeatState's generation to 1
+ assertEquals(initialRemoteHeartBeat.getGeneration(), 1);
- HeartBeatState proposedRemoteHeartBeat = new HeartBeatState(initialRemoteHeartBeat.getGeneration() + Gossiper.MAX_GENERATION_DIFFERENCE + 1);
- EndpointState proposedRemoteState = new EndpointState(proposedRemoteHeartBeat);
+ HeartBeatState proposedRemoteHeartBeat = new HeartBeatState(initialRemoteHeartBeat.getGeneration() + Gossiper.MAX_GENERATION_DIFFERENCE + 1);
+ EndpointState proposedRemoteState = new EndpointState(proposedRemoteHeartBeat);
- Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, proposedRemoteState));
+ Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, proposedRemoteState));
- //The generation should have been updated because it isn't over Gossiper.MAX_GENERATION_DIFFERENCE in the future
- HeartBeatState actualRemoteHeartBeat = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState();
- assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration());
+ //The generation should have been updated because it isn't over Gossiper.MAX_GENERATION_DIFFERENCE in the future
+ HeartBeatState actualRemoteHeartBeat = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState();
+ assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration());
- //Propose a generation 10 years in the future - this should be rejected.
- HeartBeatState badProposedRemoteHeartBeat = new HeartBeatState((int) (System.currentTimeMillis()/1000) + Gossiper.MAX_GENERATION_DIFFERENCE * 10);
- EndpointState badProposedRemoteState = new EndpointState(badProposedRemoteHeartBeat);
+ //Propose a generation 10 years in the future - this should be rejected.
+ HeartBeatState badProposedRemoteHeartBeat = new HeartBeatState((int) (System.currentTimeMillis() / 1000) + Gossiper.MAX_GENERATION_DIFFERENCE * 10);
+ EndpointState badProposedRemoteState = new EndpointState(badProposedRemoteHeartBeat);
- Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, badProposedRemoteState));
+ Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, badProposedRemoteState));
- actualRemoteHeartBeat = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState();
+ actualRemoteHeartBeat = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState();
- //The generation should not have been updated because it is over Gossiper.MAX_GENERATION_DIFFERENCE in the future
- assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration());
+ //The generation should not have been updated because it is over Gossiper.MAX_GENERATION_DIFFERENCE in the future
+ assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration());
+ }
+ finally
+ {
+ // clean up the gossip states
+ Gossiper.instance.endpointStateMap.clear();
+ }
+ }
+
+ int stateChangedNum = 0;
+
+ @Test
+ public void testDuplicatedStateUpdate() throws Exception
+ {
+ VersionedValue.VersionedValueFactory valueFactory =
+ new VersionedValue.VersionedValueFactory(DatabaseDescriptor.getPartitioner());
+
+ Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 2);
+ try
+ {
+ InetAddress remoteHostAddress = hosts.get(1);
+
+ EndpointState initialRemoteState = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress);
+ HeartBeatState initialRemoteHeartBeat = initialRemoteState.getHeartBeatState();
+
+ //Util.createInitialRing should have initialized remoteHost's HeartBeatState's generation to 1
+ assertEquals(initialRemoteHeartBeat.getGeneration(), 1);
+
+ HeartBeatState proposedRemoteHeartBeat = new HeartBeatState(initialRemoteHeartBeat.getGeneration());
+ EndpointState proposedRemoteState = new EndpointState(proposedRemoteHeartBeat);
+
+ final Token token = DatabaseDescriptor.getPartitioner().getRandomToken();
+ VersionedValue tokensValue = valueFactory.tokens(Collections.singletonList(token));
+ proposedRemoteState.addApplicationState(ApplicationState.TOKENS, tokensValue);
+
+ Gossiper.instance.register(
+ new IEndpointStateChangeSubscriber()
+ {
+ public void onJoin(InetAddress endpoint, EndpointState epState) { }
+
+ public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) { }
+
+ public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value)
+ {
+ assertEquals(ApplicationState.TOKENS, state);
+ stateChangedNum++;
+ }
+
+ public void onAlive(InetAddress endpoint, EndpointState state) { }
+
+ public void onDead(InetAddress endpoint, EndpointState state) { }
+
+ public void onRemove(InetAddress endpoint) { }
+
+ public void onRestart(InetAddress endpoint, EndpointState state) { }
+ }
+ );
+
+ stateChangedNum = 0;
+ Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, proposedRemoteState));
+ assertEquals(1, stateChangedNum);
+
+ HeartBeatState actualRemoteHeartBeat = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState();
+ assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration());
+
+ // Clone a new HeartBeatState
+ proposedRemoteHeartBeat = new HeartBeatState(initialRemoteHeartBeat.getGeneration(), proposedRemoteHeartBeat.getHeartBeatVersion());
+ proposedRemoteState = new EndpointState(proposedRemoteHeartBeat);
+
+ // Bump the heartbeat version and use the same TOKENS state
+ proposedRemoteHeartBeat.updateHeartBeat();
+ proposedRemoteState.addApplicationState(ApplicationState.TOKENS, tokensValue);
+
+ // The following state change should only update heartbeat without updating the TOKENS state
+ Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, proposedRemoteState));
+ assertEquals(1, stateChangedNum);
+
+ actualRemoteHeartBeat = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress).getHeartBeatState();
+ assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration());
+ }
+ finally
+ {
+ // clean up the gossip states
+ Gossiper.instance.endpointStateMap.clear();
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org