You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/06/21 20:33:39 UTC
[3/7] git commit: Gossiper.handleMajorStateChange can lose existing
node ApplicationState patch by jasobrown;
reviewe4d by jbellis for CASSANDRA-5665
Gossiper.handleMajorStateChange can lose existing node ApplicationState
patch by jasobrown; reviewe4d by jbellis for CASSANDRA-5665
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b7e13b89
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b7e13b89
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b7e13b89
Branch: refs/heads/trunk
Commit: b7e13b89c265c28acfb624a984b97a06a837c3ea
Parents: 110d283
Author: Jason Brown <ja...@gmail.com>
Authored: Fri Jun 21 09:23:17 2013 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Fri Jun 21 09:23:52 2013 -0700
----------------------------------------------------------------------
src/java/org/apache/cassandra/gms/Gossiper.java | 47 +++++++++++---------
1 file changed, 26 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b7e13b89/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index efa9865..6b0bbe9 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -871,6 +871,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
if (logger.isTraceEnabled())
logger.trace("Updating heartbeat state generation to " + remoteGeneration + " from " + localGeneration + " for " + ep);
// major state change will handle the update by inserting the remote state directly
+ copyNewerApplicationStates(remoteState, localEpStatePtr);
handleMajorStateChange(ep, remoteState);
}
else if ( remoteGeneration == localGeneration ) // generation has not changed, apply new states
@@ -880,11 +881,18 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
int remoteMaxVersion = getMaxEndpointStateVersion(remoteState);
if ( remoteMaxVersion > localMaxVersion )
{
- // apply states, but do not notify since there is no major change
- applyNewStates(ep, localEpStatePtr, remoteState);
+ if (logger.isTraceEnabled())
+ {
+ logger.trace("Updating heartbeat state version to " + remoteState.getHeartBeatState().getHeartBeatVersion() +
+ " from " + localEpStatePtr.getHeartBeatState().getHeartBeatVersion() + " for " + ep);
+ }
+ localEpStatePtr.setHeartBeatState(remoteState.getHeartBeatState());
+ Map<ApplicationState, VersionedValue> merged = copyNewerApplicationStates(localEpStatePtr, remoteState);
+ for (Entry<ApplicationState, VersionedValue> appState : merged.entrySet())
+ doNotifications(ep, appState.getKey(), appState.getValue());
}
else if (logger.isTraceEnabled())
- logger.trace("Ignoring remote version " + remoteMaxVersion + " <= " + localMaxVersion + " for " + ep);
+ logger.trace("Ignoring remote version " + remoteMaxVersion + " <= " + localMaxVersion + " for " + ep);
if (!localEpStatePtr.isAlive() && !isDeadState(localEpStatePtr)) // unless of course, it was dead
markAlive(ep, localEpStatePtr);
}
@@ -903,28 +911,25 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
}
}
- private void applyNewStates(InetAddress addr, EndpointState localState, EndpointState remoteState)
+ private Map<ApplicationState, VersionedValue> copyNewerApplicationStates(EndpointState toState, EndpointState fromState)
{
- // don't assert here, since if the node restarts the version will go back to zero
- int oldVersion = localState.getHeartBeatState().getHeartBeatVersion();
-
- localState.setHeartBeatState(remoteState.getHeartBeatState());
- if (logger.isTraceEnabled())
- logger.trace("Updating heartbeat state version to " + localState.getHeartBeatState().getHeartBeatVersion() + " from " + oldVersion + " for " + addr + " ...");
-
- // we need to make two loops here, one to apply, then another to notify, this way all states in an update are present and current when the notifications are received
- for (Entry<ApplicationState, VersionedValue> remoteEntry : remoteState.getApplicationStateMap().entrySet())
+ Map<ApplicationState, VersionedValue> merged = new HashMap<ApplicationState, VersionedValue>();
+ for (Entry<ApplicationState, VersionedValue> fromEntry : fromState.getApplicationStateMap().entrySet())
{
- ApplicationState remoteKey = remoteEntry.getKey();
- VersionedValue remoteValue = remoteEntry.getValue();
+ ApplicationState key = fromEntry.getKey();
+ VersionedValue value = fromEntry.getValue();
+ assert fromState.getHeartBeatState().getGeneration() == toState.getHeartBeatState().getGeneration();
- assert remoteState.getHeartBeatState().getGeneration() == localState.getHeartBeatState().getGeneration();
- localState.addApplicationState(remoteKey, remoteValue);
- }
- for (Entry<ApplicationState, VersionedValue> remoteEntry : remoteState.getApplicationStateMap().entrySet())
- {
- doNotifications(addr, remoteEntry.getKey(), remoteEntry.getValue());
+ if ( (toState.applicationState.containsKey(key) && toState.applicationState.get(key).compareTo(value) < 0)
+ || !toState.applicationState.containsKey(key) )
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("merging {}:{} into ApplicationState", key, value);
+ toState.addApplicationState(key, value);
+ merged.put(key, value);
+ }
}
+ return merged;
}
// notify that an application state has changed