You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/03/16 22:54:46 UTC
svn commit: r386465 - in /tomcat/container/tc5.5.x/modules/groupcom:
src/share/org/apache/catalina/tribes/tipis/
test/org/apache/catalina/tribes/demos/ test/org/apache/catalina/tribes/test/
Author: fhanik
Date: Thu Mar 16 13:54:44 2006
New Revision: 386465
URL: http://svn.apache.org/viewcvs?rev=386465&view=rev
Log:
Created an abstract replicated map, as I plan to have a ReplicatedMap to handle context attribute replication.
Context attributes should be retrievable at all times, and not jump backup nodes like sessions,
hence context attribute replication will use a simple ReplicatedMap, versus the session manager be using a LazyReplicatedMap
Also extended the abstract map to be able to have more than one backup node
Added:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java
tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/test/TestNioSender.java
Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?rev=386465&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java Thu Mar 16 13:54:44 2006
@@ -0,0 +1,857 @@
+/*
+ * Copyright 1999,2004-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.tipis;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelListener;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.MembershipListener;
+import org.apache.catalina.tribes.io.DirectByteArrayOutputStream;
+import org.apache.catalina.tribes.io.XByteBuffer;
+import org.apache.catalina.tribes.mcast.MemberImpl;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * <p>Title: </p>
+ *
+ * <p>Description: </p>
+ *
+ * <p>Copyright: Copyright (c) 2005</p>
+ *
+ * <p>Company: </p>
+ *
+ * @author not attributable
+ * @version 1.0
+ */
+public abstract class AbstractReplicatedMap extends LinkedHashMap implements RpcCallback, ChannelListener, MembershipListener {
+ protected static Log log = LogFactory.getLog(AbstractReplicatedMap.class);
+
+//------------------------------------------------------------------------------
+// INSTANCE VARIABLES
+//------------------------------------------------------------------------------
+ private transient long rpcTimeout = 5000;
+ private transient Channel channel;
+ private transient RpcChannel rpcChannel;
+ private transient byte[] mapContextName;
+ private transient boolean stateTransferred = false;
+ private transient Object stateMutex = new Object();
+ private transient ArrayList mapMembers = new ArrayList();
+
+//------------------------------------------------------------------------------
+// CONSTRUCTORS
+//------------------------------------------------------------------------------
+
+ /**
+ * Creates a new map
+ * @param channel The channel to use for communication
+ * @param timeout long - timeout for RPC messags
+ * @param mapContextName String - unique name for this map, to allow multiple maps per channel
+ * @param initialCapacity int - the size of this map, see HashMap
+ * @param loadFactor float - load factor, see HashMap
+ */
+ public AbstractReplicatedMap(Channel channel, long timeout, String mapContextName, int initialCapacity,
+ float loadFactor) {
+ super(initialCapacity, loadFactor);
+ init(channel, mapContextName, timeout);
+ }
+
+ /**
+ * Creates a new map
+ * @param channel The channel to use for communication
+ * @param timeout long - timeout for RPC messags
+ * @param mapContextName String - unique name for this map, to allow multiple maps per channel
+ * @param initialCapacity int - the size of this map, see HashMap
+ */
+ public AbstractReplicatedMap(Channel channel, long timeout, String mapContextName, int initialCapacity) {
+ super(initialCapacity);
+ init(channel, mapContextName, timeout);
+ }
+
+ /**
+ * Creates a new map
+ * @param channel The channel to use for communication
+ * @param timeout long - timeout for RPC messags
+ * @param mapContextName String - unique name for this map, to allow multiple maps per channel
+ */
+ public AbstractReplicatedMap(Channel channel, long timeout, String mapContextName) {
+ super();
+ init(channel, mapContextName, timeout);
+ }
+
+ protected Member[] wrap(Member m) {
+ return new Member[] {m};
+ }
+
+ private void init(Channel channel, String mapContextName, long timeout) {
+ final String chset = "ISO-8859-1";
+
+ this.channel = channel;
+ this.rpcTimeout = timeout;
+
+ try {
+ //unique context is more efficient if it is stored as bytes
+ this.mapContextName = mapContextName.getBytes(chset);
+ } catch (UnsupportedEncodingException x) {
+ log.warn("Unable to encode mapContextName[" + mapContextName + "] using getBytes(" + chset +
+ ") using default getBytes()", x);
+ this.mapContextName = mapContextName.getBytes();
+ }
+
+ //create an rpc channel and add the map as a listener
+ this.rpcChannel = new RpcChannel(this.mapContextName, channel, this);
+ this.channel.addChannelListener(this);
+ this.channel.addMembershipListener(this);
+
+ try {
+ //send out a map membership message, only wait for the first reply
+ MapMessage msg = new MapMessage(this.mapContextName, MapMessage.MSG_START,
+ false, null, null, null, wrap(channel.getLocalMember(false)));
+ Response[] resp = rpcChannel.send(channel.getMembers(), msg, rpcChannel.FIRST_REPLY, timeout);
+ for (int i = 0; i < resp.length; i++) {
+ messageReceived(resp[i].getMessage(), resp[i].getSource());
+ }
+ } catch (ChannelException x) {
+ log.warn("Unable to send map start message.");
+ }
+
+ //transfer state from another map
+ transferState();
+ }
+
+ public void breakdown() {
+ finalize();
+ }
+
+ public void finalize() {
+ try {
+ //send a map membership stop message
+ MapMessage msg = new MapMessage(this.mapContextName, MapMessage.MSG_STOP,
+ false, null, null, null, wrap(channel.getLocalMember(false)));
+ if (channel != null) channel.send(channel.getMembers(), msg);
+ } catch (ChannelException x) {
+ log.warn("Unable to send stop message.", x);
+ }
+
+ //cleanup
+ if (this.rpcChannel != null) {
+ this.rpcChannel.breakdown();
+ }
+ if (this.channel != null) {
+ this.channel.removeChannelListener(this);
+ this.channel.removeMembershipListener(this);
+ }
+ this.rpcChannel = null;
+ this.channel = null;
+ this.mapMembers.clear();
+ super.clear();
+ this.stateTransferred = false;
+ }
+
+ //------------------------------------------------------------------------------
+// GROUP COM INTERFACES
+//------------------------------------------------------------------------------
+ public Member[] getMapMembers() {
+ synchronized (mapMembers) {
+ Member[] result = new Member[mapMembers.size()];
+ mapMembers.toArray(result);
+ return result;
+ }
+ }
+
+ /**
+ * Replicates any changes to the object since the last time
+ * The object has to be primary, ie, if the object is a proxy or a backup, it will not be replicated<br>
+ * @param complete - if set to true, the object is replicated to its backup
+ * if set to false, only objects that implement ReplicatedMapEntry and the isDirty() returns true will
+ * be replicated
+ */
+ public void replicate(Object key, boolean complete) {
+ MapEntry entry = (MapEntry)super.get(key);
+ if (entry != null && entry.isPrimary()) {
+ Object value = entry.getValue();
+ //check to see if we need to replicate this object isDirty()||complete
+ boolean repl = complete || ( (value instanceof ReplicatedMapEntry) && ( (ReplicatedMapEntry) value).isDirty());
+ if (!repl)return;
+
+ //check to see if the message is diffable
+ boolean diff = ( (value instanceof ReplicatedMapEntry) && ( (ReplicatedMapEntry) value).isDiffable());
+ MapMessage msg = null;
+ if (diff) {
+ try {
+ //construct a diff message
+ msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,
+ true, (Serializable) entry.getKey(), null,
+ ( (ReplicatedMapEntry) entry.getValue()).getDiff(),
+ entry.getBackupNodes());
+ } catch (IOException x) {
+ log.error("Unable to diff object. Will replicate the entire object instead.", x);
+ }
+ }
+ if (msg == null) {
+ //construct a complete
+ msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,
+ false, (Serializable) entry.getKey(),
+ (Serializable) entry.getValue(),
+ null, entry.getBackupNodes());
+
+ }
+ try {
+ channel.send(entry.getBackupNodes(), msg);
+ } catch (ChannelException x) {
+ log.error("Unable to replicate data.", x);
+ }
+ } //end if
+
+ }
+
+ /**
+ * This can be invoked by a periodic thread to replicate out any changes.
+ * For maps that don't store objects that implement ReplicatedMapEntry, this
+ * method should be used infrequently to avoid large amounts of data transfer
+ * @param complete boolean
+ */
+ public void replicate(boolean complete) {
+ Iterator i = super.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry e = (Map.Entry) i.next();
+ replicate(e.getKey(), complete);
+ } //while
+
+ }
+
+ public void transferState() {
+ try {
+ Member[] members = getMapMembers();
+ Member backup = members.length > 0 ? (Member) members[0] : null;
+ if (backup != null) {
+ MapMessage msg = new MapMessage(mapContextName, MapMessage.MSG_STATE, false,
+ null, null, null, null);
+ Response[] resp = rpcChannel.send(new Member[] {backup}, msg, rpcChannel.FIRST_REPLY, rpcTimeout);
+ if (resp.length > 0) {
+ msg = (MapMessage) resp[0].getMessage();
+ ArrayList list = (ArrayList) msg.getValue();
+ for (int i = 0; i < list.size(); i++) {
+ MapMessage m = (MapMessage) list.get(i);
+
+ //make sure we don't store that actual object as primary or backup
+ MapEntry local = (MapEntry)super.get(m.getKey());
+ if (local != null && (!local.isProxy()))continue;
+
+ //store the object
+ MapEntry entry = new MapEntry(m.getKey(), m.getValue());
+ entry.setBackup(false);
+ entry.setProxy(true);
+ entry.setBackupNodes(m.getBackupNodes());
+ super.put(entry.getKey(), entry);
+ }
+ }
+ }
+ } catch (ChannelException x) {
+ log.error("Unable to transfer LazyReplicatedMap state.", x);
+ }
+ stateTransferred = true;
+ }
+
+ /**
+ * @todo implement state transfer
+ * @param msg Serializable
+ * @return Serializable - null if no reply should be sent
+ */
+ public Serializable replyRequest(Serializable msg, Member sender) {
+ if (! (msg instanceof MapMessage))return null;
+ MapMessage mapmsg = (MapMessage) msg;
+
+ //map start request
+ if (mapmsg.getMsgType() == mapmsg.MSG_START) {
+ mapMemberAdded(sender);
+ mapmsg.setBackUpNodes(wrap(channel.getLocalMember(false)));
+ return mapmsg;
+ }
+
+ //backup request
+ if (mapmsg.getMsgType() == mapmsg.MSG_RETRIEVE_BACKUP) {
+ MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
+ if (entry == null)return null;
+ mapmsg.setValue( (Serializable) entry.getValue());
+ return mapmsg;
+ }
+
+ //state transfer request
+ if (mapmsg.getMsgType() == mapmsg.MSG_STATE) {
+ synchronized (stateMutex) { //make sure we dont do two things at the same time
+ ArrayList list = new ArrayList();
+ Iterator i = super.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry e = (Map.Entry) i.next();
+ MapEntry entry = (MapEntry) e.getValue();
+ MapMessage me = new MapMessage(mapContextName, MapMessage.MSG_PROXY,
+ false, (Serializable) entry.getKey(), (Serializable) entry.getValue(),
+ null, entry.getBackupNodes());
+ list.add(me);
+ }
+ mapmsg.setValue(list);
+ return mapmsg;
+ } //synchronized
+ }
+
+ return null;
+
+ }
+
+ /**
+ * If the reply has already been sent to the requesting thread,
+ * the rpc callback can handle any data that comes in after the fact.
+ * @param msg Serializable
+ * @param sender Member
+ */
+ public void leftOver(Serializable msg, Member sender) {
+ //left over membership messages
+ if (! (msg instanceof MapMessage))return;
+
+ MapMessage mapmsg = (MapMessage) msg;
+ if (mapmsg.getMsgType() == MapMessage.MSG_START) {
+ mapMemberAdded(mapmsg.getBackupNodes()[0]);
+ }
+ }
+
+ public void messageReceived(Serializable msg, Member sender) {
+ //todo implement all the messages that we can receive
+ //messages we can receive are MSG_PROXY, MSG_BACKUP
+ if (! (msg instanceof MapMessage))return;
+
+ MapMessage mapmsg = (MapMessage) msg;
+
+ if (mapmsg.getMsgType() == MapMessage.MSG_START) {
+ mapMemberAdded(mapmsg.getBackupNodes()[0]);
+ }
+
+ if (mapmsg.getMsgType() == MapMessage.MSG_STOP) {
+ memberDisappeared(mapmsg.getBackupNodes()[0]);
+ }
+
+ if (mapmsg.getMsgType() == MapMessage.MSG_PROXY) {
+ MapEntry entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue());
+ entry.setBackup(false);
+ entry.setProxy(true);
+ entry.setBackupNodes(mapmsg.getBackupNodes());
+ super.put(entry.getKey(), entry);
+ }
+
+ if (mapmsg.getMsgType() == MapMessage.MSG_REMOVE) {
+ super.remove(mapmsg.getKey());
+ }
+
+ if (mapmsg.getMsgType() == MapMessage.MSG_BACKUP) {
+ MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
+ if (entry == null) {
+ entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue());
+ entry.setBackup(true);
+ entry.setProxy(false);
+ entry.setBackupNodes(mapmsg.getBackupNodes());
+ super.put(entry.getKey(), entry);
+ } else {
+ entry.setBackup(true);
+ entry.setProxy(false);
+ entry.setBackupNodes(mapmsg.getBackupNodes());
+ if (entry.getValue() instanceof ReplicatedMapEntry) {
+ ReplicatedMapEntry diff = (ReplicatedMapEntry) entry.getValue();
+ if (mapmsg.isDiff()) {
+ try {
+ diff.applyDiff(mapmsg.getDiffValue(), 0, mapmsg.getDiffValue().length);
+ } catch (IOException x) {
+ log.error("Unable to apply diff to key:" + entry.getKey(), x);
+ }
+ } else {
+ entry.setValue(mapmsg.getValue());
+ } //end if
+ } else {
+ entry.setValue(mapmsg.getValue());
+ } //end if
+ } //end if
+ } //end if
+
+ }
+
+ public boolean accept(Serializable msg, Member sender) {
+ if (msg instanceof MapMessage) {
+ return Arrays.equals(mapContextName, ( (MapMessage) msg).getMapId());
+ }
+ return false;
+ }
+
+ public void mapMemberAdded(Member member) {
+ //select a backup node if we don't have one
+ synchronized (mapMembers) {
+ mapMembers.add(member);
+ }
+ synchronized (stateMutex) {
+ Iterator i = super.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry e = (Map.Entry) i.next();
+ MapEntry entry = (MapEntry) e.getValue();
+ if (entry.isPrimary() && entry.getBackupNodes() == null && entry.getBackupNodes().length == 0) {
+ try {
+ Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
+ entry.setBackupNodes(backup);
+ } catch (ChannelException x) {
+ log.error("Unable to select backup node.", x);
+ } //catch
+ } //end if
+ } //while
+ } //synchronized
+
+ }
+
+ public boolean inSet(Member m, Member[] set) {
+ boolean result = false;
+ for (int i=0; i<set.length && (!result); i++ )
+ if ( m.equals(set[i]) ) result = true;
+ return result;
+ }
+
+ public void memberAdded(Member member) {
+ //do nothing
+ }
+
+ public void memberDisappeared(Member member) {
+ synchronized (mapMembers) {
+ mapMembers.remove(member);
+ }
+ //todo move all sessions that are primary here to and have this member as
+ //a backup
+ Iterator i = super.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry e = (Map.Entry) i.next();
+ MapEntry entry = (MapEntry) e.getValue();
+ if (entry.isPrimary() && inSet(member,entry.getBackupNodes())) {
+ try {
+ Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
+ entry.setBackupNodes(backup);
+ } catch (ChannelException x) {
+ log.error("Unable to relocate[" + entry.getKey() + "] to a new backup node", x);
+ }
+ } //end if
+ } //while
+ }
+
+ int currentNode = 0;
+ public Member getNextBackupNode() {
+ Member[] members = getMapMembers();
+ if (members.length == 0)return null;
+ int node = currentNode++;
+ if (node >= members.length) {
+ node = 0;
+ currentNode = 0;
+ }
+ return members[node];
+ }
+
+ protected abstract Member[] publishEntryInfo(Object key, Object value) throws ChannelException;
+
+//------------------------------------------------------------------------------
+// METHODS TO OVERRIDE
+//------------------------------------------------------------------------------
+
+
+
+//------------------------------------------------------------------------------
+// Map Entry class
+//------------------------------------------------------------------------------
+ public static class MapEntry
+ implements Map.Entry {
+ private boolean backup;
+ private boolean proxy;
+ private Member[] backupNodes;
+
+ private Serializable key;
+ private Serializable value;
+
+ public MapEntry(Serializable key, Serializable value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public boolean isBackup() {
+ return backup;
+ }
+
+ public void setBackup(boolean backup) {
+ this.backup = backup;
+ }
+
+ public boolean isProxy() {
+ return proxy;
+ }
+
+ public boolean isPrimary() {
+ return ( (!proxy) && (!backup));
+ }
+
+ public void setProxy(boolean proxy) {
+ this.proxy = proxy;
+ }
+
+ public boolean isDiffable() {
+ return (value instanceof ReplicatedMapEntry);
+ }
+
+ public void setBackupNodes(Member[] nodes) {
+ this.backupNodes = nodes;
+ }
+
+ public Member[] getBackupNodes() {
+ return backupNodes;
+ }
+
+ public Object getValue() {
+ return value;
+ }
+
+ public Object setValue(Object value) {
+ Object old = this.value;
+ this.value = (Serializable) value;
+ return old;
+ }
+
+ public Object getKey() {
+ return key;
+ }
+
+ public byte[] getDiff() throws IOException {
+ if (isDiffable()) {
+ return ( (ReplicatedMapEntry) value).getDiff();
+ } else {
+ return getData();
+ }
+ }
+
+ public int hashCode() {
+ return value.hashCode();
+ }
+
+ public boolean equals(Object o) {
+ return value.equals(o);
+ }
+
+ /**
+ * returns the entire object as a byte array
+ * @return byte[]
+ * @throws IOException
+ */
+ public byte[] getData() throws IOException {
+ return (new ObjectStreamable(value)).getBuf().getArray();
+ }
+
+ /**
+ * apply a diff, or an entire object
+ * @param data byte[]
+ * @param offset int
+ * @param length int
+ * @param diff boolean
+ * @throws IOException
+ * @throws ClassNotFoundException
+ */
+ public void apply(byte[] data, int offset, int length, boolean diff) throws IOException, ClassNotFoundException {
+ if (isDiffable() && diff) {
+ ( (ReplicatedMapEntry) value).applyDiff(data, offset, length);
+ } else if (length == 0) {
+ value = null;
+ proxy = true;
+ } else {
+ value = XByteBuffer.deserialize(data, offset, length);
+ }
+ }
+
+ }
+
+//------------------------------------------------------------------------------
+// map message to send to and from other maps
+//------------------------------------------------------------------------------
+
+ public static class MapMessage implements Externalizable {
+ public static final int MSG_BACKUP = 1;
+ public static final int MSG_RETRIEVE_BACKUP = 2;
+ public static final int MSG_PROXY = 3;
+ public static final int MSG_REMOVE = 4;
+ public static final int MSG_STATE = 5;
+ public static final int MSG_START = 6;
+ public static final int MSG_STOP = 7;
+
+ private byte[] mapId;
+ private int msgtype;
+ private boolean diff;
+ private Serializable key;
+ private Serializable value;
+ private byte[] diffvalue;
+ private Member[] nodes;
+
+ public MapMessage() {}
+
+ public MapMessage(byte[] mapId,
+ int msgtype, boolean diff,
+ Serializable key, Serializable value,
+ byte[] diffvalue, Member[] nodes) {
+ this.mapId = mapId;
+ this.msgtype = msgtype;
+ this.diff = diff;
+ this.key = key;
+ this.value = value;
+ this.diffvalue = diffvalue;
+ this.nodes = nodes;
+ }
+
+ public int getMsgType() {
+ return msgtype;
+ }
+
+ public boolean isDiff() {
+ return diff;
+ }
+
+ public Serializable getKey() {
+ return key;
+ }
+
+ public Serializable getValue() {
+ return value;
+ }
+
+ public byte[] getDiffValue() {
+ return diffvalue;
+ }
+
+ public Member[] getBackupNodes() {
+ return nodes;
+ }
+
+ private void setBackUpNodes(Member[] nodes) {
+ this.nodes = nodes;
+ }
+
+ public byte[] getMapId() {
+ return mapId;
+ }
+
+ public void setValue(Serializable value) {
+ this.value = value;
+ }
+
+ protected Member[] readMembers(ObjectInput in) throws IOException, ClassNotFoundException {
+ int nodecount = in.readInt();
+ Member[] members = new Member[nodecount];
+ for ( int i=0; i<members.length; i++ ) {
+ byte[] d = new byte[in.readInt()];
+ in.read(d);
+ if (d.length > 0) members[i] = MemberImpl.getMember(d);
+ }
+ return members;
+ }
+
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ mapId = new byte[in.readInt()];
+ in.read(mapId);
+ msgtype = in.readInt();
+ switch (msgtype) {
+ case MSG_BACKUP:
+ case MSG_STATE: {
+ diff = in.readBoolean();
+ key = (Serializable) in.readObject();
+ if (diff) {
+ diffvalue = new byte[in.readInt()];
+ in.read(diffvalue);
+ } else {
+ value = (Serializable) in.readObject();
+ } //endif
+ nodes = readMembers(in);
+ break;
+ }
+ case MSG_RETRIEVE_BACKUP: {
+ key = (Serializable) in.readObject();
+ value = (Serializable) in.readObject();
+ break;
+ }
+ case MSG_REMOVE: {
+ key = (Serializable) in.readObject();
+ break;
+ }
+ case MSG_PROXY: {
+ key = (Serializable) in.readObject();
+ this.nodes = readMembers(in);
+ break;
+ }
+ case MSG_START:
+ MSG_STOP: {
+ nodes = readMembers(in);
+ break;
+ }
+
+ } //switch
+ } //readExternal
+
+ protected void writeMembers(ObjectOutput out,Member[] members) throws IOException {
+ if ( members == null ) members = new Member[0];
+ out.writeInt(members.length);
+ for (int i=0; i<members.length; i++ ) {
+ if ( members[i] != null ) {
+ byte[] d = members[i] != null ? ( (MemberImpl)members[i]).getData(false) : new byte[0];
+ out.writeInt(d.length);
+ out.write(d);
+ }
+ }
+
+ }
+
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeInt(mapId.length);
+ out.write(mapId);
+ out.writeInt(msgtype);
+ switch (msgtype) {
+ case MSG_BACKUP:
+ case MSG_STATE: {
+ out.writeBoolean(diff);
+ out.writeObject(key);
+ if (diff) {
+ out.writeInt(diffvalue.length);
+ out.write(diffvalue);
+ } else {
+ out.writeObject(value);
+ } //endif
+ writeMembers(out,nodes);
+ break;
+ }
+ case MSG_RETRIEVE_BACKUP: {
+ out.writeObject(key);
+ out.writeObject(value);
+ break;
+ }
+ case MSG_REMOVE: {
+ out.writeObject(key);
+ break;
+ }
+ case MSG_PROXY: {
+ out.writeObject(key);
+ writeMembers(out,nodes);
+ break;
+ }
+ case MSG_START:
+ MSG_STOP: {
+ writeMembers(out,nodes);
+ break;
+ }
+ } //switch
+ } //writeExternal
+
+ public Object clone() {
+ return new MapMessage(this.mapId, this.msgtype, this.diff, this.key, this.value, this.diffvalue, this.nodes);
+ }
+ } //MapMessage
+
+//------------------------------------------------------------------------------
+// streamable class
+//------------------------------------------------------------------------------
+
+ public static class ObjectStreamable
+ implements Streamable {
+ private DirectByteArrayOutputStream buf;
+ private int pos = 0;
+ public ObjectStreamable(Serializable value) throws IOException {
+ buf = new DirectByteArrayOutputStream(1024);
+ ObjectOutputStream out = new ObjectOutputStream(buf);
+ out.writeObject(value);
+ out.flush();
+ }
+
+ /**
+ * returns true if the stream has reached its end
+ * @return boolean
+ */
+ public synchronized boolean eof() {
+ return (pos >= buf.size());
+
+ }
+
+ /**
+ * write data into the byte array starting at offset, maximum bytes read are (data.length-offset)
+ * @param data byte[] - the array to read data into
+ * @param offset int - start position for writing data
+ * @return int - the number of bytes written into the data buffer
+ */
+ public synchronized int write(byte[] data, int offset) throws IOException {
+ int length = Math.min(data.length - offset, buf.size() - pos);
+ System.arraycopy(buf.getArrayDirect(), pos, data, offset, length);
+ pos = pos + length;
+ return length;
+ }
+
+ public synchronized int read(byte[] data, int offset, int length) throws IOException {
+ return -1;
+ }
+
+ public DirectByteArrayOutputStream getBuf() {
+ return buf;
+ }
+
+ public int size() {
+ return buf.size();
+ }
+
+ public int pos() {
+ return pos;
+ }
+
+ }
+
+ public Channel getChannel() {
+ return channel;
+ }
+
+ public byte[] getMapContextName() {
+ return mapContextName;
+ }
+
+ public RpcChannel getRpcChannel() {
+ return rpcChannel;
+ }
+
+ public long getRpcTimeout() {
+ return rpcTimeout;
+ }
+
+ public Object getStateMutex() {
+ return stateMutex;
+ }
+
+ public boolean isStateTransferred() {
+ return stateTransferred;
+ }
+
+}
\ No newline at end of file
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=386465&r1=386464&r2=386465&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java Thu Mar 16 13:54:44 2006
@@ -78,420 +78,51 @@
* @author Filip Hanik
* @version 1.0
*/
-public class LazyReplicatedMap extends LinkedHashMap
+public class LazyReplicatedMap extends AbstractReplicatedMap
implements RpcCallback, ChannelListener, MembershipListener {
protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(LazyReplicatedMap.class);
-//------------------------------------------------------------------------------
-// INSTANCE VARIABLES
-//------------------------------------------------------------------------------
- private transient long rpcTimeout = 5000;
- private transient Channel channel;
- private transient RpcChannel rpcChannel;
- private transient byte[] mapContextName;
- private transient boolean stateTransferred = false;
- private transient Object stateMutex = new Object();
- private transient ArrayList mapMembers = new ArrayList();
//------------------------------------------------------------------------------
// CONSTRUCTORS / DESTRUCTORS
//------------------------------------------------------------------------------
/**
- * Creates a new map
- * @param channel The channel to use for communication
- * @param timeout long - timeout for RPC messags
- * @param mapContextName String - unique name for this map, to allow multiple maps per channel
- * @param initialCapacity int - the size of this map, see HashMap
- * @param loadFactor float - load factor, see HashMap
- */
- public LazyReplicatedMap(Channel channel, long timeout, String mapContextName, int initialCapacity, float loadFactor) {
- super(initialCapacity,loadFactor);
- init(channel,mapContextName,timeout);
- }
-
- /**
- * Creates a new map
- * @param channel The channel to use for communication
- * @param timeout long - timeout for RPC messags
- * @param mapContextName String - unique name for this map, to allow multiple maps per channel
- * @param initialCapacity int - the size of this map, see HashMap
- */
- public LazyReplicatedMap(Channel channel, long timeout, String mapContextName, int initialCapacity) {
- super(initialCapacity);
- init(channel,mapContextName, timeout);
- }
-
- /**
- * Creates a new map
- * @param channel The channel to use for communication
- * @param timeout long - timeout for RPC messags
- * @param mapContextName String - unique name for this map, to allow multiple maps per channel
- */
- public LazyReplicatedMap(Channel channel, long timeout, String mapContextName) {
- super();
- init(channel,mapContextName,timeout);
- }
-
-
- private void init(Channel channel, String mapContextName, long timeout) {
- final String chset = "ISO-8859-1";
-
- this.channel = channel;
- this.rpcTimeout = timeout;
-
- try {
- //unique context is more efficient if it is stored as bytes
- this.mapContextName = mapContextName.getBytes(chset);
- }catch (UnsupportedEncodingException x) {
- log.warn("Unable to encode mapContextName["+mapContextName+"] using getBytes("+chset+") using default getBytes()",x);
- this.mapContextName = mapContextName.getBytes();
- }
-
- //create an rpc channel and add the map as a listener
- this.rpcChannel = new RpcChannel(this.mapContextName, channel, this);
- this.channel.addChannelListener(this);
- this.channel.addMembershipListener(this);
-
-
- try {
- //send out a map membership message, only wait for the first reply
- MapMessage msg = new MapMessage(this.mapContextName,MapMessage.MSG_START,
- false,null,null,null,channel.getLocalMember(false));
- Response[] resp = rpcChannel.send(channel.getMembers(),msg,rpcChannel.FIRST_REPLY,timeout);
- for ( int i=0; i<resp.length; i++ ) {
- messageReceived(resp[i].getMessage(),resp[i].getSource());
- }
- }catch ( ChannelException x ) {
- log.warn("Unable to send map start message.");
- }
-
- //transfer state from another map
- transferState();
- }
-
- public void breakdown() {
- finalize();
- }
-
- public void finalize() {
- try {
- //send a map membership stop message
- MapMessage msg = new MapMessage(this.mapContextName,MapMessage.MSG_STOP,
- false,null,null,null,channel.getLocalMember(false));
- if ( channel!=null) channel.send(channel.getMembers(),msg);
- }catch ( ChannelException x ) {
- log.warn("Unable to send stop message.",x);
- }
-
- //cleanup
- if ( this.rpcChannel!=null ) {
- this.rpcChannel.breakdown();
- }
- if ( this.channel != null ) {
- this.channel.removeChannelListener(this);
- this.channel.removeMembershipListener(this);
- }
- this.rpcChannel = null;
- this.channel = null;
- this.mapMembers.clear();
- super.clear();
- this.stateTransferred = false;
- }
-
-
-//------------------------------------------------------------------------------
-// GROUP COM INTERFACES
-//------------------------------------------------------------------------------
- public Member[] getMapMembers() {
- synchronized (mapMembers) {
- Member[] result = new Member[mapMembers.size()];
- mapMembers.toArray(result);
- return result;
- }
- }
- /**
- * Replicates any changes to the object since the last time
- * The object has to be primary, ie, if the object is a proxy or a backup, it will not be replicated<br>
- * @param complete - if set to true, the object is replicated to its backup
- * if set to false, only objects that implement ReplicatedMapEntry and the isDirty() returns true will
- * be replicated
- */
- public void replicate(Object key, boolean complete) {
- MapEntry entry = (MapEntry) super.get(key);
- if (entry!=null && entry.isPrimary() ) {
- Object value = entry.getValue();
- //check to see if we need to replicate this object isDirty()||complete
- boolean repl = complete || ((value instanceof ReplicatedMapEntry) && ((ReplicatedMapEntry)value).isDirty());
- if (!repl) return;
-
- //check to see if the message is diffable
- boolean diff = ((value instanceof ReplicatedMapEntry) && ((ReplicatedMapEntry)value).isDiffable());
- MapMessage msg = null;
- if ( diff ) {
- try {
- //construct a diff message
- msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,
- true, (Serializable) entry.getKey(), null,
- ( (ReplicatedMapEntry) entry.getValue()).getDiff(),
- entry.getBackupNode());
- }catch (IOException x ) {
- log.error("Unable to diff object. Will replicate the entire object instead.",x);
- }
- }
- if ( msg == null ) {
- //construct a complete
- msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,
- false, (Serializable) entry.getKey(),
- (Serializable)entry.getValue(),
- null,entry.getBackupNode());
-
- }
- try {
- channel.send(new Member[] {entry.getBackupNode()}, msg);
- } catch ( ChannelException x ) {
- log.error("Unable to replicate data.",x);
- }
- }//end if
-
- }
-
- /**
- * This can be invoked by a periodic thread to replicate out any changes.
- * For maps that don't store objects that implement ReplicatedMapEntry, this
- * method should be used infrequently to avoid large amounts of data transfer
- * @param complete boolean
- */
- public void replicate(boolean complete) {
- Iterator i = super.entrySet().iterator();
- while (i.hasNext()) {
- Map.Entry e = (Map.Entry) i.next();
- replicate(e.getKey(),complete);
- } //while
-
- }
-
- public void transferState() {
- try {
- Member[] members = getMapMembers();
- Member backup = members.length>0?(Member)members[0]:null;
- if ( backup != null ) {
- MapMessage msg = new MapMessage(mapContextName,MapMessage.MSG_STATE,false,
- null,null,null,null);
- Response[] resp = rpcChannel.send(new Member[] {backup},msg,rpcChannel.FIRST_REPLY,rpcTimeout);
- if ( resp.length > 0 ) {
- msg = (MapMessage)resp[0].getMessage();
- ArrayList list = (ArrayList)msg.getValue();
- for (int i=0; i<list.size(); i++ ) {
- MapMessage m = (MapMessage)list.get(i);
-
- //make sure we don't store that actual object as primary or backup
- MapEntry local = (MapEntry)super.get(m.getKey());
- if ( local != null && (!local.isProxy() ) ) continue;
-
- //store the object
- MapEntry entry = new MapEntry(m.getKey(),m.getValue());
- entry.setBackup(false);
- entry.setProxy(true);
- entry.setBackupNode(m.getBackupNode());
- super.put(entry.getKey(),entry);
- }
- }
- }
- } catch ( ChannelException x ) {
- log.error("Unable to transfer LazyReplicatedMap state.",x);
- }
- stateTransferred = true;
- }
-
- /**
- * @todo implement state transfer
- * @param msg Serializable
- * @return Serializable - null if no reply should be sent
- */
- public Serializable replyRequest(Serializable msg, Member sender) {
- if ( !(msg instanceof MapMessage) ) return null;
- MapMessage mapmsg = (MapMessage)msg;
-
- //map start request
- if ( mapmsg.getMsgType() == mapmsg.MSG_START ) {
- mapMemberAdded(sender);
- mapmsg.setBackUpNode(channel.getLocalMember(false));
- return mapmsg;
- }
-
- //backup request
- if ( mapmsg.getMsgType() == mapmsg.MSG_RETRIEVE_BACKUP ) {
- MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
- if (entry == null)return null;
- mapmsg.setValue( (Serializable) entry.getValue());
- return mapmsg;
- }
-
- //state transfer request
- if ( mapmsg.getMsgType() == mapmsg.MSG_STATE ) {
- synchronized (stateMutex) { //make sure we dont do two things at the same time
- ArrayList list = new ArrayList();
- Iterator i = super.entrySet().iterator();
- while (i.hasNext()) {
- Map.Entry e = (Map.Entry) i.next();
- MapEntry entry = (MapEntry) e.getValue();
- MapMessage me = new MapMessage(mapContextName, MapMessage.MSG_PROXY,
- false, (Serializable) entry.getKey(), (Serializable) entry.getValue(),
- null, entry.getBackupNode());
- list.add(me);
- }
- mapmsg.setValue(list);
- return mapmsg;
- }//synchronized
- }
-
- return null;
-
- }
-
- /**
- * If the reply has already been sent to the requesting thread,
- * the rpc callback can handle any data that comes in after the fact.
- * @param msg Serializable
- * @param sender Member
- */
- public void leftOver(Serializable msg, Member sender) {
- //left over membership messages
- if ( !(msg instanceof MapMessage) ) return;
-
- MapMessage mapmsg = (MapMessage)msg;
- if ( mapmsg.getMsgType() == MapMessage.MSG_START ) {
- mapMemberAdded(mapmsg.getBackupNode());
+ * Creates a new map
+ * @param channel The channel to use for communication
+ * @param timeout long - timeout for RPC messags
+ * @param mapContextName String - unique name for this map, to allow multiple maps per channel
+ * @param initialCapacity int - the size of this map, see HashMap
+ * @param loadFactor float - load factor, see HashMap
+ */
+ public LazyReplicatedMap(Channel channel, long timeout, String mapContextName, int initialCapacity, float loadFactor) {
+ super(channel,timeout,mapContextName,initialCapacity,loadFactor);
}
- }
-
- public void messageReceived(Serializable msg, Member sender) {
- //todo implement all the messages that we can receive
- //messages we can receive are MSG_PROXY, MSG_BACKUP
- if ( !(msg instanceof MapMessage) ) return;
-
- MapMessage mapmsg = (MapMessage)msg;
- if ( mapmsg.getMsgType() == MapMessage.MSG_START ) {
- mapMemberAdded(mapmsg.getBackupNode());
+ /**
+ * Creates a new map
+ * @param channel The channel to use for communication
+ * @param timeout long - timeout for RPC messags
+ * @param mapContextName String - unique name for this map, to allow multiple maps per channel
+ * @param initialCapacity int - the size of this map, see HashMap
+ */
+ public LazyReplicatedMap(Channel channel, long timeout, String mapContextName, int initialCapacity) {
+ super(channel,timeout,mapContextName,initialCapacity);
}
- if ( mapmsg.getMsgType() == MapMessage.MSG_STOP ) {
- memberDisappeared(mapmsg.getBackupNode());
+ /**
+ * Creates a new map
+ * @param channel The channel to use for communication
+ * @param timeout long - timeout for RPC messags
+ * @param mapContextName String - unique name for this map, to allow multiple maps per channel
+ */
+ public LazyReplicatedMap(Channel channel, long timeout, String mapContextName) {
+ super(channel,timeout,mapContextName);
}
- if ( mapmsg.getMsgType() == MapMessage.MSG_PROXY ) {
- MapEntry entry = new MapEntry(mapmsg.getKey(),mapmsg.getValue());
- entry.setBackup(false);
- entry.setProxy(true);
- entry.setBackupNode(mapmsg.getBackupNode());
- super.put(entry.getKey(),entry);
- }
-
- if ( mapmsg.getMsgType() == MapMessage.MSG_REMOVE ) {
- super.remove(mapmsg.getKey());
- }
- if ( mapmsg.getMsgType() == MapMessage.MSG_BACKUP ) {
- MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
- if ( entry == null ) {
- entry = new MapEntry(mapmsg.getKey(),mapmsg.getValue());
- entry.setBackup(true);
- entry.setProxy(false);
- entry.setBackupNode(mapmsg.getBackupNode());
- super.put(entry.getKey(), entry);
- } else {
- entry.setBackup(true);
- entry.setProxy(false);
- entry.setBackupNode(mapmsg.getBackupNode());
- if ( entry.getValue() instanceof ReplicatedMapEntry ) {
- ReplicatedMapEntry diff = (ReplicatedMapEntry)entry.getValue();
- if ( mapmsg.isDiff() ) {
- try {
- diff.applyDiff(mapmsg.getDiffValue(), 0, mapmsg.getDiffValue().length);
- }catch ( IOException x ) {
- log.error("Unable to apply diff to key:"+entry.getKey(),x);
- }
- } else {
- entry.setValue(mapmsg.getValue());
- }//end if
- } else {
- entry.setValue(mapmsg.getValue());
- }//end if
- }//end if
- }//end if
-
- }
-
- public boolean accept(Serializable msg, Member sender) {
- if ( msg instanceof MapMessage ) {
- return Arrays.equals(mapContextName,((MapMessage)msg).getMapId());
- }
- return false;
- }
-
- public void mapMemberAdded(Member member) {
- //select a backup node if we don't have one
- synchronized (mapMembers) {
- mapMembers.add(member);
- }
- synchronized (stateMutex) {
- Iterator i = super.entrySet().iterator();
- while (i.hasNext()) {
- Map.Entry e = (Map.Entry) i.next();
- MapEntry entry = (MapEntry) e.getValue();
- if (entry.isPrimary() && entry.getBackupNode() == null) {
- try {
- Member backup = publishEntryInfo(entry.getKey(), entry.getValue());
- entry.setBackupNode(backup);
- } catch (ChannelException x) {
- log.error("Unable to select backup node.", x);
- }//catch
- }//end if
- } //while
- }//synchronized
-
- }
- public void memberAdded(Member member) {
- //do nothing
- }
- public void memberDisappeared(Member member) {
- synchronized (mapMembers) {
- mapMembers.remove(member);
- }
- //todo move all sessions that are primary here to and have this member as
- //a backup
- Iterator i = super.entrySet().iterator();
- while ( i.hasNext() ) {
- Map.Entry e = (Map.Entry)i.next();
- MapEntry entry = (MapEntry)e.getValue();
- if ( entry.isPrimary() && member.equals(entry.getBackupNode())) {
- try {
- Member backup = publishEntryInfo(entry.getKey(), entry.getValue());
- entry.setBackupNode(backup);
- }catch ( ChannelException x ) {
- log.error("Unable to relocate["+entry.getKey()+"] to a new backup node",x);
- }
- }//end if
- }//while
- }
-
- int currentNode = 0;
- public Member getNextBackupNode() {
- Member[] members = getMapMembers();
- if ( members.length == 0 ) return null;
- int node = currentNode++;
- if ( node >= members.length ) {
- node = 0;
- currentNode = 0;
- }
- return members[node];
- }
-
-
//------------------------------------------------------------------------------
// METHODS TO OVERRIDE
@@ -500,25 +131,25 @@
* publish info about a map pair (key/value) to other nodes in the cluster
* @param key Object
* @param value Object
- * @return Member
+ * @return Member - the backup node
* @throws ChannelException
*/
- protected Member publishEntryInfo(Object key, Object value) throws ChannelException {
+ protected Member[] publishEntryInfo(Object key, Object value) throws ChannelException {
//select a backup node
Member backup = getNextBackupNode();
if ( backup == null ) return null;
//publish the data out to all nodes
- MapMessage msg = new MapMessage(this.mapContextName, MapMessage.MSG_PROXY, false,
- (Serializable) key, null, null, backup);
- channel.send(getMapMembers(), msg);
+ MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_PROXY, false,
+ (Serializable) key, null, null, wrap(backup));
+ getChannel().send(getMapMembers(), msg);
//publish the backup data to one node
- msg = new MapMessage(this.mapContextName, MapMessage.MSG_BACKUP, false,
- (Serializable) key, (Serializable) value, null, backup);
- channel.send(new Member[] {backup}, msg);
- return backup;
+ msg = new MapMessage(getMapContextName(), MapMessage.MSG_BACKUP, false,
+ (Serializable) key, (Serializable) value, null, wrap(backup));
+ getChannel().send(new Member[] {backup}, msg);
+ return wrap(backup);
}
public Object get(Object key) {
@@ -527,10 +158,9 @@
if ( !entry.isPrimary() ) {
//if the message is not primary, we need to retrieve the latest value
try {
- MapMessage msg = new MapMessage(mapContextName, MapMessage.MSG_RETRIEVE_BACKUP, false,
+ MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_RETRIEVE_BACKUP, false,
(Serializable) key, null, null, null);
- Response[] resp = rpcChannel.send(new Member[] {entry.getBackupNode()},
- msg, this.rpcChannel.FIRST_REPLY, rpcTimeout);
+ Response[] resp = getRpcChannel().send(entry.getBackupNodes(),msg, this.getRpcChannel().FIRST_REPLY, getRpcTimeout());
if (resp == null || resp.length == 0) {
//no responses
log.warn("Unable to retrieve remote object for key:" + key);
@@ -538,18 +168,18 @@
}
msg = (MapMessage) resp[0].getMessage();
- Member backup = entry.getBackupNode();
+ Member[] backup = entry.getBackupNodes();
if (entry.isBackup()) {
//select a new backup node
backup = publishEntryInfo(key, msg.getValue());
} else if ( entry.isProxy() ) {
//invalidate the previous primary
- msg = new MapMessage(mapContextName,MapMessage.MSG_PROXY,false,(Serializable)key,null,null,backup);
- channel.send(new Member[] {backup},msg);
+ msg = new MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,backup);
+ getChannel().send(backup,msg);
}
- entry.setBackupNode(backup);
+ entry.setBackupNodes(backup);
entry.setBackup(false);
entry.setProxy(false);
entry.setValue(msg.getValue());
@@ -588,8 +218,8 @@
//make sure that any old values get removed
if ( containsKey(key) ) old = remove(key);
try {
- Member backup = publishEntryInfo(key, value);
- entry.setBackupNode(backup);
+ Member[] backup = publishEntryInfo(key, value);
+ entry.setBackupNodes(backup);
} catch (ChannelException x) {
log.error("Unable to replicate out data for a LazyReplicatedMap.put operation", x);
}
@@ -618,9 +248,9 @@
*/
public Object remove(Object key) {
MapEntry entry = (MapEntry)super.remove(key);
- MapMessage msg = new MapMessage(mapContextName,MapMessage.MSG_REMOVE,false,(Serializable)key,null,null,null);
+ MapMessage msg = new MapMessage(getMapContextName(),MapMessage.MSG_REMOVE,false,(Serializable)key,null,null,null);
try {
- channel.send(getMapMembers(), msg);
+ getChannel().send(getMapMembers(), msg);
} catch ( ChannelException x ) {
log.error("Unable to replicate out data for a LazyReplicatedMap.remove operation",x);
}
@@ -726,337 +356,6 @@
}
-//------------------------------------------------------------------------------
-// Map Entry class
-//------------------------------------------------------------------------------
- public static class MapEntry implements Map.Entry {
- private boolean backup;
- private boolean proxy;
- private Member backupNode;
-
- private Serializable key;
- private Serializable value;
-
- public MapEntry(Serializable key, Serializable value) {
- this.key = key;
- this.value = value;
- }
-
- public boolean isBackup() {
- return backup;
- }
-
- public void setBackup(boolean backup) {
- this.backup = backup;
- }
-
- public boolean isProxy() {
- return proxy;
- }
-
- public boolean isPrimary() {
- return ((!proxy) && (!backup));
- }
-
- public void setProxy(boolean proxy) {
- this.proxy = proxy;
- }
-
- public boolean isDiffable() {
- return (value instanceof ReplicatedMapEntry);
- }
-
- public void setBackupNode(Member node) {
- this.backupNode = node;
- }
-
- public Member getBackupNode() {
- return backupNode;
- }
-
- public Object getValue() {
- return value;
- }
-
- public Object setValue(Object value) {
- Object old = this.value;
- this.value = (Serializable)value;
- return old;
- }
-
- public Object getKey() {
- return key;
- }
-
- public byte[] getDiff() throws IOException {
- if ( isDiffable() ) {
- return ((ReplicatedMapEntry)value).getDiff();
- } else {
- return getData();
- }
- }
-
- public int hashCode() {
- return value.hashCode();
- }
-
- public boolean equals(Object o) {
- return value.equals(o);
- }
-
- /**
- * returns the entire object as a byte array
- * @return byte[]
- * @throws IOException
- */
- public byte[] getData() throws IOException {
- return (new ObjectStreamable(value)).getBuf().getArray();
- }
-
- /**
- * apply a diff, or an entire object
- * @param data byte[]
- * @param offset int
- * @param length int
- * @param diff boolean
- * @throws IOException
- * @throws ClassNotFoundException
- */
- public void apply(byte[] data, int offset, int length, boolean diff) throws IOException, ClassNotFoundException {
- if ( isDiffable() && diff ) {
- ((ReplicatedMapEntry)value).applyDiff(data,offset,length);
- } else if ( length == 0 ) {
- value = null;
- proxy = true;
- } else {
- value = XByteBuffer.deserialize(data,offset,length);
- }
- }
-
- }
-//------------------------------------------------------------------------------
-// map message to send to and from other maps
-//------------------------------------------------------------------------------
-
- public static class MapMessage implements Externalizable {
- public static final int MSG_BACKUP = 1;
- public static final int MSG_RETRIEVE_BACKUP = 2;
- public static final int MSG_PROXY = 3;
- public static final int MSG_REMOVE = 4;
- public static final int MSG_STATE = 5;
- public static final int MSG_START = 6;
- public static final int MSG_STOP = 7;
-
- private byte[] mapId;
- private int msgtype;
- private boolean diff;
- private Serializable key;
- private Serializable value;
- private byte[] diffvalue;
- private Member node;
-
- public MapMessage(){}
-
- public MapMessage(byte[] mapId,
- int msgtype, boolean diff,
- Serializable key,Serializable value,
- byte[] diffvalue, Member node) {
- this.mapId = mapId;
- this.msgtype = msgtype;
- this.diff = diff;
- this.key = key;
- this.value = value;
- this.diffvalue = diffvalue;
- this.node = node;
- }
-
- public int getMsgType() {
- return msgtype;
- }
-
- public boolean isDiff() {
- return diff;
- }
-
- public Serializable getKey() {
- return key;
- }
-
- public Serializable getValue() {
- return value;
- }
-
- public byte[] getDiffValue() {
- return diffvalue;
- }
-
- public Member getBackupNode() {
- return node;
- }
-
- private void setBackUpNode(Member node) {
- this.node = node;
- }
-
- public byte[] getMapId() {
- return mapId;
- }
-
- public void setValue(Serializable value) {
- this.value = value;
- }
-
- public void readExternal(ObjectInput in) throws IOException,ClassNotFoundException {
- mapId = new byte[in.readInt()];
- in.read(mapId);
- msgtype = in.readInt();
- switch (msgtype) {
- case MSG_BACKUP:
- case MSG_STATE: {
- diff = in.readBoolean();
- key = (Serializable)in.readObject();
- if ( diff ) {
- diffvalue = new byte[in.readInt()];
- in.read(diffvalue);
- } else {
- value = (Serializable)in.readObject();
- }//endif
- byte[] d = new byte[in.readInt()];
- in.read(d);
- if ( d.length > 0 ) node = MemberImpl.getMember(d);
- break;
- }
- case MSG_RETRIEVE_BACKUP: {
- key = (Serializable)in.readObject();
- value = (Serializable)in.readObject();
- break;
- }
- case MSG_REMOVE : {
- key = (Serializable)in.readObject();
- break;
- }
- case MSG_PROXY: {
- key = (Serializable)in.readObject();
- byte[] d = new byte[in.readInt()];
- in.read(d);
- if ( d.length > 0 ) node = MemberImpl.getMember(d);
- break;
- }
- case MSG_START :
- MSG_STOP :{
- byte[] d = new byte[in.readInt()];
- in.read(d);
- if ( d.length > 0 ) node = MemberImpl.getMember(d);
- break;
- }
-
- }//switch
- }//readExternal
-
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeInt(mapId.length);
- out.write(mapId);
- out.writeInt(msgtype);
- switch (msgtype) {
- case MSG_BACKUP:
- case MSG_STATE: {
- out.writeBoolean(diff);
- out.writeObject(key);
- if ( diff ) {
- out.writeInt(diffvalue.length);
- out.write(diffvalue);
- } else {
- out.writeObject(value);
- }//endif
- byte[] d = node!=null?((MemberImpl)node).getData(false):new byte[0];
- out.writeInt(d.length);
- out.write(d);
- break;
- }
- case MSG_RETRIEVE_BACKUP:{
- out.writeObject(key);
- out.writeObject(value);
- break;
- }
- case MSG_REMOVE : {
- out.writeObject(key);
- break;
- }
- case MSG_PROXY: {
- out.writeObject(key);
- byte[] d = node!=null?((MemberImpl)node).getData(false):new byte[0];
- out.writeInt(d.length);
- out.write(d);
- break;
- }
- case MSG_START:
- MSG_STOP : {
- byte[] d = node!=null?((MemberImpl)node).getData(false):new byte[0];
- out.writeInt(d.length);
- out.write(d);
- break;
- }
- }//switch
- }//writeExternal
-
- public Object clone() {
- return new MapMessage(this.mapId,this.msgtype,this.diff,this.key,this.value,this.diffvalue,this.node);
- }
- }//MapMessage
-
-//------------------------------------------------------------------------------
-// streamable class
-//------------------------------------------------------------------------------
-
- public static class ObjectStreamable implements Streamable {
- private DirectByteArrayOutputStream buf;
- private int pos=0;
- public ObjectStreamable(Serializable value) throws IOException {
- buf = new DirectByteArrayOutputStream(1024);
- ObjectOutputStream out = new ObjectOutputStream(buf);
- out.writeObject(value);
- out.flush();
- }
-
- /**
- * returns true if the stream has reached its end
- * @return boolean
- */
- public synchronized boolean eof() {
- return (pos>=buf.size());
-
- }
-
- /**
- * write data into the byte array starting at offset, maximum bytes read are (data.length-offset)
- * @param data byte[] - the array to read data into
- * @param offset int - start position for writing data
- * @return int - the number of bytes written into the data buffer
- */
- public synchronized int write(byte[] data, int offset) throws IOException {
- int length = Math.min(data.length-offset,buf.size()-pos);
- System.arraycopy(buf.getArrayDirect(),pos,data,offset,length);
- pos = pos + length;
- return length;
- }
-
- public synchronized int read(byte[] data, int offset, int length) throws IOException {
- return -1;
- }
-
- public DirectByteArrayOutputStream getBuf() {
- return buf;
- }
-
- public int size() {
- return buf.size();
- }
-
- public int pos() {
- return pos;
- }
-
- }
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java?rev=386465&r1=386464&r2=386465&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/ChannelCreator.java Thu Mar 16 13:54:44 2006
@@ -52,7 +52,6 @@
.append("\n\t\t[-port tcplistenport]")
.append("\n\t\t[-ack true|false]")
.append("\n\t\t[-ackto acktimeout]")
- .append("\n\t\t[-autoconnect true|false]")
.append("\n\t\t[-sync true|false]")
.append("\n\t\t[-receiver org.apache.catalina.tribes.tcp.nio.NioReceiver|org.apache.catalina.tribes.tcp.bio.BioReceiver|]")
.append("\n\t\t[-transport org.apache.catalina.tribes.tcp.nio.PooledParallelSender|org.apache.catalina.tribes.tcp.bio.PooledMultiSender]")
@@ -81,7 +80,6 @@
int tcpseltimeout = 100;
int tcpthreadcount = 4;
int acktimeout = 15000;
- boolean autoconnect = true;
String mcastaddr = "228.0.0.5";
int mcastport = 45565;
long mcastfreq = 500;
@@ -121,8 +119,6 @@
acktimeout = Integer.parseInt(args[++i]);
} else if ("-sync".equals(args[i])) {
sync = Boolean.parseBoolean(args[++i]);
- } else if ("-autoconnect".equals(args[i])) {
- autoconnect = Boolean.parseBoolean(args[++i]);
} else if ("-transport".equals(args[i])) {
transport = args[++i];
} else if (args[i]!=null && args[i].startsWith("transport.")) {
@@ -162,7 +158,6 @@
System.out.println("Creating transport class="+transport);
MultiPointSender sender = (MultiPointSender)Class.forName(transport,true,ChannelCreator.class.getClassLoader()).newInstance();
sender.setTimeout(acktimeout);
- sender.setAutoConnect(autoconnect);
sender.setWaitForAck(ack);
sender.setMaxRetryAttempts(2);
sender.setRxBufSize(43800);
Modified: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java?rev=386465&r1=386464&r2=386465&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java Thu Mar 16 13:54:44 2006
@@ -143,7 +143,18 @@
public int getColumnCount() { return columnNames.length; }
public int getRowCount() {return map.sizeFull() +1; }
-
+
+ public StringBuffer getMemberNames(Member[] members){
+ StringBuffer buf = new StringBuffer();
+ if ( members!=null ) {
+ for (int i=0;i<members.length; i++ ) {
+ buf.append(members[i].getName());
+ buf.append("; ");
+ }
+ }
+ return buf;
+ }
+
public Object getValueAt(int row, int col) {
if ( row==-1 ) {
update();
@@ -157,7 +168,7 @@
switch (col) {
case 0: return entry.getKey();
case 1: return entry.getValue();
- case 2: return entry.getBackupNode()!=null?entry.getBackupNode().getName():"";
+ case 2: return getMemberNames(entry.getBackupNodes());
case 3: return new Boolean(entry.isPrimary());
case 4: return new Boolean(entry.isProxy());
case 5: return new Boolean(entry.isBackup());
Modified: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/test/TestNioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/test/TestNioSender.java?rev=386465&r1=386464&r2=386465&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/test/TestNioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/test/TestNioSender.java Thu Mar 16 13:54:44 2006
@@ -47,7 +47,7 @@
mbr = new MemberImpl("","localhost",4444,0);
NioSender sender = new NioSender(mbr);
sender.setWaitForAck(false);
- sender.setDirect(true);
+ sender.setDirectBuffer(true);
sender.setSelector(selector);
sender.setMessage(XByteBuffer.createDataPackage(getMessage(mbr)));
sender.connect();
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org