You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by kf...@apache.org on 2015/01/21 03:37:16 UTC
svn commit: r1653423 - in
/tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis:
AbstractReplicatedMap.java LazyReplicatedMap.java ReplicatedMap.java
Author: kfujino
Date: Wed Jan 21 02:37:15 2015
New Revision: 1653423
URL: http://svn.apache.org/r1653423
Log:
Backport.
Clarify the handling of Copy message and Copy nodes.
Modified:
tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java
Modified: tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
URL: http://svn.apache.org/viewvc/tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?rev=1653423&r1=1653422&r2=1653423&view=diff
==============================================================================
--- tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java (original)
+++ tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java Wed Jan 21 02:37:15 2015
@@ -71,10 +71,12 @@ public abstract class AbstractReplicated
//------------------------------------------------------------------------------
// INSTANCE VARIABLES
//------------------------------------------------------------------------------
- private final ConcurrentHashMap<K, MapEntry<K,V>> innerMap;
+ protected final ConcurrentHashMap<K, MapEntry<K,V>> innerMap;
protected abstract int getStateMessageType();
+ protected abstract int getReplicateMessageType();
+
/**
* Timeout for RPC messages, how long we will wait for a reply
@@ -426,7 +428,7 @@ public abstract class AbstractReplicated
rentry.lock();
try {
//construct a diff message
- msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,
+ msg = new MapMessage(mapContextName, getReplicateMessageType(),
true, (Serializable) entry.getKey(), null,
rentry.getDiff(),
entry.getPrimary(),
@@ -440,7 +442,7 @@ public abstract class AbstractReplicated
}
if (msg == null && complete) {
//construct a complete
- msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,
+ msg = new MapMessage(mapContextName, getReplicateMessageType(),
false, (Serializable) entry.getKey(),
(Serializable) entry.getValue(),
null, entry.getPrimary(),entry.getBackupNodes());
@@ -632,6 +634,7 @@ public abstract class AbstractReplicated
}
entry.setProxy(true);
entry.setBackup(false);
+ entry.setCopy(false);
entry.setBackupNodes(mapmsg.getBackupNodes());
entry.setPrimary(mapmsg.getPrimary());
}
@@ -646,6 +649,7 @@ public abstract class AbstractReplicated
entry = new MapEntry<>((K) mapmsg.getKey(), (V) mapmsg.getValue());
entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP);
entry.setProxy(false);
+ entry.setCopy(mapmsg.getMsgType() == MapMessage.MSG_COPY);
entry.setBackupNodes(mapmsg.getBackupNodes());
entry.setPrimary(mapmsg.getPrimary());
if (mapmsg.getValue()!=null && mapmsg.getValue() instanceof ReplicatedMapEntry ) {
@@ -654,6 +658,7 @@ public abstract class AbstractReplicated
} else {
entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP);
entry.setProxy(false);
+ entry.setCopy(mapmsg.getMsgType() == MapMessage.MSG_COPY);
entry.setBackupNodes(mapmsg.getBackupNodes());
entry.setPrimary(mapmsg.getPrimary());
if (entry.getValue() instanceof ReplicatedMapEntry) {
@@ -692,6 +697,14 @@ public abstract class AbstractReplicated
}
}
}
+
+ if (mapmsg.getMsgType() == MapMessage.MSG_NOTIFY_MAPMEMBER) {
+ MapEntry<K, V> entry = innerMap.get(mapmsg.getKey());
+ if (entry != null) {
+ entry.setBackupNodes(mapmsg.getBackupNodes());
+ entry.setPrimary(mapmsg.getPrimary());
+ }
+ }
}
@Override
@@ -810,6 +823,7 @@ public abstract class AbstractReplicated
entry.setPrimary(channel.getLocalMember(false));
entry.setBackup(false);
entry.setProxy(false);
+ entry.setCopy(false);
Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
entry.setBackupNodes(backup);
if ( mapOwner!=null ) mapOwner.objectMadePrimary(entry.getKey(),entry.getValue());
@@ -896,7 +910,10 @@ public abstract class AbstractReplicated
try {
Member[] backup = null;
MapMessage msg = null;
- if ( !entry.isBackup() ) {
+ if (entry.isBackup()) {
+ //select a new backup node
+ backup = publishEntryInfo(key, entry.getValue());
+ } else if ( entry.isProxy() ) {
//make sure we don't retrieve from ourselves
msg = new MapMessage(getMapContextName(), MapMessage.MSG_RETRIEVE_BACKUP, false,
(Serializable) key, null, null, null,null);
@@ -909,31 +926,31 @@ public abstract class AbstractReplicated
msg = (MapMessage) resp[0].getMessage();
msg.deserialize(getExternalLoaders());
backup = entry.getBackupNodes();
- if ( entry.getValue() instanceof ReplicatedMapEntry ) {
- ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue();
- val.setOwner(getMapOwner());
- }
if ( msg.getValue()!=null ) entry.setValue((V) msg.getValue());
- }
- if (entry.isBackup()) {
- //select a new backup node
- backup = publishEntryInfo(key, entry.getValue());
- } else if ( entry.isProxy() ) {
+
//invalidate the previous primary
msg = new MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,channel.getLocalMember(false),backup);
Member[] dest = getMapMembersExcl(backup);
if ( dest!=null && dest.length >0) {
getChannel().send(dest, msg, getChannelSendOptions());
}
- if ( entry.getValue() != null && entry.getValue() instanceof ReplicatedMapEntry ) {
+ if (entry.getValue() instanceof ReplicatedMapEntry) {
ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue();
val.setOwner(getMapOwner());
}
+ } else if ( entry.isCopy() ) {
+ backup = getMapMembers();
+ if (backup.length > 0) {
+ msg = new MapMessage(getMapContextName(), MapMessage.MSG_NOTIFY_MAPMEMBER,false,
+ (Serializable)key,null,null,channel.getLocalMember(false),backup);
+ getChannel().send(backup, msg, getChannelSendOptions());
+ }
}
entry.setPrimary(channel.getLocalMember(false));
entry.setBackupNodes(backup);
entry.setBackup(false);
entry.setProxy(false);
+ entry.setCopy(false);
if ( getMapOwner()!=null ) getMapOwner().objectMadePrimary(key, entry.getValue());
} catch (Exception x) {
@@ -990,6 +1007,7 @@ public abstract class AbstractReplicated
MapEntry<K,V> entry = new MapEntry<>(key, value);
entry.setBackup(false);
entry.setProxy(false);
+ entry.setCopy(false);
entry.setPrimary(channel.getLocalMember(false));
V old = null;
@@ -1148,6 +1166,7 @@ public abstract class AbstractReplicated
public static class MapEntry<K,V> implements Map.Entry<K,V> {
private boolean backup;
private boolean proxy;
+ private boolean copy;
private Member[] backupNodes;
private Member primary;
private K key;
@@ -1184,7 +1203,7 @@ public abstract class AbstractReplicated
}
public boolean isPrimary() {
- return (!proxy && !backup);
+ return (!proxy && !backup && !copy);
}
public boolean isActive() {
@@ -1195,6 +1214,14 @@ public abstract class AbstractReplicated
this.proxy = proxy;
}
+ public boolean isCopy() {
+ return copy;
+ }
+
+ public void setCopy(boolean copy) {
+ this.copy = copy;
+ }
+
public boolean isDiffable() {
return (value instanceof ReplicatedMapEntry) &&
((ReplicatedMapEntry)value).isDiffable();
@@ -1306,6 +1333,7 @@ public abstract class AbstractReplicated
public static final int MSG_COPY = 9;
public static final int MSG_STATE_COPY = 10;
public static final int MSG_ACCESS = 11;
+ public static final int MSG_NOTIFY_MAPMEMBER = 12;
private final byte[] mapId;
private final int msgtype;
@@ -1344,6 +1372,7 @@ public abstract class AbstractReplicated
case MSG_STATE_COPY: return "MSG_STATE_COPY";
case MSG_COPY: return "MSG_COPY";
case MSG_ACCESS: return "MSG_ACCESS";
+ case MSG_NOTIFY_MAPMEMBER: return "MSG_NOTIFY_MAPMEMBER";
default : return "UNKNOWN";
}
}
Modified: tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
URL: http://svn.apache.org/viewvc/tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=1653423&r1=1653422&r2=1653423&view=diff
==============================================================================
--- tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java (original)
+++ tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java Wed Jan 21 02:37:15 2015
@@ -124,6 +124,11 @@ public class LazyReplicatedMap<K,V> exte
return AbstractReplicatedMap.MapMessage.MSG_STATE;
}
+ @Override
+ protected int getReplicateMessageType() {
+ return AbstractReplicatedMap.MapMessage.MSG_BACKUP;
+ }
+
/**
* publish info about a map pair (key/value) to other nodes in the cluster
* @param key Object
Modified: tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java
URL: http://svn.apache.org/viewvc/tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java?rev=1653423&r1=1653422&r2=1653423&view=diff
==============================================================================
--- tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java (original)
+++ tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java Wed Jan 21 02:37:15 2015
@@ -17,10 +17,14 @@
package org.apache.catalina.tribes.tipis;
import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Map;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.Member;
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
/**
* All-to-all replication for a hash map implementation. Each node in the cluster will carry an identical
@@ -49,6 +53,8 @@ public class ReplicatedMap<K,V> extends
private static final long serialVersionUID = 1L;
+ private final Log log = LogFactory.getLog(ReplicatedMap.class);
+
//--------------------------------------------------------------------------
// CONSTRUCTORS / DESTRUCTORS
//--------------------------------------------------------------------------
@@ -105,6 +111,11 @@ public class ReplicatedMap<K,V> extends
return AbstractReplicatedMap.MapMessage.MSG_STATE_COPY;
}
+ @Override
+ protected int getReplicateMessageType() {
+ return AbstractReplicatedMap.MapMessage.MSG_COPY;
+ }
+
/**
* publish info about a map pair (key/value) to other nodes in the cluster
* @param key Object
@@ -129,4 +140,67 @@ public class ReplicatedMap<K,V> extends
return backup;
}
+ @Override
+ public void memberDisappeared(Member member) {
+ boolean removed = false;
+ synchronized (mapMembers) {
+ removed = (mapMembers.remove(member) != null );
+ if (!removed) {
+ if (log.isDebugEnabled()) log.debug("Member["+member+"] disappeared, but was not present in the map.");
+ return; //the member was not part of our map.
+ }
+ }
+ if (log.isInfoEnabled())
+ log.info("Member["+member+"] disappeared. Related map entries will be relocated to the new node.");
+ long start = System.currentTimeMillis();
+ Iterator<Map.Entry<K,MapEntry<K,V>>> i = innerMap.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry<K,MapEntry<K,V>> e = i.next();
+ MapEntry<K,V> entry = innerMap.get(e.getKey());
+ if (entry==null) continue;
+ if (entry.isPrimary()) {
+ try {
+ Member[] backup = getMapMembers();
+ if (backup.length > 0) {
+ MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_NOTIFY_MAPMEMBER,false,
+ (Serializable)entry.getKey(),null,null,channel.getLocalMember(false),backup);
+ getChannel().send(backup, msg, getChannelSendOptions());
+ }
+ entry.setBackupNodes(backup);
+ entry.setPrimary(channel.getLocalMember(false));
+ } catch (ChannelException x) {
+ log.error("Unable to relocate[" + entry.getKey() + "] to a new backup node", x);
+ }
+ } else if (member.equals(entry.getPrimary())) {
+ entry.setPrimary(null);
+ }
+
+ if ( entry.getPrimary() == null &&
+ entry.isCopy() &&
+ entry.getBackupNodes()!=null &&
+ entry.getBackupNodes().length > 0 &&
+ entry.getBackupNodes()[0].equals(channel.getLocalMember(false)) ) {
+ try {
+ entry.setPrimary(channel.getLocalMember(false));
+ entry.setBackup(false);
+ entry.setProxy(false);
+ entry.setCopy(false);
+ Member[] backup = getMapMembers();
+ if (backup.length > 0) {
+ MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_NOTIFY_MAPMEMBER,false,
+ (Serializable)entry.getKey(),null,null,channel.getLocalMember(false),backup);
+ getChannel().send(backup, msg, getChannelSendOptions());
+ }
+ entry.setBackupNodes(backup);
+ if ( mapOwner!=null ) mapOwner.objectMadePrimary(entry.getKey(),entry.getValue());
+
+ } catch (ChannelException x) {
+ log.error("Unable to relocate[" + entry.getKey() + "] to a new backup node", x);
+ }
+ }
+
+ } //while
+ long complete = System.currentTimeMillis() - start;
+ if (log.isInfoEnabled()) log.info("Relocation of map entries was complete in " + complete + " ms.");
+ }
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org