You are viewing a plain text version of this content. The canonical link for it is here.
Posted to scm@geronimo.apache.org by gd...@apache.org on 2004/03/03 16:27:33 UTC
cvs commit: incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging MsgQueue.java ServerProcessors.java
gdamour 2004/03/03 07:27:33
Modified: sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging
MsgQueue.java ServerProcessors.java
Added: sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/replication
ReplicationTest.java
sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/replication
UpdateEvent.java UpdateListener.java
SimpleReplicatedMap.java ReplicationException.java
ReplicationCapable.java ReplicationMember.java
Log:
A simple replication framework based on the messaging networking
infrastructure.
A basic Map has been implemented in order to depict how this framework
could be leverage and a use-case highlights how the classes fit
together.
Revision Changes Path
1.1 incubator-geronimo/sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/replication/ReplicationTest.java
Index: ReplicationTest.java
===================================================================
/**
*
* Copyright 2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.geronimo.datastore.impl.remote.replication;
import java.net.InetAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import junit.framework.TestCase;
import org.apache.geronimo.datastore.impl.remote.messaging.NodeInfo;
import org.apache.geronimo.datastore.impl.remote.messaging.ServerNode;
/**
*
* @version $Revision: 1.1 $ $Date: 2004/03/03 15:27:32 $
*/
public class ReplicationTest extends TestCase {
SimpleReplicatedMap replicant1;
ReplicationMember replication1;
ReplicationMember replication2;
protected void setUp() throws Exception {
replicant1 = new SimpleReplicatedMap();
replication1 = new ReplicationMember("Replication1", new String[] {"Node2"});
InetAddress address = InetAddress.getLocalHost();
NodeInfo nodeInfo1 = new NodeInfo("Node1", address, 8080);
ServerNode server1 = new ServerNode(nodeInfo1,
Collections.singleton(replication1));
server1.doStart();
replication1.doStart();
replication2 = new ReplicationMember("Replication1", new String[] {"Node1"});
NodeInfo nodeInfo2 = new NodeInfo("Node2", address, 8082);
ServerNode server2 = new ServerNode(nodeInfo2,
Collections.singleton(replication2));
server2.doStart();
replication2.doStart();
server2.join(nodeInfo1);
}
public void testUseCase() {
replicant1.put("test1", "value1");
replication1.registerReplicantCapable(replicant1);
Object id = replicant1.getID();
SimpleReplicatedMap replicant2 =
(SimpleReplicatedMap) replication2.retrieveReplicantCapable(id);
assertNotNull("Not been registered", replicant2);
assertEquals("value1", replicant2.get("test1"));
replicant1.put("test2", "value2");
assertEquals("value2", replicant2.get("test2"));
replicant1.remove("test1");
assertNull(replicant2.get("test1"));
Map tmp = new HashMap();
tmp.put("test3", "value3");
replicant1.putAll(tmp);
assertEquals("value3", replicant2.get("test3"));
replicant2.remove("test3");
assertNull(replicant1.get("test3"));
}
}
1.1 incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/replication/UpdateEvent.java
Index: UpdateEvent.java
===================================================================
/**
*
* Copyright 2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.geronimo.datastore.impl.remote.replication;
import java.io.Serializable;
/**
* Event to be multicasted by a ReplicationCapable upon modification.
*
* @version $Revision: 1.1 $ $Date: 2004/03/03 15:27:32 $
*/
public interface UpdateEvent extends Serializable
{
/**
* Gets the event identifier.
*
* @return Event identifier.
*/
public int getId();
/**
* Gets the target of the event. It must be the ReplicationCapable which
* has been updated.
*
* @return Instance which has been updated.
*/
public Object getTarget();
/**
* Sets the target of this event.
*
* @param aTarget Instance which has been updated.
*/
public void setTarget(Object aTarget);
}
1.1 incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/replication/UpdateListener.java
Index: UpdateListener.java
===================================================================
/**
*
* Copyright 2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.geronimo.datastore.impl.remote.replication;
/**
* UpdateEvent listener.
*
* @version $Revision: 1.1 $ $Date: 2004/03/03 15:27:32 $
*/
public interface UpdateListener
{
/**
* Fire an UpdateEvent on this listener.
*
* @param anEvent UpdateEvent to be fired.
*/
public void fireUpdateEvent(UpdateEvent anEvent);
}
1.1 incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/replication/SimpleReplicatedMap.java
Index: SimpleReplicatedMap.java
===================================================================
/**
*
* Copyright 2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.geronimo.datastore.impl.remote.replication;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
/**
* A simple Map, which is ReplicationCapable aware.
*
* @version $Revision: 1.1 $ $Date: 2004/03/03 15:27:32 $
*/
public class SimpleReplicatedMap
implements Map, ReplicationCapable
{
private Map delegate;
private Set listeners;
private Object objectID;
public SimpleReplicatedMap() {
delegate = new HashMap();
listeners = new HashSet();
}
public void setID(Object anID) {
objectID = anID;
}
public Object getID() {
return objectID;
}
public void addUpdateListener(UpdateListener aListener) {
synchronized(listeners) {
listeners.add(aListener);
}
}
public void removeUpdateListener(UpdateListener aListener) {
synchronized(listeners) {
listeners.remove(aListener);
}
}
private void multicastEvent(MapUpdateEvent anEvent) {
synchronized(listeners) {
for (Iterator iter = listeners.iterator(); iter.hasNext();) {
UpdateListener listener = (UpdateListener) iter.next();
listener.fireUpdateEvent(anEvent);
}
}
}
public void mergeWithUpdate(UpdateEvent anEvent)
throws ReplicationException {
MapUpdateEvent event = (MapUpdateEvent) anEvent;
int id = anEvent.getId();
switch (id) {
case MapUpdateEvent.CLEAR:
delegate.clear();
break;
case MapUpdateEvent.PUT:
delegate.put(event.key, event.value);
break;
case MapUpdateEvent.PUTALL:
delegate.putAll(event.map);
break;
case MapUpdateEvent.REMOVE:
delegate.remove(event.key);
break;
default:
throw new ReplicationException("Undefined event id.");
}
}
public void clear() {
multicastEvent(new MapUpdateEvent(MapUpdateEvent.CLEAR, this));
delegate.clear();
}
public boolean containsKey(Object key) {
return delegate.containsKey(key);
}
public boolean containsValue(Object value) {
return delegate.containsValue(value);
}
public Set entrySet() {
return delegate.entrySet();
}
public boolean equals(Object obj) {
return delegate.equals(obj);
}
public Object get(Object key) {
return delegate.get(key);
}
public int hashCode() {
return delegate.hashCode();
}
public boolean isEmpty() {
return delegate.isEmpty();
}
public Set keySet() {
return delegate.keySet();
}
public Object put(Object key, Object value) {
multicastEvent(
new MapUpdateEvent(MapUpdateEvent.PUT, this, key, value));
return delegate.put(key, value);
}
public void putAll(Map t) {
multicastEvent(new MapUpdateEvent(t, this));
delegate.putAll(t);
}
public Object remove(Object key) {
multicastEvent(
new MapUpdateEvent(MapUpdateEvent.REMOVE, this, key, null));
return delegate.remove(key);
}
public int size() {
return delegate.size();
}
public Collection values() {
return delegate.values();
}
public static class MapUpdateEvent implements UpdateEvent {
private static final int BASE = 1;
public static final int CLEAR = 1 + BASE;
public static final int PUT = 2 + BASE;
public static final int PUTALL = 3 + BASE;
public static final int REMOVE = 4 + BASE;
private final int id;
private Object target;
private final Object key;
private final Object value;
private Map map;
public MapUpdateEvent(int anId, Object aTarget) {
this(anId, aTarget, null, null);
}
public MapUpdateEvent(int anId, Object aTarget,
Object aKey,
Object aValue) {
id = anId;
target = aTarget;
key = aKey;
value = aValue;
}
public MapUpdateEvent(Map aMap, Object aTarget) {
this(PUTALL, aTarget, null, null);
map = aMap;
}
public int getId() {
return id;
}
public Object getTarget() {
return target;
}
public void setTarget(Object aTarget) {
target = aTarget;
}
}
}
1.1 incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/replication/ReplicationException.java
Index: ReplicationException.java
===================================================================
/**
*
* Copyright 2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.geronimo.datastore.impl.remote.replication;
/**
* Exception raised when a replication is not possible.
*
* @version $Revision: 1.1 $ $Date: 2004/03/03 15:27:32 $
*/
public class ReplicationException extends Exception {
public ReplicationException(String aMessage) {
super(aMessage);
}
public ReplicationException(String aMessage, Throwable aNested) {
super(aMessage, aNested);
}
}
1.1 incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/replication/ReplicationCapable.java
Index: ReplicationCapable.java
===================================================================
/**
*
* Copyright 2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.geronimo.datastore.impl.remote.replication;
import java.io.Serializable;
/**
*
* TODO introduce versioning.
*
* @version $Revision: 1.1 $ $Date: 2004/03/03 15:27:32 $
*/
public interface ReplicationCapable
extends Serializable
{
/**
* Identifier of this ReplicationCapable. It identifies uniquely the
* instance in the scope of a replication group.
* <BR>
* Identfiers are automatically created by ReplicationMembers.
*
* @return Identifier of this instance.
*/
public Object getID();
/**
* Sets the identifier of this instance in the scope of the replication
* group managing it.
*
* @param anID Identifier.
*/
public void setID(Object anID);
/**
* Adds an UpdateEvent listener.
*
* @param aListener Listener to be notified when an update is performed
* on this instance.
*/
public void addUpdateListener(UpdateListener aListener);
/**
* Removes the specified UpdateListener.
*
* @param aListener Listener to be removed.
*/
public void removeUpdateListener(UpdateListener aListener);
/**
* Merges an UpdateEvent with the state of this instance.
*
* @param anEvent UpdateEvent to be merged with this instance.
* @throws ReplicationException Indicates that the merge can not be
* performed.
*/
public void mergeWithUpdate(UpdateEvent anEvent)
throws ReplicationException;
}
1.1 incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/replication/ReplicationMember.java
Index: ReplicationMember.java
===================================================================
/**
*
* Copyright 2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.geronimo.datastore.impl.remote.replication;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import org.apache.geronimo.datastore.impl.remote.messaging.CommandRequest;
import org.apache.geronimo.datastore.impl.remote.messaging.CommandResult;
import org.apache.geronimo.datastore.impl.remote.messaging.Connector;
import org.apache.geronimo.datastore.impl.remote.messaging.HeaderOutInterceptor;
import org.apache.geronimo.datastore.impl.remote.messaging.Msg;
import org.apache.geronimo.datastore.impl.remote.messaging.MsgBody;
import org.apache.geronimo.datastore.impl.remote.messaging.MsgHeader;
import org.apache.geronimo.datastore.impl.remote.messaging.MsgHeaderConstants;
import org.apache.geronimo.datastore.impl.remote.messaging.MsgOutInterceptor;
import org.apache.geronimo.datastore.impl.remote.messaging.RequestSender;
import org.apache.geronimo.gbean.GBean;
import org.apache.geronimo.gbean.GBeanContext;
import org.apache.geronimo.gbean.WaitingException;
/**
* A replication group member.
* <BR>
* This is a Connector in charge of replicating the state of registered
* ReplicantCapables across N-nodes, which constitute a replication group.
* <BR>
* Replication members are organized as follow:
* <pre>
* ReplicationMember -- MTO -- ServerNode -- MTM -- ServerNode -- OTM -- ReplicationMember
* </pre>
*
* @version $Revision: 1.1 $ $Date: 2004/03/03 15:27:32 $
*/
public class ReplicationMember
implements UpdateListener, Connector, GBean
{
/**
* Name of the replication group.
*/
private final String name;
/**
* ReplicantID to ReplicantCapable Map.
*/
private final Map idToReplicant;
/**
* Names of the nodes hosting the other members of the replication group
* of this member.
*/
private String[] targetNodes;
/**
* Output to be used to send requests.
*/
private MsgOutInterceptor requestOut;
/**
* Output to be used to send results.
*/
private MsgOutInterceptor resultOut;
/**
* Requests sender.
*/
private final RequestSender sender;
/**
* Creates a replication group member.
*
* @param aName Name of the replication group owning this member.
* @param aTargetNodes Names of the nodes hosting the other members of the
* replication group containing this member.
*/
public ReplicationMember(String aName, String[] aTargetNodes) {
if ( null == aName ) {
throw new IllegalArgumentException("Name is required");
} else if ( null == aTargetNodes ) {
throw new IllegalArgumentException("Node names is required");
}
name = aName;
targetNodes = aTargetNodes;
idToReplicant = new HashMap();
sender = new RequestSender();
}
public String getName() {
return name;
}
public void fireUpdateEvent(UpdateEvent anEvent) {
// One does not send the actual ReplicantCapable in the case of an
// update. Instead, one sends only its identifier.
ReplicationCapable target = (ReplicationCapable) anEvent.getTarget();
anEvent.setTarget(target.getID());
sender.sendSyncRequest(
new CommandRequest("mergeWithUpdate", new Object[] {anEvent}),
requestOut);
}
/**
* Merges an UpdateEvent with a registered ReplicationCapable.
*
* @param anEvent Update event to be merged.
* @throws ReplicationException Indicates that the merge can not be
* performed.
*/
public void mergeWithUpdate(UpdateEvent anEvent)
throws ReplicationException {
ReplicantID id = (ReplicantID) anEvent.getTarget();
ReplicationCapable replicationCapable;
synchronized(idToReplicant) {
replicationCapable = (ReplicationCapable) idToReplicant.get(id);
}
if ( null == replicationCapable ) {
throw new ReplicationException(
"No ReplicantCapable with the id {" + id + "}");
}
replicationCapable.mergeWithUpdate(anEvent);
}
/**
* Registers a ReplicantCapable. From now, UpdateEvents multicasted
* by the provided ReplicantCapable are also pushed to the replication
* group.
*
* @param aReplicant ReplicantCapable to be controlled by this group.
*/
public void registerReplicantCapable(ReplicationCapable aReplicant) {
ReplicantID id = new ReplicantID();
aReplicant.setID(id);
sender.sendSyncRequest(
new CommandRequest("registerLocalReplicantCapable",
new Object[] {aReplicant}),
requestOut);
synchronized(idToReplicant) {
idToReplicant.put(id, aReplicant);
aReplicant.addUpdateListener(this);
}
}
/**
* This method is for internal use only.
* <BR>
* It registers with this member a ReplicationCapable, which has been
* registered by a remote member.
*
* @param aReplicant ReplicantCapable to be locally registered.
*/
public void registerLocalReplicantCapable(ReplicationCapable aReplicant) {
synchronized(idToReplicant) {
aReplicant.addUpdateListener(this);
idToReplicant.put(aReplicant.getID(), aReplicant);
}
}
/**
* Retrieves the ReplicationCapable having the specified id.
*
* @param anID Replicant identifier.
* @return ReplicantCapable having the specified id or null if such an
* identifier is not known.
*/
public ReplicationCapable retrieveReplicantCapable(Object anID) {
synchronized(idToReplicant) {
return (ReplicationCapable) idToReplicant.get(anID);
}
}
public void setOutput(MsgOutInterceptor anOut) {
if ( null != anOut ) {
MsgOutInterceptor out =
new HeaderOutInterceptor(
MsgHeaderConstants.DEST_CONNECTOR,
name,
new HeaderOutInterceptor(
MsgHeaderConstants.DEST_NODE,
targetNodes,
anOut));
requestOut =
new HeaderOutInterceptor(
MsgHeaderConstants.BODY_TYPE,
MsgBody.Type.REQUEST,
out);
resultOut =
new HeaderOutInterceptor(
MsgHeaderConstants.BODY_TYPE,
MsgBody.Type.RESPONSE,
out);
} else {
requestOut = null;
resultOut = null;
}
}
public void deliver(Msg aMsg) {
MsgHeader header = aMsg.getHeader();
MsgBody.Type bodyType =
(MsgBody.Type) header.getHeader(MsgHeaderConstants.BODY_TYPE);
if ( bodyType.equals(MsgBody.Type.REQUEST) ) {
handleRequest(aMsg);
} else if ( bodyType.equals(MsgBody.Type.RESPONSE) ) {
handleResponse(aMsg);
}
}
/**
* Handles a request Msg.
*
* @param aMsg Request Msg to be handled.
*/
protected void handleRequest(Msg aMsg) {
MsgBody body = aMsg.getBody();
MsgHeader header = aMsg.getHeader();
Object sourceNode = header.getHeader(MsgHeaderConstants.SRC_NODE);
Object id = header.getHeader(MsgHeaderConstants.CORRELATION_ID);
CommandRequest command;
String gateway;
command = (CommandRequest) body.getContent();
command.setTarget(this);
CommandResult result = command.execute();
Msg msg = new Msg();
body = msg.getBody();
body.setContent(result);
MsgOutInterceptor reqOut =
new HeaderOutInterceptor(
MsgHeaderConstants.CORRELATION_ID,
id,
new HeaderOutInterceptor(
MsgHeaderConstants.DEST_NODE,
targetNodes,
new HeaderOutInterceptor(
MsgHeaderConstants.DEST_CONNECTOR,
name,
resultOut)));
reqOut.push(msg);
}
/**
* Handles a response Msg.
*
* @param aMsg Response to be handled.
*/
protected void handleResponse(Msg aMsg) {
MsgBody body = aMsg.getBody();
MsgHeader header = aMsg.getHeader();
CommandResult result;
result = (CommandResult) body.getContent();
sender.setResponse(
(Integer) header.getHeader(MsgHeaderConstants.CORRELATION_ID),
result);
}
public void setGBeanContext(GBeanContext context) {
}
public void doStart() throws WaitingException, Exception {
}
public void doStop() throws WaitingException, Exception {
}
public void doFail() {
}
/**
* ReplicantCapable identifier.
*/
private static class ReplicantID implements Serializable {
private static volatile int seqId = 0;
private final int id;
private ReplicantID() {
id = seqId++;
}
public int hashCode() {
// TODO improve me.
return id;
}
public boolean equals(Object obj) {
if ( false == obj instanceof ReplicantID ) {
return false;
}
ReplicantID replicantID = (ReplicantID) obj;
return id == replicantID.id;
}
}
}
1.2 +5 -1 incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/MsgQueue.java
Index: MsgQueue.java
===================================================================
RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/MsgQueue.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- MsgQueue.java 25 Feb 2004 13:36:15 -0000 1.1
+++ MsgQueue.java 3 Mar 2004 15:27:33 -0000 1.2
@@ -93,4 +93,8 @@
return message;
}
+ public String toString() {
+ return "MsgQueue {" + name + "}";
+ }
+
}
1.4 +13 -7 incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/ServerProcessors.java
Index: ServerProcessors.java
===================================================================
RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/ServerProcessors.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- ServerProcessors.java 3 Mar 2004 13:10:07 -0000 1.3
+++ ServerProcessors.java 3 Mar 2004 15:27:33 -0000 1.4
@@ -101,13 +101,19 @@
Msg msg = in.pop();
Object destNode = in.getHeader();
MsgOutInterceptor out;
- try {
- out = server.getOutForNode((String) destNode);
- } catch (CommunicationException e) {
- log.error(e);
- continue;
+ if ( destNode instanceof String ) {
+ destNode = new String[] {(String) destNode};
+ }
+ String[] dests = (String[]) destNode;
+ for (int i = 0; i < dests.length; i++) {
+ try {
+ out = server.getOutForNode(dests[i]);
+ } catch (CommunicationException e) {
+ log.error(e);
+ continue;
+ }
+ out.push(msg);
}
- out.push(msg);
}
}