You are viewing a plain text version of this content. The canonical link for it is here.
Posted to wadi-commits@incubator.apache.org by bd...@apache.org on 2005/12/14 23:36:16 UTC

svn commit: r356933 [6/35] - in /incubator/wadi/trunk: ./ etc/ modules/ modules/assembly/ modules/assembly/src/ modules/assembly/src/bin/ modules/assembly/src/conf/ modules/assembly/src/main/ modules/assembly/src/main/assembly/ modules/core/ modules/co...

Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/LocalPartition.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/LocalPartition.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/LocalPartition.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/LocalPartition.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,272 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.dindex.impl;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.ObjectMessage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.wadi.dindex.PartitionConfig;
+import org.codehaus.wadi.dindex.DIndexRequest;
+import org.codehaus.wadi.dindex.DIndexResponse;
+import org.codehaus.wadi.dindex.messages.DIndexForwardRequest;
+import org.codehaus.wadi.dindex.messages.DIndexRelocationRequest;
+import org.codehaus.wadi.dindex.messages.DIndexRelocationResponse;
+import org.codehaus.wadi.dindex.messages.RelocationRequest;
+import org.codehaus.wadi.dindex.messages.RelocationResponse;
+import org.codehaus.wadi.dindex.newmessages.DeleteIMToPM;
+import org.codehaus.wadi.dindex.newmessages.DeletePMToIM;
+import org.codehaus.wadi.dindex.newmessages.InsertIMToPM;
+import org.codehaus.wadi.dindex.newmessages.InsertPMToIM;
+import org.codehaus.wadi.dindex.newmessages.MoveIMToPM;
+import org.codehaus.wadi.dindex.newmessages.MovePMToSM;
+import org.codehaus.wadi.dindex.newmessages.MoveSMToPM;
+import org.codehaus.wadi.dindex.newmessages.MovePMToIM;
+import org.codehaus.wadi.gridstate.Dispatcher;
+import org.codehaus.wadi.impl.Utils;
+
+import EDU.oswego.cs.dl.util.concurrent.Sync;
+
+public class LocalPartition extends AbstractPartition implements Serializable {
+
+	protected transient Log _log=LogFactory.getLog(getClass());
+	protected transient Log _lockLog=LogFactory.getLog("org.codehaus.wadi.LOCKS");
+
+	protected Map _map=new HashMap();
+	protected transient PartitionConfig _config;
+
+	public LocalPartition(int key) {
+		super(key);
+	}
+
+	protected LocalPartition() {
+		super();
+		// for deserialisation...
+	}
+
+	public void init(PartitionConfig config) {
+		_config=config;
+		_log=LogFactory.getLog(getClass().getName()+"#"+_key+"@"+_config.getLocalNodeName());
+		_lockLog=LogFactory.getLog("org.codehaus.wadi.LOCKS");
+	}
+
+	public boolean isLocal() {
+		return true;
+	}
+
+	public String toString() {
+		return "<LocalPartition:"+_key+"@"+(_config==null?"<unknown>":_config.getLocalNodeName())+">";
+	}
+
+	public void put(String name, Destination destination) {
+		synchronized (_map) {
+			// TODO - check key was not already in use...
+			_map.put(name, destination);
+		}
+	}
+
+	// we probably don't need the partition lock for this - but lets play safe to start with
+	public void onMessage(ObjectMessage message, InsertIMToPM request) {
+		Destination newDestination=null;
+		try{newDestination=message.getJMSReplyTo();} catch (JMSException e) {_log.error("unexpected problem", e);}
+		boolean success=false;
+		String key=request.getKey();
+		Sync sync=null;
+		try {
+		  if (_lockLog.isTraceEnabled()) _lockLog.trace("Partition - acquiring: "+key+ " ["+Thread.currentThread().getName()+"]"+" : "+sync);
+			sync=_config.getPMSyncs().acquire(key); // TODO - PMSyncs are actually WLocks on a given sessions location (partition entry) - itegrate
+			if (_lockLog.isTraceEnabled()) _lockLog.trace("Partition - acquired: "+key+ " ["+Thread.currentThread().getName()+"]"+" : "+sync);
+
+			synchronized (_map) {
+				if (!_map.containsKey(key)) {
+					_map.put(key, newDestination); // remember location of actual session...
+					success=true;
+				}
+			}
+			if (success) {
+				if (_log.isDebugEnabled()) _log.debug("insert: "+key+" {"+_config.getNodeName(newDestination)+"}");
+			} else {
+				if (_log.isWarnEnabled()) _log.warn("insert: {"+key+" {"+_config.getNodeName(newDestination)+"} failed - key already in use");
+			}
+
+			DIndexResponse response=new InsertPMToIM(success);
+			_config.getDispatcher().reply(message, response);
+		} finally {
+			Utils.release("Partition", key, sync);
+		}
+	}
+
+	// we probably do not need to take the Partition lock whilst we do this, but, for the moment, lets do everything logically, then optimise later...
+	public void onMessage(ObjectMessage message, DeleteIMToPM request) {
+		Destination oldDestination;
+		String key=request.getKey();
+		Sync sync=null;
+		try {
+		  if (_lockLog.isTraceEnabled()) _lockLog.trace("Partition - acquiring: "+key+ " ["+Thread.currentThread().getName()+"]"+" : "+sync);
+			sync=_config.getPMSyncs().acquire(key); // TODO - PMSyncs are actually WLocks on a given sessions location (partition entry) - itegrate
+			if (_lockLog.isTraceEnabled()) _lockLog.trace("Partition - acquired: "+key+ " ["+Thread.currentThread().getName()+"]"+" : "+sync);
+
+			synchronized (_map) {
+				oldDestination=(Destination)_map.remove(key);
+			}
+			if (oldDestination==null) throw new IllegalStateException("session "+key+" is not known in this partition");
+			if (_log.isDebugEnabled()) _log.debug("delete: "+key+" {"+_config.getNodeName(oldDestination)+"}");
+			DIndexResponse response=new DeletePMToIM();
+			_config.getDispatcher().reply(message, response);
+		} finally {
+			Utils.release("Partition", key, sync);
+		}
+	}
+
+	public void onMessage(ObjectMessage message, DIndexRelocationRequest request) {
+		Destination newDestination=null;
+		try{newDestination=message.getJMSReplyTo();} catch (JMSException e) {_log.error("unexpected problem", e);}
+		Destination oldDestination=null;
+		synchronized (_map) {
+			oldDestination=(Destination)_map.put(request.getKey(), newDestination);
+		}
+		if (_log.isDebugEnabled()) _log.debug("relocation {"+request.getKey()+" : "+_config.getNodeName(oldDestination)+" -> "+_config.getNodeName(newDestination)+"}");
+		DIndexResponse response=new DIndexRelocationResponse();
+		_config.getDispatcher().reply(message, response);
+	}
+
+	public void onMessage(ObjectMessage message, DIndexForwardRequest request) {
+		// we have got to someone who actually knows where we want to go.
+		// strip off wrapper and deliver actual request to its final destination...
+		String name=request.getKey();
+		Destination destination=null;
+		synchronized (_map) {
+			destination=(Destination)_map.get(name);
+		}
+		if (destination==null) { // session could not be located...
+			DIndexRequest r=request.getRequest();
+			if (r instanceof RelocationRequest) {
+				assert message!=null;
+				assert name!=null;
+				assert _config!=null;
+				_config.getDispatcher().reply(message, new RelocationResponse(name));
+			} else {
+				if (_log.isWarnEnabled()) _log.warn("unexpected nested request structure - ignoring: " + r);
+			}
+		} else { // session succesfully located...
+			assert destination!=null;
+			assert request!=null;
+			assert _config!=null;
+			if (_log.isTraceEnabled()) _log.trace("directing: " + request + " -> " + _config.getNodeName(destination));
+			if (!_config.getDispatcher().forward(message, destination, request.getRequest()))
+				_log.warn("could not forward message");
+		}
+	}
+
+	// called on Partition Master
+	public void onMessage(ObjectMessage message1, MoveIMToPM request) {
+
+		// TODO - whilst we are in here, we should have a SHARED lock on this Partition, so it cannot be moved
+		// We should take an exclusive PM lock on the session ID, so no-one else can r/w its location whilst we are doing so.
+		// The Partitions lock should be held in the Facade, so that it can swap Partitions in and out whilst holding an exclusive lock
+		// Partition may only be migrated when exclusive lock has been taken, this may only happen when all shared locks are released - this implies that no PM session locks will be in place...
+
+		String key=request.getKey();
+		Dispatcher _dispatcher=_config.getDispatcher();
+		// what if we are NOT the PM anymore ?
+		// get write lock on location
+		//String nodeName=_config.getLocalNodeName();
+		Sync sync=null;
+		//String agent=null;
+		try {
+			Destination im=message1.getJMSReplyTo();
+			//agent=_config.getNodeName(im);
+
+			// PMSyncs should prevent _map entry from being messed with whilst we are messing with it - lock should be exclusive
+			// should synchronise map access - or is it ConcurrentHashMap ?
+			if (_lockLog.isTraceEnabled()) _lockLog.trace("Partition - acquiring: "+key+ " ["+Thread.currentThread().getName()+"]"+" : "+sync);
+			sync=_config.getPMSyncs().acquire(key); // TODO - PMSyncs are actually WLocks on a given sessions location (partition entry) - itegrate
+			if (_lockLog.isTraceEnabled()) _lockLog.trace("Partition - acquired: "+key+ " ["+Thread.currentThread().getName()+"]"+" : "+sync);
+
+			// exchange messages with StateMaster
+			Destination destination=(Destination)_map.get(key);
+
+			if (destination==null) {
+				// session does not exist - tell IM
+				_dispatcher.reply(message1,new MovePMToIM());
+				return;
+			}
+
+			if (destination.equals(im)) {
+				// whilst we were waiting for the partition lock, another thread migrated the session into our InvocationMaster...
+				// How can this happen - the first Thread should have been holding the InvocationLock...
+				_log.warn("IM REQUESTING RELOCATION IS ALREADY SM");
+			}
+
+
+			// session does exist - ask the SM to move it to the IM
+
+			// exchangeSendLoop GetPMToSM to SM
+			Destination pm=_dispatcher.getLocalDestination();
+			Destination sm=destination;
+			String poCorrelationId=null;
+			try {
+				poCorrelationId=_dispatcher.getOutgoingCorrelationId(message1);
+				//_log.info("Process Owner Correlation ID: "+poCorrelationId);
+			} catch (Exception e) {
+				_log.error("unexpected problem", e);
+			}
+
+			MovePMToSM request2=new MovePMToSM(key, im, pm, poCorrelationId);
+			ObjectMessage message2=_dispatcher.exchangeSend(pm, sm, request2, _config.getInactiveTime());
+			if (message2==null)
+				_log.error("NO RESPONSE WITHIN TIMEFRAME - PANIC!");
+
+			MoveSMToPM response=null; // the reply from the SM confirming successful move...
+			try {
+				response=(MoveSMToPM)message2.getObject();
+			} catch (JMSException e) {
+				_log.error("unexpected problem", e); // should be sorted in loop
+			}
+
+			if (response.getSuccess()) {
+				// alter location
+				Destination oldOwner=(Destination)_map.put(key, im); // The IM is now the SM
+				_log.debug("move: "+key+" {"+_config.getNodeName(oldOwner)+"->"+_config.getNodeName(im)+"}");
+			} else {
+				_log.warn("state relocation failed: "+key);
+			}
+		} catch (JMSException e) {
+			_log.error("could not read src address from incoming message");
+		}
+		finally {
+			Utils.release("Partition", key, sync);
+		}
+	}
+
+
+
+	public ObjectMessage exchange(DIndexRequest request, long timeout) throws Exception {
+		if (_log.isTraceEnabled()) _log.trace("local dispatch - needs optimisation");
+		Dispatcher dispatcher=_config.getDispatcher();
+		Destination from=dispatcher.getLocalDestination();
+		Destination to=from;
+		return dispatcher.exchangeSend(from, to, request, timeout);
+	}
+
+}

Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionFacade.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionFacade.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionFacade.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionFacade.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,215 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.dindex.impl;
+
+import javax.jms.Destination;
+import javax.jms.ObjectMessage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.wadi.dindex.DIndexRequest;
+import org.codehaus.wadi.dindex.Partition;
+import org.codehaus.wadi.dindex.PartitionConfig;
+import org.codehaus.wadi.dindex.messages.DIndexForwardRequest;
+import org.codehaus.wadi.dindex.messages.DIndexRelocationRequest;
+import org.codehaus.wadi.dindex.newmessages.DeleteIMToPM;
+import org.codehaus.wadi.dindex.newmessages.InsertIMToPM;
+import org.codehaus.wadi.dindex.newmessages.MoveIMToPM;
+import org.codehaus.wadi.gridstate.Dispatcher;
+
+import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
+import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
+import EDU.oswego.cs.dl.util.concurrent.Sync;
+import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;
+
+public class PartitionFacade extends AbstractPartition {
+
+
+    protected final ReadWriteLock _lock=new WriterPreferenceReadWriteLock();
+    protected final LinkedQueue _queue=new LinkedQueue();
+    protected final PartitionConfig _config;
+    protected final Log _log;
+
+    protected long _timeStamp;
+    protected Partition _content;
+
+    public PartitionFacade(int key, long timeStamp, Partition content, boolean queueing, PartitionConfig config) {
+        super(key);
+        _config=config;
+        _timeStamp=timeStamp;
+        _content=content;
+        _log=LogFactory.getLog(getClass().getName()+"#"+_key+"@"+_config.getLocalNodeName());
+        if (_log.isTraceEnabled()) _log.trace("initialising location to: "+_content);
+    }
+
+    public boolean isLocal() { // locking ?
+        Sync sync=_lock.writeLock(); // EXCLUSIVE
+        boolean acquired=false;
+        try {
+            sync.acquire();
+            acquired=true;
+            return _content.isLocal();
+        } catch (InterruptedException e) {
+        	_log.warn("unexpected problem", e);
+        } finally {
+            if (acquired)
+                sync.release();
+        }
+        throw new UnsupportedOperationException();
+    }
+
+    public Partition getContent() {
+        Sync sync=_lock.writeLock(); // EXCLUSIVE
+        boolean acquired=false;
+        try {
+            sync.acquire();
+            acquired=true;
+            return _content;
+        } catch (InterruptedException e) {
+                _log.warn("unexpected problem", e);
+        } finally {
+            if (acquired)
+                sync.release();
+        }
+        throw new UnsupportedOperationException();
+    }
+
+    public void setContent(long timeStamp, Partition content) {
+    	Sync sync=_lock.writeLock(); // EXCLUSIVE
+    	boolean acquired=false;
+    	try {
+    		sync.acquire();
+    		acquired=true;
+    		if (timeStamp>_timeStamp) {
+    			if (_log.isTraceEnabled()) _log.trace("["+_key+"] changing location from: "+_content+" to: "+content);
+    			_timeStamp=timeStamp;
+    			_content=content;
+    		}
+    		
+    	} catch (InterruptedException e) {
+    		_log.warn("unexpected problem", e);
+    	} finally {
+    		if (acquired)
+    			sync.release();
+    	}
+    }
+
+    public void setContentRemote(long timeStamp, Dispatcher dispatcher, Destination location) {
+    	Sync sync=_lock.writeLock(); // EXCLUSIVE
+    	boolean acquired=false;
+    	try {
+    		sync.acquire();
+    		acquired=true;
+    		if (timeStamp>_timeStamp) {
+    			_timeStamp=timeStamp;
+    			if (_content instanceof RemotePartition) {
+    				((RemotePartition)_content).setLocation(location);
+    			} else {
+    				if (_log.isTraceEnabled()) _log.trace("["+_key+"] changing location from: "+_content+" to: "+location);
+    				_content=new RemotePartition(_key, _config, location);
+    			}
+    		}
+    	} catch (InterruptedException e) {
+    		_log.warn("unexpected problem", e);
+    	} finally {
+    		if (acquired)
+    			sync.release();
+    	}
+    }
+
+    public ObjectMessage exchange(DIndexRequest request, long timeout) throws Exception {
+    	return _content.exchange(request, timeout);
+    }
+    
+    public void onMessage(ObjectMessage message, InsertIMToPM request) {
+    	if (_log.isTraceEnabled()) _log.trace("dispatching: "+request+" on "+_content);
+    	Sync sync=_lock.readLock(); // SHARED
+    	boolean acquired=false;
+    	try {
+    		sync.acquire();
+    		acquired=true;
+    		_content.onMessage(message, request);
+    	} catch (InterruptedException e) {
+    		_log.warn("unexpected problem", e);
+    	} finally {
+    		if (acquired)
+    			sync.release();
+    	}		
+    }
+    
+    public void onMessage(ObjectMessage message, DeleteIMToPM request) {
+    	Sync sync=_lock.readLock(); // SHARED
+    	boolean acquired=false;
+    	try {
+    		sync.acquire();
+    		acquired=true;
+    		_content.onMessage(message, request);
+    	} catch (InterruptedException e) {
+    		_log.warn("unexpected problem", e);
+    	} finally {
+    		if (acquired)
+    			sync.release();
+    	}		
+    }
+    
+    public void onMessage(ObjectMessage message, DIndexRelocationRequest request) {
+    	Sync sync=_lock.readLock(); // SHARED
+    	boolean acquired=false;
+    	try {
+    		sync.acquire();
+    		acquired=true;
+    		_content.onMessage(message, request);
+    	} catch (InterruptedException e) {
+    		_log.warn("unexpected problem", e);
+    	} finally {
+    		if (acquired)
+    			sync.release();
+    	}		
+    }
+    
+    // should superceded above method
+    public void onMessage(ObjectMessage message, MoveIMToPM request) {
+    	Sync sync=_lock.readLock(); // SHARED
+    	boolean acquired=false;
+    	try {
+    		sync.acquire();
+    		acquired=true;
+    		_content.onMessage(message, request);
+    	} catch (InterruptedException e) {
+    		_log.warn("unexpected problem", e);
+    	} finally {
+    		if (acquired)
+    			sync.release();
+    	}		
+    }
+    
+    public void onMessage(ObjectMessage message, DIndexForwardRequest request) {
+    	Sync sync=_lock.readLock(); // SHARED
+    	boolean acquired=false;
+    	try {
+    		sync.acquire();
+    		acquired=true;
+    		_content.onMessage(message, request);
+    	} catch (InterruptedException e) {
+    		_log.warn("unexpected problem", e);
+    	} finally {
+    		if (acquired)
+    			sync.release();
+    	}		
+    }
+
+}

Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionKeys.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionKeys.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionKeys.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionKeys.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,83 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.dindex.impl;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+import org.codehaus.wadi.dindex.Partition;
+
+public class PartitionKeys implements Serializable {
+
+    protected int[] _keys;
+
+    public PartitionKeys(PartitionFacade[] partitions) {
+        ArrayList list=new ArrayList(partitions.length);
+        for (int i=0; i<partitions.length; i++) {
+            Partition partition=partitions[i];
+            if (partition.isLocal())
+                list.add(new Integer(partition.getKey()));
+        }
+        _keys=new int[list.size()];
+        for (int i=0; i<_keys.length; i++)
+            _keys[i]=((Integer)list.get(i)).intValue();
+    }
+
+    protected PartitionKeys() {
+        // for deserialisation...
+    }
+
+    public boolean equals(Object obj) {
+        if (obj==this)
+            return true;
+
+        if (! (obj instanceof PartitionKeys))
+            return false;
+
+        PartitionKeys that=(PartitionKeys)obj;
+
+        if (this._keys.length!=that._keys.length)
+            return false;
+
+        for (int i=0; i<_keys.length; i++)
+            if (this._keys[i]!=that._keys[i])
+                return false;
+
+        return true;
+    }
+
+    public String toString() {
+        StringBuffer buffer=new StringBuffer();
+        buffer.append("{");
+        for (int i=0;i<_keys.length; i++) {
+            if (i!=0)
+                buffer.append(",");
+            buffer.append(_keys[i]);
+        }
+        buffer.append("}");
+        return buffer.toString();
+    }
+
+    public int size() {
+        return _keys.length;
+    }
+
+    public int[] getKeys() {
+        return _keys;
+    }
+
+}

Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionOwner.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionOwner.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionOwner.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionOwner.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,33 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.dindex.impl;
+
+import org.activecluster.Node;
+
+public class PartitionOwner {
+    
+    protected Node _node;
+    protected int _deviation;
+    protected boolean _leaving;
+    
+    public PartitionOwner(Node node, int deviation, boolean leaving) {
+        _node=node;
+        _deviation=deviation;
+        _leaving=leaving;
+    }
+
+}
\ No newline at end of file

Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionOwnerGreaterThanComparator.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionOwnerGreaterThanComparator.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionOwnerGreaterThanComparator.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionOwnerGreaterThanComparator.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,38 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.dindex.impl;
+
+import java.util.Comparator;
+
+
+public class PartitionOwnerGreaterThanComparator implements Comparator {
+
+    public int compare(Object o2, Object o1) {
+        PartitionOwner p1=(PartitionOwner)o1;
+        PartitionOwner p2=(PartitionOwner)o2;
+        int tmp=p1._deviation-p2._deviation;
+        if (tmp!=0)
+            return tmp;
+        else
+            return p1._node.getName().compareTo(p2._node.getName());
+    }
+    
+    public boolean equals(Object obj) {
+        return obj==this || obj.getClass()==getClass();
+    }
+
+}
\ No newline at end of file

Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionOwnerLessThanComparator.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionOwnerLessThanComparator.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionOwnerLessThanComparator.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionOwnerLessThanComparator.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,38 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.dindex.impl;
+
+import java.util.Comparator;
+
+
+public class PartitionOwnerLessThanComparator implements Comparator {
+
+    public int compare(Object o1, Object o2) {
+        PartitionOwner p1=(PartitionOwner)o1;
+        PartitionOwner p2=(PartitionOwner)o2;
+        int tmp=p1._deviation-p2._deviation;
+        if (tmp!=0)
+            return tmp;
+        else
+            return p1._node.getName().compareTo(p2._node.getName());
+    }
+    
+    public boolean equals(Object obj) {
+        return obj==this || obj.getClass()==getClass();
+    }
+
+}
\ No newline at end of file

Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionTransfer.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionTransfer.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionTransfer.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionTransfer.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,51 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.dindex.impl;
+
+import java.io.Serializable;
+
+import javax.jms.Destination;
+
+public class PartitionTransfer implements Serializable {
+
+    public Destination _destination;
+    public String _name; // TODO - only here for debugging...
+    public int _amount;
+    
+    public PartitionTransfer(Destination destination, String name, int amount) {
+        _destination=destination;
+        _name=name;
+        _amount=amount;
+    }
+    
+    protected PartitionTransfer() {
+        // for deserialisation...
+    }
+
+    public Destination getDestination() {
+        return _destination;
+    }
+    
+    public int getAmount() {
+        return _amount;
+    }
+    
+    public String toString() {
+        return "<transfer: "+_amount+"->"+_name+">";
+    }
+    
+}
\ No newline at end of file

Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/RedistributionPlan.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/RedistributionPlan.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/RedistributionPlan.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/RedistributionPlan.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,100 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.dindex.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.ListIterator;
+
+import org.activecluster.Node;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class RedistributionPlan {
+
+    protected final Log _log=LogFactory.getLog(getClass());
+    protected final List _producers = new ArrayList();
+    protected final List _consumers = new ArrayList();
+
+    public RedistributionPlan(Node[] living, Node[] leaving, int totalNumPartitions) {
+        int numPartitionsPerNode=totalNumPartitions/living.length;
+
+        for (int i=0; i<leaving.length; i++) {
+            Node node=leaving[i];
+            int numPartitions=DIndex.getPartitionKeys(node).size();
+//            _log.info("LEAVING: "+numPartitions);
+            if (numPartitions>0)
+                _producers.add(new PartitionOwner(node, numPartitions, true));
+        }
+
+        for (int i=0; i<living.length; i++) {
+            Node node=living[i];
+            int numPartitions=DIndex.getPartitionKeys(node).size();
+//            _log.info("LIVING: "+numPartitions);
+            decide(node, numPartitions, numPartitionsPerNode, _producers, _consumers);
+        }
+
+        // sort lists...
+        Collections.sort(_producers, new PartitionOwnerGreaterThanComparator());
+        Collections.sort(_consumers, new PartitionOwnerLessThanComparator());
+
+        // account for uneven division of partitions...
+        int remainingPartitions=totalNumPartitions%living.length;
+
+        for (ListIterator i=_producers.listIterator(); remainingPartitions>0 && i.hasNext(); ) {
+            PartitionOwner p=(PartitionOwner)i.next();
+            if (!p._leaving) {
+                remainingPartitions--;
+                if ((--p._deviation)==0)
+                    i.remove();
+            }
+        }
+
+        for (ListIterator i=_consumers.listIterator(); remainingPartitions>0 && i.hasNext(); ) {
+            PartitionOwner p=(PartitionOwner)i.next();
+            remainingPartitions--;
+            ++p._deviation;
+        }
+
+        assert remainingPartitions==0;
+    }
+
+    protected void decide(Node node, int numPartitions, int numPartitionsPerNode, Collection producers, Collection consumers) {
+        int deviation=numPartitions-numPartitionsPerNode;
+//        _log.info("DEVIATION: "+deviation);
+        if (deviation>0) {
+            producers.add(new PartitionOwner(node, deviation, false));
+            return;
+        }
+        if (deviation<0) {
+            consumers.add(new PartitionOwner(node, -deviation, false));
+            return;
+        }
+    }
+
+    public Collection getProducers() {
+        return _producers;
+    }
+
+    public Collection getConsumers() {
+        return _consumers;
+    }
+
+
+}

Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/RemotePartition.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/RemotePartition.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/RemotePartition.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/RemotePartition.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,117 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.dindex.impl;
+
+import javax.jms.Destination;
+import javax.jms.ObjectMessage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.wadi.dindex.DIndexRequest;
+import org.codehaus.wadi.dindex.PartitionConfig;
+import org.codehaus.wadi.dindex.messages.DIndexForwardRequest;
+import org.codehaus.wadi.dindex.messages.DIndexRelocationRequest;
+import org.codehaus.wadi.dindex.newmessages.DeleteIMToPM;
+import org.codehaus.wadi.dindex.newmessages.InsertIMToPM;
+import org.codehaus.wadi.dindex.newmessages.MoveIMToPM;
+import org.codehaus.wadi.gridstate.Dispatcher;
+
+public class RemotePartition extends AbstractPartition {
+	
+	protected transient Log _log;
+	
+	protected final PartitionConfig _config;
+	
+	protected Destination _location;
+	
+	public RemotePartition(int key, PartitionConfig config, Destination location) {
+		super(key);
+		_config=config;
+		_location=location;
+		_log=LogFactory.getLog(getClass().getName()+"#"+_key+"@"+_config.getLocalNodeName());
+	}
+	
+	public boolean isLocal() {
+		return false;
+	}
+	
+	public Destination getDestination() {
+		return _location;
+	}
+	
+	public void setLocation(Destination location) {
+		if (_location==null) {
+			if (location==null) {
+				// _location is already null
+			} else {
+				// they cannot be equal - update
+				if (_log.isTraceEnabled()) _log.trace("[" + _key + "] updating location from: " + _config.getNodeName(_location) + " to: " + _config.getNodeName(location));
+				_location=location;
+			}
+		} else {
+			if (_location.equals(location)) {
+				// no need to update
+			} else {
+				if (_log.isTraceEnabled()) _log.trace("[" + _key + "] updating location from: " + _config.getNodeName(_location) + " to: " + _config.getNodeName(location));
+				_location=location;
+			}
+		}
+	}
+	
+	public String toString() {
+		return "<"+getClass()+":"+_key+"@"+_config.getLocalNodeName()+"->"+_config.getNodeName(_location)+">";
+	}
+
+	public void onMessage(ObjectMessage message, InsertIMToPM request) {
+		if (_log.isTraceEnabled()) _log.trace("#"+_key+" : forwarding: " + request + " from "+_config.getLocalNodeName()+" to " + _config.getNodeName(_location));
+		if (!_config.getDispatcher().forward(message, _location))
+			_log.warn("could not forward message");
+	}
+
+	public void onMessage(ObjectMessage message, DeleteIMToPM request) {
+		if (_log.isTraceEnabled()) _log.trace("indirecting: " + request + " via " + _config.getNodeName(_location));
+		if (!_config.getDispatcher().forward(message, _location))
+			_log.warn("could not forward message");
+	}
+
+	public void onMessage(ObjectMessage message, DIndexRelocationRequest request) {
+		if (_log.isTraceEnabled()) _log.trace("indirecting: " + request + " via " + _config.getNodeName(_location));
+		if (!_config.getDispatcher().forward(message, _location))
+			_log.warn("could not forward message");
+	}
+
+	public void onMessage(ObjectMessage message, DIndexForwardRequest request) {
+		if (_log.isTraceEnabled()) _log.trace("indirecting: " + request + " via " + _config.getNodeName(_location));
+		if (!_config.getDispatcher().forward(message, _location))
+			_log.warn("could not forward message");
+	}
+	
+	public void onMessage(ObjectMessage message, MoveIMToPM request) {
+		if (_log.isWarnEnabled()) _log.warn(_config.getLocalNodeName()+": not Master of Partition["+_key+"] - forwarding message to "+_config.getNodeName(_location));
+		if (!_config.getDispatcher().forward(message, _location))
+			_log.warn("could not forward message");
+	}
+
+	public ObjectMessage exchange(DIndexRequest request, long timeout) throws Exception {
+		Dispatcher dispatcher=_config.getDispatcher();
+		Destination from=dispatcher.getLocalDestination();
+		Destination to=_location;
+		if (_log.isTraceEnabled()) _log.trace("exchanging message ("+request+") with node: "+_config.getNodeName(to)+" on "+Thread.currentThread().getName());
+		return dispatcher.exchangeSend(from, to, request, timeout);
+	}
+	
+}

Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/SeniorityElectionStrategy.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/SeniorityElectionStrategy.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/SeniorityElectionStrategy.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/SeniorityElectionStrategy.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,46 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.dindex.impl;
+
+import java.util.Iterator;
+
+import org.activecluster.Cluster;
+import org.activecluster.Node;
+import org.activecluster.election.ElectionStrategy;
+
+public class SeniorityElectionStrategy implements ElectionStrategy {
+    
+    public Node doElection(Cluster cluster) {
+        Node oldest=cluster.getLocalNode();
+        long earliest=getBirthTime(oldest);
+        for (Iterator i=cluster.getNodes().values().iterator(); i.hasNext();) {
+            Node candidate=(Node)i.next();
+            long birthTime=getBirthTime(candidate);
+            if (birthTime<earliest) {
+                earliest=birthTime;
+                oldest=candidate;
+            }
+        }
+        
+        return oldest;
+    }
+    
+    protected long getBirthTime(Node node) {
+        return ((Long)node.getState().get("birthTime")).longValue(); // TODO - unify state keys somewhere
+    }
+    
+}
\ No newline at end of file

Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/SimplePartitionManager.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/SimplePartitionManager.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/SimplePartitionManager.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/SimplePartitionManager.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,504 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.dindex.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.ObjectMessage;
+
+import org.activecluster.Cluster;
+import org.activecluster.ClusterEvent;
+import org.activecluster.Node;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.wadi.dindex.Partition;
+import org.codehaus.wadi.dindex.PartitionConfig;
+import org.codehaus.wadi.dindex.PartitionManager;
+import org.codehaus.wadi.dindex.PartitionManagerConfig;
+import org.codehaus.wadi.dindex.messages.PartitionEvacuationRequest;
+import org.codehaus.wadi.dindex.messages.PartitionEvacuationResponse;
+import org.codehaus.wadi.dindex.messages.PartitionRepopulateRequest;
+import org.codehaus.wadi.dindex.messages.PartitionRepopulateResponse;
+import org.codehaus.wadi.dindex.messages.PartitionTransferAcknowledgement;
+import org.codehaus.wadi.dindex.messages.PartitionTransferCommand;
+import org.codehaus.wadi.dindex.messages.PartitionTransferRequest;
+import org.codehaus.wadi.dindex.messages.PartitionTransferResponse;
+import org.codehaus.wadi.gridstate.Dispatcher;
+import org.codehaus.wadi.gridstate.LockManager;
+import org.codehaus.wadi.gridstate.PartitionMapper;
+import org.codehaus.wadi.gridstate.activecluster.ActiveClusterDispatcher;
+import org.codehaus.wadi.gridstate.impl.StupidLockManager;
+import org.codehaus.wadi.impl.Quipu;
+
+/**
+ * A Simple PartitionManager.
+ *
+ * @author jules
+ *
+ */
+public class SimplePartitionManager implements PartitionManager, PartitionConfig {
+
+	public interface Callback {void onNodeRemoved(ClusterEvent event);}
+
+	protected final static String _partitionKeysKey="partitionKeys";
+	protected final static String _timeStampKey="timeStamp";
+	protected final static String _correlationIDMapKey="correlationIDMap";
+
+	protected final String _nodeName;
+	protected final Log _log;
+	protected final int _numPartitions;
+	protected final PartitionFacade[] _partitions;
+	protected final Cluster _cluster;
+	protected final Dispatcher _dispatcher;
+	protected final Map _distributedState;
+	protected final long _inactiveTime;
+	protected final boolean _allowRegenerationOfMissingPartitions = true;
+	protected final Callback _callback;
+	protected final PartitionMapper _mapper;
+	protected final LockManager _pmSyncs;
+
+	public SimplePartitionManager(Dispatcher dispatcher, int numPartitions, Map distributedState, Callback callback, PartitionMapper mapper) {
+		_dispatcher=dispatcher;
+		_nodeName=_dispatcher.getNodeName();
+		_pmSyncs=new StupidLockManager(_nodeName);
+		_log=LogFactory.getLog(getClass().getName()+"#"+_nodeName);
+		_numPartitions=numPartitions;
+
+		_partitions=new PartitionFacade[_numPartitions];
+		long timeStamp=System.currentTimeMillis();
+		boolean queueing=true;
+		for (int i=0; i<_numPartitions; i++)
+			_partitions[i]=new PartitionFacade(i, timeStamp, new DummyPartition(i), queueing, this);
+
+		_cluster=((ActiveClusterDispatcher)_dispatcher).getCluster();
+		_distributedState=distributedState;
+		_inactiveTime=_dispatcher.getInactiveTime();
+		_callback=callback;
+		_mapper=mapper;
+	}
+
+	protected PartitionManagerConfig _config;
+
+	public void init(PartitionManagerConfig config) {
+		_config=config;
+		_log.trace("init");
+		// attach relevant message handlers to dispatcher...
+		_dispatcher.register(this, "onPartitionTransferCommand", PartitionTransferCommand.class);
+		_dispatcher.register(PartitionTransferAcknowledgement.class, _inactiveTime);
+		_dispatcher.register(this, "onPartitionTransferRequest", PartitionTransferRequest.class);
+		_dispatcher.register(PartitionTransferResponse.class, _inactiveTime);
+		_dispatcher.register(this, "onPartitionEvacuationRequest", PartitionEvacuationRequest.class);
+		_dispatcher.register(PartitionEvacuationResponse.class, _inactiveTime);
+		_dispatcher.register(this, "onPartitionRepopulateRequest", PartitionRepopulateRequest.class);
+		_dispatcher.register(PartitionRepopulateResponse.class, _inactiveTime);
+	}
+
+	public void start() throws Exception {
+		_log.trace("starting...");
+		_log.trace("...started");
+	}
+
+	public void evacuate() throws Exception {
+		_log.info("evacuating...");
+
+		PartitionEvacuationRequest request=new PartitionEvacuationRequest();
+		Node localNode=_cluster.getLocalNode();
+		String correlationId=_cluster.getLocalNode().getName();
+		if (_log.isTraceEnabled()) _log.trace("evacuating partitions...: "+_dispatcher.getNodeName(localNode.getDestination())+" -> "+_config.getCoordinatorNode().getState().get("nodeName"));
+		while (_dispatcher.exchangeSend(localNode.getDestination(), _config.getCoordinatorNode().getDestination(), correlationId, request, _inactiveTime)==null) {
+			if (_log.isWarnEnabled()) _log.warn("could not contact Coordinator - backing off for "+ _inactiveTime+" millis...");
+			Thread.sleep(_config.getInactiveTime());
+		}
+
+		_log.info("...evacuated");
+	}
+
+	public void stop() throws Exception {
+		_log.info("stopping...");
+		// detach relevant message handlers from dispatcher...
+		_dispatcher.deregister("onPartitionTransferCommand", PartitionTransferCommand.class, 5000);
+		_dispatcher.deregister("onPartitionTransferRequest", PartitionTransferRequest.class, 5000);
+		_dispatcher.deregister("onPartitionEvacuationRequest", PartitionEvacuationRequest.class, 5000);
+		_dispatcher.deregister("onPartitionRepopulateRequest", PartitionRepopulateRequest.class, 5000);
+		_log.info("...stopped");
+	}
+
+	public PartitionFacade getPartition(int partition) {
+		return _partitions[partition];
+	}
+
+	// a node wants to shutdown...
+	public void onPartitionEvacuationRequest(ObjectMessage om, PartitionEvacuationRequest request) {
+		Node from;
+		try {
+			Destination destination=om.getJMSReplyTo();
+			Node local=_cluster.getLocalNode();
+			if (destination.equals(local.getDestination()))
+				from=local;
+			else
+				from=(Node)_cluster.getNodes().get(destination);
+		} catch (JMSException e) {
+			_log.warn("could not read src node from message", e);
+			from=null;
+		}
+
+		assert from!=null;
+		_callback.onNodeRemoved(new ClusterEvent(_cluster, from, ClusterEvent.REMOVE_NODE));
+	}
+
+	// a node wants to rebuild a lost partition
+	public void onPartitionRepopulateRequest(ObjectMessage om, PartitionRepopulateRequest request) {
+		int keys[]=request.getKeys();
+		if (_log.isTraceEnabled()) _log.trace("PartitionRepopulateRequest ARRIVED: " + keys);
+		Collection[] c=createResultSet(_numPartitions, keys);
+		try {
+			_log.trace("findRelevantSessionNames - starting");
+			_config.findRelevantSessionNames(_numPartitions, c);
+			_log.trace("findRelevantSessionNames - finished");
+		} catch (Throwable t) {
+			_log.warn("ERROR", t);
+		}
+		if (!_dispatcher.reply(om, new PartitionRepopulateResponse(c)))
+			_log.warn("unexpected problem responding to partition repopulation request");
+	}
+
+	// receive a command to transfer IndexPartitions to another node
+	// send them in a request, waiting for response
+	// send an acknowledgement to Coordinator who sent original command
+	public void onPartitionTransferCommand(ObjectMessage om, PartitionTransferCommand command) {
+		PartitionTransfer[] transfers=command.getTransfers();
+		for (int i=0; i<transfers.length; i++) {
+			PartitionTransfer transfer=transfers[i];
+			int amount=transfer.getAmount();
+			Destination destination=transfer.getDestination();
+
+			// acquire partitions for transfer...
+			LocalPartition[] acquired=null;
+			try {
+				Collection c=new ArrayList();
+				for (int j=0; j<_numPartitions && c.size()<amount; j++) {
+					PartitionFacade facade=_partitions[j];
+					if (facade.isLocal()) {
+						Partition partition=facade.getContent();
+						c.add(partition);
+					}
+				}
+				acquired=(LocalPartition[])c.toArray(new LocalPartition[c.size()]);
+				assert amount==acquired.length;
+
+				long timeStamp=System.currentTimeMillis();
+
+				// build request...
+				if (_log.isTraceEnabled()) _log.trace("local state (before giving): " + getPartitionKeys());
+				PartitionTransferRequest request=new PartitionTransferRequest(timeStamp, acquired);
+				// send it...
+				ObjectMessage om3=_dispatcher.exchangeSend(_dispatcher.getLocalDestination(), destination, request, _inactiveTime);
+				// process response...
+				if (om3!=null && ((PartitionTransferResponse)om3.getObject()).getSuccess()) {
+					for (int j=0; j<acquired.length; j++) {
+						PartitionFacade facade=null;
+						facade=_partitions[acquired[j].getKey()];
+						facade.setContentRemote(timeStamp, _dispatcher, destination); // TODO - should we use a more recent ts ?
+					}
+					if (_log.isDebugEnabled()) _log.debug("released "+acquired.length+" partition[s] to "+_dispatcher.getNodeName(destination));
+				} else {
+					_log.warn("transfer unsuccessful");
+				}
+			} catch (Throwable t) {
+				_log.warn("unexpected problem", t);
+			}
+		}
+		try {
+			PartitionKeys keys=getPartitionKeys();
+			_distributedState.put(_partitionKeysKey, keys);
+			_distributedState.put(_timeStampKey, new Long(System.currentTimeMillis()));
+			if (_log.isTraceEnabled()) _log.trace("local state (after giving): " + keys);
+			String correlationID=_dispatcher.getOutgoingCorrelationId(om);
+			if (_log.isTraceEnabled()) _log.trace("CORRELATIONID: " + correlationID);
+			Map correlationIDMap=(Map)_distributedState.get(_correlationIDMapKey);
+			Destination from=om.getJMSReplyTo();
+			correlationIDMap.put(from, correlationID);
+			_dispatcher.setDistributedState(_distributedState);
+			if (_log.isTraceEnabled()) _log.trace("distributed state updated: " + _dispatcher.getDistributedState());
+			correlateStateUpdate(_distributedState); // onStateUpdate() does not get called locally
+			correlationIDMap.remove(from);
+			// FIXME - RACE - between update of distributed state and ack - they should be one and the same thing...
+			//_dispatcher.reply(om, new PartitionTransferAcknowledgement(true)); // what if failure - TODO
+		} catch (Exception e) {
+			_log.warn("could not acknowledge safe transfer to Coordinator", e);
+		}
+	}
+
+	// receive a transfer of partitions
+	public synchronized void onPartitionTransferRequest(ObjectMessage om, PartitionTransferRequest request) {
+		long timeStamp=request.getTimeStamp();
+		LocalPartition[] partitions=request.getPartitions();
+		boolean success=false;
+		// read incoming data into our own local model
+		if (_log.isTraceEnabled()) _log.trace("local state (before receiving): " + getPartitionKeys());
+		for (int i=0; i<partitions.length; i++) {
+			LocalPartition partition=partitions[i];
+			partition.init(this);
+			PartitionFacade facade=getPartition(partition.getKey());
+			facade.setContent(timeStamp, partition);
+		}
+		success=true;
+		try {
+			PartitionKeys keys=getPartitionKeys();
+			_distributedState.put(_partitionKeysKey, keys);
+			_distributedState.put(_timeStampKey, new Long(System.currentTimeMillis()));
+			if (_log.isTraceEnabled()) _log.trace("local state (after receiving): " + keys);
+			_dispatcher.setDistributedState(_distributedState);
+			if (_log.isTraceEnabled()) _log.trace("distributed state updated: " + _dispatcher.getDistributedState());
+		} catch (Exception e) {
+			_log.error("could not update distributed state", e);
+		}
+		// acknowledge safe receipt to donor
+		if (_dispatcher.reply(om, new PartitionTransferResponse(success))) {
+			// unlock Partitions here... - TODO
+			try {
+				if (_log.isDebugEnabled()) _log.debug("acquired "+partitions.length+" partition[s] from "+_dispatcher.getNodeName(om.getJMSReplyTo()));
+			} catch (JMSException e) {
+				_log.warn("problem reading incoming message's source", e);
+			}
+		} else {
+			_log.warn("problem acknowledging reciept of IndexPartitions - donor may have died");
+			// chuck them... - TODO
+		}
+	}
+
+	// TODO - duplicate code (from DIndex)
+	public Collection[] createResultSet(int numPartitions, int[] keys) {
+		Collection[] c=new Collection[numPartitions];
+		for (int i=0; i<keys.length; i++)
+			c[keys[i]]=new ArrayList();
+		return c;
+	}
+
+	// ClusterListener
+
+	public void update(Node node) {
+		Map state=node.getState();
+		long timeStamp=((Long)state.get(_timeStampKey)).longValue();
+		PartitionKeys keys=(PartitionKeys)state.get(_partitionKeysKey);
+		Destination location=node.getDestination();
+		int[] k=keys._keys;
+		for (int i=0; i<k.length; i++) {
+			int key=k[i];
+			PartitionFacade facade=_partitions[key];
+			facade.setContentRemote(timeStamp, _dispatcher, location);
+		}
+	}
+
+
+	public void markExistingPartitions(Node[] nodes, boolean[] partitionIsPresent) {
+		for (int i=0; i<nodes.length; i++) {
+			Node node=nodes[i];
+			if (node!=null) {
+				PartitionKeys keys=DIndex.getPartitionKeys(node);
+				if (keys!=null) {
+					int[] k=keys.getKeys();
+					for (int j=0; j<k.length; j++) {
+						int index=k[j];
+						if (partitionIsPresent[index]) {
+							if (_log.isErrorEnabled()) _log.error("partition " + index + " found on more than one node");
+						} else {
+							partitionIsPresent[index]=true;
+						}
+					}
+				}
+			}
+		}
+	}
+
+	public void regenerateMissingPartitions(Node[] living, Node[] leaving) {
+		boolean[] partitionIsPresent=new boolean[_numPartitions];
+		markExistingPartitions(living, partitionIsPresent);
+		markExistingPartitions(leaving, partitionIsPresent);
+		Collection missingPartitions=new ArrayList();
+		for (int i=0; i<partitionIsPresent.length; i++) {
+			if (!partitionIsPresent[i])
+				missingPartitions.add(new Integer(i));
+		}
+
+		int numKeys=missingPartitions.size();
+		if (numKeys>0) {
+			assert _allowRegenerationOfMissingPartitions;
+			// convert to int[]
+			int[] missingKeys=new int[numKeys];
+			int key=0;
+			for (Iterator i=missingPartitions.iterator(); i.hasNext(); )
+				missingKeys[key++]=((Integer)i.next()).intValue();
+
+			if (_log.isWarnEnabled()) _log.warn("RECREATING PARTITIONS...: " + missingPartitions);
+			long time=System.currentTimeMillis();
+			for (int i=0; i<missingKeys.length; i++) {
+				int k=missingKeys[i];
+				PartitionFacade facade=_partitions[k];
+				LocalPartition local=new LocalPartition(k);
+				local.init(this);
+				facade.setContent(time, local);
+			}
+			PartitionKeys newKeys=getPartitionKeys();
+			if (_log.isWarnEnabled()) _log.warn("REPOPULATING PARTITIONS...: " + missingPartitions);
+			String correlationId=_dispatcher.nextCorrelationId();
+			Quipu rv=_dispatcher.setRendezVous(correlationId, _dispatcher.getNumNodes()-1);
+			if (!_dispatcher.send(_dispatcher.getLocalDestination(), _dispatcher.getClusterDestination(), correlationId, new PartitionRepopulateRequest(missingKeys))) {
+				_log.error("unexpected problem repopulating lost index");
+			}
+
+			// whilst we are waiting for the other nodes to get back to us, figure out which relevant sessions
+			// we are carrying ourselves...
+			Collection[] c=createResultSet(_numPartitions, missingKeys);
+			_config.findRelevantSessionNames(_numPartitions, c);
+			repopulate(_dispatcher.getLocalDestination(), c);
+
+			//boolean success=false;
+			try {
+				/*success=*/rv.waitFor(_inactiveTime);
+			} catch (InterruptedException e) {
+				_log.warn("unexpected interruption", e);
+			}
+			Collection results=rv.getResults();
+
+			for (Iterator i=results.iterator(); i.hasNext(); ) {
+				ObjectMessage message=(ObjectMessage)i.next();
+				try {
+					Destination from=message.getJMSReplyTo();
+					PartitionRepopulateResponse response=(PartitionRepopulateResponse)message.getObject();
+					Collection[] relevantKeys=response.getKeys();
+
+					repopulate(from, relevantKeys);
+
+				} catch (JMSException e) {
+					_log.warn("unexpected problem interrogating response", e);
+				}
+			}
+
+			if (_log.isWarnEnabled()) _log.warn("...PARTITIONS REPOPULATED: " + missingPartitions);
+			// relayout dindex
+			_distributedState.put(_partitionKeysKey, newKeys);
+			try {
+				_dispatcher.setDistributedState(_distributedState);
+				if (_log.isTraceEnabled()) _log.trace("distributed state updated: " + _dispatcher.getDistributedState());
+			} catch (Exception e) {
+				_log.error("could not update distributed state", e);
+			}
+		}
+	}
+
+	public PartitionKeys getPartitionKeys() {
+		return new PartitionKeys(_partitions);
+	}
+
+	public void repopulate(Destination location, Collection[] keys) {
+		assert location!=null;
+		for (int i=0; i<_numPartitions; i++) {
+			Collection c=keys[i];
+			if (c!=null) {
+				PartitionFacade facade=_partitions[i];
+				LocalPartition local=(LocalPartition)facade.getContent();
+				for (Iterator j=c.iterator(); j.hasNext(); ) {
+					String name=(String)j.next();
+					local.put(name, location);
+				}
+			}
+		}
+	}
+
+	public void localise() {
+		if (_log.isDebugEnabled()) _log.debug("allocating " + _numPartitions + " partitions");
+		long timeStamp=System.currentTimeMillis();
+		for (int i=0; i<_numPartitions; i++) {
+			PartitionFacade facade=_partitions[i];
+			LocalPartition partition=new LocalPartition(i);
+			partition.init(this);
+			facade.setContent(timeStamp, partition);
+		}
+	}
+
+	// TODO - duplicate code - see DIndex...
+	protected void correlateStateUpdate(Map state) {
+		Map correlationIDMap=(Map)state.get(_correlationIDMapKey);
+		Destination local=_dispatcher.getLocalDestination();
+		String correlationID=(String)correlationIDMap.get(local);
+		if (correlationID!=null) {
+			Quipu rv=(Quipu)_dispatcher.getRendezVousMap().get(correlationID);
+			if (rv==null) {
+				if (_log.isWarnEnabled()) _log.warn("no one waiting for: " + correlationID);
+			} else {
+				if (_log.isTraceEnabled()) _log.trace("successful correlation: " + correlationID);
+				rv.putResult(state);
+			}
+		}
+	}
+
+//	public void repopulatePartitions(Destination location, Collection[] keys) {
+//	for (int i=0; i<keys.length; i++) {
+//	Collection c=keys[i];
+//	if (c!=null) {
+//	for (Iterator j=c.iterator(); j.hasNext(); ) {
+//	String key=(String)j.next();
+//	LocalPartition partition=(LocalPartition)_partitions[i].getContent();
+//	partition._map.put(key, location);
+//	}
+//	}
+//	}
+//	}
+
+	public int getNumPartitions() {
+		return _numPartitions;
+	}
+
+	public PartitionFacade getPartition(Object key) {
+		return _partitions[_mapper.map(key)];
+	}
+
+	// PartitionConfig API
+
+	public Dispatcher getDispatcher() {
+		return _dispatcher;
+	}
+
+	public Cluster getCluster() {
+		return _cluster;
+	}
+
+	public String getNodeName(Destination destination) {
+		return _dispatcher.getNodeName(destination);
+	}
+
+	public long getInactiveTime() {
+		return _inactiveTime;
+	}
+
+	// PartitionConfig API
+
+	public String getLocalNodeName() {
+		return _nodeName;
+	}
+
+	public LockManager getPMSyncs() {
+		return _pmSyncs;
+	}
+
+}

Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/SimpleStateManager.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/SimpleStateManager.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/SimpleStateManager.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/SimpleStateManager.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,340 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.dindex.impl;
+
+import java.nio.ByteBuffer;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.ObjectMessage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.wadi.Immoter;
+import org.codehaus.wadi.InvocationContext;
+import org.codehaus.wadi.InvocationException;
+import org.codehaus.wadi.Location;
+import org.codehaus.wadi.Motable;
+import org.codehaus.wadi.dindex.StateManager;
+import org.codehaus.wadi.dindex.StateManagerConfig;
+import org.codehaus.wadi.dindex.messages.DIndexForwardRequest;
+import org.codehaus.wadi.dindex.messages.DIndexRelocationRequest;
+import org.codehaus.wadi.dindex.messages.DIndexRelocationResponse;
+import org.codehaus.wadi.dindex.newmessages.DeleteIMToPM;
+import org.codehaus.wadi.dindex.newmessages.DeletePMToIM;
+import org.codehaus.wadi.dindex.newmessages.InsertIMToPM;
+import org.codehaus.wadi.dindex.newmessages.InsertPMToIM;
+import org.codehaus.wadi.dindex.newmessages.MoveIMToPM;
+import org.codehaus.wadi.dindex.newmessages.MoveIMToSM;
+import org.codehaus.wadi.dindex.newmessages.MovePMToIM;
+import org.codehaus.wadi.dindex.newmessages.MovePMToSM;
+import org.codehaus.wadi.dindex.newmessages.MoveSMToIM;
+import org.codehaus.wadi.dindex.newmessages.MoveSMToPM;
+import org.codehaus.wadi.dindex.newmessages.ReleaseEntryRequest;
+import org.codehaus.wadi.dindex.newmessages.ReleaseEntryResponse;
+import org.codehaus.wadi.gridstate.Dispatcher;
+import org.codehaus.wadi.impl.AbstractMotable;
+import org.codehaus.wadi.impl.RankedRWLock;
+import org.codehaus.wadi.impl.SimpleMotable;
+import org.codehaus.wadi.impl.Utils;
+
+import EDU.oswego.cs.dl.util.concurrent.Sync;
+import EDU.oswego.cs.dl.util.concurrent.TimeoutException;
+
+public class SimpleStateManager implements StateManager {
+
+	protected final Log _lockLog=LogFactory.getLog("org.codehaus.wadi.LOCKS");
+	protected final Dispatcher _dispatcher;
+	protected final long _inactiveTime;
+	protected final int _resTimeout=500; // TODO - parameterise
+
+	protected StateManagerConfig _config;
+	protected Log _log=LogFactory.getLog(getClass());
+
+	public SimpleStateManager(Dispatcher dispatcher, long inactiveTime) {
+		super();
+		_dispatcher=dispatcher;
+		_inactiveTime=inactiveTime;
+	}
+
+	public void init(StateManagerConfig config) {
+		_config=config;
+		_log=LogFactory.getLog(getClass().getName()+"#"+_config.getLocalNodeName());
+		_dispatcher.register(this, "onDIndexInsertionRequest", InsertIMToPM.class);
+		_dispatcher.register(InsertPMToIM.class, _inactiveTime);
+		_dispatcher.register(this, "onDIndexDeletionRequest", DeleteIMToPM.class);
+		_dispatcher.register(DeletePMToIM.class, _inactiveTime);
+		_dispatcher.register(this, "onDIndexRelocationRequest", DIndexRelocationRequest.class);
+		_dispatcher.register(DIndexRelocationResponse.class, _inactiveTime);
+		_dispatcher.register(this, "onDIndexForwardRequest", DIndexForwardRequest.class);
+
+		// GridState - Relocate - 5 messages - IM->PM->SM->IM->SM->PM
+		_dispatcher.register(this, "onMessage", MoveIMToPM.class);
+		_dispatcher.register(this, "onMessage", MovePMToSM.class);
+		_dispatcher.register(MoveSMToIM.class, _inactiveTime);
+		_dispatcher.register(MoveIMToSM.class, _inactiveTime);
+		_dispatcher.register(MoveSMToPM.class, _inactiveTime);
+		// or possibly - IM->PM->IM (failure)
+		_dispatcher.register(MovePMToIM.class, _inactiveTime);
+	}
+
+	public void start() throws Exception {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void stop() throws Exception {
+		_dispatcher.deregister("onDIndexInsertionRequest", InsertIMToPM.class, 5000);
+		_dispatcher.deregister("onDIndexDeletionRequest", DeleteIMToPM.class, 5000);
+		_dispatcher.deregister("onDIndexRelocationRequest", DIndexRelocationRequest.class, 5000);
+		_dispatcher.deregister("onDIndexForwardRequest", DIndexForwardRequest.class, 5000);
+	}
+
+
+	public void onDIndexInsertionRequest(ObjectMessage om, InsertIMToPM request) {
+		_config.getPartition(request.getKey()).onMessage(om, request);
+	}
+
+	public void onDIndexDeletionRequest(ObjectMessage om, DeleteIMToPM request) {
+		_config.getPartition(request.getKey()).onMessage(om, request);
+	}
+
+	public void onDIndexForwardRequest(ObjectMessage om, DIndexForwardRequest request) {
+		_config.getPartition(request.getKey()).onMessage(om, request);
+	}
+
+	public void onDIndexRelocationRequest(ObjectMessage om, DIndexRelocationRequest request) {
+		_config.getPartition(request.getKey()).onMessage(om, request);
+	}
+
+	public void onMessage(ObjectMessage message, MoveIMToPM request) {
+		_config.getPartition(request.getKey()).onMessage(message, request);
+	}
+
+	//----------------------------------------------------------------------------------------------------
+
+	class PMToIMEmotable extends AbstractMotable {
+
+		protected final String _name;
+		protected final String _tgtNodeName;
+		protected ObjectMessage _message1;
+		protected final MovePMToSM _get;
+
+		public PMToIMEmotable(String name, String nodeName, ObjectMessage message1, MovePMToSM get) {
+			_name=name;
+			_tgtNodeName=nodeName;
+			_message1=message1;
+			_get=get;
+		}
+		public byte[] getBodyAsByteArray() throws Exception {
+			throw new UnsupportedOperationException();
+		}
+
+		public void setBodyAsByteArray(byte[] bytes) throws Exception {
+			Motable immotable=new SimpleMotable();
+			immotable.setBodyAsByteArray(bytes);
+
+			Object key=_get.getKey();
+			Dispatcher dispatcher=_config.getDispatcher();
+			long timeout=_config.getInactiveTime();
+			Destination sm=dispatcher.getLocalDestination();
+			Destination im=(Destination)_get.getIM();
+			MoveSMToIM request=new MoveSMToIM(key, bytes);
+			// send on state from StateMaster to InvocationMaster...
+			if (_log.isTraceEnabled()) _log.trace("exchanging MoveSMToIM between: "+_config.getNodeName(sm)+"->"+_config.getNodeName(im));
+			ObjectMessage message2=(ObjectMessage)dispatcher.exchangeSend(sm, im, request, timeout, _get.getIMCorrelationId());
+			// should receive response from IM confirming safe receipt...
+			if (message2==null) {
+				_log.error("NO REPLY RECEIVED FOR MESSAGE IN TIMEFRAME - PANIC!");
+			} else {
+//				MoveIMToSM response=null;
+//				try {
+//					response=(MoveIMToSM)message2.getObject();
+//					// acknowledge transfer completed to PartitionMaster, so it may unlock resource...
+					dispatcher.reply(_message1,new MoveSMToPM(true));
+//				} catch (JMSException e) {
+//					_log.error("unexpected problem", e);
+//				}
+			}
+		}
+
+		public ByteBuffer getBodyAsByteBuffer() throws Exception {
+			throw new UnsupportedOperationException();
+		}
+
+		public void setBodyAsByteBuffer(ByteBuffer body) throws Exception {
+			throw new UnsupportedOperationException();
+		}
+	}
+
+	/**
+	 * We receive a RelocationRequest and pass a RelocationImmoter down the Contextualiser stack. The Session is passed to us
+	 * through the Immoter and we pass it back to the Request-ing node...
+	 *
+	 * @author <a href="mailto:jules@coredevelopers.net">Jules Gosnell</a>
+	 * @version $Revision: 1.21 $
+	 */
+	class RelocationImmoter implements Immoter {
+		protected final Log _log=LogFactory.getLog(getClass());
+
+		protected final String _tgtNodeName;
+		protected ObjectMessage _message;
+		protected final MovePMToSM _request;
+
+		protected boolean _found=false;
+		protected Sync _invocationLock;
+
+		public RelocationImmoter(String nodeName, ObjectMessage message, MovePMToSM request) {
+			_tgtNodeName=nodeName;
+			_message=message;
+			_request=request;
+		}
+
+		public Motable nextMotable(String name, Motable emotable) {
+			return new PMToIMEmotable(name, _tgtNodeName, _message, _request);
+		}
+
+		public boolean prepare(String name, Motable emotable, Motable immotable) {
+			// work is done in ClusterEmotable...
+			// take invocation lock
+			//boolean needsRelease=false;
+			_invocationLock=_config.getInvocationLock(name);
+			try {
+				Utils.acquireUninterrupted("Invocation", name, _invocationLock);
+				//needsRelease=true;
+			} catch (TimeoutException e) {
+				_log.error("unexpected timeout - proceding without lock", e);
+			}
+			return true;
+		}
+
+		public void commit(String name, Motable immotable) {
+			// do nothing
+			// release invocation lock
+			_found=true;
+			Utils.release("Invocation", name, _invocationLock);
+		}
+
+		public void rollback(String name, Motable immotable) {
+			// this probably has to by NYI... - nasty...
+		}
+
+		public boolean contextualise(InvocationContext invocationContext, String id, Motable immotable, Sync motionLock) throws InvocationException {
+			return false;
+		}
+
+		public String getInfo() {
+			return "emigration:"+_tgtNodeName;
+		}
+
+		public boolean getFound() {
+			return _found;
+		}
+
+	}
+
+	//--------------------------------------------------------------------------------------
+
+	// called on State Master...
+	public void onMessage(ObjectMessage message1, MovePMToSM request) {
+		// DO NOT Dispatch onto Partition - deal with it here...
+		Object key=request.getKey();
+		//String nodeName=_config.getLocalNodeName();
+		try {
+			RankedRWLock.setPriority(RankedRWLock.EMIGRATION_PRIORITY);
+
+			// Tricky - we need to call a Moter at this point and start removal of State to other node...
+
+			try {
+				Destination im=(Destination)request.getIM();
+				String imName=_config.getNodeName(im);
+				RelocationImmoter promoter=new RelocationImmoter(imName, message1, request);
+				//boolean found=
+				_config.contextualise(null, (String)key, promoter, null, true); // if we own session, this will send the correct response...
+				if (!promoter.getFound()) {
+					_log.warn("state not found - perhaps it has just been destroyed: "+key);
+					MoveSMToIM req=new MoveSMToIM(key, null);
+					// send on null state from StateMaster to InvocationMaster...
+					Destination sm=_dispatcher.getLocalDestination();
+					long timeout=_config.getInactiveTime();
+					_log.info("sending 0 bytes to : "+imName);
+					ObjectMessage ignore=(ObjectMessage)_dispatcher.exchangeSend(sm, im, req, timeout, request.getIMCorrelationId());
+					_log.info("received: "+ignore);
+					// StateMaster replies to PartitionMaster indicating failure...
+					_log.info("reporting failure to PM");
+					_dispatcher.reply(message1,new MoveSMToPM(false));
+				}
+			} catch (Exception e) {
+				if (_log.isWarnEnabled()) _log.warn("problem handling relocation request: "+key, e);
+			} finally {
+				RankedRWLock.setPriority(RankedRWLock.NO_PRIORITY);
+			}
+		} finally {
+		}
+	}
+
+	// evacuation protocol
+
+	public boolean offerEmigrant(String key, Motable emotable, long timeout) {
+		Destination to=((RemotePartition)_config.getPartition(key).getContent()).getDestination(); // TODO - HACK - temporary
+		Destination from=_dispatcher.getLocalDestination();
+		ReleaseEntryRequest request=new ReleaseEntryRequest(emotable);
+		ObjectMessage message=_dispatcher.exchangeSend(from, to, request, timeout);
+		ReleaseEntryResponse ack=null;
+		try {
+			ack=message==null?null:(ReleaseEntryResponse)message.getObject();
+		} catch (JMSException e) {
+			_log.error("could not unpack response", e);
+		}
+
+		if (ack==null) {
+			if (_log.isWarnEnabled()) _log.warn("no acknowledgement within timeframe ("+timeout+" millis): "+key);
+			return false;
+		} else {
+			if (_log.isTraceEnabled()) _log.trace("received acknowledgement within timeframe ("+timeout+" millis): "+key);
+			return true;
+		}
+	}
+
+	public void acceptImmigrant(ObjectMessage message, Location location, String name, Motable motable) {
+		if (!_dispatcher.reply(message, new ReleaseEntryResponse(name, location))) {
+			if (_log.isErrorEnabled()) _log.error("could not acknowledge safe receipt: "+name);
+		}
+	}
+
+	protected ImmigrationListener _listener;
+
+	public void setImmigrationListener(ImmigrationListener listener) {
+		_dispatcher.register(this, "onEmigrationRequest", ReleaseEntryRequest.class);
+		_dispatcher.register(ReleaseEntryResponse.class, _resTimeout);
+		_listener=listener;
+	}
+
+	public void unsetImmigrationListener(ImmigrationListener listener) {
+		if (_listener==listener) {
+			_listener=null;
+			// TODO ...
+			//_dispatcher.deregister("onEmigrationRequest", EmigrationRequest.class, _resTimeout);
+			//_dispatcher.deregister("onEmigrationResponse", EmigrationResponse.class, _resTimeout);
+		}
+	}
+
+	public void onEmigrationRequest(ObjectMessage message, ReleaseEntryRequest request) {
+		_listener.onImmigration(message, request.getMotable());
+	}
+
+}

Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/DIndexForwardRequest.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/DIndexForwardRequest.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/DIndexForwardRequest.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/DIndexForwardRequest.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,50 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.dindex.messages;
+
+import java.io.Serializable;
+
+import org.codehaus.wadi.dindex.DIndexRequest;
+
+public class DIndexForwardRequest implements DIndexRequest, Serializable {
+
+    protected DIndexRequest _request;
+
+    public DIndexForwardRequest(DIndexRequest request) {
+        _request=request;
+    }
+
+    protected DIndexForwardRequest() {
+        // for deserialisation...
+    }
+
+    public DIndexRequest getRequest() {
+        return _request;
+    }
+
+    public int getPartitionKey(int numPartitions) {
+        return _request.getPartitionKey(numPartitions);
+    }
+
+    public String getKey() {
+        return _request.getKey();
+    }
+
+    public String toString() {
+        return "["+_request.toString()+"]";
+    }
+}

Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/DIndexRelocationRequest.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/DIndexRelocationRequest.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/DIndexRelocationRequest.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/DIndexRelocationRequest.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,33 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.dindex.messages;
+
+import java.io.Serializable;
+
+import org.codehaus.wadi.dindex.impl.AbstractDIndexRequest;
+
+public class DIndexRelocationRequest extends AbstractDIndexRequest implements Serializable {
+
+    public DIndexRelocationRequest(String name) {
+        super(name);
+    }
+
+    public String toString() {
+        return "<DIndexRelocationRequest: "+_key+">";
+    }
+    
+}

Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/DIndexRelocationResponse.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/DIndexRelocationResponse.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/DIndexRelocationResponse.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/DIndexRelocationResponse.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,34 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.dindex.messages;
+
+import java.io.Serializable;
+
+import org.codehaus.wadi.dindex.DIndexResponse;
+
+public class DIndexRelocationResponse implements DIndexResponse, Serializable {
+
+    public DIndexRelocationResponse() {
+        super();
+        // TODO Auto-generated constructor stub
+    }
+
+    public String toString() {
+        return "<DIndexRelocationResponse>";
+    }
+ 
+}

Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/PartitionEvacuationRequest.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/PartitionEvacuationRequest.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/PartitionEvacuationRequest.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/PartitionEvacuationRequest.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,34 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.dindex.messages;
+
+import java.io.Serializable;
+
+import org.codehaus.wadi.OldMessage;
+
+public class PartitionEvacuationRequest implements OldMessage, Serializable {
+
+    public PartitionEvacuationRequest() {
+        super();
+    }
+
+
+    public String toString() {
+        return "<PartitionEvacuationRequest>";
+    }
+
+}

Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/PartitionEvacuationResponse.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/PartitionEvacuationResponse.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/PartitionEvacuationResponse.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/PartitionEvacuationResponse.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,34 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.dindex.messages;
+
+import java.io.Serializable;
+
+import org.codehaus.wadi.OldMessage;
+
+public class PartitionEvacuationResponse implements OldMessage, Serializable {
+
+    public PartitionEvacuationResponse() {
+        super();
+        // TODO Auto-generated constructor stub
+    }
+
+
+    public String toString() {
+        return "<PartitionEvacuationResponse>";
+    }
+}

Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/PartitionRepopulateRequest.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/PartitionRepopulateRequest.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/PartitionRepopulateRequest.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/PartitionRepopulateRequest.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,44 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.dindex.messages;
+
+import java.io.Serializable;
+
+import org.codehaus.wadi.OldMessage;
+
+public class PartitionRepopulateRequest implements OldMessage, Serializable {
+
+    protected int[] _keys;
+
+    public PartitionRepopulateRequest(int[] keys) {
+        super();
+        _keys=keys;
+    }
+
+    protected PartitionRepopulateRequest() {
+        // for deserialisation...
+    }
+
+    public int[] getKeys() {
+        return _keys;
+    }
+
+
+    public String toString() {
+        return "<PartitionRepopulateRequest"+_keys+">";
+    }
+}

Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/PartitionRepopulateResponse.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/PartitionRepopulateResponse.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/PartitionRepopulateResponse.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/PartitionRepopulateResponse.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,52 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.dindex.messages;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+import org.codehaus.wadi.OldMessage;
+
+public class PartitionRepopulateResponse implements OldMessage, Serializable {
+
+    protected Collection[] _keys;
+
+    public PartitionRepopulateResponse(Collection[] keys) {
+        super();
+        _keys=keys;
+    }
+
+    protected PartitionRepopulateResponse() {
+        // for deserialisation
+    }
+
+    public Collection[] getKeys() {
+        return _keys;
+    }
+
+
+    public String toString() {
+    	StringBuffer buffer=new StringBuffer("<PartitionRepopulateResponse: ");
+    	for (int i=0; i<_keys.length; i++) {
+    		Collection c=_keys[i];
+    		if (c!=null)
+    			buffer.append(""+i+":"+c.toString()+", ");
+    	}
+    	buffer.append(">");
+    	return buffer.toString();
+    }
+}