You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2006/10/24 05:18:02 UTC

svn commit: r467222 [23/31] - in /tomcat/tc6.0.x/trunk/java: javax/annotation/ javax/annotation/security/ javax/ejb/ javax/el/ javax/mail/ javax/mail/internet/ javax/persistence/ javax/servlet/ javax/servlet/http/ javax/servlet/jsp/ javax/servlet/jsp/e...

Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?view=diff&rev=467222&r1=467221&r2=467222
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java Mon Oct 23 20:17:11 2006
@@ -1,1368 +1,1368 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.catalina.tribes.tipis;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.catalina.tribes.Channel;
-import org.apache.catalina.tribes.ChannelException;
-import org.apache.catalina.tribes.ChannelListener;
-import org.apache.catalina.tribes.Heartbeat;
-import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.MembershipListener;
-import org.apache.catalina.tribes.group.Response;
-import org.apache.catalina.tribes.group.RpcCallback;
-import org.apache.catalina.tribes.group.RpcChannel;
-import org.apache.catalina.tribes.io.XByteBuffer;
-import org.apache.catalina.tribes.membership.MemberImpl;
-import org.apache.catalina.tribes.util.Arrays;
-import org.apache.juli.logging.Log;
-import org.apache.juli.logging.LogFactory;
-import java.util.ConcurrentModificationException;
-
-/**
- *
- * @author Filip Hanik
- * @version 1.0
- */
-public abstract class AbstractReplicatedMap extends LinkedHashMap implements RpcCallback, ChannelListener, MembershipListener, Heartbeat {
-    protected static Log log = LogFactory.getLog(AbstractReplicatedMap.class);
-
-    /**
-     * The default initial capacity - MUST be a power of two.
-     */
-    public static final int DEFAULT_INITIAL_CAPACITY = 16;
-
-    /**
-     * The load factor used when none specified in constructor.
-     **/
-    public static final float DEFAULT_LOAD_FACTOR = 0.75f;
-    
-    /**
-     * Used to identify the map
-     */
-    final String chset = "ISO-8859-1";
-
-//------------------------------------------------------------------------------
-//              INSTANCE VARIABLES
-//------------------------------------------------------------------------------
-    
-    /**
-     * Timeout for RPC messages, how long we will wait for a reply
-     */
-    protected transient long rpcTimeout = 5000;
-    /**
-     * Reference to the channel for sending messages
-     */
-    protected transient Channel channel;
-    /**
-     * The RpcChannel to send RPC messages through
-     */
-    protected transient RpcChannel rpcChannel;
-    /**
-     * The Map context name makes this map unique, this
-     * allows us to have more than one map shared
-     * through one channel
-     */
-    protected transient byte[] mapContextName;
-    /**
-     * Has the state been transferred
-     */
-    protected transient boolean stateTransferred = false;
-    /**
-     * Simple lock object for transfers
-     */
-    protected transient Object stateMutex = new Object();
-    /**
-     * A list of members in our map
-     */
-    protected transient HashMap mapMembers = new HashMap();
-    /**
-     * Our default send options
-     */
-    protected transient int channelSendOptions = Channel.SEND_OPTIONS_DEFAULT;
-    /**
-     * The owner of this map, ala a SessionManager for example
-     */
-    protected transient Object mapOwner;
-    /**
-     * External class loaders if serialization and deserialization is to be performed successfully.
-     */
-    protected transient ClassLoader[] externalLoaders;
-    
-    /**
-     * The node we are currently backing up data to, this index will rotate
-     * on a round robin basis
-     */
-    protected transient int currentNode = 0;
-    
-    /**
-     * Since the map keeps internal membership
-     * this is the timeout for a ping message to be responded to
-     * If a remote map doesn't respond within this timeframe, 
-     * its considered dead.
-     */
-    protected transient long accessTimeout = 5000;
-    
-    /**
-     * Readable string of the mapContextName value
-     */
-    protected transient String mapname = "";
-    
-
-//------------------------------------------------------------------------------
-//              CONSTRUCTORS
-//------------------------------------------------------------------------------
-
-    /**
-     * Creates a new map
-     * @param channel The channel to use for communication
-     * @param timeout long - timeout for RPC messags
-     * @param mapContextName String - unique name for this map, to allow multiple maps per channel
-     * @param initialCapacity int - the size of this map, see HashMap
-     * @param loadFactor float - load factor, see HashMap
-     * @param cls - a list of classloaders to be used for deserialization of objects.
-     */
-    public AbstractReplicatedMap(Object owner,
-                                 Channel channel, 
-                                 long timeout, 
-                                 String mapContextName, 
-                                 int initialCapacity,
-                                 float loadFactor,
-                                 int channelSendOptions,
-                                 ClassLoader[] cls) {
-        super(initialCapacity, loadFactor);
-        init(owner, channel, mapContextName, timeout, channelSendOptions, cls);
-        
-    }
-
-    /**
-     * Helper methods, wraps a single member in an array
-     * @param m Member
-     * @return Member[]
-     */
-    protected Member[] wrap(Member m) {
-        if ( m == null ) return new Member[0];
-        else return new Member[] {m};
-    }
-
-    /**
-     * Initializes the map by creating the RPC channel, registering itself as a channel listener
-     * This method is also responsible for initiating the state transfer
-     * @param owner Object
-     * @param channel Channel
-     * @param mapContextName String
-     * @param timeout long
-     * @param channelSendOptions int
-     * @param cls ClassLoader[]
-     */
-    protected void init(Object owner, Channel channel, String mapContextName, long timeout, int channelSendOptions,ClassLoader[] cls) {
-        log.info("Initializing AbstractReplicatedMap with context name:"+mapContextName);
-        this.mapOwner = owner;
-        this.externalLoaders = cls;
-        this.channelSendOptions = channelSendOptions;
-        this.channel = channel;
-        this.rpcTimeout = timeout;
-
-        try {
-            this.mapname = mapContextName;
-            //unique context is more efficient if it is stored as bytes
-            this.mapContextName = mapContextName.getBytes(chset);
-        } catch (UnsupportedEncodingException x) {
-            log.warn("Unable to encode mapContextName[" + mapContextName + "] using getBytes(" + chset +") using default getBytes()", x);
-            this.mapContextName = mapContextName.getBytes();
-        }
-        if ( log.isTraceEnabled() ) log.trace("Created Lazy Map with name:"+mapContextName+", bytes:"+Arrays.toString(this.mapContextName));
-
-        //create an rpc channel and add the map as a listener
-        this.rpcChannel = new RpcChannel(this.mapContextName, channel, this);
-        //add this map as a message listener
-        this.channel.addChannelListener(this);
-        //listen for membership notifications
-        this.channel.addMembershipListener(this);
-        
-        
-        try {
-            //broadcast our map, this just notifies other members of our existence
-            broadcast(MapMessage.MSG_INIT, true);
-            //transfer state from another map
-            transferState();
-            //state is transferred, we are ready for messaging
-            broadcast(MapMessage.MSG_START, true);
-        } catch (ChannelException x) {
-            log.warn("Unable to send map start message.");
-            throw new RuntimeException("Unable to start replicated map.",x);
-        }
-    }
-    
-    
-    /**
-     * Sends a ping out to all the members in the cluster, not just map members
-     * that this map is alive.
-     * @param timeout long
-     * @throws ChannelException
-     */
-    protected void ping(long timeout) throws ChannelException {
-        //send out a map membership message, only wait for the first reply
-        MapMessage msg = new MapMessage(this.mapContextName, 
-                                        MapMessage.MSG_INIT,
-                                        false, 
-                                        null, 
-                                        null, 
-                                        null, 
-                                        wrap(channel.getLocalMember(false)));
-        if ( channel.getMembers().length > 0 ) {
-            //send a ping, wait for all nodes to reply
-            Response[] resp = rpcChannel.send(channel.getMembers(), 
-                                              msg, rpcChannel.ALL_REPLY, 
-                                              (channelSendOptions),
-                                              (int) accessTimeout);
-            for (int i = 0; i < resp.length; i++) {
-                memberAlive(resp[i].getSource());
-            } //for
-        }
-        //update our map of members, expire some if we didn't receive a ping back
-        synchronized (mapMembers) {
-            Iterator it = mapMembers.entrySet().iterator();
-            long now = System.currentTimeMillis();
-            while ( it.hasNext() ) {
-                Map.Entry entry = (Map.Entry)it.next();
-                long access = ((Long)entry.getValue()).longValue(); 
-                if ( (now - access) > timeout ) memberDisappeared((Member)entry.getKey());
-            }
-        }//synch
-    }
-
-    /**
-     * We have received a member alive notification
-     * @param member Member
-     */
-    protected void memberAlive(Member member) {
-        synchronized (mapMembers) {
-            if (!mapMembers.containsKey(member)) {
-                mapMemberAdded(member);
-            } //end if
-            mapMembers.put(member, new Long(System.currentTimeMillis()));
-        }
-    }
-    
-    /**
-     * Helper method to broadcast a message to all members in a channel
-     * @param msgtype int
-     * @param rpc boolean
-     * @throws ChannelException
-     */
-    protected void broadcast(int msgtype, boolean rpc) throws ChannelException {
-        //send out a map membership message, only wait for the first reply
-        MapMessage msg = new MapMessage(this.mapContextName, msgtype,
-                                        false, null, null, null, wrap(channel.getLocalMember(false)));
-        if ( rpc) {
-            Response[] resp = rpcChannel.send(channel.getMembers(), msg, rpcChannel.FIRST_REPLY, (channelSendOptions),rpcTimeout);
-            for (int i = 0; i < resp.length; i++) {
-                mapMemberAdded(resp[i].getSource());
-                messageReceived(resp[i].getMessage(), resp[i].getSource());
-            }
-        } else {
-            channel.send(channel.getMembers(),msg,channelSendOptions);
-        }
-    }
-
-    public void breakdown() {
-        finalize();
-    }
-
-    public void finalize() {
-        try {broadcast(MapMessage.MSG_STOP,false); }catch ( Exception ignore){}
-        //cleanup
-        if (this.rpcChannel != null) {
-            this.rpcChannel.breakdown();
-        }
-        if (this.channel != null) {
-            this.channel.removeChannelListener(this);
-            this.channel.removeMembershipListener(this);
-        }
-        this.rpcChannel = null;
-        this.channel = null;
-        this.mapMembers.clear();
-        super.clear();
-        this.stateTransferred = false;
-        this.externalLoaders = null;
-    }
-    
-    public int hashCode() {
-        return Arrays.hashCode(this.mapContextName);
-    }
-    
-    public boolean equals(Object o) {
-        if ( o == null ) return false;
-        if ( !(o instanceof AbstractReplicatedMap)) return false;
-        if ( !(o.getClass().equals(this.getClass())) ) return false;
-        AbstractReplicatedMap other = (AbstractReplicatedMap)o;
-        return Arrays.equals(mapContextName,other.mapContextName);
-    }
-
-//------------------------------------------------------------------------------
-//              GROUP COM INTERFACES
-//------------------------------------------------------------------------------
-    public Member[] getMapMembers(HashMap members) {
-        synchronized (members) {
-            Member[] result = new Member[members.size()];
-            members.keySet().toArray(result);
-            return result;
-        }
-    }
-    public Member[] getMapMembers() {
-        return getMapMembers(this.mapMembers);
-    }
-    
-    public Member[] getMapMembersExcl(Member[] exclude) {
-        synchronized (mapMembers) {
-            HashMap list = (HashMap)mapMembers.clone();
-            for (int i=0; i<exclude.length;i++) list.remove(exclude[i]);
-            return getMapMembers(list);
-        }
-    }
-
-
-    /**
-     * Replicates any changes to the object since the last time
-     * The object has to be primary, ie, if the object is a proxy or a backup, it will not be replicated<br>
-     * @param complete - if set to true, the object is replicated to its backup
-     * if set to false, only objects that implement ReplicatedMapEntry and the isDirty() returns true will
-     * be replicated
-     */
-    public void replicate(Object key, boolean complete) {
-        if ( log.isTraceEnabled() )
-            log.trace("Replicate invoked on key:"+key);
-        MapEntry entry = (MapEntry)super.get(key);
-        if ( entry == null ) return;
-        if ( !entry.isSerializable() ) return;
-        if (entry != null && entry.isPrimary() && entry.getBackupNodes()!= null && entry.getBackupNodes().length > 0) {
-            Object value = entry.getValue();
-            //check to see if we need to replicate this object isDirty()||complete
-            boolean repl = complete || ( (value instanceof ReplicatedMapEntry) && ( (ReplicatedMapEntry) value).isDirty());
-            
-            if (!repl) {
-                if ( log.isTraceEnabled() )
-                    log.trace("Not replicating:"+key+", no change made");
-                
-                return;
-            }
-            //check to see if the message is diffable
-            boolean diff = ( (value instanceof ReplicatedMapEntry) && ( (ReplicatedMapEntry) value).isDiffable());
-            MapMessage msg = null;
-            if (diff) {
-                ReplicatedMapEntry rentry = (ReplicatedMapEntry)entry.getValue();
-                try {
-                    rentry.lock();
-                    //construct a diff message
-                    msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,
-                                         true, (Serializable) entry.getKey(), null,
-                                         rentry.getDiff(),
-                                         entry.getBackupNodes());
-                } catch (IOException x) {
-                    log.error("Unable to diff object. Will replicate the entire object instead.", x);
-                } finally {
-                    rentry.unlock();
-                }
-                
-            }
-            if (msg == null) {
-                //construct a complete
-                msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,
-                                     false, (Serializable) entry.getKey(),
-                                     (Serializable) entry.getValue(),
-                                     null, entry.getBackupNodes());
-
-            }
-            try {
-                if ( channel!=null && entry.getBackupNodes()!= null && entry.getBackupNodes().length > 0 ) {
-                    channel.send(entry.getBackupNodes(), msg, channelSendOptions);
-                }
-            } catch (ChannelException x) {
-                log.error("Unable to replicate data.", x);
-            }
-        } //end if
-
-    }
-
-    /**
-     * This can be invoked by a periodic thread to replicate out any changes.
-     * For maps that don't store objects that implement ReplicatedMapEntry, this
-     * method should be used infrequently to avoid large amounts of data transfer
-     * @param complete boolean
-     */
-    public void replicate(boolean complete) {
-        Iterator i = super.entrySet().iterator();
-        while (i.hasNext()) {
-            Map.Entry e = (Map.Entry) i.next();
-            replicate(e.getKey(), complete);
-        } //while
-
-    }
-
-    public void transferState() {
-        try {
-            Member[] members = getMapMembers();
-            Member backup = members.length > 0 ? (Member) members[0] : null;
-            if (backup != null) {
-                MapMessage msg = new MapMessage(mapContextName, MapMessage.MSG_STATE, false,
-                                                null, null, null, null);
-                Response[] resp = rpcChannel.send(new Member[] {backup}, msg, rpcChannel.FIRST_REPLY, channelSendOptions, rpcTimeout);
-                if (resp.length > 0) {
-                    synchronized (stateMutex) {
-                        msg = (MapMessage) resp[0].getMessage();
-                        msg.deserialize(getExternalLoaders());
-                        ArrayList list = (ArrayList) msg.getValue();
-                        for (int i = 0; i < list.size(); i++) {
-                            messageReceived( (Serializable) list.get(i), resp[0].getSource());
-                        } //for
-                    }
-                } else {
-                    log.warn("Transfer state, 0 replies, probably a timeout.");
-                }
-            }
-        } catch (ChannelException x) {
-            log.error("Unable to transfer LazyReplicatedMap state.", x);
-        } catch (IOException x) {
-            log.error("Unable to transfer LazyReplicatedMap state.", x);
-        } catch (ClassNotFoundException x) {
-            log.error("Unable to transfer LazyReplicatedMap state.", x);
-        }
-        stateTransferred = true;
-    }
-
-    /**
-     * @todo implement state transfer
-     * @param msg Serializable
-     * @return Serializable - null if no reply should be sent
-     */
-    public Serializable replyRequest(Serializable msg, final Member sender) {
-        if (! (msg instanceof MapMessage))return null;
-        MapMessage mapmsg = (MapMessage) msg;
-
-        //map init request
-        if (mapmsg.getMsgType() == mapmsg.MSG_INIT) {
-            mapmsg.setBackUpNodes(wrap(channel.getLocalMember(false)));
-            return mapmsg;
-        }
-        
-        //map start request
-        if (mapmsg.getMsgType() == mapmsg.MSG_START) {
-            mapmsg.setBackUpNodes(wrap(channel.getLocalMember(false)));
-            mapMemberAdded(sender);
-            return mapmsg;
-        }
-
-        //backup request
-        if (mapmsg.getMsgType() == mapmsg.MSG_RETRIEVE_BACKUP) {
-            MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
-            if (entry == null || (!entry.isSerializable()) )return null;
-            mapmsg.setValue( (Serializable) entry.getValue());
-            return mapmsg;
-        }
-
-        //state transfer request
-        if (mapmsg.getMsgType() == mapmsg.MSG_STATE) {
-            synchronized (stateMutex) { //make sure we dont do two things at the same time
-                ArrayList list = new ArrayList();
-                Iterator i = super.entrySet().iterator();
-                while (i.hasNext()) {
-                    Map.Entry e = (Map.Entry) i.next();
-                    MapEntry entry = (MapEntry) e.getValue();
-                    if ( entry.isSerializable() ) {
-                        MapMessage me = new MapMessage(mapContextName, MapMessage.MSG_PROXY,
-                            false, (Serializable) entry.getKey(), null, null, entry.getBackupNodes());
-                        list.add(me);
-                    }
-                }
-                mapmsg.setValue(list);
-                return mapmsg;
-                
-            } //synchronized
-        }
-
-        return null;
-
-    }
-
-    /**
-     * If the reply has already been sent to the requesting thread,
-     * the rpc callback can handle any data that comes in after the fact.
-     * @param msg Serializable
-     * @param sender Member
-     */
-    public void leftOver(Serializable msg, Member sender) {
-        //left over membership messages
-        if (! (msg instanceof MapMessage))return;
-
-        MapMessage mapmsg = (MapMessage) msg;
-        try {
-            mapmsg.deserialize(getExternalLoaders());
-            if (mapmsg.getMsgType() == MapMessage.MSG_START) {
-                mapMemberAdded(mapmsg.getBackupNodes()[0]);
-            } else if (mapmsg.getMsgType() == MapMessage.MSG_INIT) {
-                memberAlive(mapmsg.getBackupNodes()[0]);
-            }
-        } catch (IOException x ) {
-            log.error("Unable to deserialize MapMessage.",x);
-        } catch (ClassNotFoundException x ) {
-            log.error("Unable to deserialize MapMessage.",x);
-        }
-    }
-
-    public void messageReceived(Serializable msg, Member sender) {
-        if (! (msg instanceof MapMessage)) return;
-
-        MapMessage mapmsg = (MapMessage) msg;
-        if ( log.isTraceEnabled() ) {
-            log.trace("Map["+mapname+"] received message:"+mapmsg);
-        }
-        
-        try {
-            mapmsg.deserialize(getExternalLoaders());
-        } catch (IOException x) {
-            log.error("Unable to deserialize MapMessage.", x);
-            return;
-        } catch (ClassNotFoundException x) {
-            log.error("Unable to deserialize MapMessage.", x);
-            return;
-        }
-        if ( log.isTraceEnabled() ) 
-            log.trace("Map message received from:"+sender.getName()+" msg:"+mapmsg);
-        if (mapmsg.getMsgType() == MapMessage.MSG_START) {
-            mapMemberAdded(mapmsg.getBackupNodes()[0]);
-        }
-
-        if (mapmsg.getMsgType() == MapMessage.MSG_STOP) {
-            memberDisappeared(mapmsg.getBackupNodes()[0]);
-        }
-
-        if (mapmsg.getMsgType() == MapMessage.MSG_PROXY) {
-            MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
-            if ( entry==null ) {
-                entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue());
-                entry.setBackup(false);
-                entry.setProxy(true);
-                entry.setBackupNodes(mapmsg.getBackupNodes());
-                super.put(entry.getKey(), entry);
-            } else {
-                entry.setProxy(true);
-                entry.setBackup(false);
-                entry.setBackupNodes(mapmsg.getBackupNodes());
-            }
-        }
-
-        if (mapmsg.getMsgType() == MapMessage.MSG_REMOVE) {
-            super.remove(mapmsg.getKey());
-        }
-
-        if (mapmsg.getMsgType() == MapMessage.MSG_BACKUP) {
-            MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
-            if (entry == null) {
-                entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue());
-                entry.setBackup(true);
-                entry.setProxy(false);
-                entry.setBackupNodes(mapmsg.getBackupNodes());
-                if (mapmsg.getValue()!=null && mapmsg.getValue() instanceof ReplicatedMapEntry ) {
-                    ((ReplicatedMapEntry)mapmsg.getValue()).setOwner(getMapOwner());
-                }
-            } else {
-                entry.setBackup(true);
-                entry.setProxy(false);
-                entry.setBackupNodes(mapmsg.getBackupNodes());
-                if (entry.getValue() instanceof ReplicatedMapEntry) {
-                    ReplicatedMapEntry diff = (ReplicatedMapEntry) entry.getValue();
-                    if (mapmsg.isDiff()) {
-                        try {
-                            diff.lock();
-                            diff.applyDiff(mapmsg.getDiffValue(), 0, mapmsg.getDiffValue().length);
-                        } catch (Exception x) {
-                            log.error("Unable to apply diff to key:" + entry.getKey(), x);
-                        } finally {
-                            diff.unlock();
-                        }
-                    } else {
-                        if ( mapmsg.getValue()!=null ) entry.setValue(mapmsg.getValue());
-                        ((ReplicatedMapEntry)entry.getValue()).setOwner(getMapOwner());
-                    } //end if
-                } else if  (mapmsg.getValue() instanceof ReplicatedMapEntry) {
-                    ReplicatedMapEntry re = (ReplicatedMapEntry)mapmsg.getValue();
-                    re.setOwner(getMapOwner());
-                    entry.setValue(re);
-                } else {
-                    if ( mapmsg.getValue()!=null ) entry.setValue(mapmsg.getValue());
-                } //end if
-            } //end if
-            super.put(entry.getKey(), entry);
-        } //end if
-    }
-
-    public boolean accept(Serializable msg, Member sender) {
-        boolean result = false;
-        if (msg instanceof MapMessage) {
-            if ( log.isTraceEnabled() ) log.trace("Map["+mapname+"] accepting...."+msg);
-            result = Arrays.equals(mapContextName, ( (MapMessage) msg).getMapId());
-            if ( log.isTraceEnabled() ) log.trace("Msg["+mapname+"] accepted["+result+"]...."+msg);
-        }
-        return result;
-    }
-
-    public void mapMemberAdded(Member member) {
-        if ( member.equals(getChannel().getLocalMember(false)) ) return;
-        boolean memberAdded = false;
-        //select a backup node if we don't have one
-        synchronized (mapMembers) {
-            if (!mapMembers.containsKey(member) ) {
-                mapMembers.put(member, new Long(System.currentTimeMillis()));
-                memberAdded = true;
-            }
-        }
-        if ( memberAdded ) {
-            synchronized (stateMutex) {
-                Iterator i = super.entrySet().iterator();
-                while (i.hasNext()) {
-                    Map.Entry e = (Map.Entry) i.next();
-                    MapEntry entry = (MapEntry) e.getValue();
-                    if ( entry == null ) continue;
-                    if (entry.isPrimary() && (entry.getBackupNodes() == null || entry.getBackupNodes().length == 0)) {
-                        try {
-                            Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
-                            entry.setBackupNodes(backup);
-                        } catch (ChannelException x) {
-                            log.error("Unable to select backup node.", x);
-                        } //catch
-                    } //end if
-                } //while
-            } //synchronized
-        }//end if
-    }
-    
-    public boolean inSet(Member m, Member[] set) {
-        if ( set == null ) return false;
-        boolean result = false;
-        for (int i=0; i<set.length && (!result); i++ )
-            if ( m.equals(set[i]) ) result = true;
-        return result;
-    }
-
-    public Member[] excludeFromSet(Member[] mbrs, Member[] set) {
-        ArrayList result = new ArrayList();
-        for (int i=0; i<set.length; i++ ) {
-            boolean include = true;
-            for (int j=0; j<mbrs.length; j++ ) 
-                if ( mbrs[j].equals(set[i]) ) include = false;
-            if ( include ) result.add(set[i]);
-        }
-        return (Member[])result.toArray(new Member[result.size()]);
-    }
-
-    public void memberAdded(Member member) {
-        //do nothing
-    }
-
-    public void memberDisappeared(Member member) {
-        boolean removed = false;
-        synchronized (mapMembers) {
-            removed = (mapMembers.remove(member) != null );
-        }
-        Iterator i = super.entrySet().iterator();
-        while (i.hasNext()) {
-            Map.Entry e = (Map.Entry) i.next();
-            MapEntry entry = (MapEntry) e.getValue();
-            if (entry.isPrimary() && inSet(member,entry.getBackupNodes())) {
-                try {
-                    Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
-                    entry.setBackupNodes(backup);
-                } catch (ChannelException x) {
-                    log.error("Unable to relocate[" + entry.getKey() + "] to a new backup node", x);
-                }
-            } //end if
-        } //while
-    }
-
-    public int getNextBackupIndex() {
-        int size = mapMembers.size();
-        if (mapMembers.size() == 0)return -1;
-        int node = currentNode++;
-        if (node >= size) {
-            node = 0;
-            currentNode = 0;
-        }
-        return node;
-    }
-    public Member getNextBackupNode() {
-        Member[] members = getMapMembers();
-        int node = getNextBackupIndex();
-        if ( members.length == 0 || node==-1) return null;
-        if ( node >= members.length ) node = 0;
-        return members[node];
-    }
-
-    protected abstract Member[] publishEntryInfo(Object key, Object value) throws ChannelException;
-    
-    public void heartbeat() {
-        try {
-            ping(accessTimeout);
-        }catch ( Exception x ) {
-            log.error("Unable to send AbstractReplicatedMap.ping message",x);
-        }
-    }
-
-//------------------------------------------------------------------------------    
-//              METHODS TO OVERRIDE    
-//------------------------------------------------------------------------------
-  
-    /**
-     * Removes an object from this map, it will also remove it from 
-     * 
-     * @param key Object
-     * @return Object
-     */
-    public Object remove(Object key) {
-        MapEntry entry = (MapEntry)super.remove(key);
-
-        try {
-            if (getMapMembers().length > 0 ) {
-                MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_REMOVE, false, (Serializable) key, null, null, null);
-                getChannel().send(getMapMembers(), msg, getChannelSendOptions());
-            }
-        } catch ( ChannelException x ) {
-            log.error("Unable to replicate out data for a LazyReplicatedMap.remove operation",x);
-        }
-        return entry!=null?entry.getValue():null;
-    }
-    
-    public Object get(Object key) {
-        MapEntry entry = (MapEntry)super.get(key);
-        if (log.isTraceEnabled()) log.trace("Requesting id:"+key+" entry:"+entry);
-        if ( entry == null ) return null;
-        if ( !entry.isPrimary() ) {
-            //if the message is not primary, we need to retrieve the latest value
-            try {
-                Member[] backup = null;
-                MapMessage msg = null;
-                if ( !entry.isBackup() ) {
-                    //make sure we don't retrieve from ourselves
-                    msg = new MapMessage(getMapContextName(), MapMessage.MSG_RETRIEVE_BACKUP, false,
-                                         (Serializable) key, null, null, null);
-                    Response[] resp = getRpcChannel().send(entry.getBackupNodes(),msg, this.getRpcChannel().FIRST_REPLY, Channel.SEND_OPTIONS_DEFAULT, getRpcTimeout());
-                    if (resp == null || resp.length == 0) {
-                        //no responses
-                        log.warn("Unable to retrieve remote object for key:" + key);
-                        return null;
-                    }
-                    msg = (MapMessage) resp[0].getMessage();
-                    msg.deserialize(getExternalLoaders());
-                    backup = entry.getBackupNodes();
-                    if ( entry.getValue() instanceof ReplicatedMapEntry ) {
-                        ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue();
-                        val.setOwner(getMapOwner());
-                    }
-                    if ( msg.getValue()!=null ) entry.setValue(msg.getValue());
-                }
-                if (entry.isBackup()) {
-                    //select a new backup node
-                    backup = publishEntryInfo(key, entry.getValue());
-                } else if ( entry.isProxy() ) {
-                    //invalidate the previous primary
-                    msg = new MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,backup);
-                    Member[] dest = getMapMembersExcl(backup);
-                    if ( dest!=null && dest.length >0) {
-                        getChannel().send(dest, msg, getChannelSendOptions());
-                    }
-                }
-
-                entry.setBackupNodes(backup);
-                entry.setBackup(false);
-                entry.setProxy(false);
-
-
-            } catch (Exception x) {
-                log.error("Unable to replicate out data for a LazyReplicatedMap.get operation", x);
-                return null;
-            }
-        }
-        if (log.isTraceEnabled()) log.trace("Requesting id:"+key+" result:"+entry.getValue());
-        if ( entry.getValue() != null && entry.getValue() instanceof ReplicatedMapEntry ) {
-            ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue();
-            //hack, somehow this is not being set above
-            val.setOwner(getMapOwner());
-            
-        }
-        return entry.getValue();
-    }    
-
-    
-    protected void printMap(String header) {
-        try {
-            System.out.println("\nDEBUG MAP:"+header);
-            System.out.println("Map["+ new String(mapContextName, chset) + ", Map Size:" + super.size());
-            Member[] mbrs = getMapMembers();
-            for ( int i=0; i<mbrs.length;i++ ) {
-                System.out.println("Mbr["+(i+1)+"="+mbrs[i].getName());
-            }
-            Iterator i = super.entrySet().iterator();
-            int cnt = 0;
-
-            while (i.hasNext()) {
-                Map.Entry e = (Map.Entry) i.next();
-                System.out.println( (++cnt) + ". " + e.getValue());
-            }
-            System.out.println("EndMap]\n\n");
-        }catch ( Exception ignore) {
-            ignore.printStackTrace();
-        }
-    }
-    
-    /**
-         * Returns true if the key has an entry in the map.
-         * The entry can be a proxy or a backup entry, invoking <code>get(key)</code>
-         * will make this entry primary for the group
-         * @param key Object
-         * @return boolean
-         */
-        public boolean containsKey(Object key) {
-            return super.containsKey(key);
-        }
-    
-    
-        public Object put(Object key, Object value) {
-            MapEntry entry = new MapEntry(key,value);
-            entry.setBackup(false);
-            entry.setProxy(false);
-    
-            Object old = null;
-    
-            //make sure that any old values get removed
-            if ( containsKey(key) ) old = remove(key);
-            try {
-                Member[] backup = publishEntryInfo(key, value);
-                entry.setBackupNodes(backup);
-            } catch (ChannelException x) {
-                log.error("Unable to replicate out data for a LazyReplicatedMap.put operation", x);
-            }
-            super.put(key,entry);
-            return old;
-        }
-    
-    
-        /**
-         * Copies all values from one map to this instance
-         * @param m Map
-         */
-        public void putAll(Map m) {
-            Iterator i = m.entrySet().iterator();
-            while ( i.hasNext() ) {
-                Map.Entry entry = (Map.Entry)i.next();
-                put(entry.getKey(),entry.getValue());
-            }
-        }
-    
-        public void clear() {
-            //only delete active keys
-            Iterator keys = keySet().iterator();
-            while ( keys.hasNext() ) remove(keys.next());
-        }
-    
-        public boolean containsValue(Object value) {
-            if ( value == null ) {
-                return super.containsValue(value);
-            } else {
-                Iterator i = super.entrySet().iterator();
-                while (i.hasNext()) {
-                    Map.Entry e = (Map.Entry) i.next();
-                    MapEntry entry = (MapEntry) e.getValue();
-                    if (entry.isPrimary() && value.equals(entry.getValue())) return true;
-                }//while
-                return false;
-            }//end if
-        }
-    
-        public Object clone() {
-            throw new UnsupportedOperationException("This operation is not valid on a replicated map");
-        }
-    
-        /**
-         * Returns the entire contents of the map
-         * Map.Entry.getValue() will return a LazyReplicatedMap.MapEntry object containing all the information 
-         * about the object.
-         * @return Set
-         */
-        public Set entrySetFull() {
-            return super.entrySet();
-        }
-    
-        public Set keySetFull() {
-            return super.keySet();
-        }
-    
-        public int sizeFull() {
-            return super.size();
-        }
-    
-        public Set entrySet() {
-            LinkedHashSet set = new LinkedHashSet(super.size());
-            Iterator i = super.entrySet().iterator();
-            while ( i.hasNext() ) {
-                Map.Entry e = (Map.Entry)i.next();
-                MapEntry entry = (MapEntry)e.getValue();
-                if ( entry.isPrimary() ) set.add(entry);
-            }
-            return Collections.unmodifiableSet(set);
-        }
-    
-        public Set keySet() {
-            //todo implement
-            //should only return keys where this is active.
-            LinkedHashSet set = new LinkedHashSet(super.size());
-            Iterator i = super.entrySet().iterator();
-            while ( i.hasNext() ) {
-                Map.Entry e = (Map.Entry)i.next();
-                MapEntry entry = (MapEntry)e.getValue();
-                if ( entry.isPrimary() ) set.add(entry.getKey());
-            }
-            return Collections.unmodifiableSet(set);
-        }
-    
-    
-        public int size() {
-            //todo, implement a counter variable instead
-            //only count active members in this node
-            int counter = 0;
-            Iterator it = Collections.unmodifiableSet(super.entrySet()).iterator();
-            while (it.hasNext() ) {
-                Map.Entry e = (Map.Entry) it.next();
-                if ( e != null ) {
-                    MapEntry entry = (MapEntry) e.getValue();
-                    if (entry.isPrimary() && entry.getValue() != null) counter++;
-                }
-            }
-            return counter;
-        }
-    
-        protected boolean removeEldestEntry(Map.Entry eldest) {
-            return false;
-        }
-    
-        public boolean isEmpty() {
-            return size()==0;
-        }
-    
-        public Collection values() {
-            ArrayList values = new ArrayList();
-            Iterator i = super.entrySet().iterator();
-            while ( i.hasNext() ) {
-                Map.Entry e = (Map.Entry)i.next();
-                MapEntry entry = (MapEntry)e.getValue();
-                if ( entry.isPrimary() && entry.getValue()!=null) values.add(entry.getValue());
-            }
-            return Collections.unmodifiableCollection(values);
-        }
-        
-
-//------------------------------------------------------------------------------
-//                Map Entry class
-//------------------------------------------------------------------------------
-    public static class MapEntry implements Map.Entry {
-        private boolean backup;
-        private boolean proxy;
-        private Member[] backupNodes;
-
-        private Object key;
-        private Object value;
-
-        public MapEntry(Object key, Object value) {
-            setKey(key);
-            setValue(value);
-            
-        }
-        
-        public boolean isKeySerializable() {
-            return (key == null) || (key instanceof Serializable);
-        }
-        
-        public boolean isValueSerializable() {
-            return (value==null) || (value instanceof Serializable);
-        }
-        
-        public boolean isSerializable() {
-            return isKeySerializable() && isValueSerializable();
-        }
-        
-        public boolean isBackup() {
-            return backup;
-        }
-
-        public void setBackup(boolean backup) {
-            this.backup = backup;
-        }
-
-        public boolean isProxy() {
-            return proxy;
-        }
-
-        public boolean isPrimary() {
-            return ( (!proxy) && (!backup));
-        }
-
-        public void setProxy(boolean proxy) {
-            this.proxy = proxy;
-        }
-
-        public boolean isDiffable() {
-            return (value instanceof ReplicatedMapEntry) &&
-                   ((ReplicatedMapEntry)value).isDiffable();
-        }
-
-        public void setBackupNodes(Member[] nodes) {
-            this.backupNodes = nodes;
-        }
-
-        public Member[] getBackupNodes() {
-            return backupNodes;
-        }
-
-        public Object getValue() {
-            return value;
-        }
-
-        public Object setValue(Object value) {
-            Object old = this.value;
-            this.value = (Serializable) value;
-            return old;
-        }
-
-        public Object getKey() {
-            return key;
-        }
-        
-        public Object setKey(Object key) {
-            Object old = this.key;
-            this.key = (Serializable)key;
-            return old;
-        }
-
-        public int hashCode() {
-            return key.hashCode();
-        }
-
-        public boolean equals(Object o) {
-            return key.equals(o);
-        }
-
-        /**
-         * apply a diff, or an entire object
-         * @param data byte[]
-         * @param offset int
-         * @param length int
-         * @param diff boolean
-         * @throws IOException
-         * @throws ClassNotFoundException
-         */
-        public void apply(byte[] data, int offset, int length, boolean diff) throws IOException, ClassNotFoundException {
-            if (isDiffable() && diff) {
-                ReplicatedMapEntry rentry = (ReplicatedMapEntry) value;
-                try {
-                    rentry.lock();
-                    rentry.applyDiff(data, offset, length);
-                } finally {
-                    rentry.unlock();
-                }
-            } else if (length == 0) {
-                value = null;
-                proxy = true;
-            } else {
-                value = XByteBuffer.deserialize(data, offset, length);
-            }
-        }
-        
-        public String toString() {
-            StringBuffer buf = new StringBuffer("MapEntry[key:");
-            buf.append(getKey()).append("; ");
-            buf.append("value:").append(getValue()).append("; ");
-            buf.append("primary:").append(isPrimary()).append("; ");
-            buf.append("backup:").append(isBackup()).append("; ");
-            buf.append("proxy:").append(isProxy()).append(";]");
-            return buf.toString();
-        }
-
-    }
-
-//------------------------------------------------------------------------------
-//                map message to send to and from other maps
-//------------------------------------------------------------------------------
-
-    public static class MapMessage implements Serializable {
-        public static final int MSG_BACKUP = 1;
-        public static final int MSG_RETRIEVE_BACKUP = 2;
-        public static final int MSG_PROXY = 3;
-        public static final int MSG_REMOVE = 4;
-        public static final int MSG_STATE = 5;
-        public static final int MSG_START = 6;
-        public static final int MSG_STOP = 7;
-        public static final int MSG_INIT = 8;
-
-        private byte[] mapId;
-        private int msgtype;
-        private boolean diff;
-        private transient Serializable key;
-        private transient Serializable value;
-        private byte[] valuedata;
-        private byte[] keydata;
-        private byte[] diffvalue;
-        private Member[] nodes;
-        
-        public String toString() {
-            StringBuffer buf = new StringBuffer("MapMessage[context=");
-            buf.append(new String(mapId));
-            buf.append("; type=");
-            buf.append(getTypeDesc());
-            buf.append("; key=");
-            buf.append(key);
-            buf.append("; value=");
-            buf.append(value);
-            return buf.toString();
-        }
-        
-        public String getTypeDesc() {
-            switch (msgtype) {
-                case MSG_BACKUP: return "MSG_BACKUP";
-                case MSG_RETRIEVE_BACKUP: return "MSG_RETRIEVE_BACKUP";
-                case MSG_PROXY: return "MSG_PROXY";
-                case MSG_REMOVE: return "MSG_REMOVE";
-                case MSG_STATE: return "MSG_STATE";
-                case MSG_START: return "MSG_START";
-                case MSG_STOP: return "MSG_STOP";
-                case MSG_INIT: return "MSG_INIT";
-                default : return "UNKNOWN";
-            }
-        }
-
-        public MapMessage() {}
-
-        public MapMessage(byte[] mapId,int msgtype, boolean diff,
-                          Serializable key, Serializable value,
-                          byte[] diffvalue, Member[] nodes)  {
-            this.mapId = mapId;
-            this.msgtype = msgtype;
-            this.diff = diff;
-            this.key = key;
-            this.value = value;
-            this.diffvalue = diffvalue;
-            this.nodes = nodes;
-            setValue(value);
-            setKey(key);
-        }
-        
-        public void deserialize(ClassLoader[] cls) throws IOException, ClassNotFoundException {
-            key(cls);
-            value(cls);
-        }
-
-        public int getMsgType() {
-            return msgtype;
-        }
-
-        public boolean isDiff() {
-            return diff;
-        }
-
-        public Serializable getKey() {
-            try {
-                return key(null);
-            } catch ( Exception x ) {
-                log.error("Deserialization error of the MapMessage.key",x);
-                return null;
-            }
-        }
-
-        public Serializable key(ClassLoader[] cls) throws IOException, ClassNotFoundException {
-            if ( key!=null ) return key;
-            if ( keydata == null || keydata.length == 0 ) return null;
-            key = XByteBuffer.deserialize(keydata,0,keydata.length,cls);
-            keydata = null;
-            return key;
-        }
-        
-        public byte[] getKeyData() {
-            return keydata;
-        }
-        
-        public Serializable getValue() {
-            try {
-                return value(null);
-            } catch ( Exception x ) {
-                log.error("Deserialization error of the MapMessage.value",x);
-                return null;
-            }
-        }
-
-        public Serializable value(ClassLoader[] cls) throws IOException, ClassNotFoundException  {
-            if ( value!=null ) return value;
-            if ( valuedata == null || valuedata.length == 0 ) return null;
-            value = XByteBuffer.deserialize(valuedata,0,valuedata.length,cls);
-            valuedata = null;;
-            return value;
-        }
-        
-        public byte[] getValueData() {
-            return valuedata;
-        }
-
-        public byte[] getDiffValue() {
-            return diffvalue;
-        }
-
-        public Member[] getBackupNodes() {
-            return nodes;
-        }
-
-        private void setBackUpNodes(Member[] nodes) {
-            this.nodes = nodes;
-        }
-
-        public byte[] getMapId() {
-            return mapId;
-        }
-
-        public void setValue(Serializable value) {
-            try {
-                if ( value != null ) valuedata = XByteBuffer.serialize(value);
-                this.value = value;
-            }catch ( IOException x ) {
-                throw new RuntimeException(x);
-            }
-        }
-        
-        public void setKey(Serializable key) {
-            try {
-                if (key != null) keydata = XByteBuffer.serialize(key);
-                this.key = key;
-            } catch (IOException x) {
-                throw new RuntimeException(x);
-            }
-        }
-        
-        protected Member[] readMembers(ObjectInput in) throws IOException, ClassNotFoundException {
-            int nodecount = in.readInt();
-            Member[] members = new Member[nodecount];
-            for ( int i=0; i<members.length; i++ ) {
-                byte[] d = new byte[in.readInt()];
-                in.read(d);
-                if (d.length > 0) members[i] = MemberImpl.getMember(d);
-            }
-            return members;
-        }
-        
-        protected void writeMembers(ObjectOutput out,Member[] members) throws IOException {
-            if ( members == null ) members = new Member[0];
-            out.writeInt(members.length);
-            for (int i=0; i<members.length; i++ ) {
-                if ( members[i] != null ) {
-                    byte[] d = members[i] != null ? ( (MemberImpl)members[i]).getData(false) : new byte[0];
-                    out.writeInt(d.length);
-                    out.write(d);
-                }
-            }
-        }
-        
-        
-        /**
-         * shallow clone
-         * @return Object
-         */
-        public Object clone() {
-            MapMessage msg = new MapMessage(this.mapId, this.msgtype, this.diff, this.key, this.value, this.diffvalue, this.nodes);
-            msg.keydata = this.keydata;
-            msg.valuedata = this.valuedata;
-            return msg;
-        }
-    } //MapMessage
-
-
-    public Channel getChannel() {
-        return channel;
-    }
-
-    public byte[] getMapContextName() {
-        return mapContextName;
-    }
-
-    public RpcChannel getRpcChannel() {
-        return rpcChannel;
-    }
-
-    public long getRpcTimeout() {
-        return rpcTimeout;
-    }
-
-    public Object getStateMutex() {
-        return stateMutex;
-    }
-
-    public boolean isStateTransferred() {
-        return stateTransferred;
-    }
-
-    public Object getMapOwner() {
-        return mapOwner;
-    }
-
-    public ClassLoader[] getExternalLoaders() {
-        return externalLoaders;
-    }
-
-    public int getChannelSendOptions() {
-        return channelSendOptions;
-    }
-
-    public long getAccessTimeout() {
-        return accessTimeout;
-    }
-
-    public void setMapOwner(Object mapOwner) {
-        this.mapOwner = mapOwner;
-    }
-
-    public void setExternalLoaders(ClassLoader[] externalLoaders) {
-        this.externalLoaders = externalLoaders;
-    }
-
-    public void setChannelSendOptions(int channelSendOptions) {
-        this.channelSendOptions = channelSendOptions;
-    }
-
-    public void setAccessTimeout(long accessTimeout) {
-        this.accessTimeout = accessTimeout;
-    }
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.tipis;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelListener;
+import org.apache.catalina.tribes.Heartbeat;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.MembershipListener;
+import org.apache.catalina.tribes.group.Response;
+import org.apache.catalina.tribes.group.RpcCallback;
+import org.apache.catalina.tribes.group.RpcChannel;
+import org.apache.catalina.tribes.io.XByteBuffer;
+import org.apache.catalina.tribes.membership.MemberImpl;
+import org.apache.catalina.tribes.util.Arrays;
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
+import java.util.ConcurrentModificationException;
+
+/**
+ *
+ * @author Filip Hanik
+ * @version 1.0
+ */
+public abstract class AbstractReplicatedMap extends LinkedHashMap implements RpcCallback, ChannelListener, MembershipListener, Heartbeat {
+    protected static Log log = LogFactory.getLog(AbstractReplicatedMap.class);
+
+    /**
+     * The default initial capacity - MUST be a power of two.
+     */
+    public static final int DEFAULT_INITIAL_CAPACITY = 16;
+
+    /**
+     * The load factor used when none specified in constructor.
+     **/
+    public static final float DEFAULT_LOAD_FACTOR = 0.75f;
+    
+    /**
+     * Used to identify the map
+     */
+    final String chset = "ISO-8859-1";
+
+//------------------------------------------------------------------------------
+//              INSTANCE VARIABLES
+//------------------------------------------------------------------------------
+    
+    /**
+     * Timeout for RPC messages, how long we will wait for a reply
+     */
+    protected transient long rpcTimeout = 5000;
+    /**
+     * Reference to the channel for sending messages
+     */
+    protected transient Channel channel;
+    /**
+     * The RpcChannel to send RPC messages through
+     */
+    protected transient RpcChannel rpcChannel;
+    /**
+     * The Map context name makes this map unique, this
+     * allows us to have more than one map shared
+     * through one channel
+     */
+    protected transient byte[] mapContextName;
+    /**
+     * Has the state been transferred
+     */
+    protected transient boolean stateTransferred = false;
+    /**
+     * Simple lock object for transfers
+     */
+    protected transient Object stateMutex = new Object();
+    /**
+     * A list of members in our map
+     */
+    protected transient HashMap mapMembers = new HashMap();
+    /**
+     * Our default send options
+     */
+    protected transient int channelSendOptions = Channel.SEND_OPTIONS_DEFAULT;
+    /**
+     * The owner of this map, ala a SessionManager for example
+     */
+    protected transient Object mapOwner;
+    /**
+     * External class loaders if serialization and deserialization is to be performed successfully.
+     */
+    protected transient ClassLoader[] externalLoaders;
+    
+    /**
+     * The node we are currently backing up data to, this index will rotate
+     * on a round robin basis
+     */
+    protected transient int currentNode = 0;
+    
+    /**
+     * Since the map keeps internal membership
+     * this is the timeout for a ping message to be responded to
+     * If a remote map doesn't respond within this timeframe, 
+     * its considered dead.
+     */
+    protected transient long accessTimeout = 5000;
+    
+    /**
+     * Readable string of the mapContextName value
+     */
+    protected transient String mapname = "";
+    
+
+//------------------------------------------------------------------------------
+//              CONSTRUCTORS
+//------------------------------------------------------------------------------
+
+    /**
+     * Creates a new map
+     * @param channel The channel to use for communication
+     * @param timeout long - timeout for RPC messags
+     * @param mapContextName String - unique name for this map, to allow multiple maps per channel
+     * @param initialCapacity int - the size of this map, see HashMap
+     * @param loadFactor float - load factor, see HashMap
+     * @param cls - a list of classloaders to be used for deserialization of objects.
+     */
+    public AbstractReplicatedMap(Object owner,
+                                 Channel channel, 
+                                 long timeout, 
+                                 String mapContextName, 
+                                 int initialCapacity,
+                                 float loadFactor,
+                                 int channelSendOptions,
+                                 ClassLoader[] cls) {
+        super(initialCapacity, loadFactor);
+        init(owner, channel, mapContextName, timeout, channelSendOptions, cls);
+        
+    }
+
+    /**
+     * Helper methods, wraps a single member in an array
+     * @param m Member
+     * @return Member[]
+     */
+    protected Member[] wrap(Member m) {
+        if ( m == null ) return new Member[0];
+        else return new Member[] {m};
+    }
+
+    /**
+     * Initializes the map by creating the RPC channel, registering itself as a channel listener
+     * This method is also responsible for initiating the state transfer
+     * @param owner Object
+     * @param channel Channel
+     * @param mapContextName String
+     * @param timeout long
+     * @param channelSendOptions int
+     * @param cls ClassLoader[]
+     */
+    protected void init(Object owner, Channel channel, String mapContextName, long timeout, int channelSendOptions,ClassLoader[] cls) {
+        log.info("Initializing AbstractReplicatedMap with context name:"+mapContextName);
+        this.mapOwner = owner;
+        this.externalLoaders = cls;
+        this.channelSendOptions = channelSendOptions;
+        this.channel = channel;
+        this.rpcTimeout = timeout;
+
+        try {
+            this.mapname = mapContextName;
+            //unique context is more efficient if it is stored as bytes
+            this.mapContextName = mapContextName.getBytes(chset);
+        } catch (UnsupportedEncodingException x) {
+            log.warn("Unable to encode mapContextName[" + mapContextName + "] using getBytes(" + chset +") using default getBytes()", x);
+            this.mapContextName = mapContextName.getBytes();
+        }
+        if ( log.isTraceEnabled() ) log.trace("Created Lazy Map with name:"+mapContextName+", bytes:"+Arrays.toString(this.mapContextName));
+
+        //create an rpc channel and add the map as a listener
+        this.rpcChannel = new RpcChannel(this.mapContextName, channel, this);
+        //add this map as a message listener
+        this.channel.addChannelListener(this);
+        //listen for membership notifications
+        this.channel.addMembershipListener(this);
+        
+        
+        try {
+            //broadcast our map, this just notifies other members of our existence
+            broadcast(MapMessage.MSG_INIT, true);
+            //transfer state from another map
+            transferState();
+            //state is transferred, we are ready for messaging
+            broadcast(MapMessage.MSG_START, true);
+        } catch (ChannelException x) {
+            log.warn("Unable to send map start message.");
+            throw new RuntimeException("Unable to start replicated map.",x);
+        }
+    }
+    
+    
+    /**
+     * Sends a ping out to all the members in the cluster, not just map members
+     * that this map is alive.
+     * @param timeout long
+     * @throws ChannelException
+     */
+    protected void ping(long timeout) throws ChannelException {
+        //send out a map membership message, only wait for the first reply
+        MapMessage msg = new MapMessage(this.mapContextName, 
+                                        MapMessage.MSG_INIT,
+                                        false, 
+                                        null, 
+                                        null, 
+                                        null, 
+                                        wrap(channel.getLocalMember(false)));
+        if ( channel.getMembers().length > 0 ) {
+            //send a ping, wait for all nodes to reply
+            Response[] resp = rpcChannel.send(channel.getMembers(), 
+                                              msg, rpcChannel.ALL_REPLY, 
+                                              (channelSendOptions),
+                                              (int) accessTimeout);
+            for (int i = 0; i < resp.length; i++) {
+                memberAlive(resp[i].getSource());
+            } //for
+        }
+        //update our map of members, expire some if we didn't receive a ping back
+        synchronized (mapMembers) {
+            Iterator it = mapMembers.entrySet().iterator();
+            long now = System.currentTimeMillis();
+            while ( it.hasNext() ) {
+                Map.Entry entry = (Map.Entry)it.next();
+                long access = ((Long)entry.getValue()).longValue(); 
+                if ( (now - access) > timeout ) memberDisappeared((Member)entry.getKey());
+            }
+        }//synch
+    }
+
+    /**
+     * We have received a member alive notification
+     * @param member Member
+     */
+    protected void memberAlive(Member member) {
+        synchronized (mapMembers) {
+            if (!mapMembers.containsKey(member)) {
+                mapMemberAdded(member);
+            } //end if
+            mapMembers.put(member, new Long(System.currentTimeMillis()));
+        }
+    }
+    
+    /**
+     * Helper method to broadcast a message to all members in a channel
+     * @param msgtype int
+     * @param rpc boolean
+     * @throws ChannelException
+     */
+    protected void broadcast(int msgtype, boolean rpc) throws ChannelException {
+        //send out a map membership message, only wait for the first reply
+        MapMessage msg = new MapMessage(this.mapContextName, msgtype,
+                                        false, null, null, null, wrap(channel.getLocalMember(false)));
+        if ( rpc) {
+            Response[] resp = rpcChannel.send(channel.getMembers(), msg, rpcChannel.FIRST_REPLY, (channelSendOptions),rpcTimeout);
+            for (int i = 0; i < resp.length; i++) {
+                mapMemberAdded(resp[i].getSource());
+                messageReceived(resp[i].getMessage(), resp[i].getSource());
+            }
+        } else {
+            channel.send(channel.getMembers(),msg,channelSendOptions);
+        }
+    }
+
+    public void breakdown() {
+        finalize();
+    }
+
+    public void finalize() {
+        try {broadcast(MapMessage.MSG_STOP,false); }catch ( Exception ignore){}
+        //cleanup
+        if (this.rpcChannel != null) {
+            this.rpcChannel.breakdown();
+        }
+        if (this.channel != null) {
+            this.channel.removeChannelListener(this);
+            this.channel.removeMembershipListener(this);
+        }
+        this.rpcChannel = null;
+        this.channel = null;
+        this.mapMembers.clear();
+        super.clear();
+        this.stateTransferred = false;
+        this.externalLoaders = null;
+    }
+    
+    public int hashCode() {
+        return Arrays.hashCode(this.mapContextName);
+    }
+    
+    public boolean equals(Object o) {
+        if ( o == null ) return false;
+        if ( !(o instanceof AbstractReplicatedMap)) return false;
+        if ( !(o.getClass().equals(this.getClass())) ) return false;
+        AbstractReplicatedMap other = (AbstractReplicatedMap)o;
+        return Arrays.equals(mapContextName,other.mapContextName);
+    }
+
+//------------------------------------------------------------------------------
+//              GROUP COM INTERFACES
+//------------------------------------------------------------------------------
+    public Member[] getMapMembers(HashMap members) {
+        synchronized (members) {
+            Member[] result = new Member[members.size()];
+            members.keySet().toArray(result);
+            return result;
+        }
+    }
+    public Member[] getMapMembers() {
+        return getMapMembers(this.mapMembers);
+    }
+    
+    public Member[] getMapMembersExcl(Member[] exclude) {
+        synchronized (mapMembers) {
+            HashMap list = (HashMap)mapMembers.clone();
+            for (int i=0; i<exclude.length;i++) list.remove(exclude[i]);
+            return getMapMembers(list);
+        }
+    }
+
+
+    /**
+     * Replicates any changes to the object since the last time
+     * The object has to be primary, ie, if the object is a proxy or a backup, it will not be replicated<br>
+     * @param complete - if set to true, the object is replicated to its backup
+     * if set to false, only objects that implement ReplicatedMapEntry and the isDirty() returns true will
+     * be replicated
+     */
+    public void replicate(Object key, boolean complete) {
+        if ( log.isTraceEnabled() )
+            log.trace("Replicate invoked on key:"+key);
+        MapEntry entry = (MapEntry)super.get(key);
+        if ( entry == null ) return;
+        if ( !entry.isSerializable() ) return;
+        if (entry != null && entry.isPrimary() && entry.getBackupNodes()!= null && entry.getBackupNodes().length > 0) {
+            Object value = entry.getValue();
+            //check to see if we need to replicate this object isDirty()||complete
+            boolean repl = complete || ( (value instanceof ReplicatedMapEntry) && ( (ReplicatedMapEntry) value).isDirty());
+            
+            if (!repl) {
+                if ( log.isTraceEnabled() )
+                    log.trace("Not replicating:"+key+", no change made");
+                
+                return;
+            }
+            //check to see if the message is diffable
+            boolean diff = ( (value instanceof ReplicatedMapEntry) && ( (ReplicatedMapEntry) value).isDiffable());
+            MapMessage msg = null;
+            if (diff) {
+                ReplicatedMapEntry rentry = (ReplicatedMapEntry)entry.getValue();
+                try {
+                    rentry.lock();
+                    //construct a diff message
+                    msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,
+                                         true, (Serializable) entry.getKey(), null,
+                                         rentry.getDiff(),
+                                         entry.getBackupNodes());
+                } catch (IOException x) {
+                    log.error("Unable to diff object. Will replicate the entire object instead.", x);
+                } finally {
+                    rentry.unlock();
+                }
+                
+            }
+            if (msg == null) {
+                //construct a complete
+                msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,
+                                     false, (Serializable) entry.getKey(),
+                                     (Serializable) entry.getValue(),
+                                     null, entry.getBackupNodes());
+
+            }
+            try {
+                if ( channel!=null && entry.getBackupNodes()!= null && entry.getBackupNodes().length > 0 ) {
+                    channel.send(entry.getBackupNodes(), msg, channelSendOptions);
+                }
+            } catch (ChannelException x) {
+                log.error("Unable to replicate data.", x);
+            }
+        } //end if
+
+    }
+
+    /**
+     * This can be invoked by a periodic thread to replicate out any changes.
+     * For maps that don't store objects that implement ReplicatedMapEntry, this
+     * method should be used infrequently to avoid large amounts of data transfer
+     * @param complete boolean
+     */
+    public void replicate(boolean complete) {
+        Iterator i = super.entrySet().iterator();
+        while (i.hasNext()) {
+            Map.Entry e = (Map.Entry) i.next();
+            replicate(e.getKey(), complete);
+        } //while
+
+    }
+
+    public void transferState() {
+        try {
+            Member[] members = getMapMembers();
+            Member backup = members.length > 0 ? (Member) members[0] : null;
+            if (backup != null) {
+                MapMessage msg = new MapMessage(mapContextName, MapMessage.MSG_STATE, false,
+                                                null, null, null, null);
+                Response[] resp = rpcChannel.send(new Member[] {backup}, msg, rpcChannel.FIRST_REPLY, channelSendOptions, rpcTimeout);
+                if (resp.length > 0) {
+                    synchronized (stateMutex) {
+                        msg = (MapMessage) resp[0].getMessage();
+                        msg.deserialize(getExternalLoaders());
+                        ArrayList list = (ArrayList) msg.getValue();
+                        for (int i = 0; i < list.size(); i++) {
+                            messageReceived( (Serializable) list.get(i), resp[0].getSource());
+                        } //for
+                    }
+                } else {
+                    log.warn("Transfer state, 0 replies, probably a timeout.");
+                }
+            }
+        } catch (ChannelException x) {
+            log.error("Unable to transfer LazyReplicatedMap state.", x);
+        } catch (IOException x) {
+            log.error("Unable to transfer LazyReplicatedMap state.", x);
+        } catch (ClassNotFoundException x) {
+            log.error("Unable to transfer LazyReplicatedMap state.", x);
+        }
+        stateTransferred = true;
+    }
+
+    /**
+     * @todo implement state transfer
+     * @param msg Serializable
+     * @return Serializable - null if no reply should be sent
+     */
+    public Serializable replyRequest(Serializable msg, final Member sender) {
+        if (! (msg instanceof MapMessage))return null;
+        MapMessage mapmsg = (MapMessage) msg;
+
+        //map init request
+        if (mapmsg.getMsgType() == mapmsg.MSG_INIT) {
+            mapmsg.setBackUpNodes(wrap(channel.getLocalMember(false)));
+            return mapmsg;
+        }
+        
+        //map start request
+        if (mapmsg.getMsgType() == mapmsg.MSG_START) {
+            mapmsg.setBackUpNodes(wrap(channel.getLocalMember(false)));
+            mapMemberAdded(sender);
+            return mapmsg;
+        }
+
+        //backup request
+        if (mapmsg.getMsgType() == mapmsg.MSG_RETRIEVE_BACKUP) {
+            MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
+            if (entry == null || (!entry.isSerializable()) )return null;
+            mapmsg.setValue( (Serializable) entry.getValue());
+            return mapmsg;
+        }
+
+        //state transfer request
+        if (mapmsg.getMsgType() == mapmsg.MSG_STATE) {
+            synchronized (stateMutex) { //make sure we dont do two things at the same time
+                ArrayList list = new ArrayList();
+                Iterator i = super.entrySet().iterator();
+                while (i.hasNext()) {
+                    Map.Entry e = (Map.Entry) i.next();
+                    MapEntry entry = (MapEntry) e.getValue();
+                    if ( entry.isSerializable() ) {
+                        MapMessage me = new MapMessage(mapContextName, MapMessage.MSG_PROXY,
+                            false, (Serializable) entry.getKey(), null, null, entry.getBackupNodes());
+                        list.add(me);
+                    }
+                }
+                mapmsg.setValue(list);
+                return mapmsg;
+                
+            } //synchronized
+        }
+
+        return null;
+
+    }
+
+    /**
+     * If the reply has already been sent to the requesting thread,
+     * the rpc callback can handle any data that comes in after the fact.
+     * @param msg Serializable
+     * @param sender Member
+     */
+    public void leftOver(Serializable msg, Member sender) {
+        //left over membership messages
+        if (! (msg instanceof MapMessage))return;
+
+        MapMessage mapmsg = (MapMessage) msg;
+        try {
+            mapmsg.deserialize(getExternalLoaders());
+            if (mapmsg.getMsgType() == MapMessage.MSG_START) {
+                mapMemberAdded(mapmsg.getBackupNodes()[0]);
+            } else if (mapmsg.getMsgType() == MapMessage.MSG_INIT) {
+                memberAlive(mapmsg.getBackupNodes()[0]);
+            }
+        } catch (IOException x ) {
+            log.error("Unable to deserialize MapMessage.",x);
+        } catch (ClassNotFoundException x ) {
+            log.error("Unable to deserialize MapMessage.",x);
+        }
+    }
+
+    public void messageReceived(Serializable msg, Member sender) {
+        if (! (msg instanceof MapMessage)) return;
+
+        MapMessage mapmsg = (MapMessage) msg;
+        if ( log.isTraceEnabled() ) {
+            log.trace("Map["+mapname+"] received message:"+mapmsg);
+        }
+        
+        try {
+            mapmsg.deserialize(getExternalLoaders());
+        } catch (IOException x) {
+            log.error("Unable to deserialize MapMessage.", x);
+            return;
+        } catch (ClassNotFoundException x) {
+            log.error("Unable to deserialize MapMessage.", x);
+            return;
+        }
+        if ( log.isTraceEnabled() ) 
+            log.trace("Map message received from:"+sender.getName()+" msg:"+mapmsg);
+        if (mapmsg.getMsgType() == MapMessage.MSG_START) {
+            mapMemberAdded(mapmsg.getBackupNodes()[0]);
+        }
+
+        if (mapmsg.getMsgType() == MapMessage.MSG_STOP) {
+            memberDisappeared(mapmsg.getBackupNodes()[0]);
+        }
+
+        if (mapmsg.getMsgType() == MapMessage.MSG_PROXY) {
+            MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
+            if ( entry==null ) {
+                entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue());
+                entry.setBackup(false);
+                entry.setProxy(true);
+                entry.setBackupNodes(mapmsg.getBackupNodes());
+                super.put(entry.getKey(), entry);
+            } else {
+                entry.setProxy(true);
+                entry.setBackup(false);
+                entry.setBackupNodes(mapmsg.getBackupNodes());
+            }
+        }
+
+        if (mapmsg.getMsgType() == MapMessage.MSG_REMOVE) {
+            super.remove(mapmsg.getKey());
+        }
+
+        if (mapmsg.getMsgType() == MapMessage.MSG_BACKUP) {
+            MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
+            if (entry == null) {
+                entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue());
+                entry.setBackup(true);
+                entry.setProxy(false);
+                entry.setBackupNodes(mapmsg.getBackupNodes());
+                if (mapmsg.getValue()!=null && mapmsg.getValue() instanceof ReplicatedMapEntry ) {
+                    ((ReplicatedMapEntry)mapmsg.getValue()).setOwner(getMapOwner());
+                }
+            } else {
+                entry.setBackup(true);
+                entry.setProxy(false);
+                entry.setBackupNodes(mapmsg.getBackupNodes());
+                if (entry.getValue() instanceof ReplicatedMapEntry) {
+                    ReplicatedMapEntry diff = (ReplicatedMapEntry) entry.getValue();
+                    if (mapmsg.isDiff()) {
+                        try {
+                            diff.lock();
+                            diff.applyDiff(mapmsg.getDiffValue(), 0, mapmsg.getDiffValue().length);
+                        } catch (Exception x) {
+                            log.error("Unable to apply diff to key:" + entry.getKey(), x);
+                        } finally {
+                            diff.unlock();
+                        }
+                    } else {
+                        if ( mapmsg.getValue()!=null ) entry.setValue(mapmsg.getValue());
+                        ((ReplicatedMapEntry)entry.getValue()).setOwner(getMapOwner());
+                    } //end if
+                } else if  (mapmsg.getValue() instanceof ReplicatedMapEntry) {
+                    ReplicatedMapEntry re = (ReplicatedMapEntry)mapmsg.getValue();
+                    re.setOwner(getMapOwner());
+                    entry.setValue(re);
+                } else {
+                    if ( mapmsg.getValue()!=null ) entry.setValue(mapmsg.getValue());
+                } //end if
+            } //end if
+            super.put(entry.getKey(), entry);
+        } //end if
+    }
+
+    public boolean accept(Serializable msg, Member sender) {
+        boolean result = false;
+        if (msg instanceof MapMessage) {
+            if ( log.isTraceEnabled() ) log.trace("Map["+mapname+"] accepting...."+msg);
+            result = Arrays.equals(mapContextName, ( (MapMessage) msg).getMapId());
+            if ( log.isTraceEnabled() ) log.trace("Msg["+mapname+"] accepted["+result+"]...."+msg);
+        }
+        return result;
+    }
+
+    public void mapMemberAdded(Member member) {
+        if ( member.equals(getChannel().getLocalMember(false)) ) return;
+        boolean memberAdded = false;
+        //select a backup node if we don't have one
+        synchronized (mapMembers) {
+            if (!mapMembers.containsKey(member) ) {
+                mapMembers.put(member, new Long(System.currentTimeMillis()));
+                memberAdded = true;
+            }
+        }
+        if ( memberAdded ) {
+            synchronized (stateMutex) {
+                Iterator i = super.entrySet().iterator();
+                while (i.hasNext()) {
+                    Map.Entry e = (Map.Entry) i.next();
+                    MapEntry entry = (MapEntry) e.getValue();
+                    if ( entry == null ) continue;
+                    if (entry.isPrimary() && (entry.getBackupNodes() == null || entry.getBackupNodes().length == 0)) {
+                        try {
+                            Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
+                            entry.setBackupNodes(backup);
+                        } catch (ChannelException x) {
+                            log.error("Unable to select backup node.", x);
+                        } //catch
+                    } //end if
+                } //while
+            } //synchronized
+        }//end if
+    }
+    
+    public boolean inSet(Member m, Member[] set) {
+        if ( set == null ) return false;
+        boolean result = false;
+        for (int i=0; i<set.length && (!result); i++ )
+            if ( m.equals(set[i]) ) result = true;
+        return result;
+    }
+
+    public Member[] excludeFromSet(Member[] mbrs, Member[] set) {
+        ArrayList result = new ArrayList();
+        for (int i=0; i<set.length; i++ ) {
+            boolean include = true;
+            for (int j=0; j<mbrs.length; j++ ) 
+                if ( mbrs[j].equals(set[i]) ) include = false;
+            if ( include ) result.add(set[i]);
+        }
+        return (Member[])result.toArray(new Member[result.size()]);
+    }
+
+    public void memberAdded(Member member) {
+        //do nothing
+    }
+
+    public void memberDisappeared(Member member) {
+        boolean removed = false;
+        synchronized (mapMembers) {
+            removed = (mapMembers.remove(member) != null );
+        }
+        Iterator i = super.entrySet().iterator();
+        while (i.hasNext()) {
+            Map.Entry e = (Map.Entry) i.next();
+            MapEntry entry = (MapEntry) e.getValue();
+            if (entry.isPrimary() && inSet(member,entry.getBackupNodes())) {
+                try {
+                    Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
+                    entry.setBackupNodes(backup);
+                } catch (ChannelException x) {
+                    log.error("Unable to relocate[" + entry.getKey() + "] to a new backup node", x);
+                }
+            } //end if
+        } //while
+    }
+
+    public int getNextBackupIndex() {
+        int size = mapMembers.size();
+        if (mapMembers.size() == 0)return -1;
+        int node = currentNode++;
+        if (node >= size) {
+            node = 0;
+            currentNode = 0;
+        }
+        return node;
+    }
+    public Member getNextBackupNode() {
+        Member[] members = getMapMembers();
+        int node = getNextBackupIndex();
+        if ( members.length == 0 || node==-1) return null;
+        if ( node >= members.length ) node = 0;
+        return members[node];
+    }
+
+    protected abstract Member[] publishEntryInfo(Object key, Object value) throws ChannelException;
+    
+    public void heartbeat() {
+        try {
+            ping(accessTimeout);
+        }catch ( Exception x ) {
+            log.error("Unable to send AbstractReplicatedMap.ping message",x);
+        }
+    }
+
+//------------------------------------------------------------------------------    
+//              METHODS TO OVERRIDE    
+//------------------------------------------------------------------------------
+  
+    /**
+     * Removes an object from this map, it will also remove it from 
+     * 
+     * @param key Object
+     * @return Object
+     */
+    public Object remove(Object key) {
+        MapEntry entry = (MapEntry)super.remove(key);
+
+        try {
+            if (getMapMembers().length > 0 ) {
+                MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_REMOVE, false, (Serializable) key, null, null, null);
+                getChannel().send(getMapMembers(), msg, getChannelSendOptions());
+            }
+        } catch ( ChannelException x ) {
+            log.error("Unable to replicate out data for a LazyReplicatedMap.remove operation",x);
+        }
+        return entry!=null?entry.getValue():null;
+    }
+    
+    public Object get(Object key) {
+        MapEntry entry = (MapEntry)super.get(key);
+        if (log.isTraceEnabled()) log.trace("Requesting id:"+key+" entry:"+entry);
+        if ( entry == null ) return null;
+        if ( !entry.isPrimary() ) {
+            //if the message is not primary, we need to retrieve the latest value
+            try {
+                Member[] backup = null;
+                MapMessage msg = null;
+                if ( !entry.isBackup() ) {
+                    //make sure we don't retrieve from ourselves
+                    msg = new MapMessage(getMapContextName(), MapMessage.MSG_RETRIEVE_BACKUP, false,
+                                         (Serializable) key, null, null, null);
+                    Response[] resp = getRpcChannel().send(entry.getBackupNodes(),msg, this.getRpcChannel().FIRST_REPLY, Channel.SEND_OPTIONS_DEFAULT, getRpcTimeout());
+                    if (resp == null || resp.length == 0) {
+                        //no responses
+                        log.warn("Unable to retrieve remote object for key:" + key);
+                        return null;
+                    }
+                    msg = (MapMessage) resp[0].getMessage();
+                    msg.deserialize(getExternalLoaders());
+                    backup = entry.getBackupNodes();
+                    if ( entry.getValue() instanceof ReplicatedMapEntry ) {
+                        ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue();
+                        val.setOwner(getMapOwner());
+                    }
+                    if ( msg.getValue()!=null ) entry.setValue(msg.getValue());
+                }
+                if (entry.isBackup()) {
+                    //select a new backup node
+                    backup = publishEntryInfo(key, entry.getValue());
+                } else if ( entry.isProxy() ) {
+                    //invalidate the previous primary
+                    msg = new MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,backup);
+                    Member[] dest = getMapMembersExcl(backup);
+                    if ( dest!=null && dest.length >0) {
+                        getChannel().send(dest, msg, getChannelSendOptions());
+                    }
+                }
+
+                entry.setBackupNodes(backup);
+                entry.setBackup(false);
+                entry.setProxy(false);
+
+
+            } catch (Exception x) {
+                log.error("Unable to replicate out data for a LazyReplicatedMap.get operation", x);
+                return null;
+            }
+        }
+        if (log.isTraceEnabled()) log.trace("Requesting id:"+key+" result:"+entry.getValue());
+        if ( entry.getValue() != null && entry.getValue() instanceof ReplicatedMapEntry ) {
+            ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue();
+            //hack, somehow this is not being set above
+            val.setOwner(getMapOwner());
+            
+        }
+        return entry.getValue();
+    }    
+
+    
+    protected void printMap(String header) {
+        try {
+            System.out.println("\nDEBUG MAP:"+header);
+            System.out.println("Map["+ new String(mapContextName, chset) + ", Map Size:" + super.size());
+            Member[] mbrs = getMapMembers();
+            for ( int i=0; i<mbrs.length;i++ ) {
+                System.out.println("Mbr["+(i+1)+"="+mbrs[i].getName());
+            }
+            Iterator i = super.entrySet().iterator();
+            int cnt = 0;
+
+            while (i.hasNext()) {
+                Map.Entry e = (Map.Entry) i.next();
+                System.out.println( (++cnt) + ". " + e.getValue());
+            }
+            System.out.println("EndMap]\n\n");
+        }catch ( Exception ignore) {
+            ignore.printStackTrace();
+        }
+    }
+    
+    /**
+         * Returns true if the key has an entry in the map.
+         * The entry can be a proxy or a backup entry, invoking <code>get(key)</code>
+         * will make this entry primary for the group
+         * @param key Object
+         * @return boolean
+         */
+        public boolean containsKey(Object key) {
+            return super.containsKey(key);
+        }
+    
+    
+        public Object put(Object key, Object value) {
+            MapEntry entry = new MapEntry(key,value);
+            entry.setBackup(false);
+            entry.setProxy(false);
+    
+            Object old = null;
+    
+            //make sure that any old values get removed
+            if ( containsKey(key) ) old = remove(key);
+            try {
+                Member[] backup = publishEntryInfo(key, value);
+                entry.setBackupNodes(backup);
+            } catch (ChannelException x) {

[... 499 lines stripped ...]


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org