You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by kf...@apache.org on 2015/01/21 03:37:16 UTC

svn commit: r1653423 - in /tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis: AbstractReplicatedMap.java LazyReplicatedMap.java ReplicatedMap.java

Author: kfujino
Date: Wed Jan 21 02:37:15 2015
New Revision: 1653423

URL: http://svn.apache.org/r1653423
Log:
Backport.
Clarify the handling of Copy message and Copy nodes.

Modified:
    tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
    tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
    tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java

Modified: tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
URL: http://svn.apache.org/viewvc/tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?rev=1653423&r1=1653422&r2=1653423&view=diff
==============================================================================
--- tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java (original)
+++ tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java Wed Jan 21 02:37:15 2015
@@ -71,10 +71,12 @@ public abstract class AbstractReplicated
 //------------------------------------------------------------------------------
 //              INSTANCE VARIABLES
 //------------------------------------------------------------------------------
-    private final ConcurrentHashMap<K, MapEntry<K,V>> innerMap;
+    protected final ConcurrentHashMap<K, MapEntry<K,V>> innerMap;
 
     protected abstract int getStateMessageType();
 
+    protected abstract int getReplicateMessageType();
+
 
     /**
      * Timeout for RPC messages, how long we will wait for a reply
@@ -426,7 +428,7 @@ public abstract class AbstractReplicated
                 rentry.lock();
                 try {
                     //construct a diff message
-                    msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,
+                    msg = new MapMessage(mapContextName, getReplicateMessageType(),
                                          true, (Serializable) entry.getKey(), null,
                                          rentry.getDiff(),
                                          entry.getPrimary(),
@@ -440,7 +442,7 @@ public abstract class AbstractReplicated
             }
             if (msg == null && complete) {
                 //construct a complete
-                msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,
+                msg = new MapMessage(mapContextName, getReplicateMessageType(),
                                      false, (Serializable) entry.getKey(),
                                      (Serializable) entry.getValue(),
                                      null, entry.getPrimary(),entry.getBackupNodes());
@@ -632,6 +634,7 @@ public abstract class AbstractReplicated
             }
             entry.setProxy(true);
             entry.setBackup(false);
+            entry.setCopy(false);
             entry.setBackupNodes(mapmsg.getBackupNodes());
             entry.setPrimary(mapmsg.getPrimary());
         }
@@ -646,6 +649,7 @@ public abstract class AbstractReplicated
                 entry = new MapEntry<>((K) mapmsg.getKey(), (V) mapmsg.getValue());
                 entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP);
                 entry.setProxy(false);
+                entry.setCopy(mapmsg.getMsgType() == MapMessage.MSG_COPY);
                 entry.setBackupNodes(mapmsg.getBackupNodes());
                 entry.setPrimary(mapmsg.getPrimary());
                 if (mapmsg.getValue()!=null && mapmsg.getValue() instanceof ReplicatedMapEntry ) {
@@ -654,6 +658,7 @@ public abstract class AbstractReplicated
             } else {
                 entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP);
                 entry.setProxy(false);
+                entry.setCopy(mapmsg.getMsgType() == MapMessage.MSG_COPY);
                 entry.setBackupNodes(mapmsg.getBackupNodes());
                 entry.setPrimary(mapmsg.getPrimary());
                 if (entry.getValue() instanceof ReplicatedMapEntry) {
@@ -692,6 +697,14 @@ public abstract class AbstractReplicated
                 }
             }
         }
+
+        if (mapmsg.getMsgType() == MapMessage.MSG_NOTIFY_MAPMEMBER) {
+            MapEntry<K, V> entry = innerMap.get(mapmsg.getKey());
+            if (entry != null) {
+                entry.setBackupNodes(mapmsg.getBackupNodes());
+                entry.setPrimary(mapmsg.getPrimary());
+            }
+        }
     }
 
     @Override
@@ -810,6 +823,7 @@ public abstract class AbstractReplicated
                     entry.setPrimary(channel.getLocalMember(false));
                     entry.setBackup(false);
                     entry.setProxy(false);
+                    entry.setCopy(false);
                     Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
                     entry.setBackupNodes(backup);
                     if ( mapOwner!=null ) mapOwner.objectMadePrimary(entry.getKey(),entry.getValue());
@@ -896,7 +910,10 @@ public abstract class AbstractReplicated
             try {
                 Member[] backup = null;
                 MapMessage msg = null;
-                if ( !entry.isBackup() ) {
+                if (entry.isBackup()) {
+                    //select a new backup node
+                    backup = publishEntryInfo(key, entry.getValue());
+                } else if ( entry.isProxy() ) {
                     //make sure we don't retrieve from ourselves
                     msg = new MapMessage(getMapContextName(), MapMessage.MSG_RETRIEVE_BACKUP, false,
                                          (Serializable) key, null, null, null,null);
@@ -909,31 +926,31 @@ public abstract class AbstractReplicated
                     msg = (MapMessage) resp[0].getMessage();
                     msg.deserialize(getExternalLoaders());
                     backup = entry.getBackupNodes();
-                    if ( entry.getValue() instanceof ReplicatedMapEntry ) {
-                        ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue();
-                        val.setOwner(getMapOwner());
-                    }
                     if ( msg.getValue()!=null ) entry.setValue((V) msg.getValue());
-                }
-                if (entry.isBackup()) {
-                    //select a new backup node
-                    backup = publishEntryInfo(key, entry.getValue());
-                } else if ( entry.isProxy() ) {
+
                     //invalidate the previous primary
                     msg = new MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,channel.getLocalMember(false),backup);
                     Member[] dest = getMapMembersExcl(backup);
                     if ( dest!=null && dest.length >0) {
                         getChannel().send(dest, msg, getChannelSendOptions());
                     }
-                    if ( entry.getValue() != null && entry.getValue() instanceof ReplicatedMapEntry ) {
+                    if (entry.getValue() instanceof ReplicatedMapEntry) {
                         ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue();
                         val.setOwner(getMapOwner());
                     }
+                } else if ( entry.isCopy() ) {
+                    backup = getMapMembers();
+                    if (backup.length > 0) {
+                        msg = new MapMessage(getMapContextName(), MapMessage.MSG_NOTIFY_MAPMEMBER,false,
+                                (Serializable)key,null,null,channel.getLocalMember(false),backup);
+                        getChannel().send(backup, msg, getChannelSendOptions());
+                    }
                 }
                 entry.setPrimary(channel.getLocalMember(false));
                 entry.setBackupNodes(backup);
                 entry.setBackup(false);
                 entry.setProxy(false);
+                entry.setCopy(false);
                 if ( getMapOwner()!=null ) getMapOwner().objectMadePrimary(key, entry.getValue());
 
             } catch (Exception x) {
@@ -990,6 +1007,7 @@ public abstract class AbstractReplicated
         MapEntry<K,V> entry = new MapEntry<>(key, value);
         entry.setBackup(false);
         entry.setProxy(false);
+        entry.setCopy(false);
         entry.setPrimary(channel.getLocalMember(false));
 
         V old = null;
@@ -1148,6 +1166,7 @@ public abstract class AbstractReplicated
     public static class MapEntry<K,V> implements Map.Entry<K,V> {
         private boolean backup;
         private boolean proxy;
+        private boolean copy;
         private Member[] backupNodes;
         private Member primary;
         private K key;
@@ -1184,7 +1203,7 @@ public abstract class AbstractReplicated
         }
 
         public boolean isPrimary() {
-            return (!proxy && !backup);
+            return (!proxy && !backup && !copy);
         }
 
         public boolean isActive() {
@@ -1195,6 +1214,14 @@ public abstract class AbstractReplicated
             this.proxy = proxy;
         }
 
+        public boolean isCopy() {
+            return copy;
+        }
+
+        public void setCopy(boolean copy) {
+            this.copy = copy;
+        }
+
         public boolean isDiffable() {
             return (value instanceof ReplicatedMapEntry) &&
                    ((ReplicatedMapEntry)value).isDiffable();
@@ -1306,6 +1333,7 @@ public abstract class AbstractReplicated
         public static final int MSG_COPY = 9;
         public static final int MSG_STATE_COPY = 10;
         public static final int MSG_ACCESS = 11;
+        public static final int MSG_NOTIFY_MAPMEMBER = 12;
 
         private final byte[] mapId;
         private final int msgtype;
@@ -1344,6 +1372,7 @@ public abstract class AbstractReplicated
                 case MSG_STATE_COPY: return "MSG_STATE_COPY";
                 case MSG_COPY: return "MSG_COPY";
                 case MSG_ACCESS: return "MSG_ACCESS";
+                case MSG_NOTIFY_MAPMEMBER: return "MSG_NOTIFY_MAPMEMBER";
                 default : return "UNKNOWN";
             }
         }

Modified: tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
URL: http://svn.apache.org/viewvc/tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=1653423&r1=1653422&r2=1653423&view=diff
==============================================================================
--- tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java (original)
+++ tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java Wed Jan 21 02:37:15 2015
@@ -124,6 +124,11 @@ public class LazyReplicatedMap<K,V> exte
         return AbstractReplicatedMap.MapMessage.MSG_STATE;
     }
 
+    @Override
+    protected int getReplicateMessageType() {
+        return AbstractReplicatedMap.MapMessage.MSG_BACKUP;
+    }
+
     /**
      * publish info about a map pair (key/value) to other nodes in the cluster
      * @param key Object

Modified: tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java
URL: http://svn.apache.org/viewvc/tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java?rev=1653423&r1=1653422&r2=1653423&view=diff
==============================================================================
--- tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java (original)
+++ tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java Wed Jan 21 02:37:15 2015
@@ -17,10 +17,14 @@
 package org.apache.catalina.tribes.tipis;
 
 import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Map;
 
 import org.apache.catalina.tribes.Channel;
 import org.apache.catalina.tribes.ChannelException;
 import org.apache.catalina.tribes.Member;
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
 
 /**
  * All-to-all replication for a hash map implementation. Each node in the cluster will carry an identical
@@ -49,6 +53,8 @@ public class ReplicatedMap<K,V> extends
 
     private static final long serialVersionUID = 1L;
 
+    private final Log log = LogFactory.getLog(ReplicatedMap.class);
+
     //--------------------------------------------------------------------------
     //              CONSTRUCTORS / DESTRUCTORS
     //--------------------------------------------------------------------------
@@ -105,6 +111,11 @@ public class ReplicatedMap<K,V> extends
         return AbstractReplicatedMap.MapMessage.MSG_STATE_COPY;
     }
 
+    @Override
+    protected int getReplicateMessageType() {
+        return AbstractReplicatedMap.MapMessage.MSG_COPY;
+    }
+
     /**
      * publish info about a map pair (key/value) to other nodes in the cluster
      * @param key Object
@@ -129,4 +140,67 @@ public class ReplicatedMap<K,V> extends
         return backup;
     }
 
+    @Override
+    public void memberDisappeared(Member member) {
+        boolean removed = false;
+        synchronized (mapMembers) {
+            removed = (mapMembers.remove(member) != null );
+            if (!removed) {
+                if (log.isDebugEnabled()) log.debug("Member["+member+"] disappeared, but was not present in the map.");
+                return; //the member was not part of our map.
+            }
+        }
+        if (log.isInfoEnabled())
+            log.info("Member["+member+"] disappeared. Related map entries will be relocated to the new node.");
+        long start = System.currentTimeMillis();
+        Iterator<Map.Entry<K,MapEntry<K,V>>> i = innerMap.entrySet().iterator();
+        while (i.hasNext()) {
+            Map.Entry<K,MapEntry<K,V>> e = i.next();
+            MapEntry<K,V> entry = innerMap.get(e.getKey());
+            if (entry==null) continue;
+            if (entry.isPrimary()) {
+                try {
+                    Member[] backup = getMapMembers();
+                    if (backup.length > 0) {
+                        MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_NOTIFY_MAPMEMBER,false,
+                                (Serializable)entry.getKey(),null,null,channel.getLocalMember(false),backup);
+                        getChannel().send(backup, msg, getChannelSendOptions());
+                    }
+                    entry.setBackupNodes(backup);
+                    entry.setPrimary(channel.getLocalMember(false));
+                } catch (ChannelException x) {
+                    log.error("Unable to relocate[" + entry.getKey() + "] to a new backup node", x);
+                }
+            } else if (member.equals(entry.getPrimary())) {
+                entry.setPrimary(null);
+            }
+
+            if ( entry.getPrimary() == null &&
+                        entry.isCopy() &&
+                        entry.getBackupNodes()!=null &&
+                        entry.getBackupNodes().length > 0 &&
+                        entry.getBackupNodes()[0].equals(channel.getLocalMember(false)) ) {
+                try {
+                    entry.setPrimary(channel.getLocalMember(false));
+                    entry.setBackup(false);
+                    entry.setProxy(false);
+                    entry.setCopy(false);
+                    Member[] backup = getMapMembers();
+                    if (backup.length > 0) {
+                        MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_NOTIFY_MAPMEMBER,false,
+                                (Serializable)entry.getKey(),null,null,channel.getLocalMember(false),backup);
+                        getChannel().send(backup, msg, getChannelSendOptions());
+                    }
+                    entry.setBackupNodes(backup);
+                    if ( mapOwner!=null ) mapOwner.objectMadePrimary(entry.getKey(),entry.getValue());
+
+                } catch (ChannelException x) {
+                    log.error("Unable to relocate[" + entry.getKey() + "] to a new backup node", x);
+                }
+            }
+
+        } //while
+        long complete = System.currentTimeMillis() - start;
+        if (log.isInfoEnabled()) log.info("Relocation of map entries was complete in " + complete + " ms.");
+    }
 }
\ No newline at end of file



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org