You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2006/10/24 05:18:02 UTC
svn commit: r467222 [23/31] - in /tomcat/tc6.0.x/trunk/java:
javax/annotation/ javax/annotation/security/ javax/ejb/ javax/el/
javax/mail/ javax/mail/internet/ javax/persistence/ javax/servlet/
javax/servlet/http/ javax/servlet/jsp/ javax/servlet/jsp/e...
Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?view=diff&rev=467222&r1=467221&r2=467222
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java Mon Oct 23 20:17:11 2006
@@ -1,1368 +1,1368 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.catalina.tribes.tipis;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.catalina.tribes.Channel;
-import org.apache.catalina.tribes.ChannelException;
-import org.apache.catalina.tribes.ChannelListener;
-import org.apache.catalina.tribes.Heartbeat;
-import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.MembershipListener;
-import org.apache.catalina.tribes.group.Response;
-import org.apache.catalina.tribes.group.RpcCallback;
-import org.apache.catalina.tribes.group.RpcChannel;
-import org.apache.catalina.tribes.io.XByteBuffer;
-import org.apache.catalina.tribes.membership.MemberImpl;
-import org.apache.catalina.tribes.util.Arrays;
-import org.apache.juli.logging.Log;
-import org.apache.juli.logging.LogFactory;
-import java.util.ConcurrentModificationException;
-
-/**
- *
- * @author Filip Hanik
- * @version 1.0
- */
-public abstract class AbstractReplicatedMap extends LinkedHashMap implements RpcCallback, ChannelListener, MembershipListener, Heartbeat {
- protected static Log log = LogFactory.getLog(AbstractReplicatedMap.class);
-
- /**
- * The default initial capacity - MUST be a power of two.
- */
- public static final int DEFAULT_INITIAL_CAPACITY = 16;
-
- /**
- * The load factor used when none specified in constructor.
- **/
- public static final float DEFAULT_LOAD_FACTOR = 0.75f;
-
- /**
- * Used to identify the map
- */
- final String chset = "ISO-8859-1";
-
-//------------------------------------------------------------------------------
-// INSTANCE VARIABLES
-//------------------------------------------------------------------------------
-
- /**
- * Timeout for RPC messages, how long we will wait for a reply
- */
- protected transient long rpcTimeout = 5000;
- /**
- * Reference to the channel for sending messages
- */
- protected transient Channel channel;
- /**
- * The RpcChannel to send RPC messages through
- */
- protected transient RpcChannel rpcChannel;
- /**
- * The Map context name makes this map unique, this
- * allows us to have more than one map shared
- * through one channel
- */
- protected transient byte[] mapContextName;
- /**
- * Has the state been transferred
- */
- protected transient boolean stateTransferred = false;
- /**
- * Simple lock object for transfers
- */
- protected transient Object stateMutex = new Object();
- /**
- * A list of members in our map
- */
- protected transient HashMap mapMembers = new HashMap();
- /**
- * Our default send options
- */
- protected transient int channelSendOptions = Channel.SEND_OPTIONS_DEFAULT;
- /**
- * The owner of this map, ala a SessionManager for example
- */
- protected transient Object mapOwner;
- /**
- * External class loaders if serialization and deserialization is to be performed successfully.
- */
- protected transient ClassLoader[] externalLoaders;
-
- /**
- * The node we are currently backing up data to, this index will rotate
- * on a round robin basis
- */
- protected transient int currentNode = 0;
-
- /**
- * Since the map keeps internal membership
- * this is the timeout for a ping message to be responded to
- * If a remote map doesn't respond within this timeframe,
- * its considered dead.
- */
- protected transient long accessTimeout = 5000;
-
- /**
- * Readable string of the mapContextName value
- */
- protected transient String mapname = "";
-
-
-//------------------------------------------------------------------------------
-// CONSTRUCTORS
-//------------------------------------------------------------------------------
-
- /**
- * Creates a new map
- * @param channel The channel to use for communication
- * @param timeout long - timeout for RPC messags
- * @param mapContextName String - unique name for this map, to allow multiple maps per channel
- * @param initialCapacity int - the size of this map, see HashMap
- * @param loadFactor float - load factor, see HashMap
- * @param cls - a list of classloaders to be used for deserialization of objects.
- */
- public AbstractReplicatedMap(Object owner,
- Channel channel,
- long timeout,
- String mapContextName,
- int initialCapacity,
- float loadFactor,
- int channelSendOptions,
- ClassLoader[] cls) {
- super(initialCapacity, loadFactor);
- init(owner, channel, mapContextName, timeout, channelSendOptions, cls);
-
- }
-
- /**
- * Helper methods, wraps a single member in an array
- * @param m Member
- * @return Member[]
- */
- protected Member[] wrap(Member m) {
- if ( m == null ) return new Member[0];
- else return new Member[] {m};
- }
-
- /**
- * Initializes the map by creating the RPC channel, registering itself as a channel listener
- * This method is also responsible for initiating the state transfer
- * @param owner Object
- * @param channel Channel
- * @param mapContextName String
- * @param timeout long
- * @param channelSendOptions int
- * @param cls ClassLoader[]
- */
- protected void init(Object owner, Channel channel, String mapContextName, long timeout, int channelSendOptions,ClassLoader[] cls) {
- log.info("Initializing AbstractReplicatedMap with context name:"+mapContextName);
- this.mapOwner = owner;
- this.externalLoaders = cls;
- this.channelSendOptions = channelSendOptions;
- this.channel = channel;
- this.rpcTimeout = timeout;
-
- try {
- this.mapname = mapContextName;
- //unique context is more efficient if it is stored as bytes
- this.mapContextName = mapContextName.getBytes(chset);
- } catch (UnsupportedEncodingException x) {
- log.warn("Unable to encode mapContextName[" + mapContextName + "] using getBytes(" + chset +") using default getBytes()", x);
- this.mapContextName = mapContextName.getBytes();
- }
- if ( log.isTraceEnabled() ) log.trace("Created Lazy Map with name:"+mapContextName+", bytes:"+Arrays.toString(this.mapContextName));
-
- //create an rpc channel and add the map as a listener
- this.rpcChannel = new RpcChannel(this.mapContextName, channel, this);
- //add this map as a message listener
- this.channel.addChannelListener(this);
- //listen for membership notifications
- this.channel.addMembershipListener(this);
-
-
- try {
- //broadcast our map, this just notifies other members of our existence
- broadcast(MapMessage.MSG_INIT, true);
- //transfer state from another map
- transferState();
- //state is transferred, we are ready for messaging
- broadcast(MapMessage.MSG_START, true);
- } catch (ChannelException x) {
- log.warn("Unable to send map start message.");
- throw new RuntimeException("Unable to start replicated map.",x);
- }
- }
-
-
- /**
- * Sends a ping out to all the members in the cluster, not just map members
- * that this map is alive.
- * @param timeout long
- * @throws ChannelException
- */
- protected void ping(long timeout) throws ChannelException {
- //send out a map membership message, only wait for the first reply
- MapMessage msg = new MapMessage(this.mapContextName,
- MapMessage.MSG_INIT,
- false,
- null,
- null,
- null,
- wrap(channel.getLocalMember(false)));
- if ( channel.getMembers().length > 0 ) {
- //send a ping, wait for all nodes to reply
- Response[] resp = rpcChannel.send(channel.getMembers(),
- msg, rpcChannel.ALL_REPLY,
- (channelSendOptions),
- (int) accessTimeout);
- for (int i = 0; i < resp.length; i++) {
- memberAlive(resp[i].getSource());
- } //for
- }
- //update our map of members, expire some if we didn't receive a ping back
- synchronized (mapMembers) {
- Iterator it = mapMembers.entrySet().iterator();
- long now = System.currentTimeMillis();
- while ( it.hasNext() ) {
- Map.Entry entry = (Map.Entry)it.next();
- long access = ((Long)entry.getValue()).longValue();
- if ( (now - access) > timeout ) memberDisappeared((Member)entry.getKey());
- }
- }//synch
- }
-
- /**
- * We have received a member alive notification
- * @param member Member
- */
- protected void memberAlive(Member member) {
- synchronized (mapMembers) {
- if (!mapMembers.containsKey(member)) {
- mapMemberAdded(member);
- } //end if
- mapMembers.put(member, new Long(System.currentTimeMillis()));
- }
- }
-
- /**
- * Helper method to broadcast a message to all members in a channel
- * @param msgtype int
- * @param rpc boolean
- * @throws ChannelException
- */
- protected void broadcast(int msgtype, boolean rpc) throws ChannelException {
- //send out a map membership message, only wait for the first reply
- MapMessage msg = new MapMessage(this.mapContextName, msgtype,
- false, null, null, null, wrap(channel.getLocalMember(false)));
- if ( rpc) {
- Response[] resp = rpcChannel.send(channel.getMembers(), msg, rpcChannel.FIRST_REPLY, (channelSendOptions),rpcTimeout);
- for (int i = 0; i < resp.length; i++) {
- mapMemberAdded(resp[i].getSource());
- messageReceived(resp[i].getMessage(), resp[i].getSource());
- }
- } else {
- channel.send(channel.getMembers(),msg,channelSendOptions);
- }
- }
-
- public void breakdown() {
- finalize();
- }
-
- public void finalize() {
- try {broadcast(MapMessage.MSG_STOP,false); }catch ( Exception ignore){}
- //cleanup
- if (this.rpcChannel != null) {
- this.rpcChannel.breakdown();
- }
- if (this.channel != null) {
- this.channel.removeChannelListener(this);
- this.channel.removeMembershipListener(this);
- }
- this.rpcChannel = null;
- this.channel = null;
- this.mapMembers.clear();
- super.clear();
- this.stateTransferred = false;
- this.externalLoaders = null;
- }
-
- public int hashCode() {
- return Arrays.hashCode(this.mapContextName);
- }
-
- public boolean equals(Object o) {
- if ( o == null ) return false;
- if ( !(o instanceof AbstractReplicatedMap)) return false;
- if ( !(o.getClass().equals(this.getClass())) ) return false;
- AbstractReplicatedMap other = (AbstractReplicatedMap)o;
- return Arrays.equals(mapContextName,other.mapContextName);
- }
-
-//------------------------------------------------------------------------------
-// GROUP COM INTERFACES
-//------------------------------------------------------------------------------
- public Member[] getMapMembers(HashMap members) {
- synchronized (members) {
- Member[] result = new Member[members.size()];
- members.keySet().toArray(result);
- return result;
- }
- }
- public Member[] getMapMembers() {
- return getMapMembers(this.mapMembers);
- }
-
- public Member[] getMapMembersExcl(Member[] exclude) {
- synchronized (mapMembers) {
- HashMap list = (HashMap)mapMembers.clone();
- for (int i=0; i<exclude.length;i++) list.remove(exclude[i]);
- return getMapMembers(list);
- }
- }
-
-
- /**
- * Replicates any changes to the object since the last time
- * The object has to be primary, ie, if the object is a proxy or a backup, it will not be replicated<br>
- * @param complete - if set to true, the object is replicated to its backup
- * if set to false, only objects that implement ReplicatedMapEntry and the isDirty() returns true will
- * be replicated
- */
- public void replicate(Object key, boolean complete) {
- if ( log.isTraceEnabled() )
- log.trace("Replicate invoked on key:"+key);
- MapEntry entry = (MapEntry)super.get(key);
- if ( entry == null ) return;
- if ( !entry.isSerializable() ) return;
- if (entry != null && entry.isPrimary() && entry.getBackupNodes()!= null && entry.getBackupNodes().length > 0) {
- Object value = entry.getValue();
- //check to see if we need to replicate this object isDirty()||complete
- boolean repl = complete || ( (value instanceof ReplicatedMapEntry) && ( (ReplicatedMapEntry) value).isDirty());
-
- if (!repl) {
- if ( log.isTraceEnabled() )
- log.trace("Not replicating:"+key+", no change made");
-
- return;
- }
- //check to see if the message is diffable
- boolean diff = ( (value instanceof ReplicatedMapEntry) && ( (ReplicatedMapEntry) value).isDiffable());
- MapMessage msg = null;
- if (diff) {
- ReplicatedMapEntry rentry = (ReplicatedMapEntry)entry.getValue();
- try {
- rentry.lock();
- //construct a diff message
- msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,
- true, (Serializable) entry.getKey(), null,
- rentry.getDiff(),
- entry.getBackupNodes());
- } catch (IOException x) {
- log.error("Unable to diff object. Will replicate the entire object instead.", x);
- } finally {
- rentry.unlock();
- }
-
- }
- if (msg == null) {
- //construct a complete
- msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,
- false, (Serializable) entry.getKey(),
- (Serializable) entry.getValue(),
- null, entry.getBackupNodes());
-
- }
- try {
- if ( channel!=null && entry.getBackupNodes()!= null && entry.getBackupNodes().length > 0 ) {
- channel.send(entry.getBackupNodes(), msg, channelSendOptions);
- }
- } catch (ChannelException x) {
- log.error("Unable to replicate data.", x);
- }
- } //end if
-
- }
-
- /**
- * This can be invoked by a periodic thread to replicate out any changes.
- * For maps that don't store objects that implement ReplicatedMapEntry, this
- * method should be used infrequently to avoid large amounts of data transfer
- * @param complete boolean
- */
- public void replicate(boolean complete) {
- Iterator i = super.entrySet().iterator();
- while (i.hasNext()) {
- Map.Entry e = (Map.Entry) i.next();
- replicate(e.getKey(), complete);
- } //while
-
- }
-
- public void transferState() {
- try {
- Member[] members = getMapMembers();
- Member backup = members.length > 0 ? (Member) members[0] : null;
- if (backup != null) {
- MapMessage msg = new MapMessage(mapContextName, MapMessage.MSG_STATE, false,
- null, null, null, null);
- Response[] resp = rpcChannel.send(new Member[] {backup}, msg, rpcChannel.FIRST_REPLY, channelSendOptions, rpcTimeout);
- if (resp.length > 0) {
- synchronized (stateMutex) {
- msg = (MapMessage) resp[0].getMessage();
- msg.deserialize(getExternalLoaders());
- ArrayList list = (ArrayList) msg.getValue();
- for (int i = 0; i < list.size(); i++) {
- messageReceived( (Serializable) list.get(i), resp[0].getSource());
- } //for
- }
- } else {
- log.warn("Transfer state, 0 replies, probably a timeout.");
- }
- }
- } catch (ChannelException x) {
- log.error("Unable to transfer LazyReplicatedMap state.", x);
- } catch (IOException x) {
- log.error("Unable to transfer LazyReplicatedMap state.", x);
- } catch (ClassNotFoundException x) {
- log.error("Unable to transfer LazyReplicatedMap state.", x);
- }
- stateTransferred = true;
- }
-
- /**
- * @todo implement state transfer
- * @param msg Serializable
- * @return Serializable - null if no reply should be sent
- */
- public Serializable replyRequest(Serializable msg, final Member sender) {
- if (! (msg instanceof MapMessage))return null;
- MapMessage mapmsg = (MapMessage) msg;
-
- //map init request
- if (mapmsg.getMsgType() == mapmsg.MSG_INIT) {
- mapmsg.setBackUpNodes(wrap(channel.getLocalMember(false)));
- return mapmsg;
- }
-
- //map start request
- if (mapmsg.getMsgType() == mapmsg.MSG_START) {
- mapmsg.setBackUpNodes(wrap(channel.getLocalMember(false)));
- mapMemberAdded(sender);
- return mapmsg;
- }
-
- //backup request
- if (mapmsg.getMsgType() == mapmsg.MSG_RETRIEVE_BACKUP) {
- MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
- if (entry == null || (!entry.isSerializable()) )return null;
- mapmsg.setValue( (Serializable) entry.getValue());
- return mapmsg;
- }
-
- //state transfer request
- if (mapmsg.getMsgType() == mapmsg.MSG_STATE) {
- synchronized (stateMutex) { //make sure we dont do two things at the same time
- ArrayList list = new ArrayList();
- Iterator i = super.entrySet().iterator();
- while (i.hasNext()) {
- Map.Entry e = (Map.Entry) i.next();
- MapEntry entry = (MapEntry) e.getValue();
- if ( entry.isSerializable() ) {
- MapMessage me = new MapMessage(mapContextName, MapMessage.MSG_PROXY,
- false, (Serializable) entry.getKey(), null, null, entry.getBackupNodes());
- list.add(me);
- }
- }
- mapmsg.setValue(list);
- return mapmsg;
-
- } //synchronized
- }
-
- return null;
-
- }
-
- /**
- * If the reply has already been sent to the requesting thread,
- * the rpc callback can handle any data that comes in after the fact.
- * @param msg Serializable
- * @param sender Member
- */
- public void leftOver(Serializable msg, Member sender) {
- //left over membership messages
- if (! (msg instanceof MapMessage))return;
-
- MapMessage mapmsg = (MapMessage) msg;
- try {
- mapmsg.deserialize(getExternalLoaders());
- if (mapmsg.getMsgType() == MapMessage.MSG_START) {
- mapMemberAdded(mapmsg.getBackupNodes()[0]);
- } else if (mapmsg.getMsgType() == MapMessage.MSG_INIT) {
- memberAlive(mapmsg.getBackupNodes()[0]);
- }
- } catch (IOException x ) {
- log.error("Unable to deserialize MapMessage.",x);
- } catch (ClassNotFoundException x ) {
- log.error("Unable to deserialize MapMessage.",x);
- }
- }
-
- public void messageReceived(Serializable msg, Member sender) {
- if (! (msg instanceof MapMessage)) return;
-
- MapMessage mapmsg = (MapMessage) msg;
- if ( log.isTraceEnabled() ) {
- log.trace("Map["+mapname+"] received message:"+mapmsg);
- }
-
- try {
- mapmsg.deserialize(getExternalLoaders());
- } catch (IOException x) {
- log.error("Unable to deserialize MapMessage.", x);
- return;
- } catch (ClassNotFoundException x) {
- log.error("Unable to deserialize MapMessage.", x);
- return;
- }
- if ( log.isTraceEnabled() )
- log.trace("Map message received from:"+sender.getName()+" msg:"+mapmsg);
- if (mapmsg.getMsgType() == MapMessage.MSG_START) {
- mapMemberAdded(mapmsg.getBackupNodes()[0]);
- }
-
- if (mapmsg.getMsgType() == MapMessage.MSG_STOP) {
- memberDisappeared(mapmsg.getBackupNodes()[0]);
- }
-
- if (mapmsg.getMsgType() == MapMessage.MSG_PROXY) {
- MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
- if ( entry==null ) {
- entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue());
- entry.setBackup(false);
- entry.setProxy(true);
- entry.setBackupNodes(mapmsg.getBackupNodes());
- super.put(entry.getKey(), entry);
- } else {
- entry.setProxy(true);
- entry.setBackup(false);
- entry.setBackupNodes(mapmsg.getBackupNodes());
- }
- }
-
- if (mapmsg.getMsgType() == MapMessage.MSG_REMOVE) {
- super.remove(mapmsg.getKey());
- }
-
- if (mapmsg.getMsgType() == MapMessage.MSG_BACKUP) {
- MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
- if (entry == null) {
- entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue());
- entry.setBackup(true);
- entry.setProxy(false);
- entry.setBackupNodes(mapmsg.getBackupNodes());
- if (mapmsg.getValue()!=null && mapmsg.getValue() instanceof ReplicatedMapEntry ) {
- ((ReplicatedMapEntry)mapmsg.getValue()).setOwner(getMapOwner());
- }
- } else {
- entry.setBackup(true);
- entry.setProxy(false);
- entry.setBackupNodes(mapmsg.getBackupNodes());
- if (entry.getValue() instanceof ReplicatedMapEntry) {
- ReplicatedMapEntry diff = (ReplicatedMapEntry) entry.getValue();
- if (mapmsg.isDiff()) {
- try {
- diff.lock();
- diff.applyDiff(mapmsg.getDiffValue(), 0, mapmsg.getDiffValue().length);
- } catch (Exception x) {
- log.error("Unable to apply diff to key:" + entry.getKey(), x);
- } finally {
- diff.unlock();
- }
- } else {
- if ( mapmsg.getValue()!=null ) entry.setValue(mapmsg.getValue());
- ((ReplicatedMapEntry)entry.getValue()).setOwner(getMapOwner());
- } //end if
- } else if (mapmsg.getValue() instanceof ReplicatedMapEntry) {
- ReplicatedMapEntry re = (ReplicatedMapEntry)mapmsg.getValue();
- re.setOwner(getMapOwner());
- entry.setValue(re);
- } else {
- if ( mapmsg.getValue()!=null ) entry.setValue(mapmsg.getValue());
- } //end if
- } //end if
- super.put(entry.getKey(), entry);
- } //end if
- }
-
- public boolean accept(Serializable msg, Member sender) {
- boolean result = false;
- if (msg instanceof MapMessage) {
- if ( log.isTraceEnabled() ) log.trace("Map["+mapname+"] accepting...."+msg);
- result = Arrays.equals(mapContextName, ( (MapMessage) msg).getMapId());
- if ( log.isTraceEnabled() ) log.trace("Msg["+mapname+"] accepted["+result+"]...."+msg);
- }
- return result;
- }
-
- public void mapMemberAdded(Member member) {
- if ( member.equals(getChannel().getLocalMember(false)) ) return;
- boolean memberAdded = false;
- //select a backup node if we don't have one
- synchronized (mapMembers) {
- if (!mapMembers.containsKey(member) ) {
- mapMembers.put(member, new Long(System.currentTimeMillis()));
- memberAdded = true;
- }
- }
- if ( memberAdded ) {
- synchronized (stateMutex) {
- Iterator i = super.entrySet().iterator();
- while (i.hasNext()) {
- Map.Entry e = (Map.Entry) i.next();
- MapEntry entry = (MapEntry) e.getValue();
- if ( entry == null ) continue;
- if (entry.isPrimary() && (entry.getBackupNodes() == null || entry.getBackupNodes().length == 0)) {
- try {
- Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
- entry.setBackupNodes(backup);
- } catch (ChannelException x) {
- log.error("Unable to select backup node.", x);
- } //catch
- } //end if
- } //while
- } //synchronized
- }//end if
- }
-
- public boolean inSet(Member m, Member[] set) {
- if ( set == null ) return false;
- boolean result = false;
- for (int i=0; i<set.length && (!result); i++ )
- if ( m.equals(set[i]) ) result = true;
- return result;
- }
-
- public Member[] excludeFromSet(Member[] mbrs, Member[] set) {
- ArrayList result = new ArrayList();
- for (int i=0; i<set.length; i++ ) {
- boolean include = true;
- for (int j=0; j<mbrs.length; j++ )
- if ( mbrs[j].equals(set[i]) ) include = false;
- if ( include ) result.add(set[i]);
- }
- return (Member[])result.toArray(new Member[result.size()]);
- }
-
- public void memberAdded(Member member) {
- //do nothing
- }
-
- public void memberDisappeared(Member member) {
- boolean removed = false;
- synchronized (mapMembers) {
- removed = (mapMembers.remove(member) != null );
- }
- Iterator i = super.entrySet().iterator();
- while (i.hasNext()) {
- Map.Entry e = (Map.Entry) i.next();
- MapEntry entry = (MapEntry) e.getValue();
- if (entry.isPrimary() && inSet(member,entry.getBackupNodes())) {
- try {
- Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
- entry.setBackupNodes(backup);
- } catch (ChannelException x) {
- log.error("Unable to relocate[" + entry.getKey() + "] to a new backup node", x);
- }
- } //end if
- } //while
- }
-
- public int getNextBackupIndex() {
- int size = mapMembers.size();
- if (mapMembers.size() == 0)return -1;
- int node = currentNode++;
- if (node >= size) {
- node = 0;
- currentNode = 0;
- }
- return node;
- }
- public Member getNextBackupNode() {
- Member[] members = getMapMembers();
- int node = getNextBackupIndex();
- if ( members.length == 0 || node==-1) return null;
- if ( node >= members.length ) node = 0;
- return members[node];
- }
-
- protected abstract Member[] publishEntryInfo(Object key, Object value) throws ChannelException;
-
- public void heartbeat() {
- try {
- ping(accessTimeout);
- }catch ( Exception x ) {
- log.error("Unable to send AbstractReplicatedMap.ping message",x);
- }
- }
-
-//------------------------------------------------------------------------------
-// METHODS TO OVERRIDE
-//------------------------------------------------------------------------------
-
- /**
- * Removes an object from this map, it will also remove it from
- *
- * @param key Object
- * @return Object
- */
- public Object remove(Object key) {
- MapEntry entry = (MapEntry)super.remove(key);
-
- try {
- if (getMapMembers().length > 0 ) {
- MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_REMOVE, false, (Serializable) key, null, null, null);
- getChannel().send(getMapMembers(), msg, getChannelSendOptions());
- }
- } catch ( ChannelException x ) {
- log.error("Unable to replicate out data for a LazyReplicatedMap.remove operation",x);
- }
- return entry!=null?entry.getValue():null;
- }
-
- public Object get(Object key) {
- MapEntry entry = (MapEntry)super.get(key);
- if (log.isTraceEnabled()) log.trace("Requesting id:"+key+" entry:"+entry);
- if ( entry == null ) return null;
- if ( !entry.isPrimary() ) {
- //if the message is not primary, we need to retrieve the latest value
- try {
- Member[] backup = null;
- MapMessage msg = null;
- if ( !entry.isBackup() ) {
- //make sure we don't retrieve from ourselves
- msg = new MapMessage(getMapContextName(), MapMessage.MSG_RETRIEVE_BACKUP, false,
- (Serializable) key, null, null, null);
- Response[] resp = getRpcChannel().send(entry.getBackupNodes(),msg, this.getRpcChannel().FIRST_REPLY, Channel.SEND_OPTIONS_DEFAULT, getRpcTimeout());
- if (resp == null || resp.length == 0) {
- //no responses
- log.warn("Unable to retrieve remote object for key:" + key);
- return null;
- }
- msg = (MapMessage) resp[0].getMessage();
- msg.deserialize(getExternalLoaders());
- backup = entry.getBackupNodes();
- if ( entry.getValue() instanceof ReplicatedMapEntry ) {
- ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue();
- val.setOwner(getMapOwner());
- }
- if ( msg.getValue()!=null ) entry.setValue(msg.getValue());
- }
- if (entry.isBackup()) {
- //select a new backup node
- backup = publishEntryInfo(key, entry.getValue());
- } else if ( entry.isProxy() ) {
- //invalidate the previous primary
- msg = new MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,backup);
- Member[] dest = getMapMembersExcl(backup);
- if ( dest!=null && dest.length >0) {
- getChannel().send(dest, msg, getChannelSendOptions());
- }
- }
-
- entry.setBackupNodes(backup);
- entry.setBackup(false);
- entry.setProxy(false);
-
-
- } catch (Exception x) {
- log.error("Unable to replicate out data for a LazyReplicatedMap.get operation", x);
- return null;
- }
- }
- if (log.isTraceEnabled()) log.trace("Requesting id:"+key+" result:"+entry.getValue());
- if ( entry.getValue() != null && entry.getValue() instanceof ReplicatedMapEntry ) {
- ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue();
- //hack, somehow this is not being set above
- val.setOwner(getMapOwner());
-
- }
- return entry.getValue();
- }
-
-
- protected void printMap(String header) {
- try {
- System.out.println("\nDEBUG MAP:"+header);
- System.out.println("Map["+ new String(mapContextName, chset) + ", Map Size:" + super.size());
- Member[] mbrs = getMapMembers();
- for ( int i=0; i<mbrs.length;i++ ) {
- System.out.println("Mbr["+(i+1)+"="+mbrs[i].getName());
- }
- Iterator i = super.entrySet().iterator();
- int cnt = 0;
-
- while (i.hasNext()) {
- Map.Entry e = (Map.Entry) i.next();
- System.out.println( (++cnt) + ". " + e.getValue());
- }
- System.out.println("EndMap]\n\n");
- }catch ( Exception ignore) {
- ignore.printStackTrace();
- }
- }
-
- /**
- * Returns true if the key has an entry in the map.
- * The entry can be a proxy or a backup entry, invoking <code>get(key)</code>
- * will make this entry primary for the group
- * @param key Object
- * @return boolean
- */
- public boolean containsKey(Object key) {
- return super.containsKey(key);
- }
-
-
- public Object put(Object key, Object value) {
- MapEntry entry = new MapEntry(key,value);
- entry.setBackup(false);
- entry.setProxy(false);
-
- Object old = null;
-
- //make sure that any old values get removed
- if ( containsKey(key) ) old = remove(key);
- try {
- Member[] backup = publishEntryInfo(key, value);
- entry.setBackupNodes(backup);
- } catch (ChannelException x) {
- log.error("Unable to replicate out data for a LazyReplicatedMap.put operation", x);
- }
- super.put(key,entry);
- return old;
- }
-
-
- /**
- * Copies all values from one map to this instance
- * @param m Map
- */
- public void putAll(Map m) {
- Iterator i = m.entrySet().iterator();
- while ( i.hasNext() ) {
- Map.Entry entry = (Map.Entry)i.next();
- put(entry.getKey(),entry.getValue());
- }
- }
-
- public void clear() {
- //only delete active keys
- Iterator keys = keySet().iterator();
- while ( keys.hasNext() ) remove(keys.next());
- }
-
- public boolean containsValue(Object value) {
- if ( value == null ) {
- return super.containsValue(value);
- } else {
- Iterator i = super.entrySet().iterator();
- while (i.hasNext()) {
- Map.Entry e = (Map.Entry) i.next();
- MapEntry entry = (MapEntry) e.getValue();
- if (entry.isPrimary() && value.equals(entry.getValue())) return true;
- }//while
- return false;
- }//end if
- }
-
- public Object clone() {
- throw new UnsupportedOperationException("This operation is not valid on a replicated map");
- }
-
- /**
- * Returns the entire contents of the map
- * Map.Entry.getValue() will return a LazyReplicatedMap.MapEntry object containing all the information
- * about the object.
- * @return Set
- */
- public Set entrySetFull() {
- return super.entrySet();
- }
-
- public Set keySetFull() {
- return super.keySet();
- }
-
- public int sizeFull() {
- return super.size();
- }
-
- public Set entrySet() {
- LinkedHashSet set = new LinkedHashSet(super.size());
- Iterator i = super.entrySet().iterator();
- while ( i.hasNext() ) {
- Map.Entry e = (Map.Entry)i.next();
- MapEntry entry = (MapEntry)e.getValue();
- if ( entry.isPrimary() ) set.add(entry);
- }
- return Collections.unmodifiableSet(set);
- }
-
- public Set keySet() {
- //todo implement
- //should only return keys where this is active.
- LinkedHashSet set = new LinkedHashSet(super.size());
- Iterator i = super.entrySet().iterator();
- while ( i.hasNext() ) {
- Map.Entry e = (Map.Entry)i.next();
- MapEntry entry = (MapEntry)e.getValue();
- if ( entry.isPrimary() ) set.add(entry.getKey());
- }
- return Collections.unmodifiableSet(set);
- }
-
-
- public int size() {
- //todo, implement a counter variable instead
- //only count active members in this node
- int counter = 0;
- Iterator it = Collections.unmodifiableSet(super.entrySet()).iterator();
- while (it.hasNext() ) {
- Map.Entry e = (Map.Entry) it.next();
- if ( e != null ) {
- MapEntry entry = (MapEntry) e.getValue();
- if (entry.isPrimary() && entry.getValue() != null) counter++;
- }
- }
- return counter;
- }
-
- protected boolean removeEldestEntry(Map.Entry eldest) {
- return false;
- }
-
- public boolean isEmpty() {
- return size()==0;
- }
-
- public Collection values() {
- ArrayList values = new ArrayList();
- Iterator i = super.entrySet().iterator();
- while ( i.hasNext() ) {
- Map.Entry e = (Map.Entry)i.next();
- MapEntry entry = (MapEntry)e.getValue();
- if ( entry.isPrimary() && entry.getValue()!=null) values.add(entry.getValue());
- }
- return Collections.unmodifiableCollection(values);
- }
-
-
-//------------------------------------------------------------------------------
-// Map Entry class
-//------------------------------------------------------------------------------
- public static class MapEntry implements Map.Entry {
- private boolean backup;
- private boolean proxy;
- private Member[] backupNodes;
-
- private Object key;
- private Object value;
-
- public MapEntry(Object key, Object value) {
- setKey(key);
- setValue(value);
-
- }
-
- public boolean isKeySerializable() {
- return (key == null) || (key instanceof Serializable);
- }
-
- public boolean isValueSerializable() {
- return (value==null) || (value instanceof Serializable);
- }
-
- public boolean isSerializable() {
- return isKeySerializable() && isValueSerializable();
- }
-
- public boolean isBackup() {
- return backup;
- }
-
- public void setBackup(boolean backup) {
- this.backup = backup;
- }
-
- public boolean isProxy() {
- return proxy;
- }
-
- public boolean isPrimary() {
- return ( (!proxy) && (!backup));
- }
-
- public void setProxy(boolean proxy) {
- this.proxy = proxy;
- }
-
- public boolean isDiffable() {
- return (value instanceof ReplicatedMapEntry) &&
- ((ReplicatedMapEntry)value).isDiffable();
- }
-
- public void setBackupNodes(Member[] nodes) {
- this.backupNodes = nodes;
- }
-
- public Member[] getBackupNodes() {
- return backupNodes;
- }
-
- public Object getValue() {
- return value;
- }
-
- public Object setValue(Object value) {
- Object old = this.value;
- this.value = (Serializable) value;
- return old;
- }
-
- public Object getKey() {
- return key;
- }
-
- public Object setKey(Object key) {
- Object old = this.key;
- this.key = (Serializable)key;
- return old;
- }
-
- public int hashCode() {
- return key.hashCode();
- }
-
- public boolean equals(Object o) {
- return key.equals(o);
- }
-
- /**
- * apply a diff, or an entire object
- * @param data byte[]
- * @param offset int
- * @param length int
- * @param diff boolean
- * @throws IOException
- * @throws ClassNotFoundException
- */
- public void apply(byte[] data, int offset, int length, boolean diff) throws IOException, ClassNotFoundException {
- if (isDiffable() && diff) {
- ReplicatedMapEntry rentry = (ReplicatedMapEntry) value;
- try {
- rentry.lock();
- rentry.applyDiff(data, offset, length);
- } finally {
- rentry.unlock();
- }
- } else if (length == 0) {
- value = null;
- proxy = true;
- } else {
- value = XByteBuffer.deserialize(data, offset, length);
- }
- }
-
- public String toString() {
- StringBuffer buf = new StringBuffer("MapEntry[key:");
- buf.append(getKey()).append("; ");
- buf.append("value:").append(getValue()).append("; ");
- buf.append("primary:").append(isPrimary()).append("; ");
- buf.append("backup:").append(isBackup()).append("; ");
- buf.append("proxy:").append(isProxy()).append(";]");
- return buf.toString();
- }
-
- }
-
-//------------------------------------------------------------------------------
-// map message to send to and from other maps
-//------------------------------------------------------------------------------
-
- public static class MapMessage implements Serializable {
- public static final int MSG_BACKUP = 1;
- public static final int MSG_RETRIEVE_BACKUP = 2;
- public static final int MSG_PROXY = 3;
- public static final int MSG_REMOVE = 4;
- public static final int MSG_STATE = 5;
- public static final int MSG_START = 6;
- public static final int MSG_STOP = 7;
- public static final int MSG_INIT = 8;
-
- private byte[] mapId;
- private int msgtype;
- private boolean diff;
- private transient Serializable key;
- private transient Serializable value;
- private byte[] valuedata;
- private byte[] keydata;
- private byte[] diffvalue;
- private Member[] nodes;
-
- public String toString() {
- StringBuffer buf = new StringBuffer("MapMessage[context=");
- buf.append(new String(mapId));
- buf.append("; type=");
- buf.append(getTypeDesc());
- buf.append("; key=");
- buf.append(key);
- buf.append("; value=");
- buf.append(value);
- return buf.toString();
- }
-
- public String getTypeDesc() {
- switch (msgtype) {
- case MSG_BACKUP: return "MSG_BACKUP";
- case MSG_RETRIEVE_BACKUP: return "MSG_RETRIEVE_BACKUP";
- case MSG_PROXY: return "MSG_PROXY";
- case MSG_REMOVE: return "MSG_REMOVE";
- case MSG_STATE: return "MSG_STATE";
- case MSG_START: return "MSG_START";
- case MSG_STOP: return "MSG_STOP";
- case MSG_INIT: return "MSG_INIT";
- default : return "UNKNOWN";
- }
- }
-
- public MapMessage() {}
-
- public MapMessage(byte[] mapId,int msgtype, boolean diff,
- Serializable key, Serializable value,
- byte[] diffvalue, Member[] nodes) {
- this.mapId = mapId;
- this.msgtype = msgtype;
- this.diff = diff;
- this.key = key;
- this.value = value;
- this.diffvalue = diffvalue;
- this.nodes = nodes;
- setValue(value);
- setKey(key);
- }
-
- public void deserialize(ClassLoader[] cls) throws IOException, ClassNotFoundException {
- key(cls);
- value(cls);
- }
-
- public int getMsgType() {
- return msgtype;
- }
-
- public boolean isDiff() {
- return diff;
- }
-
- public Serializable getKey() {
- try {
- return key(null);
- } catch ( Exception x ) {
- log.error("Deserialization error of the MapMessage.key",x);
- return null;
- }
- }
-
- public Serializable key(ClassLoader[] cls) throws IOException, ClassNotFoundException {
- if ( key!=null ) return key;
- if ( keydata == null || keydata.length == 0 ) return null;
- key = XByteBuffer.deserialize(keydata,0,keydata.length,cls);
- keydata = null;
- return key;
- }
-
- public byte[] getKeyData() {
- return keydata;
- }
-
- public Serializable getValue() {
- try {
- return value(null);
- } catch ( Exception x ) {
- log.error("Deserialization error of the MapMessage.value",x);
- return null;
- }
- }
-
- public Serializable value(ClassLoader[] cls) throws IOException, ClassNotFoundException {
- if ( value!=null ) return value;
- if ( valuedata == null || valuedata.length == 0 ) return null;
- value = XByteBuffer.deserialize(valuedata,0,valuedata.length,cls);
- valuedata = null;;
- return value;
- }
-
- public byte[] getValueData() {
- return valuedata;
- }
-
- public byte[] getDiffValue() {
- return diffvalue;
- }
-
- public Member[] getBackupNodes() {
- return nodes;
- }
-
- private void setBackUpNodes(Member[] nodes) {
- this.nodes = nodes;
- }
-
- public byte[] getMapId() {
- return mapId;
- }
-
- public void setValue(Serializable value) {
- try {
- if ( value != null ) valuedata = XByteBuffer.serialize(value);
- this.value = value;
- }catch ( IOException x ) {
- throw new RuntimeException(x);
- }
- }
-
- public void setKey(Serializable key) {
- try {
- if (key != null) keydata = XByteBuffer.serialize(key);
- this.key = key;
- } catch (IOException x) {
- throw new RuntimeException(x);
- }
- }
-
- protected Member[] readMembers(ObjectInput in) throws IOException, ClassNotFoundException {
- int nodecount = in.readInt();
- Member[] members = new Member[nodecount];
- for ( int i=0; i<members.length; i++ ) {
- byte[] d = new byte[in.readInt()];
- in.read(d);
- if (d.length > 0) members[i] = MemberImpl.getMember(d);
- }
- return members;
- }
-
- protected void writeMembers(ObjectOutput out,Member[] members) throws IOException {
- if ( members == null ) members = new Member[0];
- out.writeInt(members.length);
- for (int i=0; i<members.length; i++ ) {
- if ( members[i] != null ) {
- byte[] d = members[i] != null ? ( (MemberImpl)members[i]).getData(false) : new byte[0];
- out.writeInt(d.length);
- out.write(d);
- }
- }
- }
-
-
- /**
- * shallow clone
- * @return Object
- */
- public Object clone() {
- MapMessage msg = new MapMessage(this.mapId, this.msgtype, this.diff, this.key, this.value, this.diffvalue, this.nodes);
- msg.keydata = this.keydata;
- msg.valuedata = this.valuedata;
- return msg;
- }
- } //MapMessage
-
-
- public Channel getChannel() {
- return channel;
- }
-
- public byte[] getMapContextName() {
- return mapContextName;
- }
-
- public RpcChannel getRpcChannel() {
- return rpcChannel;
- }
-
- public long getRpcTimeout() {
- return rpcTimeout;
- }
-
- public Object getStateMutex() {
- return stateMutex;
- }
-
- public boolean isStateTransferred() {
- return stateTransferred;
- }
-
- public Object getMapOwner() {
- return mapOwner;
- }
-
- public ClassLoader[] getExternalLoaders() {
- return externalLoaders;
- }
-
- public int getChannelSendOptions() {
- return channelSendOptions;
- }
-
- public long getAccessTimeout() {
- return accessTimeout;
- }
-
- public void setMapOwner(Object mapOwner) {
- this.mapOwner = mapOwner;
- }
-
- public void setExternalLoaders(ClassLoader[] externalLoaders) {
- this.externalLoaders = externalLoaders;
- }
-
- public void setChannelSendOptions(int channelSendOptions) {
- this.channelSendOptions = channelSendOptions;
- }
-
- public void setAccessTimeout(long accessTimeout) {
- this.accessTimeout = accessTimeout;
- }
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.tipis;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelListener;
+import org.apache.catalina.tribes.Heartbeat;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.MembershipListener;
+import org.apache.catalina.tribes.group.Response;
+import org.apache.catalina.tribes.group.RpcCallback;
+import org.apache.catalina.tribes.group.RpcChannel;
+import org.apache.catalina.tribes.io.XByteBuffer;
+import org.apache.catalina.tribes.membership.MemberImpl;
+import org.apache.catalina.tribes.util.Arrays;
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
+import java.util.ConcurrentModificationException;
+
+/**
+ *
+ * @author Filip Hanik
+ * @version 1.0
+ */
+public abstract class AbstractReplicatedMap extends LinkedHashMap implements RpcCallback, ChannelListener, MembershipListener, Heartbeat {
+ protected static Log log = LogFactory.getLog(AbstractReplicatedMap.class);
+
+ /**
+ * The default initial capacity - MUST be a power of two.
+ */
+ public static final int DEFAULT_INITIAL_CAPACITY = 16;
+
+ /**
+ * The load factor used when none specified in constructor.
+ **/
+ public static final float DEFAULT_LOAD_FACTOR = 0.75f;
+
+ /**
+ * Used to identify the map
+ */
+ final String chset = "ISO-8859-1";
+
+//------------------------------------------------------------------------------
+// INSTANCE VARIABLES
+//------------------------------------------------------------------------------
+
+ /**
+ * Timeout for RPC messages, how long we will wait for a reply
+ */
+ protected transient long rpcTimeout = 5000;
+ /**
+ * Reference to the channel for sending messages
+ */
+ protected transient Channel channel;
+ /**
+ * The RpcChannel to send RPC messages through
+ */
+ protected transient RpcChannel rpcChannel;
+ /**
+ * The Map context name makes this map unique, this
+ * allows us to have more than one map shared
+ * through one channel
+ */
+ protected transient byte[] mapContextName;
+ /**
+ * Has the state been transferred
+ */
+ protected transient boolean stateTransferred = false;
+ /**
+ * Simple lock object for transfers
+ */
+ protected transient Object stateMutex = new Object();
+ /**
+ * A list of members in our map
+ */
+ protected transient HashMap mapMembers = new HashMap();
+ /**
+ * Our default send options
+ */
+ protected transient int channelSendOptions = Channel.SEND_OPTIONS_DEFAULT;
+ /**
+ * The owner of this map, ala a SessionManager for example
+ */
+ protected transient Object mapOwner;
+ /**
+ * External class loaders if serialization and deserialization is to be performed successfully.
+ */
+ protected transient ClassLoader[] externalLoaders;
+
+ /**
+ * The node we are currently backing up data to, this index will rotate
+ * on a round robin basis
+ */
+ protected transient int currentNode = 0;
+
+ /**
+ * Since the map keeps internal membership
+ * this is the timeout for a ping message to be responded to
+ * If a remote map doesn't respond within this timeframe,
+ * its considered dead.
+ */
+ protected transient long accessTimeout = 5000;
+
+ /**
+ * Readable string of the mapContextName value
+ */
+ protected transient String mapname = "";
+
+
+//------------------------------------------------------------------------------
+// CONSTRUCTORS
+//------------------------------------------------------------------------------
+
+ /**
+ * Creates a new map
+ * @param channel The channel to use for communication
+ * @param timeout long - timeout for RPC messags
+ * @param mapContextName String - unique name for this map, to allow multiple maps per channel
+ * @param initialCapacity int - the size of this map, see HashMap
+ * @param loadFactor float - load factor, see HashMap
+ * @param cls - a list of classloaders to be used for deserialization of objects.
+ */
+ public AbstractReplicatedMap(Object owner,
+ Channel channel,
+ long timeout,
+ String mapContextName,
+ int initialCapacity,
+ float loadFactor,
+ int channelSendOptions,
+ ClassLoader[] cls) {
+ super(initialCapacity, loadFactor);
+ init(owner, channel, mapContextName, timeout, channelSendOptions, cls);
+
+ }
+
+ /**
+ * Helper methods, wraps a single member in an array
+ * @param m Member
+ * @return Member[]
+ */
+ protected Member[] wrap(Member m) {
+ if ( m == null ) return new Member[0];
+ else return new Member[] {m};
+ }
+
+ /**
+ * Initializes the map by creating the RPC channel, registering itself as a channel listener
+ * This method is also responsible for initiating the state transfer
+ * @param owner Object
+ * @param channel Channel
+ * @param mapContextName String
+ * @param timeout long
+ * @param channelSendOptions int
+ * @param cls ClassLoader[]
+ */
+ protected void init(Object owner, Channel channel, String mapContextName, long timeout, int channelSendOptions,ClassLoader[] cls) {
+ log.info("Initializing AbstractReplicatedMap with context name:"+mapContextName);
+ this.mapOwner = owner;
+ this.externalLoaders = cls;
+ this.channelSendOptions = channelSendOptions;
+ this.channel = channel;
+ this.rpcTimeout = timeout;
+
+ try {
+ this.mapname = mapContextName;
+ //unique context is more efficient if it is stored as bytes
+ this.mapContextName = mapContextName.getBytes(chset);
+ } catch (UnsupportedEncodingException x) {
+ log.warn("Unable to encode mapContextName[" + mapContextName + "] using getBytes(" + chset +") using default getBytes()", x);
+ this.mapContextName = mapContextName.getBytes();
+ }
+ if ( log.isTraceEnabled() ) log.trace("Created Lazy Map with name:"+mapContextName+", bytes:"+Arrays.toString(this.mapContextName));
+
+ //create an rpc channel and add the map as a listener
+ this.rpcChannel = new RpcChannel(this.mapContextName, channel, this);
+ //add this map as a message listener
+ this.channel.addChannelListener(this);
+ //listen for membership notifications
+ this.channel.addMembershipListener(this);
+
+
+ try {
+ //broadcast our map, this just notifies other members of our existence
+ broadcast(MapMessage.MSG_INIT, true);
+ //transfer state from another map
+ transferState();
+ //state is transferred, we are ready for messaging
+ broadcast(MapMessage.MSG_START, true);
+ } catch (ChannelException x) {
+ log.warn("Unable to send map start message.");
+ throw new RuntimeException("Unable to start replicated map.",x);
+ }
+ }
+
+
+ /**
+ * Sends a ping out to all the members in the cluster, not just map members
+ * that this map is alive.
+ * @param timeout long
+ * @throws ChannelException
+ */
+ protected void ping(long timeout) throws ChannelException {
+ //send out a map membership message, only wait for the first reply
+ MapMessage msg = new MapMessage(this.mapContextName,
+ MapMessage.MSG_INIT,
+ false,
+ null,
+ null,
+ null,
+ wrap(channel.getLocalMember(false)));
+ if ( channel.getMembers().length > 0 ) {
+ //send a ping, wait for all nodes to reply
+ Response[] resp = rpcChannel.send(channel.getMembers(),
+ msg, rpcChannel.ALL_REPLY,
+ (channelSendOptions),
+ (int) accessTimeout);
+ for (int i = 0; i < resp.length; i++) {
+ memberAlive(resp[i].getSource());
+ } //for
+ }
+ //update our map of members, expire some if we didn't receive a ping back
+ synchronized (mapMembers) {
+ Iterator it = mapMembers.entrySet().iterator();
+ long now = System.currentTimeMillis();
+ while ( it.hasNext() ) {
+ Map.Entry entry = (Map.Entry)it.next();
+ long access = ((Long)entry.getValue()).longValue();
+ if ( (now - access) > timeout ) memberDisappeared((Member)entry.getKey());
+ }
+ }//synch
+ }
+
+ /**
+ * We have received a member alive notification
+ * @param member Member
+ */
+ protected void memberAlive(Member member) {
+ synchronized (mapMembers) {
+ if (!mapMembers.containsKey(member)) {
+ mapMemberAdded(member);
+ } //end if
+ mapMembers.put(member, new Long(System.currentTimeMillis()));
+ }
+ }
+
+ /**
+ * Helper method to broadcast a message to all members in a channel
+ * @param msgtype int
+ * @param rpc boolean
+ * @throws ChannelException
+ */
+ protected void broadcast(int msgtype, boolean rpc) throws ChannelException {
+ //send out a map membership message, only wait for the first reply
+ MapMessage msg = new MapMessage(this.mapContextName, msgtype,
+ false, null, null, null, wrap(channel.getLocalMember(false)));
+ if ( rpc) {
+ Response[] resp = rpcChannel.send(channel.getMembers(), msg, rpcChannel.FIRST_REPLY, (channelSendOptions),rpcTimeout);
+ for (int i = 0; i < resp.length; i++) {
+ mapMemberAdded(resp[i].getSource());
+ messageReceived(resp[i].getMessage(), resp[i].getSource());
+ }
+ } else {
+ channel.send(channel.getMembers(),msg,channelSendOptions);
+ }
+ }
+
+ public void breakdown() {
+ finalize();
+ }
+
+ public void finalize() {
+ try {broadcast(MapMessage.MSG_STOP,false); }catch ( Exception ignore){}
+ //cleanup
+ if (this.rpcChannel != null) {
+ this.rpcChannel.breakdown();
+ }
+ if (this.channel != null) {
+ this.channel.removeChannelListener(this);
+ this.channel.removeMembershipListener(this);
+ }
+ this.rpcChannel = null;
+ this.channel = null;
+ this.mapMembers.clear();
+ super.clear();
+ this.stateTransferred = false;
+ this.externalLoaders = null;
+ }
+
+ public int hashCode() {
+ return Arrays.hashCode(this.mapContextName);
+ }
+
+ public boolean equals(Object o) {
+ if ( o == null ) return false;
+ if ( !(o instanceof AbstractReplicatedMap)) return false;
+ if ( !(o.getClass().equals(this.getClass())) ) return false;
+ AbstractReplicatedMap other = (AbstractReplicatedMap)o;
+ return Arrays.equals(mapContextName,other.mapContextName);
+ }
+
+//------------------------------------------------------------------------------
+// GROUP COM INTERFACES
+//------------------------------------------------------------------------------
+ public Member[] getMapMembers(HashMap members) {
+ synchronized (members) {
+ Member[] result = new Member[members.size()];
+ members.keySet().toArray(result);
+ return result;
+ }
+ }
+ public Member[] getMapMembers() {
+ return getMapMembers(this.mapMembers);
+ }
+
+ public Member[] getMapMembersExcl(Member[] exclude) {
+ synchronized (mapMembers) {
+ HashMap list = (HashMap)mapMembers.clone();
+ for (int i=0; i<exclude.length;i++) list.remove(exclude[i]);
+ return getMapMembers(list);
+ }
+ }
+
+
+ /**
+ * Replicates any changes to the object since the last time
+ * The object has to be primary, ie, if the object is a proxy or a backup, it will not be replicated<br>
+ * @param complete - if set to true, the object is replicated to its backup
+ * if set to false, only objects that implement ReplicatedMapEntry and the isDirty() returns true will
+ * be replicated
+ */
+ public void replicate(Object key, boolean complete) {
+ if ( log.isTraceEnabled() )
+ log.trace("Replicate invoked on key:"+key);
+ MapEntry entry = (MapEntry)super.get(key);
+ if ( entry == null ) return;
+ if ( !entry.isSerializable() ) return;
+ if (entry != null && entry.isPrimary() && entry.getBackupNodes()!= null && entry.getBackupNodes().length > 0) {
+ Object value = entry.getValue();
+ //check to see if we need to replicate this object isDirty()||complete
+ boolean repl = complete || ( (value instanceof ReplicatedMapEntry) && ( (ReplicatedMapEntry) value).isDirty());
+
+ if (!repl) {
+ if ( log.isTraceEnabled() )
+ log.trace("Not replicating:"+key+", no change made");
+
+ return;
+ }
+ //check to see if the message is diffable
+ boolean diff = ( (value instanceof ReplicatedMapEntry) && ( (ReplicatedMapEntry) value).isDiffable());
+ MapMessage msg = null;
+ if (diff) {
+ ReplicatedMapEntry rentry = (ReplicatedMapEntry)entry.getValue();
+ try {
+ rentry.lock();
+ //construct a diff message
+ msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,
+ true, (Serializable) entry.getKey(), null,
+ rentry.getDiff(),
+ entry.getBackupNodes());
+ } catch (IOException x) {
+ log.error("Unable to diff object. Will replicate the entire object instead.", x);
+ } finally {
+ rentry.unlock();
+ }
+
+ }
+ if (msg == null) {
+ //construct a complete
+ msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,
+ false, (Serializable) entry.getKey(),
+ (Serializable) entry.getValue(),
+ null, entry.getBackupNodes());
+
+ }
+ try {
+ if ( channel!=null && entry.getBackupNodes()!= null && entry.getBackupNodes().length > 0 ) {
+ channel.send(entry.getBackupNodes(), msg, channelSendOptions);
+ }
+ } catch (ChannelException x) {
+ log.error("Unable to replicate data.", x);
+ }
+ } //end if
+
+ }
+
+ /**
+ * This can be invoked by a periodic thread to replicate out any changes.
+ * For maps that don't store objects that implement ReplicatedMapEntry, this
+ * method should be used infrequently to avoid large amounts of data transfer
+ * @param complete boolean
+ */
+ public void replicate(boolean complete) {
+ Iterator i = super.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry e = (Map.Entry) i.next();
+ replicate(e.getKey(), complete);
+ } //while
+
+ }
+
+ public void transferState() {
+ try {
+ Member[] members = getMapMembers();
+ Member backup = members.length > 0 ? (Member) members[0] : null;
+ if (backup != null) {
+ MapMessage msg = new MapMessage(mapContextName, MapMessage.MSG_STATE, false,
+ null, null, null, null);
+ Response[] resp = rpcChannel.send(new Member[] {backup}, msg, rpcChannel.FIRST_REPLY, channelSendOptions, rpcTimeout);
+ if (resp.length > 0) {
+ synchronized (stateMutex) {
+ msg = (MapMessage) resp[0].getMessage();
+ msg.deserialize(getExternalLoaders());
+ ArrayList list = (ArrayList) msg.getValue();
+ for (int i = 0; i < list.size(); i++) {
+ messageReceived( (Serializable) list.get(i), resp[0].getSource());
+ } //for
+ }
+ } else {
+ log.warn("Transfer state, 0 replies, probably a timeout.");
+ }
+ }
+ } catch (ChannelException x) {
+ log.error("Unable to transfer LazyReplicatedMap state.", x);
+ } catch (IOException x) {
+ log.error("Unable to transfer LazyReplicatedMap state.", x);
+ } catch (ClassNotFoundException x) {
+ log.error("Unable to transfer LazyReplicatedMap state.", x);
+ }
+ stateTransferred = true;
+ }
+
+ /**
+ * @todo implement state transfer
+ * @param msg Serializable
+ * @return Serializable - null if no reply should be sent
+ */
+ public Serializable replyRequest(Serializable msg, final Member sender) {
+ if (! (msg instanceof MapMessage))return null;
+ MapMessage mapmsg = (MapMessage) msg;
+
+ //map init request
+ if (mapmsg.getMsgType() == mapmsg.MSG_INIT) {
+ mapmsg.setBackUpNodes(wrap(channel.getLocalMember(false)));
+ return mapmsg;
+ }
+
+ //map start request
+ if (mapmsg.getMsgType() == mapmsg.MSG_START) {
+ mapmsg.setBackUpNodes(wrap(channel.getLocalMember(false)));
+ mapMemberAdded(sender);
+ return mapmsg;
+ }
+
+ //backup request
+ if (mapmsg.getMsgType() == mapmsg.MSG_RETRIEVE_BACKUP) {
+ MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
+ if (entry == null || (!entry.isSerializable()) )return null;
+ mapmsg.setValue( (Serializable) entry.getValue());
+ return mapmsg;
+ }
+
+ //state transfer request
+ if (mapmsg.getMsgType() == mapmsg.MSG_STATE) {
+ synchronized (stateMutex) { //make sure we dont do two things at the same time
+ ArrayList list = new ArrayList();
+ Iterator i = super.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry e = (Map.Entry) i.next();
+ MapEntry entry = (MapEntry) e.getValue();
+ if ( entry.isSerializable() ) {
+ MapMessage me = new MapMessage(mapContextName, MapMessage.MSG_PROXY,
+ false, (Serializable) entry.getKey(), null, null, entry.getBackupNodes());
+ list.add(me);
+ }
+ }
+ mapmsg.setValue(list);
+ return mapmsg;
+
+ } //synchronized
+ }
+
+ return null;
+
+ }
+
+ /**
+ * If the reply has already been sent to the requesting thread,
+ * the rpc callback can handle any data that comes in after the fact.
+ * @param msg Serializable
+ * @param sender Member
+ */
+ public void leftOver(Serializable msg, Member sender) {
+ //left over membership messages
+ if (! (msg instanceof MapMessage))return;
+
+ MapMessage mapmsg = (MapMessage) msg;
+ try {
+ mapmsg.deserialize(getExternalLoaders());
+ if (mapmsg.getMsgType() == MapMessage.MSG_START) {
+ mapMemberAdded(mapmsg.getBackupNodes()[0]);
+ } else if (mapmsg.getMsgType() == MapMessage.MSG_INIT) {
+ memberAlive(mapmsg.getBackupNodes()[0]);
+ }
+ } catch (IOException x ) {
+ log.error("Unable to deserialize MapMessage.",x);
+ } catch (ClassNotFoundException x ) {
+ log.error("Unable to deserialize MapMessage.",x);
+ }
+ }
+
+ public void messageReceived(Serializable msg, Member sender) {
+ if (! (msg instanceof MapMessage)) return;
+
+ MapMessage mapmsg = (MapMessage) msg;
+ if ( log.isTraceEnabled() ) {
+ log.trace("Map["+mapname+"] received message:"+mapmsg);
+ }
+
+ try {
+ mapmsg.deserialize(getExternalLoaders());
+ } catch (IOException x) {
+ log.error("Unable to deserialize MapMessage.", x);
+ return;
+ } catch (ClassNotFoundException x) {
+ log.error("Unable to deserialize MapMessage.", x);
+ return;
+ }
+ if ( log.isTraceEnabled() )
+ log.trace("Map message received from:"+sender.getName()+" msg:"+mapmsg);
+ if (mapmsg.getMsgType() == MapMessage.MSG_START) {
+ mapMemberAdded(mapmsg.getBackupNodes()[0]);
+ }
+
+ if (mapmsg.getMsgType() == MapMessage.MSG_STOP) {
+ memberDisappeared(mapmsg.getBackupNodes()[0]);
+ }
+
+ if (mapmsg.getMsgType() == MapMessage.MSG_PROXY) {
+ MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
+ if ( entry==null ) {
+ entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue());
+ entry.setBackup(false);
+ entry.setProxy(true);
+ entry.setBackupNodes(mapmsg.getBackupNodes());
+ super.put(entry.getKey(), entry);
+ } else {
+ entry.setProxy(true);
+ entry.setBackup(false);
+ entry.setBackupNodes(mapmsg.getBackupNodes());
+ }
+ }
+
+ if (mapmsg.getMsgType() == MapMessage.MSG_REMOVE) {
+ super.remove(mapmsg.getKey());
+ }
+
+ if (mapmsg.getMsgType() == MapMessage.MSG_BACKUP) {
+ MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
+ if (entry == null) {
+ entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue());
+ entry.setBackup(true);
+ entry.setProxy(false);
+ entry.setBackupNodes(mapmsg.getBackupNodes());
+ if (mapmsg.getValue()!=null && mapmsg.getValue() instanceof ReplicatedMapEntry ) {
+ ((ReplicatedMapEntry)mapmsg.getValue()).setOwner(getMapOwner());
+ }
+ } else {
+ entry.setBackup(true);
+ entry.setProxy(false);
+ entry.setBackupNodes(mapmsg.getBackupNodes());
+ if (entry.getValue() instanceof ReplicatedMapEntry) {
+ ReplicatedMapEntry diff = (ReplicatedMapEntry) entry.getValue();
+ if (mapmsg.isDiff()) {
+ try {
+ diff.lock();
+ diff.applyDiff(mapmsg.getDiffValue(), 0, mapmsg.getDiffValue().length);
+ } catch (Exception x) {
+ log.error("Unable to apply diff to key:" + entry.getKey(), x);
+ } finally {
+ diff.unlock();
+ }
+ } else {
+ if ( mapmsg.getValue()!=null ) entry.setValue(mapmsg.getValue());
+ ((ReplicatedMapEntry)entry.getValue()).setOwner(getMapOwner());
+ } //end if
+ } else if (mapmsg.getValue() instanceof ReplicatedMapEntry) {
+ ReplicatedMapEntry re = (ReplicatedMapEntry)mapmsg.getValue();
+ re.setOwner(getMapOwner());
+ entry.setValue(re);
+ } else {
+ if ( mapmsg.getValue()!=null ) entry.setValue(mapmsg.getValue());
+ } //end if
+ } //end if
+ super.put(entry.getKey(), entry);
+ } //end if
+ }
+
+ public boolean accept(Serializable msg, Member sender) {
+ boolean result = false;
+ if (msg instanceof MapMessage) {
+ if ( log.isTraceEnabled() ) log.trace("Map["+mapname+"] accepting...."+msg);
+ result = Arrays.equals(mapContextName, ( (MapMessage) msg).getMapId());
+ if ( log.isTraceEnabled() ) log.trace("Msg["+mapname+"] accepted["+result+"]...."+msg);
+ }
+ return result;
+ }
+
+ public void mapMemberAdded(Member member) {
+ if ( member.equals(getChannel().getLocalMember(false)) ) return;
+ boolean memberAdded = false;
+ //select a backup node if we don't have one
+ synchronized (mapMembers) {
+ if (!mapMembers.containsKey(member) ) {
+ mapMembers.put(member, new Long(System.currentTimeMillis()));
+ memberAdded = true;
+ }
+ }
+ if ( memberAdded ) {
+ synchronized (stateMutex) {
+ Iterator i = super.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry e = (Map.Entry) i.next();
+ MapEntry entry = (MapEntry) e.getValue();
+ if ( entry == null ) continue;
+ if (entry.isPrimary() && (entry.getBackupNodes() == null || entry.getBackupNodes().length == 0)) {
+ try {
+ Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
+ entry.setBackupNodes(backup);
+ } catch (ChannelException x) {
+ log.error("Unable to select backup node.", x);
+ } //catch
+ } //end if
+ } //while
+ } //synchronized
+ }//end if
+ }
+
+ public boolean inSet(Member m, Member[] set) {
+ if ( set == null ) return false;
+ boolean result = false;
+ for (int i=0; i<set.length && (!result); i++ )
+ if ( m.equals(set[i]) ) result = true;
+ return result;
+ }
+
+ public Member[] excludeFromSet(Member[] mbrs, Member[] set) {
+ ArrayList result = new ArrayList();
+ for (int i=0; i<set.length; i++ ) {
+ boolean include = true;
+ for (int j=0; j<mbrs.length; j++ )
+ if ( mbrs[j].equals(set[i]) ) include = false;
+ if ( include ) result.add(set[i]);
+ }
+ return (Member[])result.toArray(new Member[result.size()]);
+ }
+
+ public void memberAdded(Member member) {
+ //do nothing
+ }
+
+ public void memberDisappeared(Member member) {
+ boolean removed = false;
+ synchronized (mapMembers) {
+ removed = (mapMembers.remove(member) != null );
+ }
+ Iterator i = super.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry e = (Map.Entry) i.next();
+ MapEntry entry = (MapEntry) e.getValue();
+ if (entry.isPrimary() && inSet(member,entry.getBackupNodes())) {
+ try {
+ Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
+ entry.setBackupNodes(backup);
+ } catch (ChannelException x) {
+ log.error("Unable to relocate[" + entry.getKey() + "] to a new backup node", x);
+ }
+ } //end if
+ } //while
+ }
+
+ public int getNextBackupIndex() {
+ int size = mapMembers.size();
+ if (mapMembers.size() == 0)return -1;
+ int node = currentNode++;
+ if (node >= size) {
+ node = 0;
+ currentNode = 0;
+ }
+ return node;
+ }
+ public Member getNextBackupNode() {
+ Member[] members = getMapMembers();
+ int node = getNextBackupIndex();
+ if ( members.length == 0 || node==-1) return null;
+ if ( node >= members.length ) node = 0;
+ return members[node];
+ }
+
+ protected abstract Member[] publishEntryInfo(Object key, Object value) throws ChannelException;
+
+ public void heartbeat() {
+ try {
+ ping(accessTimeout);
+ }catch ( Exception x ) {
+ log.error("Unable to send AbstractReplicatedMap.ping message",x);
+ }
+ }
+
+//------------------------------------------------------------------------------
+// METHODS TO OVERRIDE
+//------------------------------------------------------------------------------
+
+ /**
+ * Removes an object from this map, it will also remove it from
+ *
+ * @param key Object
+ * @return Object
+ */
+ public Object remove(Object key) {
+ MapEntry entry = (MapEntry)super.remove(key);
+
+ try {
+ if (getMapMembers().length > 0 ) {
+ MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_REMOVE, false, (Serializable) key, null, null, null);
+ getChannel().send(getMapMembers(), msg, getChannelSendOptions());
+ }
+ } catch ( ChannelException x ) {
+ log.error("Unable to replicate out data for a LazyReplicatedMap.remove operation",x);
+ }
+ return entry!=null?entry.getValue():null;
+ }
+
+ public Object get(Object key) {
+ MapEntry entry = (MapEntry)super.get(key);
+ if (log.isTraceEnabled()) log.trace("Requesting id:"+key+" entry:"+entry);
+ if ( entry == null ) return null;
+ if ( !entry.isPrimary() ) {
+ //if the message is not primary, we need to retrieve the latest value
+ try {
+ Member[] backup = null;
+ MapMessage msg = null;
+ if ( !entry.isBackup() ) {
+ //make sure we don't retrieve from ourselves
+ msg = new MapMessage(getMapContextName(), MapMessage.MSG_RETRIEVE_BACKUP, false,
+ (Serializable) key, null, null, null);
+ Response[] resp = getRpcChannel().send(entry.getBackupNodes(),msg, this.getRpcChannel().FIRST_REPLY, Channel.SEND_OPTIONS_DEFAULT, getRpcTimeout());
+ if (resp == null || resp.length == 0) {
+ //no responses
+ log.warn("Unable to retrieve remote object for key:" + key);
+ return null;
+ }
+ msg = (MapMessage) resp[0].getMessage();
+ msg.deserialize(getExternalLoaders());
+ backup = entry.getBackupNodes();
+ if ( entry.getValue() instanceof ReplicatedMapEntry ) {
+ ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue();
+ val.setOwner(getMapOwner());
+ }
+ if ( msg.getValue()!=null ) entry.setValue(msg.getValue());
+ }
+ if (entry.isBackup()) {
+ //select a new backup node
+ backup = publishEntryInfo(key, entry.getValue());
+ } else if ( entry.isProxy() ) {
+ //invalidate the previous primary
+ msg = new MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,backup);
+ Member[] dest = getMapMembersExcl(backup);
+ if ( dest!=null && dest.length >0) {
+ getChannel().send(dest, msg, getChannelSendOptions());
+ }
+ }
+
+ entry.setBackupNodes(backup);
+ entry.setBackup(false);
+ entry.setProxy(false);
+
+
+ } catch (Exception x) {
+ log.error("Unable to replicate out data for a LazyReplicatedMap.get operation", x);
+ return null;
+ }
+ }
+ if (log.isTraceEnabled()) log.trace("Requesting id:"+key+" result:"+entry.getValue());
+ if ( entry.getValue() != null && entry.getValue() instanceof ReplicatedMapEntry ) {
+ ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue();
+ //hack, somehow this is not being set above
+ val.setOwner(getMapOwner());
+
+ }
+ return entry.getValue();
+ }
+
+
+ protected void printMap(String header) {
+ try {
+ System.out.println("\nDEBUG MAP:"+header);
+ System.out.println("Map["+ new String(mapContextName, chset) + ", Map Size:" + super.size());
+ Member[] mbrs = getMapMembers();
+ for ( int i=0; i<mbrs.length;i++ ) {
+ System.out.println("Mbr["+(i+1)+"="+mbrs[i].getName());
+ }
+ Iterator i = super.entrySet().iterator();
+ int cnt = 0;
+
+ while (i.hasNext()) {
+ Map.Entry e = (Map.Entry) i.next();
+ System.out.println( (++cnt) + ". " + e.getValue());
+ }
+ System.out.println("EndMap]\n\n");
+ }catch ( Exception ignore) {
+ ignore.printStackTrace();
+ }
+ }
+
+ /**
+ * Returns true if the key has an entry in the map.
+ * The entry can be a proxy or a backup entry, invoking <code>get(key)</code>
+ * will make this entry primary for the group
+ * @param key Object
+ * @return boolean
+ */
+ public boolean containsKey(Object key) {
+ return super.containsKey(key);
+ }
+
+
+ public Object put(Object key, Object value) {
+ MapEntry entry = new MapEntry(key,value);
+ entry.setBackup(false);
+ entry.setProxy(false);
+
+ Object old = null;
+
+ //make sure that any old values get removed
+ if ( containsKey(key) ) old = remove(key);
+ try {
+ Member[] backup = publishEntryInfo(key, value);
+ entry.setBackupNodes(backup);
+ } catch (ChannelException x) {
[... 499 lines stripped ...]
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org