You are viewing a plain text version of this content. The canonical link for it is here.
Posted to wadi-commits@incubator.apache.org by bd...@apache.org on 2005/12/14 23:36:16 UTC
svn commit: r356933 [6/35] - in /incubator/wadi/trunk: ./ etc/ modules/
modules/assembly/ modules/assembly/src/ modules/assembly/src/bin/
modules/assembly/src/conf/ modules/assembly/src/main/
modules/assembly/src/main/assembly/ modules/core/ modules/co...
Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/LocalPartition.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/LocalPartition.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/LocalPartition.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/LocalPartition.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,272 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.dindex.impl;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.ObjectMessage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.wadi.dindex.PartitionConfig;
+import org.codehaus.wadi.dindex.DIndexRequest;
+import org.codehaus.wadi.dindex.DIndexResponse;
+import org.codehaus.wadi.dindex.messages.DIndexForwardRequest;
+import org.codehaus.wadi.dindex.messages.DIndexRelocationRequest;
+import org.codehaus.wadi.dindex.messages.DIndexRelocationResponse;
+import org.codehaus.wadi.dindex.messages.RelocationRequest;
+import org.codehaus.wadi.dindex.messages.RelocationResponse;
+import org.codehaus.wadi.dindex.newmessages.DeleteIMToPM;
+import org.codehaus.wadi.dindex.newmessages.DeletePMToIM;
+import org.codehaus.wadi.dindex.newmessages.InsertIMToPM;
+import org.codehaus.wadi.dindex.newmessages.InsertPMToIM;
+import org.codehaus.wadi.dindex.newmessages.MoveIMToPM;
+import org.codehaus.wadi.dindex.newmessages.MovePMToSM;
+import org.codehaus.wadi.dindex.newmessages.MoveSMToPM;
+import org.codehaus.wadi.dindex.newmessages.MovePMToIM;
+import org.codehaus.wadi.gridstate.Dispatcher;
+import org.codehaus.wadi.impl.Utils;
+
+import EDU.oswego.cs.dl.util.concurrent.Sync;
+
+public class LocalPartition extends AbstractPartition implements Serializable {
+
+ protected transient Log _log=LogFactory.getLog(getClass());
+ protected transient Log _lockLog=LogFactory.getLog("org.codehaus.wadi.LOCKS");
+
+ protected Map _map=new HashMap();
+ protected transient PartitionConfig _config;
+
+ public LocalPartition(int key) {
+ super(key);
+ }
+
+ protected LocalPartition() {
+ super();
+ // for deserialisation...
+ }
+
+ public void init(PartitionConfig config) {
+ _config=config;
+ _log=LogFactory.getLog(getClass().getName()+"#"+_key+"@"+_config.getLocalNodeName());
+ _lockLog=LogFactory.getLog("org.codehaus.wadi.LOCKS");
+ }
+
+ public boolean isLocal() {
+ return true;
+ }
+
+ public String toString() {
+ return "<LocalPartition:"+_key+"@"+(_config==null?"<unknown>":_config.getLocalNodeName())+">";
+ }
+
+ public void put(String name, Destination destination) {
+ synchronized (_map) {
+ // TODO - check key was not already in use...
+ _map.put(name, destination);
+ }
+ }
+
+ // we probably don't need the partition lock for this - but lets play safe to start with
+ public void onMessage(ObjectMessage message, InsertIMToPM request) {
+ Destination newDestination=null;
+ try{newDestination=message.getJMSReplyTo();} catch (JMSException e) {_log.error("unexpected problem", e);}
+ boolean success=false;
+ String key=request.getKey();
+ Sync sync=null;
+ try {
+ if (_lockLog.isTraceEnabled()) _lockLog.trace("Partition - acquiring: "+key+ " ["+Thread.currentThread().getName()+"]"+" : "+sync);
+ sync=_config.getPMSyncs().acquire(key); // TODO - PMSyncs are actually WLocks on a given sessions location (partition entry) - itegrate
+ if (_lockLog.isTraceEnabled()) _lockLog.trace("Partition - acquired: "+key+ " ["+Thread.currentThread().getName()+"]"+" : "+sync);
+
+ synchronized (_map) {
+ if (!_map.containsKey(key)) {
+ _map.put(key, newDestination); // remember location of actual session...
+ success=true;
+ }
+ }
+ if (success) {
+ if (_log.isDebugEnabled()) _log.debug("insert: "+key+" {"+_config.getNodeName(newDestination)+"}");
+ } else {
+ if (_log.isWarnEnabled()) _log.warn("insert: {"+key+" {"+_config.getNodeName(newDestination)+"} failed - key already in use");
+ }
+
+ DIndexResponse response=new InsertPMToIM(success);
+ _config.getDispatcher().reply(message, response);
+ } finally {
+ Utils.release("Partition", key, sync);
+ }
+ }
+
+ // we probably do not need to take the Partition lock whilst we do this, but, for the moment, lets do everything logically, then optimise later...
+ public void onMessage(ObjectMessage message, DeleteIMToPM request) {
+ Destination oldDestination;
+ String key=request.getKey();
+ Sync sync=null;
+ try {
+ if (_lockLog.isTraceEnabled()) _lockLog.trace("Partition - acquiring: "+key+ " ["+Thread.currentThread().getName()+"]"+" : "+sync);
+ sync=_config.getPMSyncs().acquire(key); // TODO - PMSyncs are actually WLocks on a given sessions location (partition entry) - itegrate
+ if (_lockLog.isTraceEnabled()) _lockLog.trace("Partition - acquired: "+key+ " ["+Thread.currentThread().getName()+"]"+" : "+sync);
+
+ synchronized (_map) {
+ oldDestination=(Destination)_map.remove(key);
+ }
+ if (oldDestination==null) throw new IllegalStateException("session "+key+" is not known in this partition");
+ if (_log.isDebugEnabled()) _log.debug("delete: "+key+" {"+_config.getNodeName(oldDestination)+"}");
+ DIndexResponse response=new DeletePMToIM();
+ _config.getDispatcher().reply(message, response);
+ } finally {
+ Utils.release("Partition", key, sync);
+ }
+ }
+
+ public void onMessage(ObjectMessage message, DIndexRelocationRequest request) {
+ Destination newDestination=null;
+ try{newDestination=message.getJMSReplyTo();} catch (JMSException e) {_log.error("unexpected problem", e);}
+ Destination oldDestination=null;
+ synchronized (_map) {
+ oldDestination=(Destination)_map.put(request.getKey(), newDestination);
+ }
+ if (_log.isDebugEnabled()) _log.debug("relocation {"+request.getKey()+" : "+_config.getNodeName(oldDestination)+" -> "+_config.getNodeName(newDestination)+"}");
+ DIndexResponse response=new DIndexRelocationResponse();
+ _config.getDispatcher().reply(message, response);
+ }
+
+ public void onMessage(ObjectMessage message, DIndexForwardRequest request) {
+ // we have got to someone who actually knows where we want to go.
+ // strip off wrapper and deliver actual request to its final destination...
+ String name=request.getKey();
+ Destination destination=null;
+ synchronized (_map) {
+ destination=(Destination)_map.get(name);
+ }
+ if (destination==null) { // session could not be located...
+ DIndexRequest r=request.getRequest();
+ if (r instanceof RelocationRequest) {
+ assert message!=null;
+ assert name!=null;
+ assert _config!=null;
+ _config.getDispatcher().reply(message, new RelocationResponse(name));
+ } else {
+ if (_log.isWarnEnabled()) _log.warn("unexpected nested request structure - ignoring: " + r);
+ }
+ } else { // session succesfully located...
+ assert destination!=null;
+ assert request!=null;
+ assert _config!=null;
+ if (_log.isTraceEnabled()) _log.trace("directing: " + request + " -> " + _config.getNodeName(destination));
+ if (!_config.getDispatcher().forward(message, destination, request.getRequest()))
+ _log.warn("could not forward message");
+ }
+ }
+
+ // called on Partition Master
+ public void onMessage(ObjectMessage message1, MoveIMToPM request) {
+
+ // TODO - whilst we are in here, we should have a SHARED lock on this Partition, so it cannot be moved
+ // We should take an exclusive PM lock on the session ID, so no-one else can r/w its location whilst we are doing so.
+ // The Partitions lock should be held in the Facade, so that it can swap Partitions in and out whilst holding an exclusive lock
+ // Partition may only be migrated when exclusive lock has been taken, this may only happen when all shared locks are released - this implies that no PM session locks will be in place...
+
+ String key=request.getKey();
+ Dispatcher _dispatcher=_config.getDispatcher();
+ // what if we are NOT the PM anymore ?
+ // get write lock on location
+ //String nodeName=_config.getLocalNodeName();
+ Sync sync=null;
+ //String agent=null;
+ try {
+ Destination im=message1.getJMSReplyTo();
+ //agent=_config.getNodeName(im);
+
+ // PMSyncs should prevent _map entry from being messed with whilst we are messing with it - lock should be exclusive
+ // should synchronise map access - or is it ConcurrentHashMap ?
+ if (_lockLog.isTraceEnabled()) _lockLog.trace("Partition - acquiring: "+key+ " ["+Thread.currentThread().getName()+"]"+" : "+sync);
+ sync=_config.getPMSyncs().acquire(key); // TODO - PMSyncs are actually WLocks on a given sessions location (partition entry) - itegrate
+ if (_lockLog.isTraceEnabled()) _lockLog.trace("Partition - acquired: "+key+ " ["+Thread.currentThread().getName()+"]"+" : "+sync);
+
+ // exchange messages with StateMaster
+ Destination destination=(Destination)_map.get(key);
+
+ if (destination==null) {
+ // session does not exist - tell IM
+ _dispatcher.reply(message1,new MovePMToIM());
+ return;
+ }
+
+ if (destination.equals(im)) {
+ // whilst we were waiting for the partition lock, another thread migrated the session into our InvocationMaster...
+ // How can this happen - the first Thread should have been holding the InvocationLock...
+ _log.warn("IM REQUESTING RELOCATION IS ALREADY SM");
+ }
+
+
+ // session does exist - ask the SM to move it to the IM
+
+ // exchangeSendLoop GetPMToSM to SM
+ Destination pm=_dispatcher.getLocalDestination();
+ Destination sm=destination;
+ String poCorrelationId=null;
+ try {
+ poCorrelationId=_dispatcher.getOutgoingCorrelationId(message1);
+ //_log.info("Process Owner Correlation ID: "+poCorrelationId);
+ } catch (Exception e) {
+ _log.error("unexpected problem", e);
+ }
+
+ MovePMToSM request2=new MovePMToSM(key, im, pm, poCorrelationId);
+ ObjectMessage message2=_dispatcher.exchangeSend(pm, sm, request2, _config.getInactiveTime());
+ if (message2==null)
+ _log.error("NO RESPONSE WITHIN TIMEFRAME - PANIC!");
+
+ MoveSMToPM response=null; // the reply from the SM confirming successful move...
+ try {
+ response=(MoveSMToPM)message2.getObject();
+ } catch (JMSException e) {
+ _log.error("unexpected problem", e); // should be sorted in loop
+ }
+
+ if (response.getSuccess()) {
+ // alter location
+ Destination oldOwner=(Destination)_map.put(key, im); // The IM is now the SM
+ _log.debug("move: "+key+" {"+_config.getNodeName(oldOwner)+"->"+_config.getNodeName(im)+"}");
+ } else {
+ _log.warn("state relocation failed: "+key);
+ }
+ } catch (JMSException e) {
+ _log.error("could not read src address from incoming message");
+ }
+ finally {
+ Utils.release("Partition", key, sync);
+ }
+ }
+
+
+
+ public ObjectMessage exchange(DIndexRequest request, long timeout) throws Exception {
+ if (_log.isTraceEnabled()) _log.trace("local dispatch - needs optimisation");
+ Dispatcher dispatcher=_config.getDispatcher();
+ Destination from=dispatcher.getLocalDestination();
+ Destination to=from;
+ return dispatcher.exchangeSend(from, to, request, timeout);
+ }
+
+}
Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionFacade.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionFacade.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionFacade.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionFacade.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,215 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.dindex.impl;
+
+import javax.jms.Destination;
+import javax.jms.ObjectMessage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.wadi.dindex.DIndexRequest;
+import org.codehaus.wadi.dindex.Partition;
+import org.codehaus.wadi.dindex.PartitionConfig;
+import org.codehaus.wadi.dindex.messages.DIndexForwardRequest;
+import org.codehaus.wadi.dindex.messages.DIndexRelocationRequest;
+import org.codehaus.wadi.dindex.newmessages.DeleteIMToPM;
+import org.codehaus.wadi.dindex.newmessages.InsertIMToPM;
+import org.codehaus.wadi.dindex.newmessages.MoveIMToPM;
+import org.codehaus.wadi.gridstate.Dispatcher;
+
+import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
+import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
+import EDU.oswego.cs.dl.util.concurrent.Sync;
+import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;
+
+public class PartitionFacade extends AbstractPartition {
+
+
+ protected final ReadWriteLock _lock=new WriterPreferenceReadWriteLock();
+ protected final LinkedQueue _queue=new LinkedQueue();
+ protected final PartitionConfig _config;
+ protected final Log _log;
+
+ protected long _timeStamp;
+ protected Partition _content;
+
+ public PartitionFacade(int key, long timeStamp, Partition content, boolean queueing, PartitionConfig config) {
+ super(key);
+ _config=config;
+ _timeStamp=timeStamp;
+ _content=content;
+ _log=LogFactory.getLog(getClass().getName()+"#"+_key+"@"+_config.getLocalNodeName());
+ if (_log.isTraceEnabled()) _log.trace("initialising location to: "+_content);
+ }
+
+ public boolean isLocal() { // locking ?
+ Sync sync=_lock.writeLock(); // EXCLUSIVE
+ boolean acquired=false;
+ try {
+ sync.acquire();
+ acquired=true;
+ return _content.isLocal();
+ } catch (InterruptedException e) {
+ _log.warn("unexpected problem", e);
+ } finally {
+ if (acquired)
+ sync.release();
+ }
+ throw new UnsupportedOperationException();
+ }
+
+ public Partition getContent() {
+ Sync sync=_lock.writeLock(); // EXCLUSIVE
+ boolean acquired=false;
+ try {
+ sync.acquire();
+ acquired=true;
+ return _content;
+ } catch (InterruptedException e) {
+ _log.warn("unexpected problem", e);
+ } finally {
+ if (acquired)
+ sync.release();
+ }
+ throw new UnsupportedOperationException();
+ }
+
+ public void setContent(long timeStamp, Partition content) {
+ Sync sync=_lock.writeLock(); // EXCLUSIVE
+ boolean acquired=false;
+ try {
+ sync.acquire();
+ acquired=true;
+ if (timeStamp>_timeStamp) {
+ if (_log.isTraceEnabled()) _log.trace("["+_key+"] changing location from: "+_content+" to: "+content);
+ _timeStamp=timeStamp;
+ _content=content;
+ }
+
+ } catch (InterruptedException e) {
+ _log.warn("unexpected problem", e);
+ } finally {
+ if (acquired)
+ sync.release();
+ }
+ }
+
+ public void setContentRemote(long timeStamp, Dispatcher dispatcher, Destination location) {
+ Sync sync=_lock.writeLock(); // EXCLUSIVE
+ boolean acquired=false;
+ try {
+ sync.acquire();
+ acquired=true;
+ if (timeStamp>_timeStamp) {
+ _timeStamp=timeStamp;
+ if (_content instanceof RemotePartition) {
+ ((RemotePartition)_content).setLocation(location);
+ } else {
+ if (_log.isTraceEnabled()) _log.trace("["+_key+"] changing location from: "+_content+" to: "+location);
+ _content=new RemotePartition(_key, _config, location);
+ }
+ }
+ } catch (InterruptedException e) {
+ _log.warn("unexpected problem", e);
+ } finally {
+ if (acquired)
+ sync.release();
+ }
+ }
+
+ public ObjectMessage exchange(DIndexRequest request, long timeout) throws Exception {
+ return _content.exchange(request, timeout);
+ }
+
+ public void onMessage(ObjectMessage message, InsertIMToPM request) {
+ if (_log.isTraceEnabled()) _log.trace("dispatching: "+request+" on "+_content);
+ Sync sync=_lock.readLock(); // SHARED
+ boolean acquired=false;
+ try {
+ sync.acquire();
+ acquired=true;
+ _content.onMessage(message, request);
+ } catch (InterruptedException e) {
+ _log.warn("unexpected problem", e);
+ } finally {
+ if (acquired)
+ sync.release();
+ }
+ }
+
+ public void onMessage(ObjectMessage message, DeleteIMToPM request) {
+ Sync sync=_lock.readLock(); // SHARED
+ boolean acquired=false;
+ try {
+ sync.acquire();
+ acquired=true;
+ _content.onMessage(message, request);
+ } catch (InterruptedException e) {
+ _log.warn("unexpected problem", e);
+ } finally {
+ if (acquired)
+ sync.release();
+ }
+ }
+
+ public void onMessage(ObjectMessage message, DIndexRelocationRequest request) {
+ Sync sync=_lock.readLock(); // SHARED
+ boolean acquired=false;
+ try {
+ sync.acquire();
+ acquired=true;
+ _content.onMessage(message, request);
+ } catch (InterruptedException e) {
+ _log.warn("unexpected problem", e);
+ } finally {
+ if (acquired)
+ sync.release();
+ }
+ }
+
+ // should superceded above method
+ public void onMessage(ObjectMessage message, MoveIMToPM request) {
+ Sync sync=_lock.readLock(); // SHARED
+ boolean acquired=false;
+ try {
+ sync.acquire();
+ acquired=true;
+ _content.onMessage(message, request);
+ } catch (InterruptedException e) {
+ _log.warn("unexpected problem", e);
+ } finally {
+ if (acquired)
+ sync.release();
+ }
+ }
+
+ public void onMessage(ObjectMessage message, DIndexForwardRequest request) {
+ Sync sync=_lock.readLock(); // SHARED
+ boolean acquired=false;
+ try {
+ sync.acquire();
+ acquired=true;
+ _content.onMessage(message, request);
+ } catch (InterruptedException e) {
+ _log.warn("unexpected problem", e);
+ } finally {
+ if (acquired)
+ sync.release();
+ }
+ }
+
+}
Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionKeys.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionKeys.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionKeys.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionKeys.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,83 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.dindex.impl;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+import org.codehaus.wadi.dindex.Partition;
+
+public class PartitionKeys implements Serializable {
+
+ protected int[] _keys;
+
+ public PartitionKeys(PartitionFacade[] partitions) {
+ ArrayList list=new ArrayList(partitions.length);
+ for (int i=0; i<partitions.length; i++) {
+ Partition partition=partitions[i];
+ if (partition.isLocal())
+ list.add(new Integer(partition.getKey()));
+ }
+ _keys=new int[list.size()];
+ for (int i=0; i<_keys.length; i++)
+ _keys[i]=((Integer)list.get(i)).intValue();
+ }
+
+ protected PartitionKeys() {
+ // for deserialisation...
+ }
+
+ public boolean equals(Object obj) {
+ if (obj==this)
+ return true;
+
+ if (! (obj instanceof PartitionKeys))
+ return false;
+
+ PartitionKeys that=(PartitionKeys)obj;
+
+ if (this._keys.length!=that._keys.length)
+ return false;
+
+ for (int i=0; i<_keys.length; i++)
+ if (this._keys[i]!=that._keys[i])
+ return false;
+
+ return true;
+ }
+
+ public String toString() {
+ StringBuffer buffer=new StringBuffer();
+ buffer.append("{");
+ for (int i=0;i<_keys.length; i++) {
+ if (i!=0)
+ buffer.append(",");
+ buffer.append(_keys[i]);
+ }
+ buffer.append("}");
+ return buffer.toString();
+ }
+
+ public int size() {
+ return _keys.length;
+ }
+
+ public int[] getKeys() {
+ return _keys;
+ }
+
+}
Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionOwner.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionOwner.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionOwner.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionOwner.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,33 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.dindex.impl;
+
+import org.activecluster.Node;
+
+public class PartitionOwner {
+
+ protected Node _node;
+ protected int _deviation;
+ protected boolean _leaving;
+
+ public PartitionOwner(Node node, int deviation, boolean leaving) {
+ _node=node;
+ _deviation=deviation;
+ _leaving=leaving;
+ }
+
+}
\ No newline at end of file
Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionOwnerGreaterThanComparator.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionOwnerGreaterThanComparator.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionOwnerGreaterThanComparator.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionOwnerGreaterThanComparator.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,38 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.dindex.impl;
+
+import java.util.Comparator;
+
+
+public class PartitionOwnerGreaterThanComparator implements Comparator {
+
+ public int compare(Object o2, Object o1) {
+ PartitionOwner p1=(PartitionOwner)o1;
+ PartitionOwner p2=(PartitionOwner)o2;
+ int tmp=p1._deviation-p2._deviation;
+ if (tmp!=0)
+ return tmp;
+ else
+ return p1._node.getName().compareTo(p2._node.getName());
+ }
+
+ public boolean equals(Object obj) {
+ return obj==this || obj.getClass()==getClass();
+ }
+
+}
\ No newline at end of file
Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionOwnerLessThanComparator.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionOwnerLessThanComparator.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionOwnerLessThanComparator.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionOwnerLessThanComparator.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,38 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.dindex.impl;
+
+import java.util.Comparator;
+
+
+public class PartitionOwnerLessThanComparator implements Comparator {
+
+ public int compare(Object o1, Object o2) {
+ PartitionOwner p1=(PartitionOwner)o1;
+ PartitionOwner p2=(PartitionOwner)o2;
+ int tmp=p1._deviation-p2._deviation;
+ if (tmp!=0)
+ return tmp;
+ else
+ return p1._node.getName().compareTo(p2._node.getName());
+ }
+
+ public boolean equals(Object obj) {
+ return obj==this || obj.getClass()==getClass();
+ }
+
+}
\ No newline at end of file
Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionTransfer.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionTransfer.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionTransfer.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/PartitionTransfer.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,51 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.dindex.impl;
+
+import java.io.Serializable;
+
+import javax.jms.Destination;
+
+public class PartitionTransfer implements Serializable {
+
+ public Destination _destination;
+ public String _name; // TODO - only here for debugging...
+ public int _amount;
+
+ public PartitionTransfer(Destination destination, String name, int amount) {
+ _destination=destination;
+ _name=name;
+ _amount=amount;
+ }
+
+ protected PartitionTransfer() {
+ // for deserialisation...
+ }
+
+ public Destination getDestination() {
+ return _destination;
+ }
+
+ public int getAmount() {
+ return _amount;
+ }
+
+ public String toString() {
+ return "<transfer: "+_amount+"->"+_name+">";
+ }
+
+}
\ No newline at end of file
Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/RedistributionPlan.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/RedistributionPlan.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/RedistributionPlan.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/RedistributionPlan.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,100 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.dindex.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.ListIterator;
+
+import org.activecluster.Node;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class RedistributionPlan {
+
+ protected final Log _log=LogFactory.getLog(getClass());
+ protected final List _producers = new ArrayList();
+ protected final List _consumers = new ArrayList();
+
+ public RedistributionPlan(Node[] living, Node[] leaving, int totalNumPartitions) {
+ int numPartitionsPerNode=totalNumPartitions/living.length;
+
+ for (int i=0; i<leaving.length; i++) {
+ Node node=leaving[i];
+ int numPartitions=DIndex.getPartitionKeys(node).size();
+// _log.info("LEAVING: "+numPartitions);
+ if (numPartitions>0)
+ _producers.add(new PartitionOwner(node, numPartitions, true));
+ }
+
+ for (int i=0; i<living.length; i++) {
+ Node node=living[i];
+ int numPartitions=DIndex.getPartitionKeys(node).size();
+// _log.info("LIVING: "+numPartitions);
+ decide(node, numPartitions, numPartitionsPerNode, _producers, _consumers);
+ }
+
+ // sort lists...
+ Collections.sort(_producers, new PartitionOwnerGreaterThanComparator());
+ Collections.sort(_consumers, new PartitionOwnerLessThanComparator());
+
+ // account for uneven division of partitions...
+ int remainingPartitions=totalNumPartitions%living.length;
+
+ for (ListIterator i=_producers.listIterator(); remainingPartitions>0 && i.hasNext(); ) {
+ PartitionOwner p=(PartitionOwner)i.next();
+ if (!p._leaving) {
+ remainingPartitions--;
+ if ((--p._deviation)==0)
+ i.remove();
+ }
+ }
+
+ for (ListIterator i=_consumers.listIterator(); remainingPartitions>0 && i.hasNext(); ) {
+ PartitionOwner p=(PartitionOwner)i.next();
+ remainingPartitions--;
+ ++p._deviation;
+ }
+
+ assert remainingPartitions==0;
+ }
+
+ protected void decide(Node node, int numPartitions, int numPartitionsPerNode, Collection producers, Collection consumers) {
+ int deviation=numPartitions-numPartitionsPerNode;
+// _log.info("DEVIATION: "+deviation);
+ if (deviation>0) {
+ producers.add(new PartitionOwner(node, deviation, false));
+ return;
+ }
+ if (deviation<0) {
+ consumers.add(new PartitionOwner(node, -deviation, false));
+ return;
+ }
+ }
+
+ public Collection getProducers() {
+ return _producers;
+ }
+
+ public Collection getConsumers() {
+ return _consumers;
+ }
+
+
+}
Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/RemotePartition.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/RemotePartition.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/RemotePartition.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/RemotePartition.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,117 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.dindex.impl;
+
+import javax.jms.Destination;
+import javax.jms.ObjectMessage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.wadi.dindex.DIndexRequest;
+import org.codehaus.wadi.dindex.PartitionConfig;
+import org.codehaus.wadi.dindex.messages.DIndexForwardRequest;
+import org.codehaus.wadi.dindex.messages.DIndexRelocationRequest;
+import org.codehaus.wadi.dindex.newmessages.DeleteIMToPM;
+import org.codehaus.wadi.dindex.newmessages.InsertIMToPM;
+import org.codehaus.wadi.dindex.newmessages.MoveIMToPM;
+import org.codehaus.wadi.gridstate.Dispatcher;
+
+public class RemotePartition extends AbstractPartition {
+
+ protected transient Log _log;
+
+ protected final PartitionConfig _config;
+
+ protected Destination _location;
+
+ public RemotePartition(int key, PartitionConfig config, Destination location) {
+ super(key);
+ _config=config;
+ _location=location;
+ _log=LogFactory.getLog(getClass().getName()+"#"+_key+"@"+_config.getLocalNodeName());
+ }
+
+ public boolean isLocal() {
+ return false;
+ }
+
+ public Destination getDestination() {
+ return _location;
+ }
+
+ public void setLocation(Destination location) {
+ if (_location==null) {
+ if (location==null) {
+ // _location is already null
+ } else {
+ // they cannot be equal - update
+ if (_log.isTraceEnabled()) _log.trace("[" + _key + "] updating location from: " + _config.getNodeName(_location) + " to: " + _config.getNodeName(location));
+ _location=location;
+ }
+ } else {
+ if (_location.equals(location)) {
+ // no need to update
+ } else {
+ if (_log.isTraceEnabled()) _log.trace("[" + _key + "] updating location from: " + _config.getNodeName(_location) + " to: " + _config.getNodeName(location));
+ _location=location;
+ }
+ }
+ }
+
+ public String toString() {
+ return "<"+getClass()+":"+_key+"@"+_config.getLocalNodeName()+"->"+_config.getNodeName(_location)+">";
+ }
+
+ public void onMessage(ObjectMessage message, InsertIMToPM request) {
+ if (_log.isTraceEnabled()) _log.trace("#"+_key+" : forwarding: " + request + " from "+_config.getLocalNodeName()+" to " + _config.getNodeName(_location));
+ if (!_config.getDispatcher().forward(message, _location))
+ _log.warn("could not forward message");
+ }
+
+ public void onMessage(ObjectMessage message, DeleteIMToPM request) {
+ if (_log.isTraceEnabled()) _log.trace("indirecting: " + request + " via " + _config.getNodeName(_location));
+ if (!_config.getDispatcher().forward(message, _location))
+ _log.warn("could not forward message");
+ }
+
+ public void onMessage(ObjectMessage message, DIndexRelocationRequest request) {
+ if (_log.isTraceEnabled()) _log.trace("indirecting: " + request + " via " + _config.getNodeName(_location));
+ if (!_config.getDispatcher().forward(message, _location))
+ _log.warn("could not forward message");
+ }
+
+ public void onMessage(ObjectMessage message, DIndexForwardRequest request) {
+ if (_log.isTraceEnabled()) _log.trace("indirecting: " + request + " via " + _config.getNodeName(_location));
+ if (!_config.getDispatcher().forward(message, _location))
+ _log.warn("could not forward message");
+ }
+
+ public void onMessage(ObjectMessage message, MoveIMToPM request) {
+ if (_log.isWarnEnabled()) _log.warn(_config.getLocalNodeName()+": not Master of Partition["+_key+"] - forwarding message to "+_config.getNodeName(_location));
+ if (!_config.getDispatcher().forward(message, _location))
+ _log.warn("could not forward message");
+ }
+
+ public ObjectMessage exchange(DIndexRequest request, long timeout) throws Exception {
+ Dispatcher dispatcher=_config.getDispatcher();
+ Destination from=dispatcher.getLocalDestination();
+ Destination to=_location;
+ if (_log.isTraceEnabled()) _log.trace("exchanging message ("+request+") with node: "+_config.getNodeName(to)+" on "+Thread.currentThread().getName());
+ return dispatcher.exchangeSend(from, to, request, timeout);
+ }
+
+}
Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/SeniorityElectionStrategy.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/SeniorityElectionStrategy.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/SeniorityElectionStrategy.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/SeniorityElectionStrategy.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,46 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.dindex.impl;
+
+import java.util.Iterator;
+
+import org.activecluster.Cluster;
+import org.activecluster.Node;
+import org.activecluster.election.ElectionStrategy;
+
+public class SeniorityElectionStrategy implements ElectionStrategy {
+
+ public Node doElection(Cluster cluster) {
+ Node oldest=cluster.getLocalNode();
+ long earliest=getBirthTime(oldest);
+ for (Iterator i=cluster.getNodes().values().iterator(); i.hasNext();) {
+ Node candidate=(Node)i.next();
+ long birthTime=getBirthTime(candidate);
+ if (birthTime<earliest) {
+ earliest=birthTime;
+ oldest=candidate;
+ }
+ }
+
+ return oldest;
+ }
+
+ protected long getBirthTime(Node node) {
+ return ((Long)node.getState().get("birthTime")).longValue(); // TODO - unify state keys somewhere
+ }
+
+}
\ No newline at end of file
Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/SimplePartitionManager.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/SimplePartitionManager.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/SimplePartitionManager.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/SimplePartitionManager.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,504 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.dindex.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.ObjectMessage;
+
+import org.activecluster.Cluster;
+import org.activecluster.ClusterEvent;
+import org.activecluster.Node;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.wadi.dindex.Partition;
+import org.codehaus.wadi.dindex.PartitionConfig;
+import org.codehaus.wadi.dindex.PartitionManager;
+import org.codehaus.wadi.dindex.PartitionManagerConfig;
+import org.codehaus.wadi.dindex.messages.PartitionEvacuationRequest;
+import org.codehaus.wadi.dindex.messages.PartitionEvacuationResponse;
+import org.codehaus.wadi.dindex.messages.PartitionRepopulateRequest;
+import org.codehaus.wadi.dindex.messages.PartitionRepopulateResponse;
+import org.codehaus.wadi.dindex.messages.PartitionTransferAcknowledgement;
+import org.codehaus.wadi.dindex.messages.PartitionTransferCommand;
+import org.codehaus.wadi.dindex.messages.PartitionTransferRequest;
+import org.codehaus.wadi.dindex.messages.PartitionTransferResponse;
+import org.codehaus.wadi.gridstate.Dispatcher;
+import org.codehaus.wadi.gridstate.LockManager;
+import org.codehaus.wadi.gridstate.PartitionMapper;
+import org.codehaus.wadi.gridstate.activecluster.ActiveClusterDispatcher;
+import org.codehaus.wadi.gridstate.impl.StupidLockManager;
+import org.codehaus.wadi.impl.Quipu;
+
+/**
+ * A Simple PartitionManager.
+ *
+ * @author jules
+ *
+ */
+public class SimplePartitionManager implements PartitionManager, PartitionConfig {
+
+ public interface Callback {void onNodeRemoved(ClusterEvent event);}
+
+ protected final static String _partitionKeysKey="partitionKeys";
+ protected final static String _timeStampKey="timeStamp";
+ protected final static String _correlationIDMapKey="correlationIDMap";
+
+ protected final String _nodeName;
+ protected final Log _log;
+ protected final int _numPartitions;
+ protected final PartitionFacade[] _partitions;
+ protected final Cluster _cluster;
+ protected final Dispatcher _dispatcher;
+ protected final Map _distributedState;
+ protected final long _inactiveTime;
+ protected final boolean _allowRegenerationOfMissingPartitions = true;
+ protected final Callback _callback;
+ protected final PartitionMapper _mapper;
+ protected final LockManager _pmSyncs;
+
+ public SimplePartitionManager(Dispatcher dispatcher, int numPartitions, Map distributedState, Callback callback, PartitionMapper mapper) {
+ _dispatcher=dispatcher;
+ _nodeName=_dispatcher.getNodeName();
+ _pmSyncs=new StupidLockManager(_nodeName);
+ _log=LogFactory.getLog(getClass().getName()+"#"+_nodeName);
+ _numPartitions=numPartitions;
+
+ _partitions=new PartitionFacade[_numPartitions];
+ long timeStamp=System.currentTimeMillis();
+ boolean queueing=true;
+ for (int i=0; i<_numPartitions; i++)
+ _partitions[i]=new PartitionFacade(i, timeStamp, new DummyPartition(i), queueing, this);
+
+ _cluster=((ActiveClusterDispatcher)_dispatcher).getCluster();
+ _distributedState=distributedState;
+ _inactiveTime=_dispatcher.getInactiveTime();
+ _callback=callback;
+ _mapper=mapper;
+ }
+
+ protected PartitionManagerConfig _config;
+
+ public void init(PartitionManagerConfig config) {
+ _config=config;
+ _log.trace("init");
+ // attach relevant message handlers to dispatcher...
+ _dispatcher.register(this, "onPartitionTransferCommand", PartitionTransferCommand.class);
+ _dispatcher.register(PartitionTransferAcknowledgement.class, _inactiveTime);
+ _dispatcher.register(this, "onPartitionTransferRequest", PartitionTransferRequest.class);
+ _dispatcher.register(PartitionTransferResponse.class, _inactiveTime);
+ _dispatcher.register(this, "onPartitionEvacuationRequest", PartitionEvacuationRequest.class);
+ _dispatcher.register(PartitionEvacuationResponse.class, _inactiveTime);
+ _dispatcher.register(this, "onPartitionRepopulateRequest", PartitionRepopulateRequest.class);
+ _dispatcher.register(PartitionRepopulateResponse.class, _inactiveTime);
+ }
+
+ public void start() throws Exception {
+ _log.trace("starting...");
+ _log.trace("...started");
+ }
+
+ public void evacuate() throws Exception {
+ _log.info("evacuating...");
+
+ PartitionEvacuationRequest request=new PartitionEvacuationRequest();
+ Node localNode=_cluster.getLocalNode();
+ String correlationId=_cluster.getLocalNode().getName();
+ if (_log.isTraceEnabled()) _log.trace("evacuating partitions...: "+_dispatcher.getNodeName(localNode.getDestination())+" -> "+_config.getCoordinatorNode().getState().get("nodeName"));
+ while (_dispatcher.exchangeSend(localNode.getDestination(), _config.getCoordinatorNode().getDestination(), correlationId, request, _inactiveTime)==null) {
+ if (_log.isWarnEnabled()) _log.warn("could not contact Coordinator - backing off for "+ _inactiveTime+" millis...");
+ Thread.sleep(_config.getInactiveTime());
+ }
+
+ _log.info("...evacuated");
+ }
+
+ public void stop() throws Exception {
+ _log.info("stopping...");
+ // detach relevant message handlers from dispatcher...
+ _dispatcher.deregister("onPartitionTransferCommand", PartitionTransferCommand.class, 5000);
+ _dispatcher.deregister("onPartitionTransferRequest", PartitionTransferRequest.class, 5000);
+ _dispatcher.deregister("onPartitionEvacuationRequest", PartitionEvacuationRequest.class, 5000);
+ _dispatcher.deregister("onPartitionRepopulateRequest", PartitionRepopulateRequest.class, 5000);
+ _log.info("...stopped");
+ }
+
+ public PartitionFacade getPartition(int partition) {
+ return _partitions[partition];
+ }
+
+ // a node wants to shutdown...
+ public void onPartitionEvacuationRequest(ObjectMessage om, PartitionEvacuationRequest request) {
+ Node from;
+ try {
+ Destination destination=om.getJMSReplyTo();
+ Node local=_cluster.getLocalNode();
+ if (destination.equals(local.getDestination()))
+ from=local;
+ else
+ from=(Node)_cluster.getNodes().get(destination);
+ } catch (JMSException e) {
+ _log.warn("could not read src node from message", e);
+ from=null;
+ }
+
+ assert from!=null;
+ _callback.onNodeRemoved(new ClusterEvent(_cluster, from, ClusterEvent.REMOVE_NODE));
+ }
+
+ // a node wants to rebuild a lost partition
+ public void onPartitionRepopulateRequest(ObjectMessage om, PartitionRepopulateRequest request) {
+ int keys[]=request.getKeys();
+ if (_log.isTraceEnabled()) _log.trace("PartitionRepopulateRequest ARRIVED: " + keys);
+ Collection[] c=createResultSet(_numPartitions, keys);
+ try {
+ _log.trace("findRelevantSessionNames - starting");
+ _config.findRelevantSessionNames(_numPartitions, c);
+ _log.trace("findRelevantSessionNames - finished");
+ } catch (Throwable t) {
+ _log.warn("ERROR", t);
+ }
+ if (!_dispatcher.reply(om, new PartitionRepopulateResponse(c)))
+ _log.warn("unexpected problem responding to partition repopulation request");
+ }
+
+ // receive a command to transfer IndexPartitions to another node
+ // send them in a request, waiting for response
+ // send an acknowledgement to Coordinator who sent original command
+ public void onPartitionTransferCommand(ObjectMessage om, PartitionTransferCommand command) {
+ PartitionTransfer[] transfers=command.getTransfers();
+ for (int i=0; i<transfers.length; i++) {
+ PartitionTransfer transfer=transfers[i];
+ int amount=transfer.getAmount();
+ Destination destination=transfer.getDestination();
+
+ // acquire partitions for transfer...
+ LocalPartition[] acquired=null;
+ try {
+ Collection c=new ArrayList();
+ for (int j=0; j<_numPartitions && c.size()<amount; j++) {
+ PartitionFacade facade=_partitions[j];
+ if (facade.isLocal()) {
+ Partition partition=facade.getContent();
+ c.add(partition);
+ }
+ }
+ acquired=(LocalPartition[])c.toArray(new LocalPartition[c.size()]);
+ assert amount==acquired.length;
+
+ long timeStamp=System.currentTimeMillis();
+
+ // build request...
+ if (_log.isTraceEnabled()) _log.trace("local state (before giving): " + getPartitionKeys());
+ PartitionTransferRequest request=new PartitionTransferRequest(timeStamp, acquired);
+ // send it...
+ ObjectMessage om3=_dispatcher.exchangeSend(_dispatcher.getLocalDestination(), destination, request, _inactiveTime);
+ // process response...
+ if (om3!=null && ((PartitionTransferResponse)om3.getObject()).getSuccess()) {
+ for (int j=0; j<acquired.length; j++) {
+ PartitionFacade facade=null;
+ facade=_partitions[acquired[j].getKey()];
+ facade.setContentRemote(timeStamp, _dispatcher, destination); // TODO - should we use a more recent ts ?
+ }
+ if (_log.isDebugEnabled()) _log.debug("released "+acquired.length+" partition[s] to "+_dispatcher.getNodeName(destination));
+ } else {
+ _log.warn("transfer unsuccessful");
+ }
+ } catch (Throwable t) {
+ _log.warn("unexpected problem", t);
+ }
+ }
+ try {
+ PartitionKeys keys=getPartitionKeys();
+ _distributedState.put(_partitionKeysKey, keys);
+ _distributedState.put(_timeStampKey, new Long(System.currentTimeMillis()));
+ if (_log.isTraceEnabled()) _log.trace("local state (after giving): " + keys);
+ String correlationID=_dispatcher.getOutgoingCorrelationId(om);
+ if (_log.isTraceEnabled()) _log.trace("CORRELATIONID: " + correlationID);
+ Map correlationIDMap=(Map)_distributedState.get(_correlationIDMapKey);
+ Destination from=om.getJMSReplyTo();
+ correlationIDMap.put(from, correlationID);
+ _dispatcher.setDistributedState(_distributedState);
+ if (_log.isTraceEnabled()) _log.trace("distributed state updated: " + _dispatcher.getDistributedState());
+ correlateStateUpdate(_distributedState); // onStateUpdate() does not get called locally
+ correlationIDMap.remove(from);
+ // FIXME - RACE - between update of distributed state and ack - they should be one and the same thing...
+ //_dispatcher.reply(om, new PartitionTransferAcknowledgement(true)); // what if failure - TODO
+ } catch (Exception e) {
+ _log.warn("could not acknowledge safe transfer to Coordinator", e);
+ }
+ }
+
+ // receive a transfer of partitions
+ public synchronized void onPartitionTransferRequest(ObjectMessage om, PartitionTransferRequest request) {
+ long timeStamp=request.getTimeStamp();
+ LocalPartition[] partitions=request.getPartitions();
+ boolean success=false;
+ // read incoming data into our own local model
+ if (_log.isTraceEnabled()) _log.trace("local state (before receiving): " + getPartitionKeys());
+ for (int i=0; i<partitions.length; i++) {
+ LocalPartition partition=partitions[i];
+ partition.init(this);
+ PartitionFacade facade=getPartition(partition.getKey());
+ facade.setContent(timeStamp, partition);
+ }
+ success=true;
+ try {
+ PartitionKeys keys=getPartitionKeys();
+ _distributedState.put(_partitionKeysKey, keys);
+ _distributedState.put(_timeStampKey, new Long(System.currentTimeMillis()));
+ if (_log.isTraceEnabled()) _log.trace("local state (after receiving): " + keys);
+ _dispatcher.setDistributedState(_distributedState);
+ if (_log.isTraceEnabled()) _log.trace("distributed state updated: " + _dispatcher.getDistributedState());
+ } catch (Exception e) {
+ _log.error("could not update distributed state", e);
+ }
+ // acknowledge safe receipt to donor
+ if (_dispatcher.reply(om, new PartitionTransferResponse(success))) {
+ // unlock Partitions here... - TODO
+ try {
+ if (_log.isDebugEnabled()) _log.debug("acquired "+partitions.length+" partition[s] from "+_dispatcher.getNodeName(om.getJMSReplyTo()));
+ } catch (JMSException e) {
+ _log.warn("problem reading incoming message's source", e);
+ }
+ } else {
+ _log.warn("problem acknowledging reciept of IndexPartitions - donor may have died");
+ // chuck them... - TODO
+ }
+ }
+
+ // TODO - duplicate code (from DIndex)
+ public Collection[] createResultSet(int numPartitions, int[] keys) {
+ Collection[] c=new Collection[numPartitions];
+ for (int i=0; i<keys.length; i++)
+ c[keys[i]]=new ArrayList();
+ return c;
+ }
+
+ // ClusterListener
+
+ public void update(Node node) {
+ Map state=node.getState();
+ long timeStamp=((Long)state.get(_timeStampKey)).longValue();
+ PartitionKeys keys=(PartitionKeys)state.get(_partitionKeysKey);
+ Destination location=node.getDestination();
+ int[] k=keys._keys;
+ for (int i=0; i<k.length; i++) {
+ int key=k[i];
+ PartitionFacade facade=_partitions[key];
+ facade.setContentRemote(timeStamp, _dispatcher, location);
+ }
+ }
+
+
+ public void markExistingPartitions(Node[] nodes, boolean[] partitionIsPresent) {
+ for (int i=0; i<nodes.length; i++) {
+ Node node=nodes[i];
+ if (node!=null) {
+ PartitionKeys keys=DIndex.getPartitionKeys(node);
+ if (keys!=null) {
+ int[] k=keys.getKeys();
+ for (int j=0; j<k.length; j++) {
+ int index=k[j];
+ if (partitionIsPresent[index]) {
+ if (_log.isErrorEnabled()) _log.error("partition " + index + " found on more than one node");
+ } else {
+ partitionIsPresent[index]=true;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ public void regenerateMissingPartitions(Node[] living, Node[] leaving) {
+ boolean[] partitionIsPresent=new boolean[_numPartitions];
+ markExistingPartitions(living, partitionIsPresent);
+ markExistingPartitions(leaving, partitionIsPresent);
+ Collection missingPartitions=new ArrayList();
+ for (int i=0; i<partitionIsPresent.length; i++) {
+ if (!partitionIsPresent[i])
+ missingPartitions.add(new Integer(i));
+ }
+
+ int numKeys=missingPartitions.size();
+ if (numKeys>0) {
+ assert _allowRegenerationOfMissingPartitions;
+ // convert to int[]
+ int[] missingKeys=new int[numKeys];
+ int key=0;
+ for (Iterator i=missingPartitions.iterator(); i.hasNext(); )
+ missingKeys[key++]=((Integer)i.next()).intValue();
+
+ if (_log.isWarnEnabled()) _log.warn("RECREATING PARTITIONS...: " + missingPartitions);
+ long time=System.currentTimeMillis();
+ for (int i=0; i<missingKeys.length; i++) {
+ int k=missingKeys[i];
+ PartitionFacade facade=_partitions[k];
+ LocalPartition local=new LocalPartition(k);
+ local.init(this);
+ facade.setContent(time, local);
+ }
+ PartitionKeys newKeys=getPartitionKeys();
+ if (_log.isWarnEnabled()) _log.warn("REPOPULATING PARTITIONS...: " + missingPartitions);
+ String correlationId=_dispatcher.nextCorrelationId();
+ Quipu rv=_dispatcher.setRendezVous(correlationId, _dispatcher.getNumNodes()-1);
+ if (!_dispatcher.send(_dispatcher.getLocalDestination(), _dispatcher.getClusterDestination(), correlationId, new PartitionRepopulateRequest(missingKeys))) {
+ _log.error("unexpected problem repopulating lost index");
+ }
+
+ // whilst we are waiting for the other nodes to get back to us, figure out which relevant sessions
+ // we are carrying ourselves...
+ Collection[] c=createResultSet(_numPartitions, missingKeys);
+ _config.findRelevantSessionNames(_numPartitions, c);
+ repopulate(_dispatcher.getLocalDestination(), c);
+
+ //boolean success=false;
+ try {
+ /*success=*/rv.waitFor(_inactiveTime);
+ } catch (InterruptedException e) {
+ _log.warn("unexpected interruption", e);
+ }
+ Collection results=rv.getResults();
+
+ for (Iterator i=results.iterator(); i.hasNext(); ) {
+ ObjectMessage message=(ObjectMessage)i.next();
+ try {
+ Destination from=message.getJMSReplyTo();
+ PartitionRepopulateResponse response=(PartitionRepopulateResponse)message.getObject();
+ Collection[] relevantKeys=response.getKeys();
+
+ repopulate(from, relevantKeys);
+
+ } catch (JMSException e) {
+ _log.warn("unexpected problem interrogating response", e);
+ }
+ }
+
+ if (_log.isWarnEnabled()) _log.warn("...PARTITIONS REPOPULATED: " + missingPartitions);
+ // relayout dindex
+ _distributedState.put(_partitionKeysKey, newKeys);
+ try {
+ _dispatcher.setDistributedState(_distributedState);
+ if (_log.isTraceEnabled()) _log.trace("distributed state updated: " + _dispatcher.getDistributedState());
+ } catch (Exception e) {
+ _log.error("could not update distributed state", e);
+ }
+ }
+ }
+
+ public PartitionKeys getPartitionKeys() {
+ return new PartitionKeys(_partitions);
+ }
+
+ public void repopulate(Destination location, Collection[] keys) {
+ assert location!=null;
+ for (int i=0; i<_numPartitions; i++) {
+ Collection c=keys[i];
+ if (c!=null) {
+ PartitionFacade facade=_partitions[i];
+ LocalPartition local=(LocalPartition)facade.getContent();
+ for (Iterator j=c.iterator(); j.hasNext(); ) {
+ String name=(String)j.next();
+ local.put(name, location);
+ }
+ }
+ }
+ }
+
+ public void localise() {
+ if (_log.isDebugEnabled()) _log.debug("allocating " + _numPartitions + " partitions");
+ long timeStamp=System.currentTimeMillis();
+ for (int i=0; i<_numPartitions; i++) {
+ PartitionFacade facade=_partitions[i];
+ LocalPartition partition=new LocalPartition(i);
+ partition.init(this);
+ facade.setContent(timeStamp, partition);
+ }
+ }
+
+ // TODO - duplicate code - see DIndex...
+ protected void correlateStateUpdate(Map state) {
+ Map correlationIDMap=(Map)state.get(_correlationIDMapKey);
+ Destination local=_dispatcher.getLocalDestination();
+ String correlationID=(String)correlationIDMap.get(local);
+ if (correlationID!=null) {
+ Quipu rv=(Quipu)_dispatcher.getRendezVousMap().get(correlationID);
+ if (rv==null) {
+ if (_log.isWarnEnabled()) _log.warn("no one waiting for: " + correlationID);
+ } else {
+ if (_log.isTraceEnabled()) _log.trace("successful correlation: " + correlationID);
+ rv.putResult(state);
+ }
+ }
+ }
+
+// public void repopulatePartitions(Destination location, Collection[] keys) {
+// for (int i=0; i<keys.length; i++) {
+// Collection c=keys[i];
+// if (c!=null) {
+// for (Iterator j=c.iterator(); j.hasNext(); ) {
+// String key=(String)j.next();
+// LocalPartition partition=(LocalPartition)_partitions[i].getContent();
+// partition._map.put(key, location);
+// }
+// }
+// }
+// }
+
+ public int getNumPartitions() {
+ return _numPartitions;
+ }
+
+ public PartitionFacade getPartition(Object key) {
+ return _partitions[_mapper.map(key)];
+ }
+
+ // PartitionConfig API
+
+ public Dispatcher getDispatcher() {
+ return _dispatcher;
+ }
+
+ public Cluster getCluster() {
+ return _cluster;
+ }
+
+ public String getNodeName(Destination destination) {
+ return _dispatcher.getNodeName(destination);
+ }
+
+ public long getInactiveTime() {
+ return _inactiveTime;
+ }
+
+ // PartitionConfig API
+
+ public String getLocalNodeName() {
+ return _nodeName;
+ }
+
+ public LockManager getPMSyncs() {
+ return _pmSyncs;
+ }
+
+}
Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/SimpleStateManager.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/SimpleStateManager.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/SimpleStateManager.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/impl/SimpleStateManager.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,340 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.dindex.impl;
+
+import java.nio.ByteBuffer;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.ObjectMessage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.wadi.Immoter;
+import org.codehaus.wadi.InvocationContext;
+import org.codehaus.wadi.InvocationException;
+import org.codehaus.wadi.Location;
+import org.codehaus.wadi.Motable;
+import org.codehaus.wadi.dindex.StateManager;
+import org.codehaus.wadi.dindex.StateManagerConfig;
+import org.codehaus.wadi.dindex.messages.DIndexForwardRequest;
+import org.codehaus.wadi.dindex.messages.DIndexRelocationRequest;
+import org.codehaus.wadi.dindex.messages.DIndexRelocationResponse;
+import org.codehaus.wadi.dindex.newmessages.DeleteIMToPM;
+import org.codehaus.wadi.dindex.newmessages.DeletePMToIM;
+import org.codehaus.wadi.dindex.newmessages.InsertIMToPM;
+import org.codehaus.wadi.dindex.newmessages.InsertPMToIM;
+import org.codehaus.wadi.dindex.newmessages.MoveIMToPM;
+import org.codehaus.wadi.dindex.newmessages.MoveIMToSM;
+import org.codehaus.wadi.dindex.newmessages.MovePMToIM;
+import org.codehaus.wadi.dindex.newmessages.MovePMToSM;
+import org.codehaus.wadi.dindex.newmessages.MoveSMToIM;
+import org.codehaus.wadi.dindex.newmessages.MoveSMToPM;
+import org.codehaus.wadi.dindex.newmessages.ReleaseEntryRequest;
+import org.codehaus.wadi.dindex.newmessages.ReleaseEntryResponse;
+import org.codehaus.wadi.gridstate.Dispatcher;
+import org.codehaus.wadi.impl.AbstractMotable;
+import org.codehaus.wadi.impl.RankedRWLock;
+import org.codehaus.wadi.impl.SimpleMotable;
+import org.codehaus.wadi.impl.Utils;
+
+import EDU.oswego.cs.dl.util.concurrent.Sync;
+import EDU.oswego.cs.dl.util.concurrent.TimeoutException;
+
+public class SimpleStateManager implements StateManager {
+
+ protected final Log _lockLog=LogFactory.getLog("org.codehaus.wadi.LOCKS");
+ protected final Dispatcher _dispatcher;
+ protected final long _inactiveTime;
+ protected final int _resTimeout=500; // TODO - parameterise
+
+ protected StateManagerConfig _config;
+ protected Log _log=LogFactory.getLog(getClass());
+
+ public SimpleStateManager(Dispatcher dispatcher, long inactiveTime) {
+ super();
+ _dispatcher=dispatcher;
+ _inactiveTime=inactiveTime;
+ }
+
+ public void init(StateManagerConfig config) {
+ _config=config;
+ _log=LogFactory.getLog(getClass().getName()+"#"+_config.getLocalNodeName());
+ _dispatcher.register(this, "onDIndexInsertionRequest", InsertIMToPM.class);
+ _dispatcher.register(InsertPMToIM.class, _inactiveTime);
+ _dispatcher.register(this, "onDIndexDeletionRequest", DeleteIMToPM.class);
+ _dispatcher.register(DeletePMToIM.class, _inactiveTime);
+ _dispatcher.register(this, "onDIndexRelocationRequest", DIndexRelocationRequest.class);
+ _dispatcher.register(DIndexRelocationResponse.class, _inactiveTime);
+ _dispatcher.register(this, "onDIndexForwardRequest", DIndexForwardRequest.class);
+
+ // GridState - Relocate - 5 messages - IM->PM->SM->IM->SM->PM
+ _dispatcher.register(this, "onMessage", MoveIMToPM.class);
+ _dispatcher.register(this, "onMessage", MovePMToSM.class);
+ _dispatcher.register(MoveSMToIM.class, _inactiveTime);
+ _dispatcher.register(MoveIMToSM.class, _inactiveTime);
+ _dispatcher.register(MoveSMToPM.class, _inactiveTime);
+ // or possibly - IM->PM->IM (failure)
+ _dispatcher.register(MovePMToIM.class, _inactiveTime);
+ }
+
+ public void start() throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void stop() throws Exception {
+ _dispatcher.deregister("onDIndexInsertionRequest", InsertIMToPM.class, 5000);
+ _dispatcher.deregister("onDIndexDeletionRequest", DeleteIMToPM.class, 5000);
+ _dispatcher.deregister("onDIndexRelocationRequest", DIndexRelocationRequest.class, 5000);
+ _dispatcher.deregister("onDIndexForwardRequest", DIndexForwardRequest.class, 5000);
+ }
+
+
+ public void onDIndexInsertionRequest(ObjectMessage om, InsertIMToPM request) {
+ _config.getPartition(request.getKey()).onMessage(om, request);
+ }
+
+ public void onDIndexDeletionRequest(ObjectMessage om, DeleteIMToPM request) {
+ _config.getPartition(request.getKey()).onMessage(om, request);
+ }
+
+ public void onDIndexForwardRequest(ObjectMessage om, DIndexForwardRequest request) {
+ _config.getPartition(request.getKey()).onMessage(om, request);
+ }
+
+ public void onDIndexRelocationRequest(ObjectMessage om, DIndexRelocationRequest request) {
+ _config.getPartition(request.getKey()).onMessage(om, request);
+ }
+
+ public void onMessage(ObjectMessage message, MoveIMToPM request) {
+ _config.getPartition(request.getKey()).onMessage(message, request);
+ }
+
+ //----------------------------------------------------------------------------------------------------
+
+ class PMToIMEmotable extends AbstractMotable {
+
+ protected final String _name;
+ protected final String _tgtNodeName;
+ protected ObjectMessage _message1;
+ protected final MovePMToSM _get;
+
+ public PMToIMEmotable(String name, String nodeName, ObjectMessage message1, MovePMToSM get) {
+ _name=name;
+ _tgtNodeName=nodeName;
+ _message1=message1;
+ _get=get;
+ }
+ public byte[] getBodyAsByteArray() throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ public void setBodyAsByteArray(byte[] bytes) throws Exception {
+ Motable immotable=new SimpleMotable();
+ immotable.setBodyAsByteArray(bytes);
+
+ Object key=_get.getKey();
+ Dispatcher dispatcher=_config.getDispatcher();
+ long timeout=_config.getInactiveTime();
+ Destination sm=dispatcher.getLocalDestination();
+ Destination im=(Destination)_get.getIM();
+ MoveSMToIM request=new MoveSMToIM(key, bytes);
+ // send on state from StateMaster to InvocationMaster...
+ if (_log.isTraceEnabled()) _log.trace("exchanging MoveSMToIM between: "+_config.getNodeName(sm)+"->"+_config.getNodeName(im));
+ ObjectMessage message2=(ObjectMessage)dispatcher.exchangeSend(sm, im, request, timeout, _get.getIMCorrelationId());
+ // should receive response from IM confirming safe receipt...
+ if (message2==null) {
+ _log.error("NO REPLY RECEIVED FOR MESSAGE IN TIMEFRAME - PANIC!");
+ } else {
+// MoveIMToSM response=null;
+// try {
+// response=(MoveIMToSM)message2.getObject();
+// // acknowledge transfer completed to PartitionMaster, so it may unlock resource...
+ dispatcher.reply(_message1,new MoveSMToPM(true));
+// } catch (JMSException e) {
+// _log.error("unexpected problem", e);
+// }
+ }
+ }
+
+ public ByteBuffer getBodyAsByteBuffer() throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ public void setBodyAsByteBuffer(ByteBuffer body) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ /**
+ * We receive a RelocationRequest and pass a RelocationImmoter down the Contextualiser stack. The Session is passed to us
+ * through the Immoter and we pass it back to the Request-ing node...
+ *
+ * @author <a href="mailto:jules@coredevelopers.net">Jules Gosnell</a>
+ * @version $Revision: 1.21 $
+ */
+ class RelocationImmoter implements Immoter {
+ protected final Log _log=LogFactory.getLog(getClass());
+
+ protected final String _tgtNodeName;
+ protected ObjectMessage _message;
+ protected final MovePMToSM _request;
+
+ protected boolean _found=false;
+ protected Sync _invocationLock;
+
+ public RelocationImmoter(String nodeName, ObjectMessage message, MovePMToSM request) {
+ _tgtNodeName=nodeName;
+ _message=message;
+ _request=request;
+ }
+
+ public Motable nextMotable(String name, Motable emotable) {
+ return new PMToIMEmotable(name, _tgtNodeName, _message, _request);
+ }
+
+ public boolean prepare(String name, Motable emotable, Motable immotable) {
+ // work is done in ClusterEmotable...
+ // take invocation lock
+ //boolean needsRelease=false;
+ _invocationLock=_config.getInvocationLock(name);
+ try {
+ Utils.acquireUninterrupted("Invocation", name, _invocationLock);
+ //needsRelease=true;
+ } catch (TimeoutException e) {
+ _log.error("unexpected timeout - proceding without lock", e);
+ }
+ return true;
+ }
+
+ public void commit(String name, Motable immotable) {
+ // do nothing
+ // release invocation lock
+ _found=true;
+ Utils.release("Invocation", name, _invocationLock);
+ }
+
+ public void rollback(String name, Motable immotable) {
+ // this probably has to by NYI... - nasty...
+ }
+
+ public boolean contextualise(InvocationContext invocationContext, String id, Motable immotable, Sync motionLock) throws InvocationException {
+ return false;
+ }
+
+ public String getInfo() {
+ return "emigration:"+_tgtNodeName;
+ }
+
+ public boolean getFound() {
+ return _found;
+ }
+
+ }
+
+ //--------------------------------------------------------------------------------------
+
+ // called on State Master...
+ public void onMessage(ObjectMessage message1, MovePMToSM request) {
+ // DO NOT Dispatch onto Partition - deal with it here...
+ Object key=request.getKey();
+ //String nodeName=_config.getLocalNodeName();
+ try {
+ RankedRWLock.setPriority(RankedRWLock.EMIGRATION_PRIORITY);
+
+ // Tricky - we need to call a Moter at this point and start removal of State to other node...
+
+ try {
+ Destination im=(Destination)request.getIM();
+ String imName=_config.getNodeName(im);
+ RelocationImmoter promoter=new RelocationImmoter(imName, message1, request);
+ //boolean found=
+ _config.contextualise(null, (String)key, promoter, null, true); // if we own session, this will send the correct response...
+ if (!promoter.getFound()) {
+ _log.warn("state not found - perhaps it has just been destroyed: "+key);
+ MoveSMToIM req=new MoveSMToIM(key, null);
+ // send on null state from StateMaster to InvocationMaster...
+ Destination sm=_dispatcher.getLocalDestination();
+ long timeout=_config.getInactiveTime();
+ _log.info("sending 0 bytes to : "+imName);
+ ObjectMessage ignore=(ObjectMessage)_dispatcher.exchangeSend(sm, im, req, timeout, request.getIMCorrelationId());
+ _log.info("received: "+ignore);
+ // StateMaster replies to PartitionMaster indicating failure...
+ _log.info("reporting failure to PM");
+ _dispatcher.reply(message1,new MoveSMToPM(false));
+ }
+ } catch (Exception e) {
+ if (_log.isWarnEnabled()) _log.warn("problem handling relocation request: "+key, e);
+ } finally {
+ RankedRWLock.setPriority(RankedRWLock.NO_PRIORITY);
+ }
+ } finally {
+ }
+ }
+
+ // evacuation protocol
+
+ public boolean offerEmigrant(String key, Motable emotable, long timeout) {
+ Destination to=((RemotePartition)_config.getPartition(key).getContent()).getDestination(); // TODO - HACK - temporary
+ Destination from=_dispatcher.getLocalDestination();
+ ReleaseEntryRequest request=new ReleaseEntryRequest(emotable);
+ ObjectMessage message=_dispatcher.exchangeSend(from, to, request, timeout);
+ ReleaseEntryResponse ack=null;
+ try {
+ ack=message==null?null:(ReleaseEntryResponse)message.getObject();
+ } catch (JMSException e) {
+ _log.error("could not unpack response", e);
+ }
+
+ if (ack==null) {
+ if (_log.isWarnEnabled()) _log.warn("no acknowledgement within timeframe ("+timeout+" millis): "+key);
+ return false;
+ } else {
+ if (_log.isTraceEnabled()) _log.trace("received acknowledgement within timeframe ("+timeout+" millis): "+key);
+ return true;
+ }
+ }
+
+ public void acceptImmigrant(ObjectMessage message, Location location, String name, Motable motable) {
+ if (!_dispatcher.reply(message, new ReleaseEntryResponse(name, location))) {
+ if (_log.isErrorEnabled()) _log.error("could not acknowledge safe receipt: "+name);
+ }
+ }
+
+ protected ImmigrationListener _listener;
+
+ public void setImmigrationListener(ImmigrationListener listener) {
+ _dispatcher.register(this, "onEmigrationRequest", ReleaseEntryRequest.class);
+ _dispatcher.register(ReleaseEntryResponse.class, _resTimeout);
+ _listener=listener;
+ }
+
+ public void unsetImmigrationListener(ImmigrationListener listener) {
+ if (_listener==listener) {
+ _listener=null;
+ // TODO ...
+ //_dispatcher.deregister("onEmigrationRequest", EmigrationRequest.class, _resTimeout);
+ //_dispatcher.deregister("onEmigrationResponse", EmigrationResponse.class, _resTimeout);
+ }
+ }
+
+ public void onEmigrationRequest(ObjectMessage message, ReleaseEntryRequest request) {
+ _listener.onImmigration(message, request.getMotable());
+ }
+
+}
Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/DIndexForwardRequest.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/DIndexForwardRequest.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/DIndexForwardRequest.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/DIndexForwardRequest.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,50 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.dindex.messages;
+
+import java.io.Serializable;
+
+import org.codehaus.wadi.dindex.DIndexRequest;
+
+public class DIndexForwardRequest implements DIndexRequest, Serializable {
+
+ protected DIndexRequest _request;
+
+ public DIndexForwardRequest(DIndexRequest request) {
+ _request=request;
+ }
+
+ protected DIndexForwardRequest() {
+ // for deserialisation...
+ }
+
+ public DIndexRequest getRequest() {
+ return _request;
+ }
+
+ public int getPartitionKey(int numPartitions) {
+ return _request.getPartitionKey(numPartitions);
+ }
+
+ public String getKey() {
+ return _request.getKey();
+ }
+
+ public String toString() {
+ return "["+_request.toString()+"]";
+ }
+}
Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/DIndexRelocationRequest.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/DIndexRelocationRequest.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/DIndexRelocationRequest.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/DIndexRelocationRequest.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,33 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.dindex.messages;
+
+import java.io.Serializable;
+
+import org.codehaus.wadi.dindex.impl.AbstractDIndexRequest;
+
+public class DIndexRelocationRequest extends AbstractDIndexRequest implements Serializable {
+
+ public DIndexRelocationRequest(String name) {
+ super(name);
+ }
+
+ public String toString() {
+ return "<DIndexRelocationRequest: "+_key+">";
+ }
+
+}
Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/DIndexRelocationResponse.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/DIndexRelocationResponse.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/DIndexRelocationResponse.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/DIndexRelocationResponse.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,34 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.dindex.messages;
+
+import java.io.Serializable;
+
+import org.codehaus.wadi.dindex.DIndexResponse;
+
+public class DIndexRelocationResponse implements DIndexResponse, Serializable {
+
+ public DIndexRelocationResponse() {
+ super();
+ // TODO Auto-generated constructor stub
+ }
+
+ public String toString() {
+ return "<DIndexRelocationResponse>";
+ }
+
+}
Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/PartitionEvacuationRequest.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/PartitionEvacuationRequest.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/PartitionEvacuationRequest.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/PartitionEvacuationRequest.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,34 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.dindex.messages;
+
+import java.io.Serializable;
+
+import org.codehaus.wadi.OldMessage;
+
+public class PartitionEvacuationRequest implements OldMessage, Serializable {
+
+ public PartitionEvacuationRequest() {
+ super();
+ }
+
+
+ public String toString() {
+ return "<PartitionEvacuationRequest>";
+ }
+
+}
Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/PartitionEvacuationResponse.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/PartitionEvacuationResponse.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/PartitionEvacuationResponse.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/PartitionEvacuationResponse.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,34 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.dindex.messages;
+
+import java.io.Serializable;
+
+import org.codehaus.wadi.OldMessage;
+
+public class PartitionEvacuationResponse implements OldMessage, Serializable {
+
+ public PartitionEvacuationResponse() {
+ super();
+ // TODO Auto-generated constructor stub
+ }
+
+
+ public String toString() {
+ return "<PartitionEvacuationResponse>";
+ }
+}
Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/PartitionRepopulateRequest.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/PartitionRepopulateRequest.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/PartitionRepopulateRequest.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/PartitionRepopulateRequest.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,44 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.dindex.messages;
+
+import java.io.Serializable;
+
+import org.codehaus.wadi.OldMessage;
+
+public class PartitionRepopulateRequest implements OldMessage, Serializable {
+
+ protected int[] _keys;
+
+ public PartitionRepopulateRequest(int[] keys) {
+ super();
+ _keys=keys;
+ }
+
+ protected PartitionRepopulateRequest() {
+ // for deserialisation...
+ }
+
+ public int[] getKeys() {
+ return _keys;
+ }
+
+
+ public String toString() {
+ return "<PartitionRepopulateRequest"+_keys+">";
+ }
+}
Added: incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/PartitionRepopulateResponse.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/PartitionRepopulateResponse.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/PartitionRepopulateResponse.java (added)
+++ incubator/wadi/trunk/modules/core/src/main/java/org/codehaus/wadi/dindex/messages/PartitionRepopulateResponse.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,52 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.dindex.messages;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+import org.codehaus.wadi.OldMessage;
+
+public class PartitionRepopulateResponse implements OldMessage, Serializable {
+
+ protected Collection[] _keys;
+
+ public PartitionRepopulateResponse(Collection[] keys) {
+ super();
+ _keys=keys;
+ }
+
+ protected PartitionRepopulateResponse() {
+ // for deserialisation
+ }
+
+ public Collection[] getKeys() {
+ return _keys;
+ }
+
+
+ public String toString() {
+ StringBuffer buffer=new StringBuffer("<PartitionRepopulateResponse: ");
+ for (int i=0; i<_keys.length; i++) {
+ Collection c=_keys[i];
+ if (c!=null)
+ buffer.append(""+i+":"+c.toString()+", ");
+ }
+ buffer.append(">");
+ return buffer.toString();
+ }
+}