You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/06/21 20:33:39 UTC

[3/7] git commit: Gossiper.handleMajorStateChange can lose existing node ApplicationState patch by jasobrown; reviewe4d by jbellis for CASSANDRA-5665

Gossiper.handleMajorStateChange can lose existing node ApplicationState
patch by jasobrown; reviewe4d by jbellis for CASSANDRA-5665


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b7e13b89
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b7e13b89
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b7e13b89

Branch: refs/heads/trunk
Commit: b7e13b89c265c28acfb624a984b97a06a837c3ea
Parents: 110d283
Author: Jason Brown <ja...@gmail.com>
Authored: Fri Jun 21 09:23:17 2013 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Fri Jun 21 09:23:52 2013 -0700

----------------------------------------------------------------------
 src/java/org/apache/cassandra/gms/Gossiper.java | 47 +++++++++++---------
 1 file changed, 26 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b7e13b89/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index efa9865..6b0bbe9 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -871,6 +871,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
                     if (logger.isTraceEnabled())
                         logger.trace("Updating heartbeat state generation to " + remoteGeneration + " from " + localGeneration + " for " + ep);
                     // major state change will handle the update by inserting the remote state directly
+                    copyNewerApplicationStates(remoteState, localEpStatePtr);
                     handleMajorStateChange(ep, remoteState);
                 }
                 else if ( remoteGeneration == localGeneration ) // generation has not changed, apply new states
@@ -880,11 +881,18 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
                     int remoteMaxVersion = getMaxEndpointStateVersion(remoteState);
                     if ( remoteMaxVersion > localMaxVersion )
                     {
-                        // apply states, but do not notify since there is no major change
-                        applyNewStates(ep, localEpStatePtr, remoteState);
+                        if (logger.isTraceEnabled())
+                        {
+                            logger.trace("Updating heartbeat state version to " + remoteState.getHeartBeatState().getHeartBeatVersion() +
+                                    " from " + localEpStatePtr.getHeartBeatState().getHeartBeatVersion() + " for " + ep);
+                        }
+                        localEpStatePtr.setHeartBeatState(remoteState.getHeartBeatState());
+                        Map<ApplicationState, VersionedValue> merged = copyNewerApplicationStates(localEpStatePtr, remoteState);
+                        for (Entry<ApplicationState, VersionedValue> appState : merged.entrySet())
+                            doNotifications(ep, appState.getKey(), appState.getValue());
                     }
                     else if (logger.isTraceEnabled())
-                            logger.trace("Ignoring remote version " + remoteMaxVersion + " <= " + localMaxVersion + " for " + ep);
+                        logger.trace("Ignoring remote version " + remoteMaxVersion + " <= " + localMaxVersion + " for " + ep);
                     if (!localEpStatePtr.isAlive() && !isDeadState(localEpStatePtr)) // unless of course, it was dead
                         markAlive(ep, localEpStatePtr);
                 }
@@ -903,28 +911,25 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         }
     }
 
-    private void applyNewStates(InetAddress addr, EndpointState localState, EndpointState remoteState)
+    private Map<ApplicationState, VersionedValue> copyNewerApplicationStates(EndpointState toState, EndpointState fromState)
     {
-        // don't assert here, since if the node restarts the version will go back to zero
-        int oldVersion = localState.getHeartBeatState().getHeartBeatVersion();
-
-        localState.setHeartBeatState(remoteState.getHeartBeatState());
-        if (logger.isTraceEnabled())
-            logger.trace("Updating heartbeat state version to " + localState.getHeartBeatState().getHeartBeatVersion() + " from " + oldVersion + " for " + addr + " ...");
-
-        // we need to make two loops here, one to apply, then another to notify, this way all states in an update are present and current when the notifications are received
-        for (Entry<ApplicationState, VersionedValue> remoteEntry : remoteState.getApplicationStateMap().entrySet())
+        Map<ApplicationState, VersionedValue> merged = new HashMap<ApplicationState, VersionedValue>();
+        for (Entry<ApplicationState, VersionedValue> fromEntry : fromState.getApplicationStateMap().entrySet())
         {
-            ApplicationState remoteKey = remoteEntry.getKey();
-            VersionedValue remoteValue = remoteEntry.getValue();
+            ApplicationState key = fromEntry.getKey();
+            VersionedValue value = fromEntry.getValue();
+            assert fromState.getHeartBeatState().getGeneration() == toState.getHeartBeatState().getGeneration();
 
-            assert remoteState.getHeartBeatState().getGeneration() == localState.getHeartBeatState().getGeneration();
-            localState.addApplicationState(remoteKey, remoteValue);
-        }
-        for (Entry<ApplicationState, VersionedValue> remoteEntry : remoteState.getApplicationStateMap().entrySet())
-        {
-            doNotifications(addr, remoteEntry.getKey(), remoteEntry.getValue());
+            if ( (toState.applicationState.containsKey(key) && toState.applicationState.get(key).compareTo(value) < 0)
+                || !toState.applicationState.containsKey(key) )
+            {
+                if (logger.isTraceEnabled())
+                    logger.trace("merging {}:{} into ApplicationState", key, value);
+                toState.addApplicationState(key, value);
+                merged.put(key, value);
+            }
         }
+        return merged;
     }
 
     // notify that an application state has changed