You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by kf...@apache.org on 2015/01/20 05:54:06 UTC

svn commit: r1653193 - in /tomcat/trunk/java/org/apache/catalina/tribes/tipis: AbstractReplicatedMap.java ReplicatedMap.java

Author: kfujino
Date: Tue Jan 20 04:54:05 2015
New Revision: 1653193

URL: http://svn.apache.org/r1653193
Log:
Fix behavior of ReplicatedMap when member has disappeared.
If map entry is primary, rebuild the backup members.
If primary node of map entry has disappeared, backup node is promoted to primary.

Modified:
    tomcat/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
    tomcat/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java

Modified: tomcat/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?rev=1653193&r1=1653192&r2=1653193&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java (original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java Tue Jan 20 04:54:05 2015
@@ -71,7 +71,7 @@ public abstract class AbstractReplicated
 //------------------------------------------------------------------------------
 //              INSTANCE VARIABLES
 //------------------------------------------------------------------------------
-    private final ConcurrentHashMap<K, MapEntry<K,V>> innerMap;
+    protected final ConcurrentHashMap<K, MapEntry<K,V>> innerMap;
 
     protected abstract int getStateMessageType();
 

Modified: tomcat/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java?rev=1653193&r1=1653192&r2=1653193&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java (original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java Tue Jan 20 04:54:05 2015
@@ -17,10 +17,14 @@
 package org.apache.catalina.tribes.tipis;
 
 import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Map;
 
 import org.apache.catalina.tribes.Channel;
 import org.apache.catalina.tribes.ChannelException;
 import org.apache.catalina.tribes.Member;
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
 
 /**
  * All-to-all replication for a hash map implementation. Each node in the cluster will carry an identical
@@ -49,6 +53,8 @@ public class ReplicatedMap<K,V> extends
 
     private static final long serialVersionUID = 1L;
 
+    private final Log log = LogFactory.getLog(ReplicatedMap.class);
+
     //--------------------------------------------------------------------------
     //              CONSTRUCTORS / DESTRUCTORS
     //--------------------------------------------------------------------------
@@ -134,4 +140,67 @@ public class ReplicatedMap<K,V> extends
         return backup;
     }
 
+    @Override
+    public void memberDisappeared(Member member) {
+        boolean removed = false;
+        synchronized (mapMembers) {
+            removed = (mapMembers.remove(member) != null );
+            if (!removed) {
+                if (log.isDebugEnabled()) log.debug("Member["+member+"] disappeared, but was not present in the map.");
+                return; //the member was not part of our map.
+            }
+        }
+        if (log.isInfoEnabled())
+            log.info("Member["+member+"] disappeared. Related map entries will be relocated to the new node.");
+        long start = System.currentTimeMillis();
+        Iterator<Map.Entry<K,MapEntry<K,V>>> i = innerMap.entrySet().iterator();
+        while (i.hasNext()) {
+            Map.Entry<K,MapEntry<K,V>> e = i.next();
+            MapEntry<K,V> entry = innerMap.get(e.getKey());
+            if (entry==null) continue;
+            if (entry.isPrimary()) {
+                try {
+                    Member[] backup = getMapMembers();
+                    if (backup.length > 0) {
+                        MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_NOTIFY_MAPMEMBER,false,
+                                (Serializable)entry.getKey(),null,null,channel.getLocalMember(false),backup);
+                        getChannel().send(backup, msg, getChannelSendOptions());
+                    }
+                    entry.setBackupNodes(backup);
+                    entry.setPrimary(channel.getLocalMember(false));
+                } catch (ChannelException x) {
+                    log.error("Unable to relocate[" + entry.getKey() + "] to a new backup node", x);
+                }
+            } else if (member.equals(entry.getPrimary())) {
+                entry.setPrimary(null);
+            }
+
+            if ( entry.getPrimary() == null &&
+                        entry.isCopy() &&
+                        entry.getBackupNodes()!=null &&
+                        entry.getBackupNodes().length > 0 &&
+                        entry.getBackupNodes()[0].equals(channel.getLocalMember(false)) ) {
+                try {
+                    entry.setPrimary(channel.getLocalMember(false));
+                    entry.setBackup(false);
+                    entry.setProxy(false);
+                    entry.setCopy(false);
+                    Member[] backup = getMapMembers();
+                    if (backup.length > 0) {
+                        MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_NOTIFY_MAPMEMBER,false,
+                                (Serializable)entry.getKey(),null,null,channel.getLocalMember(false),backup);
+                        getChannel().send(backup, msg, getChannelSendOptions());
+                    }
+                    entry.setBackupNodes(backup);
+                    if ( mapOwner!=null ) mapOwner.objectMadePrimary(entry.getKey(),entry.getValue());
+
+                } catch (ChannelException x) {
+                    log.error("Unable to relocate[" + entry.getKey() + "] to a new backup node", x);
+                }
+            }
+
+        } //while
+        long complete = System.currentTimeMillis() - start;
+        if (log.isInfoEnabled()) log.info("Relocation of map entries was complete in " + complete + " ms.");
+    }
 }
\ No newline at end of file



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org