You are viewing a plain text version of this content. The canonical link for it is here.
Posted to scm@geronimo.apache.org by gd...@apache.org on 2004/03/03 16:27:33 UTC

cvs commit: incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging MsgQueue.java ServerProcessors.java

gdamour     2004/03/03 07:27:33

  Modified:    sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging
                        MsgQueue.java ServerProcessors.java
  Added:       sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/replication
                        ReplicationTest.java
               sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/replication
                        UpdateEvent.java UpdateListener.java
                        SimpleReplicatedMap.java ReplicationException.java
                        ReplicationCapable.java ReplicationMember.java
  Log:
  A simple replication framework based on the messaging networking
  infrastructure.
  
  A basic Map has been implemented in order to depict how this framework
  could be leverage and a use-case highlights how the classes fit
  together.
  
  Revision  Changes    Path
  1.1                  incubator-geronimo/sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/replication/ReplicationTest.java
  
  Index: ReplicationTest.java
  ===================================================================
  /**
   *
   * Copyright 2004 The Apache Software Foundation
   *
   *  Licensed under the Apache License, Version 2.0 (the "License");
   *  you may not use this file except in compliance with the License.
   *  You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   *  Unless required by applicable law or agreed to in writing, software
   *  distributed under the License is distributed on an "AS IS" BASIS,
   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   *  See the License for the specific language governing permissions and
   *  limitations under the License.
   */
  
  package org.apache.geronimo.datastore.impl.remote.replication;
  
  import java.net.InetAddress;
  import java.util.Collections;
  import java.util.HashMap;
  import java.util.Map;
  
  import junit.framework.TestCase;
  
  import org.apache.geronimo.datastore.impl.remote.messaging.NodeInfo;
  import org.apache.geronimo.datastore.impl.remote.messaging.ServerNode;
  
  /**
   *
   * @version $Revision: 1.1 $ $Date: 2004/03/03 15:27:32 $
   */
  public class ReplicationTest extends TestCase {
  
      SimpleReplicatedMap replicant1;
      ReplicationMember replication1;
      ReplicationMember replication2;
      
      protected void setUp() throws Exception {
          replicant1 = new SimpleReplicatedMap();
          replication1 = new ReplicationMember("Replication1", new String[] {"Node2"});
          InetAddress address = InetAddress.getLocalHost();
          NodeInfo nodeInfo1 = new NodeInfo("Node1", address, 8080);
          ServerNode server1 = new ServerNode(nodeInfo1,
              Collections.singleton(replication1));
          server1.doStart();
          replication1.doStart();
  
          replication2 = new ReplicationMember("Replication1", new String[] {"Node1"});
          NodeInfo nodeInfo2 = new NodeInfo("Node2", address, 8082);
          ServerNode server2 = new ServerNode(nodeInfo2,
              Collections.singleton(replication2));
          server2.doStart();
          replication2.doStart();
          
          server2.join(nodeInfo1);
      }
  
      public void testUseCase() {
          replicant1.put("test1", "value1");
          replication1.registerReplicantCapable(replicant1);
          Object id = replicant1.getID();
          SimpleReplicatedMap replicant2 =
              (SimpleReplicatedMap) replication2.retrieveReplicantCapable(id);
          assertNotNull("Not been registered", replicant2);
          assertEquals("value1", replicant2.get("test1"));
          replicant1.put("test2", "value2");
          assertEquals("value2", replicant2.get("test2"));
          replicant1.remove("test1");
          assertNull(replicant2.get("test1"));
          Map tmp = new HashMap();
          tmp.put("test3", "value3");
          replicant1.putAll(tmp);
          assertEquals("value3", replicant2.get("test3"));
          replicant2.remove("test3");
          assertNull(replicant1.get("test3"));
      }
      
  }
  
  
  
  1.1                  incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/replication/UpdateEvent.java
  
  Index: UpdateEvent.java
  ===================================================================
  /**
   *
   * Copyright 2004 The Apache Software Foundation
   *
   *  Licensed under the Apache License, Version 2.0 (the "License");
   *  you may not use this file except in compliance with the License.
   *  You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   *  Unless required by applicable law or agreed to in writing, software
   *  distributed under the License is distributed on an "AS IS" BASIS,
   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   *  See the License for the specific language governing permissions and
   *  limitations under the License.
   */
  
  package org.apache.geronimo.datastore.impl.remote.replication;
  
  import java.io.Serializable;
  
  /**
   * Event to be multicasted by a ReplicationCapable upon modification.
   *
   * @version $Revision: 1.1 $ $Date: 2004/03/03 15:27:32 $
   */
  public interface UpdateEvent extends Serializable
  {
  
      /**
       * Gets the event identifier.
       * 
       * @return Event identifier.
       */
      public int getId();
      
      /**
       * Gets the target of the event. It must be the ReplicationCapable which
       * has been updated.
       * 
       * @return Instance which has been updated.
       */
      public Object getTarget();
      
      /**
       * Sets the target of this event.
       * 
       * @param aTarget Instance which has been updated.
       */
      public void setTarget(Object aTarget);
      
  }
  
  
  
  1.1                  incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/replication/UpdateListener.java
  
  Index: UpdateListener.java
  ===================================================================
  /**
   *
   * Copyright 2004 The Apache Software Foundation
   *
   *  Licensed under the Apache License, Version 2.0 (the "License");
   *  you may not use this file except in compliance with the License.
   *  You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   *  Unless required by applicable law or agreed to in writing, software
   *  distributed under the License is distributed on an "AS IS" BASIS,
   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   *  See the License for the specific language governing permissions and
   *  limitations under the License.
   */
  
  package org.apache.geronimo.datastore.impl.remote.replication;
  
  /**
   * UpdateEvent listener.
   *
   * @version $Revision: 1.1 $ $Date: 2004/03/03 15:27:32 $
   */
  public interface UpdateListener
  {
  
      /**
       * Fire an UpdateEvent on this listener.
       * 
       * @param anEvent UpdateEvent to be fired.
       */
      public void fireUpdateEvent(UpdateEvent anEvent);
      
  }
  
  
  
  1.1                  incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/replication/SimpleReplicatedMap.java
  
  Index: SimpleReplicatedMap.java
  ===================================================================
  /**
   *
   * Copyright 2004 The Apache Software Foundation
   *
   *  Licensed under the Apache License, Version 2.0 (the "License");
   *  you may not use this file except in compliance with the License.
   *  You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   *  Unless required by applicable law or agreed to in writing, software
   *  distributed under the License is distributed on an "AS IS" BASIS,
   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   *  See the License for the specific language governing permissions and
   *  limitations under the License.
   */
  
  package org.apache.geronimo.datastore.impl.remote.replication;
  
  import java.util.Collection;
  import java.util.HashMap;
  import java.util.HashSet;
  import java.util.Iterator;
  import java.util.Map;
  import java.util.Set;
  
  /**
   * A simple Map, which is ReplicationCapable aware.
   *
   * @version $Revision: 1.1 $ $Date: 2004/03/03 15:27:32 $
   */
  public class SimpleReplicatedMap
      implements Map, ReplicationCapable
  {
  
      private Map delegate;
      private Set listeners;
      private Object objectID;
      
      public SimpleReplicatedMap() {
          delegate = new HashMap();
          listeners = new HashSet();
      }
  
      public void setID(Object anID) {
          objectID = anID;
      }
      
      public Object getID() {
          return objectID; 
      }
      
      public void addUpdateListener(UpdateListener aListener) {
          synchronized(listeners) {
              listeners.add(aListener);
          }
      }
      
      public void removeUpdateListener(UpdateListener aListener) {
          synchronized(listeners) {
              listeners.remove(aListener);
          }
      }
      
      private void multicastEvent(MapUpdateEvent anEvent) {
          synchronized(listeners) {
              for (Iterator iter = listeners.iterator(); iter.hasNext();) {
                  UpdateListener listener = (UpdateListener) iter.next();
                  listener.fireUpdateEvent(anEvent);
              }
          }
      }
      
      public void mergeWithUpdate(UpdateEvent anEvent)
          throws ReplicationException {
          MapUpdateEvent event = (MapUpdateEvent) anEvent;
          int id = anEvent.getId();
          switch (id) {
              case MapUpdateEvent.CLEAR:
                  delegate.clear();
                  break;
              case MapUpdateEvent.PUT:
                  delegate.put(event.key, event.value);
                  break;
              case MapUpdateEvent.PUTALL:
                  delegate.putAll(event.map);
                  break;
              case MapUpdateEvent.REMOVE:
                  delegate.remove(event.key);
                  break;
              default:
                  throw new ReplicationException("Undefined event id.");
          }
      }
  
      public void clear() {
          multicastEvent(new MapUpdateEvent(MapUpdateEvent.CLEAR, this));
          delegate.clear();
      }
  
      public boolean containsKey(Object key) {
          return delegate.containsKey(key);
      }
  
      public boolean containsValue(Object value) {
          return delegate.containsValue(value);
      }
  
      public Set entrySet() {
          return delegate.entrySet();
      }
  
      public boolean equals(Object obj) {
          return delegate.equals(obj);
      }
  
      public Object get(Object key) {
          return delegate.get(key);
      }
  
      public int hashCode() {
          return delegate.hashCode();
      }
  
      public boolean isEmpty() {
          return delegate.isEmpty();
      }
  
      public Set keySet() {
          return delegate.keySet();
      }
  
      public Object put(Object key, Object value) {
          multicastEvent(
              new MapUpdateEvent(MapUpdateEvent.PUT, this, key, value));
          return delegate.put(key, value);
      }
  
      public void putAll(Map t) {
          multicastEvent(new MapUpdateEvent(t, this));
          delegate.putAll(t);
      }
  
      public Object remove(Object key) {
          multicastEvent(
              new MapUpdateEvent(MapUpdateEvent.REMOVE, this, key, null));
          return delegate.remove(key);
      }
  
      public int size() {
          return delegate.size();
      }
  
      public Collection values() {
          return delegate.values();
      }
  
      public static class MapUpdateEvent implements UpdateEvent {
          private static final int BASE = 1;
          public static final int CLEAR = 1 + BASE;
          public static final int PUT = 2 + BASE;
          public static final int PUTALL = 3 + BASE;
          public static final int REMOVE = 4 + BASE;
          
          private final int id;
          private Object target;
          private final Object key;
          private final Object value;
          private Map map;
          public MapUpdateEvent(int anId, Object aTarget) {
              this(anId, aTarget, null, null);
          }
          public MapUpdateEvent(int anId, Object aTarget,
              Object aKey,
              Object aValue) {
              id = anId;
              target = aTarget;
              key = aKey;
              value = aValue;
          }
          public MapUpdateEvent(Map aMap, Object aTarget) {
              this(PUTALL, aTarget, null, null);
              map = aMap;
          }
          public int getId() {
              return id;
          }
          public Object getTarget() {
              return target;
          }
          public void setTarget(Object aTarget) {
              target = aTarget;
          }
      }
      
  }
  
  
  
  1.1                  incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/replication/ReplicationException.java
  
  Index: ReplicationException.java
  ===================================================================
  /**
   *
   * Copyright 2004 The Apache Software Foundation
   *
   *  Licensed under the Apache License, Version 2.0 (the "License");
   *  you may not use this file except in compliance with the License.
   *  You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   *  Unless required by applicable law or agreed to in writing, software
   *  distributed under the License is distributed on an "AS IS" BASIS,
   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   *  See the License for the specific language governing permissions and
   *  limitations under the License.
   */
  
  package org.apache.geronimo.datastore.impl.remote.replication;
  
  /**
   * Exception raised when a replication is not possible.
   *
   * @version $Revision: 1.1 $ $Date: 2004/03/03 15:27:32 $
   */
  public class ReplicationException extends Exception {
  
      public ReplicationException(String aMessage) {
          super(aMessage);
      }
      
      public ReplicationException(String aMessage, Throwable aNested) {
          super(aMessage, aNested);
      }
      
  }
  
  
  
  1.1                  incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/replication/ReplicationCapable.java
  
  Index: ReplicationCapable.java
  ===================================================================
  /**
   *
   * Copyright 2004 The Apache Software Foundation
   *
   *  Licensed under the Apache License, Version 2.0 (the "License");
   *  you may not use this file except in compliance with the License.
   *  You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   *  Unless required by applicable law or agreed to in writing, software
   *  distributed under the License is distributed on an "AS IS" BASIS,
   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   *  See the License for the specific language governing permissions and
   *  limitations under the License.
   */
  
  package org.apache.geronimo.datastore.impl.remote.replication;
  
  import java.io.Serializable;
  
  /**
   *
   * TODO introduce versioning.
   *
   * @version $Revision: 1.1 $ $Date: 2004/03/03 15:27:32 $
   */
  public interface ReplicationCapable
      extends Serializable
  {
  
      /**
       * Identifier of this ReplicationCapable. It identifies uniquely the
       * instance in the scope of a replication group.
       * <BR>
       * Identfiers are automatically created by ReplicationMembers.
       * 
       * @return Identifier of this instance.
       */
      public Object getID();
      
      /**
       * Sets the identifier of this instance in the scope of the replication
       * group managing it.
       * 
       * @param anID Identifier.
       */
      public void setID(Object anID);
      
      /**
       * Adds an UpdateEvent listener.
       * 
       * @param aListener Listener to be notified when an update is performed
       * on this instance.
       */
      public void addUpdateListener(UpdateListener aListener);
      
      /**
       * Removes the specified UpdateListener. 
       * 
       * @param aListener Listener to be removed.
       */
      public void removeUpdateListener(UpdateListener aListener);
      
      /**
       * Merges an UpdateEvent with the state of this instance. 
       * 
       * @param anEvent UpdateEvent to be merged with this instance.
       * @throws ReplicationException Indicates that the merge can not be
       * performed.
       */
      public void mergeWithUpdate(UpdateEvent anEvent)
          throws ReplicationException;
      
  }
  
  
  
  1.1                  incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/replication/ReplicationMember.java
  
  Index: ReplicationMember.java
  ===================================================================
  /**
   *
   * Copyright 2004 The Apache Software Foundation
   *
   *  Licensed under the Apache License, Version 2.0 (the "License");
   *  you may not use this file except in compliance with the License.
   *  You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   *  Unless required by applicable law or agreed to in writing, software
   *  distributed under the License is distributed on an "AS IS" BASIS,
   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   *  See the License for the specific language governing permissions and
   *  limitations under the License.
   */
  
  package org.apache.geronimo.datastore.impl.remote.replication;
  
  import java.io.Serializable;
  import java.util.HashMap;
  import java.util.Map;
  
  import org.apache.geronimo.datastore.impl.remote.messaging.CommandRequest;
  import org.apache.geronimo.datastore.impl.remote.messaging.CommandResult;
  import org.apache.geronimo.datastore.impl.remote.messaging.Connector;
  import org.apache.geronimo.datastore.impl.remote.messaging.HeaderOutInterceptor;
  import org.apache.geronimo.datastore.impl.remote.messaging.Msg;
  import org.apache.geronimo.datastore.impl.remote.messaging.MsgBody;
  import org.apache.geronimo.datastore.impl.remote.messaging.MsgHeader;
  import org.apache.geronimo.datastore.impl.remote.messaging.MsgHeaderConstants;
  import org.apache.geronimo.datastore.impl.remote.messaging.MsgOutInterceptor;
  import org.apache.geronimo.datastore.impl.remote.messaging.RequestSender;
  import org.apache.geronimo.gbean.GBean;
  import org.apache.geronimo.gbean.GBeanContext;
  import org.apache.geronimo.gbean.WaitingException;
  
  /**
   * A replication group member.
   * <BR>
   * This is a Connector in charge of replicating the state of registered
   * ReplicantCapables across N-nodes, which constitute a replication group.
   * <BR>
   * Replication members are organized as follow:
   * <pre>
   * ReplicationMember -- MTO -- ServerNode -- MTM -- ServerNode -- OTM -- ReplicationMember
   * </pre>
   *
   * @version $Revision: 1.1 $ $Date: 2004/03/03 15:27:32 $
   */
  public class ReplicationMember
      implements UpdateListener, Connector, GBean
  {
  
      /**
       * Name of the replication group.
       */
      private final String name;
      
      /**
       * ReplicantID to ReplicantCapable Map.
       */
      private final Map idToReplicant;
      
      /**
       * Names of the nodes hosting the other members of the replication group
       * of this member.
       */
      private String[] targetNodes;
      
      /**
       * Output to be used to send requests.
       */
      private MsgOutInterceptor requestOut;
      
      /**
       * Output to be used to send results.
       */
      private MsgOutInterceptor resultOut;
      
      /**
       * Requests sender.
       */
      private final RequestSender sender;
      
      /**
       * Creates a replication group member.
       * 
       * @param aName Name of the replication group owning this member.
       * @param aTargetNodes Names of the nodes hosting the other members of the
       * replication group containing this member.
       */
      public ReplicationMember(String aName, String[] aTargetNodes) {
          if ( null == aName ) {
              throw new IllegalArgumentException("Name is required");
          } else if ( null == aTargetNodes ) {
              throw new IllegalArgumentException("Node names is required");
          }
          name = aName;
          targetNodes = aTargetNodes;
          idToReplicant = new HashMap();
          sender = new RequestSender();
      }
      
      public String getName() {
          return name;
      }
  
      public void fireUpdateEvent(UpdateEvent anEvent) {
          // One does not send the actual ReplicantCapable in the case of an
          // update. Instead, one sends only its identifier.
          ReplicationCapable target = (ReplicationCapable) anEvent.getTarget();
          anEvent.setTarget(target.getID());
          sender.sendSyncRequest(
              new CommandRequest("mergeWithUpdate", new Object[] {anEvent}),
              requestOut);
      }
  
      /**
       * Merges an UpdateEvent with a registered ReplicationCapable.
       * 
       * @param anEvent Update event to be merged.
       * @throws ReplicationException Indicates that the merge can not be
       * performed.
       */
      public void mergeWithUpdate(UpdateEvent anEvent)
          throws ReplicationException {
          ReplicantID id = (ReplicantID) anEvent.getTarget();
          ReplicationCapable replicationCapable;
          synchronized(idToReplicant) {
              replicationCapable = (ReplicationCapable) idToReplicant.get(id);
          }
          if ( null == replicationCapable ) {
              throw new ReplicationException(
                  "No ReplicantCapable with the id {" + id + "}");
          }
          replicationCapable.mergeWithUpdate(anEvent);
      }
      
      /**
       * Registers a ReplicantCapable. From now, UpdateEvents multicasted
       * by the provided ReplicantCapable are also pushed to the replication
       * group.
       * 
       * @param aReplicant ReplicantCapable to be controlled by this group.
       */
      public void registerReplicantCapable(ReplicationCapable aReplicant) {
          ReplicantID id = new ReplicantID();
          aReplicant.setID(id);
          sender.sendSyncRequest(
              new CommandRequest("registerLocalReplicantCapable",
                  new Object[] {aReplicant}),
              requestOut);
          synchronized(idToReplicant) {
              idToReplicant.put(id, aReplicant);
              aReplicant.addUpdateListener(this);
          }
      }
      
      /**
       * This method is for internal use only.
       * <BR>
       * It registers with this member a ReplicationCapable, which has been
       * registered by a remote member.  
       * 
       * @param aReplicant ReplicantCapable to be locally registered.
       */
      public void registerLocalReplicantCapable(ReplicationCapable aReplicant) {
          synchronized(idToReplicant) {
              aReplicant.addUpdateListener(this);
              idToReplicant.put(aReplicant.getID(), aReplicant);
          }
      }
      
      /**
       * Retrieves the ReplicationCapable having the specified id.
       * 
       * @param anID Replicant identifier.
       * @return ReplicantCapable having the specified id or null if such an
       * identifier is not known.
       */
      public ReplicationCapable retrieveReplicantCapable(Object anID) {
          synchronized(idToReplicant) {
              return (ReplicationCapable) idToReplicant.get(anID);
          }
      }
      
      public void setOutput(MsgOutInterceptor anOut) {
          if ( null != anOut ) {
              MsgOutInterceptor out =
                  new HeaderOutInterceptor(
                      MsgHeaderConstants.DEST_CONNECTOR,
                      name,
                      new HeaderOutInterceptor(
                          MsgHeaderConstants.DEST_NODE,
                          targetNodes,
                          anOut));
              requestOut =
                  new HeaderOutInterceptor(
                      MsgHeaderConstants.BODY_TYPE,
                      MsgBody.Type.REQUEST,
                      out);
              resultOut = 
                  new HeaderOutInterceptor(
                      MsgHeaderConstants.BODY_TYPE,
                      MsgBody.Type.RESPONSE,
                      out);
          } else {
              requestOut = null;
              resultOut = null;
          }
      }
  
      public void deliver(Msg aMsg) {
          MsgHeader header = aMsg.getHeader();
          MsgBody.Type bodyType =
          (MsgBody.Type) header.getHeader(MsgHeaderConstants.BODY_TYPE);
          if ( bodyType.equals(MsgBody.Type.REQUEST) ) {
              handleRequest(aMsg);
          } else if ( bodyType.equals(MsgBody.Type.RESPONSE) ) {
              handleResponse(aMsg);
          }
      }
      
      /**
       * Handles a request Msg.
       * 
       * @param aMsg Request Msg to be handled.
       */
      protected void handleRequest(Msg aMsg) {
          MsgBody body = aMsg.getBody();
          MsgHeader header = aMsg.getHeader();
          Object sourceNode = header.getHeader(MsgHeaderConstants.SRC_NODE);
          Object id = header.getHeader(MsgHeaderConstants.CORRELATION_ID);
          CommandRequest command;
          String gateway;
          command = (CommandRequest) body.getContent();
          command.setTarget(this);
          CommandResult result = command.execute();
          Msg msg = new Msg();
          body = msg.getBody();
          body.setContent(result);
          MsgOutInterceptor reqOut =
              new HeaderOutInterceptor(
                  MsgHeaderConstants.CORRELATION_ID,
                  id,
                  new HeaderOutInterceptor(
                      MsgHeaderConstants.DEST_NODE,
                      targetNodes,
                      new HeaderOutInterceptor(
                          MsgHeaderConstants.DEST_CONNECTOR,
                          name,
                          resultOut)));
          reqOut.push(msg);
      }
  
      /**
       * Handles a response Msg.
       * 
       * @param aMsg Response to be handled.
       */
      protected void handleResponse(Msg aMsg) {
          MsgBody body = aMsg.getBody();
          MsgHeader header = aMsg.getHeader();
          CommandResult result;
          result = (CommandResult) body.getContent();
          sender.setResponse(
              (Integer) header.getHeader(MsgHeaderConstants.CORRELATION_ID),
              result);
      }
      
      public void setGBeanContext(GBeanContext context) {
      }
  
      public void doStart() throws WaitingException, Exception {
      }
  
      public void doStop() throws WaitingException, Exception {
      }
  
      public void doFail() {
      }
  
      /**
       * ReplicantCapable identifier. 
       */
      private static class ReplicantID implements Serializable {
          private static volatile int seqId = 0;
          private final int id;
          private ReplicantID() {
              id = seqId++;
          }
          public int hashCode() {
              // TODO improve me.
              return id;
          }
          public boolean equals(Object obj) {
              if ( false == obj instanceof ReplicantID ) {
                 return false;
              }
              ReplicantID replicantID = (ReplicantID) obj;
              return id == replicantID.id;
          }
      }
  
  }
  
  
  
  1.2       +5 -1      incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/MsgQueue.java
  
  Index: MsgQueue.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/MsgQueue.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- MsgQueue.java	25 Feb 2004 13:36:15 -0000	1.1
  +++ MsgQueue.java	3 Mar 2004 15:27:33 -0000	1.2
  @@ -93,4 +93,8 @@
           return message;
       }
       
  +    public String toString() {
  +        return "MsgQueue {" + name + "}";
  +    }
  +    
   }
  
  
  
  1.4       +13 -7     incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/ServerProcessors.java
  
  Index: ServerProcessors.java
  ===================================================================
  RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/ServerProcessors.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- ServerProcessors.java	3 Mar 2004 13:10:07 -0000	1.3
  +++ ServerProcessors.java	3 Mar 2004 15:27:33 -0000	1.4
  @@ -101,13 +101,19 @@
                   Msg msg = in.pop();
                   Object destNode = in.getHeader();
                   MsgOutInterceptor out;
  -                try {
  -                    out = server.getOutForNode((String) destNode);
  -                } catch (CommunicationException e) {
  -                    log.error(e);
  -                    continue;
  +                if ( destNode instanceof String ) {
  +                    destNode = new String[] {(String) destNode};
  +                }
  +                String[] dests = (String[]) destNode;
  +                for (int i = 0; i < dests.length; i++) {
  +                    try {
  +                        out = server.getOutForNode(dests[i]);
  +                    } catch (CommunicationException e) {
  +                        log.error(e);
  +                        continue;
  +                    }
  +                    out.push(msg);
                   }
  -                out.push(msg);
               }
           }