You are viewing a plain text version of this content. The canonical link for it is here.
Posted to wadi-commits@incubator.apache.org by bd...@apache.org on 2005/12/14 23:36:16 UTC
svn commit: r356933 [15/35] - in /incubator/wadi/trunk: ./ etc/ modules/
modules/assembly/ modules/assembly/src/ modules/assembly/src/bin/
modules/assembly/src/conf/ modules/assembly/src/main/
modules/assembly/src/main/assembly/ modules/core/ modules/c...
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/WholeAttributes.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/WholeAttributes.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/WholeAttributes.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/WholeAttributes.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,205 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.servlet.http.HttpSessionActivationListener;
+import javax.servlet.http.HttpSessionEvent;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.wadi.Dirtier;
+
+// TODO - consider mode where rep is shifted from byte->Object->byte for the duration of any change
+// this would be useful for testing that apps were distributable
+
+public class WholeAttributes implements Attributes {
+ protected static final Log _log = LogFactory.getLog(WholeAttributes.class);
+
+ protected final Dirtier _dirtier;
+ protected final Streamer _streamer;
+ protected final boolean _evictObjectRepASAP;
+ protected final boolean _evictByteRepASAP;
+
+ protected final Map _objectRep;
+ protected byte[] _byteRep;
+
+ protected boolean _objectRepValid;
+ protected boolean _hasListeners;
+
+ public WholeAttributes(Dirtier dirtier, Streamer streamer, boolean evictObjectRepASAP, boolean evictByteRepASAP) {
+ _dirtier=dirtier;
+ _streamer=streamer;
+ _evictObjectRepASAP=evictObjectRepASAP;
+ _evictByteRepASAP=evictByteRepASAP;
+
+ _objectRep=new HashMap();
+ _objectRepValid=true;
+ _byteRep=null;
+ }
+
+ protected synchronized Map getObjectRep() {
+ if (!_objectRepValid) {
+ List activationListeners=new ArrayList();
+ // convert byte[] to Object rep
+ try {
+ ByteArrayInputStream bais=new ByteArrayInputStream(_byteRep);
+ ObjectInput oi=_streamer.getInputStream(bais); // TODO - ClassLoading ?
+ // read number of attributes - there doesn't seem to be a way to presize an existing Map :-(
+ int size=oi.readInt();
+ // for each attribute:
+ for (int i=0; i<size; i++) {
+ // read key
+ Object key=oi.readObject();
+ // read val
+ Object val=oi.readObject();
+ // if it is an activation listener, call didActivate() on it
+ if (val instanceof HttpSessionActivationListener) {
+ activationListeners.add(val);
+ }
+ // if it is the wrapper class - use this to get the real object
+ // TODO - use wrapper to reincarnate val
+ _objectRep.put(key, val);
+ }
+ oi.close();
+ _objectRepValid=true;
+ if (_evictByteRepASAP) _byteRep=null;
+ // call activationListeners, now that we have a complete session...
+ int l=activationListeners.size();
+ for (int i=0; i<l; i++)
+ ((HttpSessionActivationListener)activationListeners.get(i)).sessionDidActivate(_event);
+ } catch (Exception e) {
+ _log.error("unexpected problem converting byte[] to Attributes", e);
+ }
+ }
+ return _objectRep;
+ }
+
+ synchronized byte[] getByteRep() {
+ if (null==_byteRep) {
+ // convert Object to byte[] rep
+ try {
+ ByteArrayOutputStream baos=new ByteArrayOutputStream();
+ ObjectOutput oo=_streamer.getOutputStream(baos);
+ // write the number of attributes
+ oo.writeInt(_objectRep.size());
+ // for each attribute :
+ for (Iterator i=_objectRep.entrySet().iterator(); i.hasNext();) {
+ Map.Entry e=(Map.Entry)i.next();
+ // write it's key
+ oo.writeObject(e.getKey());
+ // if it is an activation listener, call willPassivate()
+ Object val=e.getValue();
+ // if it is either an Activation or Binding listener, set a flag
+ if (val instanceof HttpSessionActivationListener) {
+ ((HttpSessionActivationListener)val).sessionWillPassivate(_event);
+ _hasListeners=true; // the whole session will need deserialising if it times out on e.g. disc...
+ }
+ // if it is a known non-serialisable type, do the right thing with a serialisable wrapper class
+ // TODO - use a configurable table of wrapper types...
+ // write it out
+ oo.writeObject(val);
+ }
+ // don't write the flag - this will form part of our containing session's metadata...
+ // extract resulting byte[]...
+ oo.close();
+ _byteRep=baos.toByteArray();
+ if (_evictObjectRepASAP) {
+ _objectRep.clear();
+ _objectRepValid=false;
+ }
+ } catch (Exception e) {
+ _log.error("unexpected problem converting Attributes to byte[]", e);
+ }
+ }
+ return _byteRep;
+ }
+
+ public Object get(Object key) {
+ Object tmp=getObjectRep().get(key);
+ if (tmp!=null && _dirtier.readAccess()) _byteRep=null;
+ return tmp;
+ }
+
+ public Object remove(Object key) {
+ Object tmp=getObjectRep().remove(key);
+ if (tmp!=null && _dirtier.writeAccess()) _byteRep=null;
+ return tmp;
+ }
+
+ public Object put(Object key, Object value) {
+ Object tmp=getObjectRep().put(key, value);
+ if (_dirtier.writeAccess()) _byteRep=null; // no need to check for null value - this would become a remove()
+ return tmp;
+ }
+
+ public int size() {
+ return getObjectRep().size();
+ }
+
+ public Set keySet() {
+ return getObjectRep().keySet();
+ }
+
+ public byte[] getBytes() {
+ return getByteRep();
+ }
+
+ public synchronized void setBytes(byte[] bytes) {
+ _objectRep.clear();
+ _objectRepValid=false;
+ _byteRep=bytes;
+ }
+
+ public synchronized void clear() {
+ _objectRep.clear();
+ _objectRepValid=false;
+ _byteRep=null;
+ }
+
+ protected HttpSessionEvent _event;
+ public void setHttpSessionEvent(HttpSessionEvent event) {_event=event;}
+
+ public Set getBindingListenerNames() {return null;} // NYI
+ public Set getActivationListenerNames() {return null;} // NYI
+
+ public HttpSessionEvent getHttpSessionEvent() {return null;} // NYI
+ public Streamer getStreamer() {return _streamer;}
+ public Dirtier getDirtier() {return _dirtier;}
+
+ // NYI
+ public void readContent(ObjectInput oi) throws IOException, ClassNotFoundException {
+ throw new NotSerializableException();
+ }
+
+ public void writeContent(ObjectOutput oo) throws IOException {
+ throw new NotSerializableException();
+ }
+
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/WholeAttributesFactory.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/WholeAttributesFactory.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/WholeAttributesFactory.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/WholeAttributesFactory.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,46 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi;
+
+import org.codehaus.wadi.Dirtier;
+
+/**
+ * TODO - JavaDoc this type
+ *
+ * @author <a href="mailto:jules@coredevelopers.net">Jules Gosnell</a>
+ * @version $Revision: 1.1 $
+ */
+
+public class WholeAttributesFactory implements AttributesFactory {
+
+ protected final Dirtier _dirtier;
+ protected final Streamer _streamer;
+ protected final boolean _evictObjectRepASAP;
+ protected final boolean _evictByteRepASAP;
+
+ public WholeAttributesFactory(Dirtier dirtier, Streamer streamer, boolean evictObjectRepASAP, boolean evictByteRepASAP) {
+ _dirtier=dirtier;
+ _streamer=streamer;
+ _evictObjectRepASAP=evictObjectRepASAP;
+ _evictByteRepASAP=evictByteRepASAP;
+ }
+
+ public Attributes create(AttributesConfig config) {
+ return new WholeAttributes(_dirtier, _streamer, _evictObjectRepASAP, _evictByteRepASAP); // FIXME
+ }
+
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/WriteDirtier.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/WriteDirtier.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/WriteDirtier.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/WriteDirtier.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,33 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi;
+
+import org.codehaus.wadi.Dirtier;
+
+/**
+ * TODO - JavaDoc this type
+ *
+ * @author <a href="mailto:jules@coredevelopers.net">Jules Gosnell</a>
+ * @version $Revision: 1.1 $
+ */
+
+public class WriteDirtier implements Dirtier {
+
+ public boolean readAccess() {return false;}
+ public boolean writeAccess() {return true;}
+
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/gridstate/TestLockManager.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/gridstate/TestLockManager.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/gridstate/TestLockManager.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/gridstate/TestLockManager.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,86 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.gridstate;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.wadi.gridstate.LockManager;
+import org.codehaus.wadi.gridstate.impl.SmartLockManager;
+import org.codehaus.wadi.gridstate.impl.StupidLockManager;
+
+import EDU.oswego.cs.dl.util.concurrent.Sync;
+import junit.framework.TestCase;
+
+public class TestLockManager extends TestCase {
+
+ protected final Log _log=LogFactory.getLog(getClass());
+
+ protected final int _numThreads=1000;
+ protected final int _numIterations=100;
+ protected final Thread[] _threads=new Thread[_numThreads];
+
+ public static void main(String[] args) {
+ }
+
+ public TestLockManager(String arg0) {
+ super(arg0);
+ }
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ }
+
+ class TestThread implements Runnable {
+
+ protected final Object _key;
+ protected final LockManager _manager;
+
+ TestThread(String key, LockManager manager) {
+ _key=key;
+ _manager=manager;
+ }
+
+ public void run() {
+ for (int i=0; i<_numIterations; i++) {
+ Sync sync=_manager.acquire(_key);
+ sync.release();
+ }
+ }
+ }
+
+ public void testLockManagers() throws Exception {
+ run(new SmartLockManager(""));
+ run(new StupidLockManager(""));
+ }
+
+ protected void run(LockManager lm) throws Exception {
+ _log.info("starting: " + lm);
+ String key="abc";
+ for (int i=0; i<_numThreads; i++)
+ (_threads[i]=new Thread(new TestThread(key, lm), "TestThread-"+i)).start();
+ for (int i=0; i<_numThreads; i++) {
+ _threads[i].join();
+ _threads[i]=null;
+ }
+ if (_log.isInfoEnabled()) _log.info("finished: " + lm);
+ }
+
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/old/test/ClusterDemo.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/old/test/ClusterDemo.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/old/test/ClusterDemo.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/old/test/ClusterDemo.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,125 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.codehaus.wadi.old.test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.JMSException;
+
+import org.activecluster.Cluster;
+import org.activecluster.ClusterException;
+import org.activecluster.ClusterFactory;
+import org.activecluster.impl.DefaultClusterFactory;
+import org.activemq.ActiveMQConnectionFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.wadi.impl.Utils;
+import org.codehaus.wadi.sandbox.cluster.NChooseKTopologyStrategy;
+import org.codehaus.wadi.sandbox.cluster.TopologyStrategy;
+
+// originally based on James' ClusterDemo from activecluster...
+
+/**
+ * @version $Revision: 1.2 $
+ */
+public class
+ ClusterDemo
+{
+ protected Cluster _cluster;
+ protected ActiveMQConnectionFactory _connFactory=Utils.getConnectionFactory();
+ protected String _nodeId;
+ protected TopologyStrategy _topology;
+ protected int _cellSize=2;
+
+ public
+ ClusterDemo(String id, int cellSize)
+ {
+ _nodeId=id;
+ _cellSize=cellSize;
+ }
+
+ protected void
+ start()
+ throws JMSException, ClusterException
+ {
+ String clusterId="ORG.CODEHAUS.WADI.TEST.CLUSTER";
+ ClusterFactory factory = new DefaultClusterFactory(_connFactory);
+ // factory.setInactiveTime(20000); // 20 secs ?
+ _cluster= factory.createCluster(clusterId);
+ Map state=new HashMap();
+ state.put("id", _nodeId);
+ _cluster.getLocalNode().setState(state);
+ _topology=new NChooseKTopologyStrategy(_nodeId, clusterId, _cluster, factory, _cellSize);
+ //_topology=new RingTopologyStrategy(_nodeId, clusterId, _cluster, factory, _cellSize);
+ _topology.start();
+ _cluster.addClusterListener(_topology);
+ _cluster.start();
+ }
+
+ protected void
+ stop()
+ throws JMSException
+ {
+ _cluster.stop();
+ _topology.stop();
+ _connFactory.stop();
+ }
+
+ //----------------------------------------
+
+
+ public static void
+ main(String[] args)
+ {
+ Log log=LogFactory.getLog(ClusterDemo.class);
+
+ int nPeers=Integer.parseInt(args[0]);
+ int cellSize=Integer.parseInt(args[1]);
+
+ for (int i=0; i<nPeers; i++)
+ {
+ try
+ {
+ String pid=System.getProperty("pid");
+ ClusterDemo test = new ClusterDemo("node"+pid+"."+i, cellSize);
+ test.start();
+ }
+ catch (JMSException e)
+ {
+ if ( log.isWarnEnabled() ) {
+
+ log.warn("unexpected problem", e);
+ }
+ Exception c = e.getLinkedException();
+ if (c != null)
+ if ( log.isWarnEnabled() ) {
+
+ log.warn("unexpected problem", c);
+ }
+ }
+ catch (Throwable e)
+ {
+ if ( log.isWarnEnabled() ) {
+
+ log.warn("unexpected problem", e);
+ }
+ }
+ }
+ }
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/old/test/TestCluster.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/old/test/TestCluster.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/old/test/TestCluster.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/old/test/TestCluster.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,292 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.codehaus.wadi.old.test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.ObjectMessage;
+
+import junit.framework.TestCase;
+
+import org.activecluster.Cluster;
+import org.activecluster.ClusterEvent;
+import org.activecluster.ClusterListener;
+import org.activecluster.impl.DefaultClusterFactory;
+import org.activemq.ActiveMQConnectionFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.wadi.impl.Utils;
+
+/**
+ * Test ActiveCluster, ActiveMQ, with an eye to putting WADI on top of
+ * them.
+ *
+ * @author <a href="mailto:jules@coredevelopers.net">Jules Gosnell</a>
+ * @version $Revision: 1.4 $
+ */
+public class
+ TestCluster
+ extends TestCase
+{
+ protected Log _log=LogFactory.getLog(TestCluster.class);
+
+ public TestCluster(String name)
+ {
+ super(name);
+ }
+
+ protected ActiveMQConnectionFactory _connectionFactory;
+ protected Connection _connection;
+ protected DefaultClusterFactory _clusterFactory;
+ protected Cluster _cluster0;
+ protected Cluster _cluster1;
+
+ protected void
+ setUp()
+ throws Exception
+ {
+ testResponsePassed=false;
+
+ _connectionFactory = Utils.getConnectionFactory();
+ _clusterFactory = new DefaultClusterFactory(_connectionFactory);
+ _cluster0 = _clusterFactory.createCluster("ORG.CODEHAUS.WADI.TEST.CLUSTER");
+ _cluster1 = _clusterFactory.createCluster("ORG.CODEHAUS.WADI.TEST.CLUSTER");
+
+ _cluster0.start();
+ if (_log.isInfoEnabled()) _log.info("started node0: " + _cluster0.getLocalNode().getDestination());
+ _cluster1.start();
+ if (_log.isInfoEnabled()) _log.info("started node1: " + _cluster1.getLocalNode().getDestination());
+ }
+
+ protected void
+ tearDown()
+ throws JMSException
+ {
+ // _cluster1.stop();
+ _cluster1=null;
+ // _cluster0.stop();
+ _cluster0=null;
+ _clusterFactory=null;
+ // _connection.stop();
+ _connection=null;
+ // _connectionFactory.stop();
+ }
+
+ //----------------------------------------
+
+ class MyClusterListener
+ implements ClusterListener
+ {
+ public void
+ onNodeAdd(ClusterEvent ce)
+ {
+ if (_log.isInfoEnabled()) _log.info("node added: " + ce.getNode());
+ }
+
+ public void
+ onNodeFailed(ClusterEvent ce)
+ {
+ if (_log.isInfoEnabled()) _log.info("node failed: " + ce.getNode());
+ }
+
+ public void
+ onNodeRemoved(ClusterEvent ce)
+ {
+ if (_log.isInfoEnabled()) _log.info("node removed: " + ce.getNode());
+ }
+
+ public void
+ onNodeUpdate(ClusterEvent ce)
+ {
+ if (_log.isInfoEnabled()) _log.info("node updated: " + ce.getNode());
+ }
+
+ public void
+ onCoordinatorChanged(ClusterEvent ce)
+ {
+ if (_log.isInfoEnabled()) _log.info("coordinator changed: " + ce.getNode());
+ }
+ }
+
+ public void
+ testCluster()
+ throws Exception
+ {
+ _cluster0.addClusterListener(new MyClusterListener());
+
+ Map map = new HashMap();
+ map.put("text", "testing123");
+ _cluster0.getLocalNode().setState(map);
+
+ if (_log.isInfoEnabled()) _log.info("nodes: " + _cluster0.getNodes());
+ Thread.sleep(10000);
+ assertTrue(true);
+ }
+
+ /**
+ * An invokable piece of work.
+ *
+ */
+ static interface Invocation extends java.io.Serializable
+ {
+ public void invoke(Cluster cluster, ObjectMessage om);
+ }
+
+ /**
+ * Listen for messages, if they contain Invocations, invoke() them.
+ *
+ */
+ class
+ InvocationListener
+ implements MessageListener
+ {
+ protected Cluster _cluster;
+
+ public
+ InvocationListener(Cluster cluster)
+ {
+ _cluster=cluster;
+ }
+
+ public void
+ onMessage(Message message)
+ {
+ if (_log.isInfoEnabled()) _log.info("message received: " + message);
+
+ ObjectMessage om=null;
+ Object tmp=null;
+ Invocation invocation=null;
+
+ try
+ {
+ if (message instanceof ObjectMessage &&
+ (om=(ObjectMessage)message)!=null &&
+ (tmp=om.getObject())!=null &&
+ tmp instanceof Invocation &&
+ (invocation=(Invocation)tmp)!=null)
+ {
+ if (_log.isInfoEnabled()) _log.info("invoking message on: " + _cluster.getLocalNode());
+ invocation.invoke(_cluster, om);
+ if (_log.isInfoEnabled()) _log.info("message successfully invoked on: " + _cluster.getLocalNode());
+ }
+ else
+ {
+ if (_log.isWarnEnabled()) _log.warn("bad message: " + message);
+ }
+ }
+ catch (JMSException e)
+ {
+ _log.warn("unexpected problem", e);
+ }
+ }
+ }
+
+ /**
+ * A request for a piece of work which involves sending a response
+ * back to the original requester.
+ *
+ */
+ static class Request
+ implements Invocation
+ {
+ public void
+ invoke(Cluster cluster, ObjectMessage om2)
+ {
+ try
+ {
+ System.out.println("request received");
+ ObjectMessage om = cluster.createObjectMessage();
+ om.setJMSReplyTo(cluster.getLocalNode().getDestination());
+ om.setObject(new Response());
+ System.out.println("sending response");
+ cluster.send(om2.getJMSReplyTo(), om);
+ System.out.println("request processed");
+ }
+ catch (JMSException e)
+ {
+ System.err.println("problem sending response");
+ e.printStackTrace();
+ }
+ }
+ }
+
+ static boolean testResponsePassed=false;
+
+ /**
+ * A response containing a piece of work.
+ *
+ */
+ static class Response
+ implements Invocation
+ {
+ public void
+ invoke(Cluster cluster, ObjectMessage om)
+ {
+ try
+ {
+ System.out.println("response arrived from: "+om.getJMSReplyTo());
+ // set a flag to test later
+ TestCluster.testResponsePassed=true;
+ System.out.println("response processed on: "+cluster.getLocalNode().getDestination());
+ }
+ catch (JMSException e)
+ {
+ System.err.println("problem processing response");
+ }
+ }
+ }
+
+ public void
+ testResponse()
+ throws Exception
+ {
+
+ MessageListener listener0=new InvocationListener(_cluster0);
+ MessageListener listener1=new InvocationListener(_cluster1);
+
+ // 1->(n-1) messages (excludes self)
+ _cluster0.createConsumer(_cluster0.getDestination(), null, true).setMessageListener(listener0);
+ // 1->1 messages
+ _cluster0.createConsumer(_cluster0.getLocalNode().getDestination()).setMessageListener(listener0);
+ // 1->(n-1) messages (excludes self)
+ _cluster1.createConsumer(_cluster1.getDestination(), null, true).setMessageListener(listener1);
+ // 1->1 messages
+ _cluster1.createConsumer(_cluster1.getLocalNode().getDestination()).setMessageListener(listener1);
+
+ ObjectMessage om = _cluster0.createObjectMessage();
+ om.setJMSReplyTo(_cluster0.getLocalNode().getDestination());
+ om.setObject(new Request());
+
+ testResponsePassed=false;
+ _cluster0.send(_cluster0.getLocalNode().getDestination(), om);
+ Thread.sleep(3000);
+ assertTrue(testResponsePassed);
+ _log.info("request/response between same node OK");
+
+ testResponsePassed=false;
+ _cluster0.send(_cluster1.getLocalNode().getDestination(), om);
+ Thread.sleep(3000);
+ assertTrue(testResponsePassed);
+ _log.info("request/response between two different nodes OK");
+ }
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/old/test/TestConcurrency.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/old/test/TestConcurrency.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/old/test/TestConcurrency.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/old/test/TestConcurrency.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,230 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.codehaus.wadi.old.test;
+
+import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
+import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
+import EDU.oswego.cs.dl.util.concurrent.Mutex;
+import EDU.oswego.cs.dl.util.concurrent.SyncMap;
+import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.wadi.impl.RWLock;
+
+
+/**
+ * Test concurrency related issues
+ *
+ * @author <a href="mailto:jules@coredevelopers.net">Jules Gosnell</a>
+ * @version $Revision: 1.5 $
+ */
+public class
+ TestConcurrency
+ extends TestCase
+{
+ protected Log _log=LogFactory.getLog(getClass());
+
+ public
+ TestConcurrency(String name)
+ {
+ super(name);
+ }
+
+ //----------------------------------------
+
+ protected long
+ testMap(Map map)
+ {
+ long start=System.currentTimeMillis();
+ int iters=100;
+
+ for (int i=iters;i>0;i--)
+ {
+ String s=""+i;
+ map.put(s,s);
+ }
+ for (int i=iters;i>0;i--)
+ {
+ String s=""+i;
+ assertTrue(map.get(s).equals(s));
+ }
+ for (int i=iters;i>0;i--)
+ {
+ String s=""+i;
+ map.remove(s);
+ }
+ assertTrue(map.size()==0);
+
+ long end=System.currentTimeMillis();
+
+ return end-start;
+ }
+
+ public void
+ testMaps()
+ throws Exception
+ {
+
+ System.out.println("HashMap: "+testMap(new HashMap()));
+ System.out.println("ConcurrentReaderHashMap: "+testMap(new ConcurrentReaderHashMap()));
+ System.out.println("ConcurrentHashMap: "+testMap(new ConcurrentHashMap()));
+ System.out.println("HashMap: "+testMap(new HashMap()));
+ System.out.println("Mutex(HashMap): "+testMap(new SyncMap(new HashMap(), new Mutex())));
+ System.out.println("WriterPreferenceReadWriteLock(HashMap): "+testMap(new SyncMap(new HashMap(), new WriterPreferenceReadWriteLock())));
+
+ assertTrue(true);
+ }
+
+ public void
+ testIterator()
+ {
+ Map map=new ConcurrentReaderHashMap();
+
+ map.put("a","1");
+ map.put("b","2");
+ map.put("c","3");
+
+ for (Iterator i=map.entrySet().iterator(); i.hasNext();)
+ {
+ System.out.println("removing element...");
+ i.next();
+ i.remove();
+ }
+
+ assertTrue(map.size()==0);
+ }
+
+ protected int _priority=Thread.MAX_PRIORITY+1;
+
+ final int INVALIDATION_PRIORITY=3;
+ final int TIMEOUT_PRIORITY=2;
+ final int EMMIGRATION_PRIORITY=1;
+ final int EVICTION_PRIORITY=0;
+
+ final int MAX_PRIORITY=INVALIDATION_PRIORITY;
+
+ public void
+ priority(final boolean acquire)
+ throws Exception
+ {
+
+ final RWLock lock=new RWLock(MAX_PRIORITY);
+
+ Thread[] threads=new Thread[MAX_PRIORITY+1];
+
+ RWLock.setPriority(EVICTION_PRIORITY);
+
+ lock.readLock().attempt(60000);
+
+ for (int i=0;i<=MAX_PRIORITY;i++)
+ {
+ final int p=i;
+ if (_log.isInfoEnabled()) _log.info("starting: "+p);
+ Thread t=new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ RWLock.setPriority(p);
+ if (acquire)
+ lock.writeLock().acquire();
+ else
+ lock.writeLock().attempt(60000);
+ int priority=RWLock.getPriority();
+ if (_log.isInfoEnabled()) _log.info("priority: "+priority);
+ assertTrue(priority<_priority);
+ _priority=priority;
+ lock.writeLock().release();
+ }
+ catch (Exception e)
+ {
+ _log.warn("oops", e);
+ }
+ }
+ };
+ threads[i]=t;
+ t.start();
+ }
+
+ Thread.yield();
+ _log.info("releasing read lock");
+ lock.readLock().release();
+
+ for (int i=0;i<=MAX_PRIORITY;i++)
+ {
+ Thread t=threads[i];
+ t.join();
+ if (_log.isInfoEnabled()) _log.info("joining: "+i);
+ }
+ }
+
+ public void
+ testPriority()
+ throws Exception
+ {
+ _priority=MAX_PRIORITY+1;
+ priority(true);
+ _priority=MAX_PRIORITY+1;
+ priority(false);
+ }
+
+ protected boolean _first=true;
+
+ public void
+ testOverlap()
+ throws Exception
+ {
+ final RWLock lock=new RWLock(MAX_PRIORITY);
+
+ {
+ lock.readLock().acquire();
+
+ Thread t1=new Thread() {public void run()
+ {
+ try
+ {
+ RWLock.setPriority(EVICTION_PRIORITY);
+ lock.writeLock().acquire();
+ _log.info("I lost");
+ assertTrue(_first==false);
+ lock.writeLock().release();
+ }
+ catch (Exception e)
+ {
+ _log.warn(e);
+ }
+ }
+ };
+ t1.start();
+
+ RWLock.setPriority(INVALIDATION_PRIORITY);
+ lock.overlap();
+ _log.info("I won");
+ assertTrue(_first==true);
+ _first=false;
+ lock.writeLock().release();
+
+ t1.join();
+ }
+ }
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/old/test/TestDistributedOwnership.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/old/test/TestDistributedOwnership.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/old/test/TestDistributedOwnership.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/old/test/TestDistributedOwnership.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,143 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// this test should create a lot of threads, each representing a node.
+
+// each node will own a set of 'sessions'
+
+// the threads will continually negotiate new ownership of the sessions and migrate them between each other, until i am satisfied that I have a bulletproof strategy...
+
+package org.codehaus.wadi.old.test;
+
+import javax.jms.JMSException;
+
+import junit.framework.TestCase;
+
+import org.activemq.ActiveMQConnectionFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.activecluster.Cluster;
+import org.activecluster.ClusterException;
+import org.activecluster.ClusterFactory;
+import org.activecluster.impl.DefaultClusterFactory;
+import org.codehaus.wadi.impl.Utils;
+
+/**
+ * @author jules
+ *
+ * TODO To change the template for this generated type comment go to Window -
+ * Preferences - Java - Code Style - Code Templates
+ */
+public class TestDistributedOwnership extends TestCase {
+
+ public TestDistributedOwnership(String name) {
+ super(name);
+ }
+
+ protected Log _log = LogFactory.getLog(TestCluster.class);
+ protected ActiveMQConnectionFactory _connectionFactory;
+ protected DefaultClusterFactory _clusterFactory;
+
+ class Node extends Thread {
+ protected String _name;
+ protected ClusterFactory _factory;
+ protected String _id;
+ protected Cluster _cluster;
+ protected Log _log;
+
+ Node(ClusterFactory factory, String name, String id)
+ throws ClusterException, JMSException
+ {
+ _factory = factory;
+ _name = name;
+ _cluster = _clusterFactory.createCluster(_name);
+ _id = id;
+ _log = LogFactory.getLog(getClass().getName()+"#"+_id);
+ }
+
+ public void run() {
+ try {
+ _cluster.start();
+ } catch (JMSException e) {
+ _log.error("could not start node", e);
+ }
+
+ try
+ {
+ _log.info("running...");
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ _log.error("interrupted", e);
+ }
+ try {
+ _cluster.stop();
+ } catch (JMSException e) {
+ _log.error("could not stop node", e);
+ }
+ _cluster = null;
+ }
+
+ }
+
+ protected int _numNodes=100;
+ protected Node[] _nodes=new Node[_numNodes];
+
+ protected void setUp() throws Exception {
+ _connectionFactory = Utils.getConnectionFactory();
+ _clusterFactory = new DefaultClusterFactory(_connectionFactory);
+
+ for (int i=0; i<_numNodes; i++)
+ _nodes[i]=new Node(_clusterFactory, "ORG.CODEHAUS.WADI.TEST.CLUSTER", ""+i);
+ }
+
+ protected void tearDown() throws JMSException {
+ _nodes=null;
+ }
+
+ protected void
+ startNodes()
+ {
+ for (int i=0; i<_numNodes; i++)
+ _nodes[i].start();
+ }
+
+ protected void
+ stopNodes()
+ {
+// for (int i=0; i<_numNodes; i++)
+// _nodes[i].stop();
+
+ for (int i=0; i<_numNodes; i++)
+ {
+ try
+ {
+ _nodes[i].join();
+ }
+ catch (InterruptedException e)
+ {
+ _log.warn("interrupted whilst stopping thread", e);
+ }
+ }
+ }
+
+ public void
+ testThreads()
+ {
+ startNodes();
+ stopNodes();
+ }
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/old/test/TestLocationClient.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/old/test/TestLocationClient.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/old/test/TestLocationClient.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/old/test/TestLocationClient.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,107 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.codehaus.wadi.old.test;
+
+import java.net.DatagramPacket;
+import java.net.InetAddress;
+import java.net.MulticastSocket;
+import java.io.IOException;
+
+/**
+ * Test the Location client
+ *
+ * @author <a href="mailto:jules@coredevelopers.net">Jules Gosnell</a>
+ * @version $Revision: 1.1 $
+ */
+public class
+ TestLocationClient
+{
+
+ public static class Client
+ implements Runnable
+ {
+ protected final int _iters;
+ protected final int _port;
+ protected final InetAddress _address;
+ protected final MulticastSocket _socket;
+ protected final String _message;
+
+
+ public Client(int iters, InetAddress address, int port, MulticastSocket socket, String message)
+ {
+ _iters=iters;
+ _address=address;
+ _port=port;
+ _socket=socket;
+ _message=message;
+ }
+
+ public void run()
+ {
+ for (int i=0;i<_iters;i++)
+ {
+ try
+ {
+ _socket.send(new DatagramPacket(_message.getBytes(),
+ _message.length(),
+ _address,
+ _port));
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+
+ public static void
+ main(String[] args)
+ {
+ try
+ {
+ int port=6789;
+ MulticastSocket socket= new MulticastSocket(port);
+ InetAddress address=InetAddress.getByName("228.5.6.7");
+
+ socket.setLoopbackMode(false);
+ socket.joinGroup(address);
+
+ String message=args[0];
+
+ int numThreads=100;
+ int numIters=100;
+ Thread[] threads=new Thread[numThreads];
+ for (int i=0;i<numThreads;i++)
+ threads[i]=new Thread(new Client(numIters, address, port, socket, message));
+
+ for (int i=0;i<numThreads;i++)
+ threads[i].start();
+
+ for (int i=0;i<numThreads;i++)
+ threads[i].join();
+
+ socket.leaveGroup(address);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(System.err);
+ }
+ }
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/old/test/TestLocationServer.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/old/test/TestLocationServer.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/old/test/TestLocationServer.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/old/test/TestLocationServer.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,82 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.codehaus.wadi.old.test;
+
+/**
+ * Test the Location server
+ *
+ * @author <a href="mailto:jules@coredevelopers.net">Jules Gosnell</a>
+ * @version $Revision: 1.1 $
+ */
+public class
+ TestLocationServer
+// extends LocationService
+// implements Runnable
+{
+// protected long _timeout=2000; // TODO - 0does not quit properly
+// protected String _httpIpAddress;
+// protected String _httpPort;
+
+// public
+// TestLocationServer(InetAddress serverIpAddress, int serverPort,
+// long timeout, String httpIpAddress, String httpPort)
+// {
+// super(serverIpAddress, serverPort);
+// _timeout=timeout;
+// _httpIpAddress=httpIpAddress;
+// _httpPort=httpPort;
+// }
+
+// public void
+// run()
+// {
+// while (true)
+// {
+// if (_timeout==0) Thread.yield(); // prevent a tight loop...
+// processMulticast(receiveMulticast(_timeout), _timeout, _httpIpAddress, _httpPort);
+// }
+// }
+
+// public boolean
+// isOwner(String id)
+// {
+// System.out.println("am I the owner: "+id);
+// return true;
+// }
+
+// public static void
+// main(String args[])
+// {
+// try
+// {
+// TestLocationServer server= new TestLocationServer(InetAddress.getByName("228.5.6.7"),
+// 6789,
+// 2000,
+// args[0],
+// args[1]);
+
+// server.start();
+// server.run();
+// server.stop();
+// }
+// catch (Exception e)
+// {
+// e.printStackTrace(System.err);
+// }
+// }
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/old/test/TestMulticastServer.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/old/test/TestMulticastServer.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/old/test/TestMulticastServer.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/old/test/TestMulticastServer.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,77 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.codehaus.wadi.old.test;
+
+import java.net.DatagramPacket;
+import java.net.InetAddress;
+import java.net.MulticastSocket;
+//import org.apache.commons.logging.Log;
+//import org.apache.commons.logging.LogFactory;
+
+/**
+ * Multicast related tests
+ *
+ * @author <a href="mailto:jules@coredevelopers.net">Jules Gosnell</a>
+ * @version $Revision: 1.1 $
+ */
+public class
+ TestMulticastServer
+ implements Runnable
+{
+ protected MulticastSocket _socket;
+
+ public
+ TestMulticastServer(String ip, int port)
+ throws Exception
+ {
+ InetAddress address = InetAddress.getByName(ip);
+ _socket = new MulticastSocket(port);
+ _socket.joinGroup(address);
+ }
+
+ public void
+ run()
+ {
+ try
+ {
+ System.out.println("starting: "+_socket);
+ byte[] buffer=new byte[1024];
+ while (true)
+ {
+ DatagramPacket packet=new DatagramPacket(buffer, buffer.length);
+ _socket.receive(packet);
+ System.out.println("received: "+new String(packet.getData(), packet.getOffset(), packet.getLength()));
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ public static void
+ main(String args[])
+ throws Exception
+ {
+ String address="228.5.6.7";
+ int port=6789;
+
+ TestMulticastServer server= new TestMulticastServer(address, port);
+ server.run();
+ }
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/old/test/TestSerialisation.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/old/test/TestSerialisation.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/old/test/TestSerialisation.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/old/test/TestSerialisation.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,162 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.codehaus.wadi.old.test;
+
+import java.io.ObjectStreamException;
+import java.io.Serializable;
+import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Serialisation related tests
+ *
+ * @author <a href="mailto:jules@coredevelopers.net">Jules Gosnell</a>
+ * @version $Revision: 1.3 $
+ */
+class Shared
+ implements Serializable
+{
+ protected transient Log _log=LogFactory.getLog(getClass());
+
+ public Shared(){}
+ public Shared(Shared s){_payload=s._payload;}
+ protected int _payload;
+ public void setPayload(int t){_payload=t;}
+ public int getPayload(){return _payload;}
+
+ public String toString(){return "<"+getClass().getName()+":"+_payload+">";}
+
+ protected Object
+ writeReplace()
+ throws ObjectStreamException
+ {
+ _log.info("writing porter");
+ Porter p=new Porter(this);
+ if (_log.isInfoEnabled()) _log.info(""+p);
+ return p;
+ }
+}
+
+class Porter
+ extends Shared
+ implements Serializable
+{
+
+ public static Class _target;
+ public static java.lang.reflect.Constructor _ctor;
+
+ static void
+ setUp(Class target)
+ throws NoSuchMethodException
+ {
+ _target=target;
+ _ctor=_target.getConstructor(new Class[]{Shared.class});
+ }
+
+ Porter(Shared s){super(s);}
+
+ protected Object
+ readResolve()
+ throws ObjectStreamException
+ {
+ _log=LogFactory.getLog(getClass()); // why do transient fields not get properly initialised ?
+ if (_log.isInfoEnabled()) _log.info(""+_payload);
+ try
+ {
+ return _ctor.newInstance(new Shared[]{this});
+ }
+ catch (Exception any)
+ {
+ throw new ObjectStreamException(){};
+ }
+ }
+
+ protected Object
+ writeReplace()
+ throws ObjectStreamException
+ {
+ return this; // need to override super to prevent double replacement
+ }
+}
+
+class Tomcat
+ extends Shared
+ implements Serializable
+{
+ public Tomcat() {_payload=20;}
+ public Tomcat(Shared s) {super(s);}
+}
+
+class Jetty
+ extends Shared
+ implements Serializable
+{
+ public Jetty() {_payload=10;}
+ public Jetty(Shared s) {super(s);}
+}
+
+
+
+public class
+ TestSerialisation
+ extends TestCase
+{
+ protected Log _log=LogFactory.getLog(TestSerialisation.class);
+
+ public TestSerialisation(String name) {super(name);}
+
+ protected void
+ setUp()
+ throws Exception
+ {
+ _log.info("starting test");
+ }
+
+ protected void
+ tearDown()
+ throws InterruptedException
+ {
+ _log.info("stopping test");
+ }
+
+ public void
+ testRoundTrip()
+ throws Exception
+ {
+// byte[] buffer;
+// Object o1;
+// Object o2;
+//
+// Tomcat tc=new Tomcat();
+// tc.setPayload(100);
+//
+// o1=tc;
+// if (_log.isInfoEnabled()) _log.info("outbound instance is: "+o1);
+// buffer=ObjectInputStream.marshall(o1);
+// Porter.setUp(Jetty.class);
+// o2=ObjectInputStream.demarshall(buffer);
+// if (_log.isInfoEnabled()) _log.info("inbound instance is: "+o2);
+//
+// if (_log.isInfoEnabled()) _log.info("outbound instance is: "+o2);
+// buffer=ObjectInputStream.marshall(o2);
+// Porter.setUp(Tomcat.class);
+// o1=ObjectInputStream.demarshall(buffer);
+// if (_log.isInfoEnabled()) _log.info("inbound instance is: "+o1);
+ }
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/old/test/TestTopologies.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/old/test/TestTopologies.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/old/test/TestTopologies.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/old/test/TestTopologies.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,301 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.codehaus.wadi.old.test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.wadi.sandbox.cluster.Abstract2TopologyStrategy;
+import org.codehaus.wadi.sandbox.cluster.Cell;
+import org.codehaus.wadi.sandbox.cluster.Peer;
+import org.codehaus.wadi.sandbox.cluster.RingTopologyStrategy;
+
+public class
+ TestTopologies
+ extends TestCase
+{
+ protected Log _log=LogFactory.getLog(getClass());
+
+ public TestTopologies(String name)
+ {
+ super(name);
+ }
+
+ protected void
+ setUp()
+ throws Exception
+ {}
+
+ protected void
+ tearDown()
+ throws Exception
+ {}
+
+ //----------------------------------------
+
+ public void
+ testRing()
+ {
+ int k=2;
+
+ Peer p0=new Peer("0");
+ Peer p1=new Peer("1");
+ Peer p2=new Peer("2");
+ Peer p3=new Peer("3");
+ Peer local=p0;
+
+ Collection e=new TreeSet();
+ e.add(p0);
+ e.add(p1);
+ e.add(p2);
+ e.add(p3);
+
+ if (_log.isInfoEnabled()) _log.info("in :" + e);
+
+ Abstract2TopologyStrategy ts=new RingTopologyStrategy(local.getId(), "test", null, null, 2);
+ Map result;
+ Map control=new TreeMap();
+ {
+ Collection c;
+ c=new ArrayList();
+ c.add(p0);
+ c.add(p1);
+ control.put(Cell.id(c), c);
+ c=new ArrayList();
+ c.add(p1);
+ c.add(p2);
+ control.put(Cell.id(c), c);
+ c=new ArrayList();
+ c.add(p2);
+ c.add(p3);
+ control.put(Cell.id(c), c);
+ c=new ArrayList();
+ c.add(p3);
+ c.add(p0);
+ control.put(Cell.id(c), c);
+ }
+
+ Collection control2;
+ Collection result2;
+
+ result=ts.combineMap(null, e, k);
+ if (_log.isInfoEnabled()) {
+ _log.info("out :" + result);
+ _log.info("control:" + control);
+ }
+
+ assertTrue(control.equals(result));
+
+ result2=ts.combineCollection(null, e, k);
+ control2=new ArrayList(control.values());
+ if (_log.isInfoEnabled()) {
+ _log.info("out2 :" + result2);
+ _log.info("control2:" + control2);
+ }
+
+ assertTrue(control2.equals(result2));
+
+ control.remove("1-2");
+ control.remove("2-3");
+
+ result=ts.combineMap(local, e, k);
+ if (_log.isInfoEnabled()) {
+ _log.info("out :" + result);
+ _log.info("control:" + control);
+ }
+
+ assertTrue(control.equals(result));
+
+ result2=ts.combineCollection(local, e, k);
+ control2=new ArrayList(control.values());
+ if (_log.isInfoEnabled()) {
+ _log.info("out2 :" + result2);
+ _log.info("control2:" + control2);
+ }
+
+ assertTrue(control2.equals(result2));
+
+ // now some timings...
+ e=new TreeSet();
+ k=2;
+ for (int i=0; i<500; i++)
+ e.add(new Peer(""+i));
+
+ long start;
+ long end;
+
+ start=System.currentTimeMillis();
+ ts.combineMap(null, e, k);
+ end=System.currentTimeMillis();
+ if (_log.isInfoEnabled()) _log.info("combineMap Ring x500 :" + ( end - start ) + " milis");
+
+ start=System.currentTimeMillis();
+ ts.combineMap(null, e, k);
+ end=System.currentTimeMillis();
+ if (_log.isInfoEnabled()) _log.info("combineMap Ring x500 :" + ( end - start ) + " milis");
+
+ start=System.currentTimeMillis();
+ ts.combineMap(null, e, k);
+ end=System.currentTimeMillis();
+ if (_log.isInfoEnabled()) _log.info("combineMap Ring x500 :" + ( end - start ) + " milis");
+
+ start=System.currentTimeMillis();
+ ts.combineCollection(null, e, k);
+ end=System.currentTimeMillis();
+ if (_log.isInfoEnabled()) _log.info("combineCollection Ring x500 :" + ( end - start ) + " milis");
+
+ start=System.currentTimeMillis();
+ ts.combineCollection(null, e, k);
+ end=System.currentTimeMillis();
+ if (_log.isInfoEnabled()) _log.info("combineCollection Ring x500 :" + ( end - start ) + " milis");
+
+ start=System.currentTimeMillis();
+ ts.combineCollection(null, e, k);
+ end=System.currentTimeMillis();
+ if (_log.isInfoEnabled()) _log.info("combineCollection Ring x500 :" + ( end - start ) + " milis");
+ }
+
+// public void
+// testNChooseK()
+// {
+// int k=2;
+//
+// Peer p0=new Peer("0");
+// Peer p1=new Peer("1");
+// Peer p2=new Peer("2");
+// Peer p3=new Peer("3");
+// Peer local=p0;
+//
+// Collection e=new TreeSet();
+// e.add(p0);
+// e.add(p1);
+// e.add(p2);
+// e.add(p3);
+//
+// _log.info("in :"+e);
+//
+// Abstract2TopologyStrategy ts=new NChooseKTopologyStrategy(local.getId(), "test", null, null, 2);
+// Map result;
+// Collection result2;
+//
+// Map control=new TreeMap();
+// {
+// Collection c;
+// c=new TreeSet();
+// c.add(p0);
+// c.add(p1);
+// control.put(Cell.id(c), c);
+// c=new TreeSet();
+// c.add(p0);
+// c.add(p2);
+// control.put(Cell.id(c), c);
+// c=new TreeSet();
+// c.add(p0);
+// c.add(p3);
+// control.put(Cell.id(c), c);
+// c=new TreeSet();
+// c.add(p1);
+// c.add(p2);
+// control.put(Cell.id(c), c);
+// c=new TreeSet();
+// c.add(p1);
+// c.add(p3);
+// control.put(Cell.id(c), c);
+// c=new TreeSet();
+// c.add(p2);
+// c.add(p3);
+// control.put(Cell.id(c), c);
+// }
+// Collection control2;
+//
+// result=ts.combineMap(null, e, k);
+// _log.info("control:"+control);
+// _log.info("out :"+result);
+// assertTrue(control.equals(result));
+//
+// control2=new TreeSet(new CollectionComparator());
+// control2.addAll(control.values());
+//
+// result2=ts.combineCollection(null, e, k);
+// _log.info("control2:"+control2);
+// _log.info("out2 :"+result2);
+// assertTrue(control2.equals(result2));
+//
+// result=ts.combineMap(local, e, k);
+// control.remove("1-2");
+// control.remove("1-3");
+// control.remove("2-3");
+//
+// _log.info("control:"+control);
+// _log.info("out :"+result);
+// assertTrue(control.equals(result));
+//
+// result2=ts.combineCollection(local, e, k);
+// control2=new TreeSet(new CollectionComparator());
+// control2.addAll(control.values());
+//
+// _log.info("control2:"+control2);
+// _log.info("out2 :"+result2);
+// assertTrue(control2.equals(result2));
+//
+//
+// // now some timings...
+// e=new TreeSet();
+// k=2;
+// for (int i=0; i<5; i++)
+// e.add(new Peer(""+i));
+//
+// long start;
+// long end;
+//
+// start=System.currentTimeMillis();
+// ts.combineMap(null, e, k);
+// end=System.currentTimeMillis();
+// _log.info("combineMap NChooseK x500 :"+(end-start)+" milis");
+//
+// start=System.currentTimeMillis();
+// ts.combineMap(null, e, k);
+// end=System.currentTimeMillis();
+// _log.info("combineMap NChooseK x500 :"+(end-start)+" milis");
+//
+// start=System.currentTimeMillis();
+// ts.combineMap(null, e, k);
+// end=System.currentTimeMillis();
+// _log.info("combineMap NChooseK x500 :"+(end-start)+" milis");
+//
+// start=System.currentTimeMillis();
+// ts.combineCollection(null, e, k);
+// end=System.currentTimeMillis();
+// _log.info("combineCollection NChooseK x500 :"+(end-start)+" milis");
+//
+// start=System.currentTimeMillis();
+// ts.combineCollection(null, e, k);
+// end=System.currentTimeMillis();
+// _log.info("combineCollection NChooseK x500 :"+(end-start)+" milis");
+//
+// start=System.currentTimeMillis();
+// ts.combineCollection(null, e, k);
+// end=System.currentTimeMillis();
+// _log.info("combineCollection NChooseK x500 :"+(end-start)+" milis");
+// }
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/old/test/TestUnicastClient.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/old/test/TestUnicastClient.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/old/test/TestUnicastClient.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/old/test/TestUnicastClient.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,75 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.codehaus.wadi.old.test;
+
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+//import org.apache.commons.logging.Log;
+//import org.apache.commons.logging.LogFactory;
+
+/**
+ * Test unicast client to multicast server
+ *
+ * @author <a href="mailto:jules@coredevelopers.net">Jules Gosnell</a>
+ * @version $Revision: 1.1 $
+ */
+public class
+ TestUnicastClient
+ implements Runnable
+{
+ protected DatagramSocket _socket;
+ protected DatagramPacket _packet;
+
+ public
+ TestUnicastClient(String ip, int port, String message)
+ throws Exception
+ {
+ byte[] bytes=message.getBytes();
+ _socket = new DatagramSocket();
+ _packet = new DatagramPacket(bytes, 0, bytes.length);
+ _packet.setAddress(InetAddress.getByName(ip));
+ _packet.setPort(port);
+ }
+
+ public void
+ run()
+ {
+ try
+ {
+ _socket.send(_packet);
+ System.out.println("sent: "+new String(_packet.getData(), _packet.getOffset(), _packet.getLength()));
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ public static void
+ main(String args[])
+ throws Exception
+ {
+ String address="228.5.6.7";
+ int port=6789;
+ String message=args[0];
+
+ TestUnicastClient client= new TestUnicastClient(address, port, message);
+ client.run();
+ }
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/cluster/Abstract2TopologyStrategy.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/cluster/Abstract2TopologyStrategy.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/cluster/Abstract2TopologyStrategy.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/cluster/Abstract2TopologyStrategy.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,124 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.codehaus.wadi.sandbox.cluster;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.activecluster.Cluster;
+import org.activecluster.ClusterFactory;
+
+public abstract class
+ Abstract2TopologyStrategy
+ extends AbstractTopologyStrategy
+{
+ protected Map _cells=new TreeMap(); // would a HashMap be faster?
+ protected String _clusterId;
+ protected ClusterFactory _factory;
+ protected int _k=1;
+
+ public
+ Abstract2TopologyStrategy(String nodeId, String clusterId, Cluster cluster, ClusterFactory factory, int k)
+ {
+ super(nodeId, cluster);
+ _clusterId=clusterId;
+ _factory=factory;
+ _k=k;
+ }
+
+ protected Collection _oldPeers=new TreeSet(new CollectionComparator());
+ protected Map _oldCells=new TreeMap();
+ protected Collection _oldCells2=new TreeSet(new CollectionComparator());
+
+ public Object[]
+ combineMap(Peer p)
+ {
+ Peer localPeer=getLocalPeer();
+ localPeer=localPeer!=null?localPeer:p; // TODO - hack
+ Map newCells=combineMap(localPeer, _peers.values(), Math.min(_k,_peers.size()));
+
+ if (_log.isInfoEnabled()) {
+ _log.info("old peers=" + _oldPeers);
+ _log.info("old rel cells=" + _oldCells.keySet());
+ _log.info("new peers=" + _peers.keySet());
+ _log.info("new rel cells=" + newCells.keySet());
+ }
+
+
+ Map joiningCells=new TreeMap(newCells);
+ joiningCells.keySet().removeAll(_oldCells.keySet());
+ if (_log.isInfoEnabled()) _log.info("joining cells=" + joiningCells.keySet());
+
+ Map leavingCells=new TreeMap(_oldCells);
+ leavingCells.keySet().removeAll(newCells.keySet());
+ if (_log.isInfoEnabled()) _log.info("leaving cells=" + leavingCells.keySet());
+
+ _oldPeers=new TreeSet(_peers.values());
+ _oldCells=newCells;
+
+ return new Object[]{joiningCells, leavingCells};
+ }
+
+ public Object[]
+ combineCollection(Peer p)
+ {
+ Peer localPeer=getLocalPeer();
+ localPeer=localPeer!=null?localPeer:p; // TODO - hack
+ Collection newCells=combineCollection(localPeer, _peers.values(), Math.min(_k,_peers.size()));
+
+ if (_log.isInfoEnabled()) {
+ _log.info("old peers=" + _oldPeers);
+ _log.info("old rel cells=" + _oldCells2);
+ _log.info("new peers=" + _peers.keySet());
+ _log.info("new rel cells=" + newCells);
+ }
+
+ Collection joiningCells=new TreeSet(new CollectionComparator());
+ joiningCells.addAll(newCells);
+ joiningCells.removeAll(_oldCells2);
+ if (_log.isInfoEnabled()) _log.info("joining cells=" + joiningCells);
+
+ Collection leavingCells=new TreeSet(new CollectionComparator());
+ leavingCells.addAll(_oldCells2);
+ leavingCells.removeAll(newCells);
+ if (_log.isInfoEnabled()) _log.info("leaving cells=" + leavingCells);
+
+ _oldPeers=new TreeSet(_peers.values());
+ _oldCells2=newCells;
+
+ return new Object[]{joiningCells, leavingCells};
+ }
+
+ /**
+ * returns a Map of String:Collection (id:set-of-peers) representing
+ * all cells into which the passed list of Peers should be
+ * organised. K is the number of Peers in each Cell.
+ *
+ * @param localPeer a <code>Peer</code> value
+ * @param peers a <code>Collection</code> value
+ * @param peersPerCell an <code>int</code> value
+ * @return a <code>Map</code> value
+ */
+ public abstract Map combineMap(Peer localPeer, Collection peers, int peersPerCell);
+ public abstract Collection combineCollection(Peer localPeer, Collection peers, int peersPerCell);
+
+ public Cell getCell(String id) {return (Cell)_cells.get(id);}
+ public void putCell(String id, Cell cell) {_cells.put(id, cell);}
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/cluster/AbstractTopologyStrategy.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/cluster/AbstractTopologyStrategy.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/cluster/AbstractTopologyStrategy.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/cluster/AbstractTopologyStrategy.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,143 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.codehaus.wadi.sandbox.cluster;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.activecluster.Cluster;
+import org.activecluster.ClusterEvent;
+import org.activecluster.LocalNode;
+import org.activecluster.Node;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+// not sure how many layers to this API yet:
+
+// 1 converts nodes to Peers
+
+// 1 calls combine and then works out relevant joining/leaving cells -
+// and later creates/destroys them...
+
+// it should be possible to override the above with a more
+// efficient/specific algorithm which calculates which cells should be
+// created/destroyed directly....
+
+// and more...
+
+public abstract class
+ AbstractTopologyStrategy
+ implements TopologyStrategy
+{
+ protected Log _log=LogFactory.getLog(getClass().getName());
+ protected Map _peers=new TreeMap();
+ protected String _id;
+ protected Cluster _cluster;
+ protected Peer _localPeer;
+ protected LocalNode _localNode;
+
+ public
+ AbstractTopologyStrategy(String id, Cluster cluster)
+ {
+ _id=id;
+ _log=LogFactory.getLog(getClass().getName()+"#"+_id);
+ _cluster=cluster;
+ }
+
+ public Peer getLocalPeer(){return _localPeer;}
+
+ public void
+ start()
+ {
+ _localNode=_cluster.getLocalNode();
+ onNodeAdd(new ClusterEvent(_cluster, _localNode, ClusterEvent.ADD_NODE));
+ _localPeer=(Peer)_peers.get(_id);
+ }
+
+ public void
+ stop()
+ {
+ onNodeRemoved(new ClusterEvent(_cluster, _localNode, ClusterEvent.REMOVE_NODE));
+ _localNode=null;
+ _localPeer=null;
+ }
+
+ public void
+ onNodeAdd(ClusterEvent event)
+ {
+ Peer p=new Peer(event.getNode());
+// Collection peers=null;
+
+ synchronized (_peers)
+ {
+ _peers.put(p.getId(), p);
+ // peers=_peers.values();
+ }
+
+ _log.info("adding: " + p);
+ // _log.info("nodes : " + peers);
+
+ // Object[] diffs=combineCollection(p);
+ }
+
+ public void onCoordinatorChanged(ClusterEvent ce){} // TODO - what does thi mean ?
+ public void onNodeFailed(ClusterEvent event){onNodeRemoved(event);}
+ public void
+ onNodeRemoved(ClusterEvent event)
+ {
+ Node node=event.getNode();
+ String id=(String)node.getState().get("id");
+ Peer p=null;
+// Collection peers=null;
+ synchronized (_peers)
+ {
+ p=(Peer)_peers.remove(id);
+// peers=_peers.values();
+ }
+
+ if (_log.isInfoEnabled()) _log.info("removing: " + p);
+ // _log.info("nodes : " + peers);
+
+// Object[] diffs=combineCollection(p);
+ }
+
+ // do we need this yet ?
+ public void
+ onNodeUpdate(ClusterEvent event)
+ {
+ Node node=event.getNode();
+ String id=(String)(node.getState().get("id"));
+ Peer p=null;
+ Collection peers=null;
+ synchronized (_peers)
+ {
+ p=(Peer)_peers.get(id);
+ peers=_peers.values();
+ }
+ p.setNode(node); // important - this is the update...
+
+ _log.info("updating: " + p);
+ _log.info("nodes : " + peers);
+
+ }
+
+ public abstract Object[] combineMap(Peer p);
+ public abstract Object[] combineCollection(Peer p);
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/cluster/Cell.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/cluster/Cell.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/cluster/Cell.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/cluster/Cell.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,94 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.codehaus.wadi.sandbox.cluster;
+
+import java.util.Collection;
+import java.util.Iterator;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.activecluster.Cluster;
+import org.activecluster.ClusterFactory;
+
+public class
+ Cell
+{
+ protected Log _log=LogFactory.getLog(getClass());
+ protected ClusterFactory _factory;
+ protected String _id;
+ protected String _clusterId;
+ protected Collection _peers;
+ protected Cluster _cluster;
+
+ public
+ Cell(String id, String clusterId, Collection peers, ClusterFactory factory)
+ {
+ _id=id;
+ _clusterId=clusterId;
+ _peers=peers;
+ _factory=factory;
+ _cluster=null;
+ }
+
+ public void
+ start()
+ {
+ if (_log.isInfoEnabled()) _log.info("starting: " + _id);
+
+ try
+ {
+ _cluster=_factory.createCluster(_clusterId+"."+_id);
+ }
+ catch (Exception e)
+ {
+ if (_log.isWarnEnabled()) _log.warn("unexpected problem starting Cell: " + _id, e);
+ }
+ }
+
+ public void
+ stop()
+ {
+ if (_log.isInfoEnabled()) _log.info("stopping: " + _id);
+
+ try
+ {
+ _cluster.stop();
+ _cluster=null;
+ }
+ catch (Exception e)
+ {
+ if (_log.isWarnEnabled()) _log.warn("unexpected problem stopping Cell: " + _id, e);
+ }
+ }
+
+ public String
+ toString()
+ {
+ return "<Cell:"+_id+">";
+ }
+
+ public String getId(){return _id;}
+
+ public static String
+ id(Collection peers)
+ {
+ String id="";
+ for (Iterator i=peers.iterator(); i.hasNext(); )
+ id+=((id.length()==0)?"":"-")+((Peer)i.next()).getId();
+ return id;
+ }
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/cluster/CollectionComparator.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/cluster/CollectionComparator.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/cluster/CollectionComparator.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/cluster/CollectionComparator.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,57 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.codehaus.wadi.sandbox.cluster;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+public class
+ CollectionComparator
+ implements java.util.Comparator
+{
+ public int
+ compare(Object o1, Object o2)
+ {
+ Collection ts1=(Collection)o1;
+ Collection ts2=(Collection)o2;
+
+ Iterator i=ts1.iterator();
+ Iterator j=ts2.iterator();
+
+ int result=0;
+ while (i.hasNext() && j.hasNext() && result==0)
+ {
+ result=((Comparable)i.next()).compareTo(j.next());
+ }
+
+ if (result!=0)
+ return result;
+ else if (i.hasNext())
+ return -1;
+ else if (j.hasNext())
+ return 1;
+ else
+ return 0;
+ }
+
+ public boolean
+ equals(Object o)
+ {
+ return this==o;
+ }
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/cluster/NChooseKTopologyStrategy.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/cluster/NChooseKTopologyStrategy.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/cluster/NChooseKTopologyStrategy.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/cluster/NChooseKTopologyStrategy.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,166 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.codehaus.wadi.sandbox.cluster;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.activecluster.Cluster;
+import org.activecluster.ClusterFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class
+ NChooseKTopologyStrategy
+ extends Abstract2TopologyStrategy
+{
+ protected Log _log=LogFactory.getLog(getClass().getName());
+
+ public
+ NChooseKTopologyStrategy(String nodeId, String clusterId, Cluster cluster, ClusterFactory factory, int k)
+ {
+ super(nodeId, clusterId, cluster, factory, k);
+ }
+
+ public Map
+ combineMap(Peer local, Collection e, int k)
+ {
+ Map combs=null;
+ boolean filter=(local!=null);
+
+ if (filter)
+ {
+ // calculate all subcombinations...
+ e=new TreeSet(e);
+ e.remove(local);
+ k--;
+ combs=combineMap(e, k);
+
+ // combine them with local node...
+ Map tmp=new TreeMap();
+ for (Iterator i=combs.entrySet().iterator(); i.hasNext(); )
+ {
+ Map.Entry entry=(Map.Entry)i.next();
+ String key=(String)entry.getKey();
+ Collection value=(Collection)entry.getValue();
+ value.add(local);
+ key=Cell.id(value);
+ tmp.put(key, value);
+ }
+ combs=tmp;
+ }
+ else
+ {
+ combs=combineMap(e, k);
+ }
+
+ return combs;
+ }
+
+ protected Map
+ combineMap(Collection e, int k)
+ {
+ Map combsOut=new TreeMap();
+
+ if (k==0)
+ combsOut.put("", new TreeSet());
+ else
+ {
+ Map combsIn=combineMap(e, k-1);
+
+ for (Iterator i=combsIn.values().iterator(); i.hasNext(); )
+ {
+ Collection comb=((Collection)i.next());
+ for (Iterator j=e.iterator(); j.hasNext(); )
+ {
+ Object peer=j.next();
+ if (!comb.contains(peer))
+ {
+ Set newComb=new TreeSet(comb);
+ newComb.add(peer);
+ String id=Cell.id(newComb);
+ combsOut.put(id, newComb);
+ }
+ }
+ }
+ }
+
+ return combsOut;
+ }
+
+ public Collection
+ combineCollection(Peer local, Collection e, int k)
+ {
+ Collection combs=null;
+ boolean filter=(local!=null);
+
+ if (filter)
+ {
+ if (_log.isInfoEnabled()) _log.info("[1] START : " + e);
+ // calculate all subcombinations...
+ e=new TreeSet(e);
+ e.remove(local);
+ k--;
+ combs=combineCollection(e.toArray(), -1, k);
+
+ // combine them with local node...
+ for (Iterator i=combs.iterator(); i.hasNext(); )
+ ((Collection)i.next()).add(local);
+ }
+ else
+ {
+ if (_log.isInfoEnabled()) _log.info("[2] START : " + e);
+ if (_log.isWarnEnabled()) _log.warn("", new Exception());
+ combs=combineCollection(e.toArray(), -1, k);
+ }
+
+ return combs;
+ }
+
+ protected Collection
+ combineCollection(Object[] e, int offset, int k)
+ {
+ Collection combsOut=new TreeSet(new CollectionComparator());
+
+ if (k==0)
+ combsOut.add(new TreeSet());
+ else
+ {
+ Collection combsIn=combineCollection(e, ++offset, k-1);
+
+ for (Iterator i=combsIn.iterator(); i.hasNext(); )
+ {
+ Collection comb=((Collection)i.next());
+ for (int j=offset; j<e.length; j++)
+ {
+ Peer peer=(Peer)e[j];
+ Set newComb=new TreeSet(comb);
+ newComb.add(peer);
+ combsOut.add(newComb);
+ if (_log.isInfoEnabled()) _log.info("newComb=" + newComb + ", " + offset);
+ }
+ }
+ }
+
+ return combsOut;
+ }
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/cluster/Peer.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/cluster/Peer.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/cluster/Peer.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/cluster/Peer.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,68 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.codehaus.wadi.sandbox.cluster;
+
+import javax.jms.Destination;
+
+import org.activecluster.Node;
+
+public class
+ Peer
+ implements Comparable
+{
+ protected String _id;
+ protected Destination _dest;
+ protected Node _node;
+
+ public int
+ compareTo(Object o)
+ {
+ assert o.getClass()==Peer.class;
+ return _id.compareTo(((Peer)o)._id);
+ }
+
+ public
+ Peer(Node node)
+ {
+ _id=(String)(node.getState().get("id"));
+ _dest=null;
+ _node=node;
+ }
+
+
+ /**
+ * This ctor is for testing only...
+ *
+ * @param id a <code>String</code> value
+ */
+ public
+ Peer(String id)
+ {
+ _id=id;
+ }
+
+ public void setNode(Node node){_node=node;}
+
+ public String
+ toString()
+ {
+ return "<Peer:"+_id+">";
+ }
+
+ public String getId(){return _id;}
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/cluster/RingTopologyStrategy.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/cluster/RingTopologyStrategy.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/cluster/RingTopologyStrategy.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/cluster/RingTopologyStrategy.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,97 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.codehaus.wadi.sandbox.cluster;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.activecluster.Cluster;
+import org.activecluster.ClusterFactory;
+
+public class
+ RingTopologyStrategy
+ extends Abstract2TopologyStrategy
+{
+ public
+ RingTopologyStrategy(String nodeId, String clusterId, Cluster cluster, ClusterFactory factory, int k)
+ {
+ super(nodeId, clusterId, cluster, factory, k);
+ }
+
+ public Map
+ combineMap(Peer local, Collection e, int k)
+ {
+ int l=e.size();
+
+ Map combs=new TreeMap();
+
+ if (k>0)
+ {
+ Object[] array=e.toArray();
+ for (int i=0; i<l; i++)
+ {
+ // use an ArrayList because:
+ // the algorithm does not produce duplicates
+ // we want cell peers to use ordering produced by algorithm - 3-0, not 0-3
+ Collection comb=new ArrayList(k);
+
+ for (int j=0; j<k; j++)
+ comb.add(array[(i+j)%l]);
+
+ boolean filter=(local!=null);
+
+ if (!filter || comb.contains(local)) // TODO - could be more efficient...
+ combs.put(Cell.id(comb), comb);
+ }
+ }
+
+ return combs;
+ }
+
+ public Collection
+ combineCollection(Peer local, Collection e, int k)
+ {
+ int l=e.size();
+
+ Collection combs=new ArrayList();
+
+ if (k>0)
+ {
+ Object[] array=e.toArray();
+ for (int i=0; i<l; i++)
+ {
+ // use an ArrayList because:
+ // the algorithm does not produce duplicates
+ // we want cell peers to use ordering produced by algorithm - 3-0, not 0-3
+ Collection comb=new ArrayList(k);
+
+ for (int j=0; j<k; j++)
+ comb.add(array[(i+j)%l]);
+
+ boolean filter=(local!=null);
+
+ if (!filter || comb.contains(local)) // TODO - could be more efficient...
+ combs.add(comb);
+ }
+ }
+
+ return combs;
+ }
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/cluster/TopologyStrategy.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/cluster/TopologyStrategy.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/cluster/TopologyStrategy.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/cluster/TopologyStrategy.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,29 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.codehaus.wadi.sandbox.cluster;
+
+import org.activecluster.ClusterListener;
+
+public interface
+ TopologyStrategy
+ extends ClusterListener
+{
+ // should extend some LifeCycle i/f...
+ void start();
+ void stop();
+}