You are viewing a plain text version of this content. The canonical link for it is here.
Posted to scm@geronimo.apache.org by gd...@apache.org on 2004/07/20 02:15:07 UTC
cvs commit: incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode AbstractRemoteNode.java RemoteNode.java RemoteNodeConnection.java RemoteNodeMonitor.java NodeServer.java RemoteNodeManagerImpl.java RemoteNodeManager.java MessagingTransportFactory.java
gdamour 2004/07/19 17:15:07
Modified: sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/network
RemoteNodeJoiner.java NodeServerImpl.java
RemoteNodeJoined.java
AbstractRemoteNodeConnection.java
NetworkTransportFactory.java
RemoteNodeJoinerConnection.java
RemoteNodeJoinedConnection.java
sandbox/messaging/src/test/org/apache/geronimo/messaging/remotenode
MockNodeServer.java RemoteNodeManagerImplTest.java
MockMessagingTransportFactory.java
MockRemoteNode.java
sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode
RemoteNode.java RemoteNodeConnection.java
RemoteNodeMonitor.java NodeServer.java
RemoteNodeManagerImpl.java RemoteNodeManager.java
MessagingTransportFactory.java
Added: sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/network
CallbackSocketProtocol.java
sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode
AbstractRemoteNode.java
Log:
Refactoring of the remote node layer. This new version defines only one connection per RemoteNode. A
RemoteNodeConnection supports now an asynchronous callback when the underlying connection is closed. It is
used to detect and remove a RemoteNode from its RemoteNodeManager when the connection fails.
Revision Changes Path
1.4 +67 -16 incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/network/RemoteNodeJoiner.java
Index: RemoteNodeJoiner.java
===================================================================
RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/network/RemoteNodeJoiner.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- RemoteNodeJoiner.java 24 Jun 2004 23:39:03 -0000 1.3
+++ RemoteNodeJoiner.java 20 Jul 2004 00:15:05 -0000 1.4
@@ -17,14 +17,23 @@
package org.apache.geronimo.messaging.remotenode.network;
-import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
-import org.apache.geronimo.messaging.CommunicationException;
+import org.apache.geronimo.messaging.Msg;
+import org.apache.geronimo.messaging.MsgBody;
+import org.apache.geronimo.messaging.MsgHeader;
+import org.apache.geronimo.messaging.MsgHeaderConstants;
+import org.apache.geronimo.messaging.NodeException;
import org.apache.geronimo.messaging.NodeInfo;
+import org.apache.geronimo.messaging.RequestSender;
+import org.apache.geronimo.messaging.interceptors.MsgOutInterceptor;
import org.apache.geronimo.messaging.io.IOContext;
-import org.apache.geronimo.messaging.remotenode.MessagingTransportFactory;
-import org.apache.geronimo.messaging.remotenode.RemoteNode;
+import org.apache.geronimo.messaging.remotenode.AbstractRemoteNode;
import org.apache.geronimo.messaging.remotenode.RemoteNodeConnection;
+import org.apache.geronimo.network.SelectorManager;
+
+import EDU.oswego.cs.dl.util.concurrent.FutureResult;
+import EDU.oswego.cs.dl.util.concurrent.TimeoutException;
/**
*
@@ -32,23 +41,65 @@
*/
public class RemoteNodeJoiner
extends AbstractRemoteNode
- implements RemoteNode
{
- private final MessagingTransportFactory connFactory;
+ private final SelectorManager sm;
- public RemoteNodeJoiner(NodeInfo aNodeInfo, IOContext anIOContext,
- MessagingTransportFactory aFactory) {
- super(aNodeInfo, anIOContext);
- if ( null == aFactory ) {
- throw new IllegalArgumentException("Factory is required.");
+ public RemoteNodeJoiner(NodeInfo aLocalNodeInfo, NodeInfo aRemoteNodeInfo,
+ IOContext anIOContext, SelectorManager aSelectorManager) {
+ super(aLocalNodeInfo, aRemoteNodeInfo, anIOContext);
+ if ( null == aSelectorManager ) {
+ throw new IllegalArgumentException("SelectorManager is required");
}
- connFactory = aFactory;
+ sm = aSelectorManager;
}
- public RemoteNodeConnection newConnection()
- throws IOException, CommunicationException {
- return connFactory.factoryRemoteNodeConnection(nodeInfo, ioContext);
+ public void join() throws NodeException {
+ RemoteNodeConnection connection =
+ new RemoteNodeJoinerConnection(remoteNodeInfo, ioContext, sm);
+ setConnection(connection);
+
+ Msg msg = new Msg();
+ MsgHeader header = msg.getHeader();
+ header.addHeader(MsgHeaderConstants.SRC_NODE, localNodeInfo);
+ header.addHeader(MsgHeaderConstants.DEST_NODE, remoteNodeInfo);
+
+ // Only set to comply with a valid request.
+ header.addHeader(MsgHeaderConstants.DEST_NODES, remoteNodeInfo);
+ header.addHeader(MsgHeaderConstants.SRC_ENDPOINT, "");
+ header.addHeader(MsgHeaderConstants.CORRELATION_ID,
+ new RequestSender.RequestID((byte) 0));
+ header.addHeader(MsgHeaderConstants.BODY_TYPE, MsgBody.Type.REQUEST);
+ header.addHeader(MsgHeaderConstants.TOPOLOGY_VERSION, new Integer(0));
+
+ msg.getBody().setContent(localNodeInfo);
+
+ final FutureResult result = new FutureResult();
+ setMsgProducerOut(new MsgOutInterceptor() {
+ public void push(Msg aMsg) {
+ result.set(aMsg);
+ }
+ });
+ getMsgConsumerOut().push(msg);
+ Msg reply;
+ try {
+ // waits 3 seconds for a reply.
+ reply = (Msg) result.get();
+ reply = (Msg) result.timedGet(3000);
+ } catch (TimeoutException e) {
+ throw new NodeException("Join request submitted by " +
+ localNodeInfo + " to " + remoteNodeInfo + " has timed out.");
+ } catch (InterruptedException e) {
+ throw new NodeException(e);
+ } catch (InvocationTargetException e) {
+ throw new NodeException(e);
+ }
+ Boolean isOK = (Boolean) reply.getBody().getContent();
+ if ( Boolean.FALSE == isOK ) {
+ throw new NodeException(remoteNodeInfo + " has refused the " +
+ "join request submitted by " + localNodeInfo);
+ }
+ manager.registerRemoteNode(this);
}
}
1.6 +19 -54 incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/network/NodeServerImpl.java
Index: NodeServerImpl.java
===================================================================
RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/network/NodeServerImpl.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- NodeServerImpl.java 17 Jul 2004 03:45:41 -0000 1.5
+++ NodeServerImpl.java 20 Jul 2004 00:15:05 -0000 1.6
@@ -17,28 +17,20 @@
package org.apache.geronimo.messaging.remotenode.network;
-import java.io.IOException;
import java.net.URI;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.geronimo.messaging.CommunicationException;
-import org.apache.geronimo.messaging.Msg;
-import org.apache.geronimo.messaging.MsgBody;
+import org.apache.geronimo.messaging.NodeException;
import org.apache.geronimo.messaging.NodeInfo;
-import org.apache.geronimo.messaging.interceptors.MsgOutInterceptor;
import org.apache.geronimo.messaging.io.IOContext;
import org.apache.geronimo.messaging.remotenode.NodeServer;
-import org.apache.geronimo.messaging.remotenode.RemoteNode;
-import org.apache.geronimo.messaging.remotenode.RemoteNodeConnection;
import org.apache.geronimo.messaging.remotenode.RemoteNodeManager;
-import org.apache.geronimo.messaging.remotenode.admin.JoinReply;
import org.apache.geronimo.network.SelectorManager;
import org.apache.geronimo.network.protocol.AcceptableProtocol;
import org.apache.geronimo.network.protocol.ProtocolException;
import org.apache.geronimo.network.protocol.ProtocolFactory;
import org.apache.geronimo.network.protocol.ServerSocketAcceptor;
-import org.apache.geronimo.network.protocol.SocketProtocol;
import org.apache.geronimo.network.protocol.ProtocolFactory.AcceptedCallBack;
import org.apache.geronimo.pool.ClockPool;
@@ -86,11 +78,14 @@
serverSocketAcceptor.setSelectorManager(selectorManager);
}
- public void start() throws IOException, CommunicationException {
+ public void start() throws NodeException {
+ if ( null == manager ) {
+ throw new IllegalStateException("Manager is not set.");
+ }
log.debug("Starting NodeServer.");
- SocketProtocol spt = new SocketProtocol();
+ CallbackSocketProtocol spt = new CallbackSocketProtocol();
// TODO configurable.
- spt.setTimeout(10 * 1000);
+ spt.setTimeout(1000);
spt.setSelectorManager(selectorManager);
ProtocolFactory pf = new ProtocolFactory();
@@ -98,7 +93,7 @@
// TODO configurable.
pf.setMaxAge(Long.MAX_VALUE);
pf.setMaxInactivity(1 * 60 * 60 * 1000);
- pf.setReclaimPeriod(10 * 1000);
+ pf.setReclaimPeriod(500);
pf.setTemplate(spt);
pf.setAcceptedCallBack(this);
@@ -114,20 +109,16 @@
serverSocketAcceptor.setUri(bindURI);
serverSocketAcceptor.startup();
} catch (Exception e) {
- IOException exception = new IOException("Can not start.");
- exception.initCause(e);
- throw exception;
+ throw new NodeException("Can not start server", e);
}
}
- public void stop() throws IOException, CommunicationException {
+ public void stop() {
log.info("Stopping NodeServer.");
try {
serverSocketAcceptor.drain();
} catch (Exception e) {
- IOException exception = new IOException("Can not stop.");
- exception.initCause(e);
- throw exception;
+ log.error("Error stopping NodeServer", e);
}
}
@@ -137,40 +128,14 @@
public void accepted(AcceptableProtocol aProtocol)
throws ProtocolException {
- new RemoteNodeInitializer(aProtocol);
- }
-
- private class RemoteNodeInitializer implements MsgOutInterceptor {
- private final RemoteNodeConnection connection;
- private RemoteNodeInitializer(AcceptableProtocol aProtocol)
- throws ProtocolException {
- connection =
- new RemoteNodeJoinedConnection(ioContext, aProtocol);
- try {
- connection.open();
- } catch (IOException e) {
- throw new ProtocolException(e);
- } catch (CommunicationException e) {
- throw new ProtocolException(e);
- }
- connection.setMsgProducerOut(this);
- }
-
- public void push(Msg aMsg) {
- MsgBody body = aMsg.getBody();
- NodeInfo otherNodeInfo = (NodeInfo) body.getContent();
-
- JoinReply joinReply = new JoinReply(aMsg);
- joinReply.execute(connection);
-
- RemoteNode remoteNode = manager.findRemoteNode(otherNodeInfo);
- if ( null == remoteNode ) {
- remoteNode = new RemoteNodeJoined(otherNodeInfo, ioContext);
- }
- remoteNode.addConnection(connection);
- manager.registerRemoteNode(remoteNode);
+ RemoteNodeJoined remoteNode =
+ new RemoteNodeJoined(nodeInfo, ioContext, aProtocol);
+ remoteNode.setManager(manager);
+ try {
+ remoteNode.join();
+ } catch (NodeException e) {
+ log.error("Can not join node", e);
}
-
}
}
1.3 +47 -8 incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/network/RemoteNodeJoined.java
Index: RemoteNodeJoined.java
===================================================================
RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/network/RemoteNodeJoined.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- RemoteNodeJoined.java 3 Jun 2004 14:39:44 -0000 1.2
+++ RemoteNodeJoined.java 20 Jul 2004 00:15:05 -0000 1.3
@@ -17,10 +17,17 @@
package org.apache.geronimo.messaging.remotenode.network;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.geronimo.messaging.Msg;
+import org.apache.geronimo.messaging.MsgBody;
+import org.apache.geronimo.messaging.NodeException;
import org.apache.geronimo.messaging.NodeInfo;
+import org.apache.geronimo.messaging.interceptors.MsgOutInterceptor;
import org.apache.geronimo.messaging.io.IOContext;
-import org.apache.geronimo.messaging.remotenode.RemoteNode;
+import org.apache.geronimo.messaging.remotenode.AbstractRemoteNode;
import org.apache.geronimo.messaging.remotenode.RemoteNodeConnection;
+import org.apache.geronimo.network.protocol.Protocol;
/**
*
@@ -28,16 +35,48 @@
*/
public class RemoteNodeJoined
extends AbstractRemoteNode
- implements RemoteNode
{
+
+ private static final Log log = LogFactory.getLog(RemoteNodeJoined.class);
- public RemoteNodeJoined(NodeInfo aNodeInfo, IOContext anIOContext) {
- super(aNodeInfo, anIOContext);
+ private final Protocol protocol;
+
+
+ public RemoteNodeJoined(NodeInfo aLocalNode, IOContext anIOContext,
+ Protocol aProtocol) {
+ super(aLocalNode, anIOContext);
+ protocol = aProtocol;
}
- public RemoteNodeConnection newConnection() {
- throw new UnsupportedOperationException(
- "A joined node does not create connections");
+ public void join() throws NodeException {
+ RemoteNodeConnection connection =
+ new RemoteNodeJoinedConnection(ioContext, protocol);
+ setConnection(connection);
+
+ setMsgProducerOut(new JoinExecutor());
+ }
+
+ private class JoinExecutor implements MsgOutInterceptor {
+
+ public void push(Msg aMsg) {
+ MsgBody body = aMsg.getBody();
+ remoteNodeInfo = (NodeInfo) body.getContent();
+
+ if ( null != manager.findRemoteNode(remoteNodeInfo) ) {
+ log.error(remoteNodeInfo +
+ " tried to join twice this node; rejecting request.");
+ Msg msg = aMsg.reply();
+ msg.getBody().setContent(Boolean.FALSE);
+ getMsgConsumerOut().push(msg);
+ return;
+ }
+ Msg msg = aMsg.reply();
+ msg.getBody().setContent(Boolean.TRUE);
+ getMsgConsumerOut().push(msg);
+
+ manager.registerRemoteNode(RemoteNodeJoined.this);
+ }
+
}
}
1.2 +53 -17 incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/network/AbstractRemoteNodeConnection.java
Index: AbstractRemoteNodeConnection.java
===================================================================
RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/network/AbstractRemoteNodeConnection.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- AbstractRemoteNodeConnection.java 11 May 2004 12:06:42 -0000 1.1
+++ AbstractRemoteNodeConnection.java 20 Jul 2004 00:15:05 -0000 1.2
@@ -19,7 +19,9 @@
import java.io.IOException;
-import org.apache.geronimo.messaging.CommunicationException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.geronimo.messaging.NodeException;
import org.apache.geronimo.messaging.interceptors.MsgOutInterceptor;
import org.apache.geronimo.messaging.io.IOContext;
import org.apache.geronimo.messaging.io.PopSynchronization;
@@ -27,6 +29,7 @@
import org.apache.geronimo.messaging.io.ReplacerResolver;
import org.apache.geronimo.messaging.io.StreamManager;
import org.apache.geronimo.messaging.remotenode.RemoteNodeConnection;
+import org.apache.geronimo.messaging.remotenode.network.CallbackSocketProtocol.SocketProtocolListener;
import org.apache.geronimo.network.protocol.Protocol;
import org.apache.geronimo.network.protocol.ProtocolException;
@@ -39,11 +42,14 @@
implements RemoteNodeConnection
{
+ private static final Log log = LogFactory.getLog(AbstractRemoteNodeConnection.class);
+
private final IOContext ioContext;
protected Protocol protocol;
private MsgOutInterceptor msgOut;
private ProtocolInDispatcher inDispatcher;
-
+ private LifecycleListener listener;
+
public AbstractRemoteNodeConnection(IOContext anIOContext) {
if ( null == anIOContext ) {
throw new IllegalArgumentException("IOContext is required.");
@@ -67,36 +73,66 @@
return msgOut;
}
- public void open() throws IOException, CommunicationException {
+ public void open() throws NodeException {
try {
protocol = newProtocol();
} catch (ProtocolException e) {
- IOException exception = new IOException("Can not create Protocol.");
- exception.initCause(e);
- throw exception;
+ throw new NodeException("Can not create protocol", e);
+ }
+ Protocol curProtocol = protocol;
+ while ( null != curProtocol ) {
+ if ( curProtocol instanceof CallbackSocketProtocol ) {
+ ((CallbackSocketProtocol) curProtocol).setListener(
+ new SocketProtocolListener() {
+ public void onClose() {
+ msgOut = null;
+ inDispatcher = null;
+ if ( null != listener ) {
+ listener.onClose();
+ }
+ }
+ });
+ break;
+ }
+ curProtocol = curProtocol.getDownProtocol();
+ }
+ if ( false == curProtocol instanceof CallbackSocketProtocol ) {
+ throw new AssertionError("No CallbackSocketProtocol.");
}
+
StreamManager streamManager = ioContext.getStreamManager();
ReplacerResolver replacerResolver = ioContext.getReplacerResolver();
- PushSynchronization pushSynchronization = ioContext.getPushSynchronization();
- PopSynchronization popSynchronization = ioContext.getPopSynchronization();
- msgOut = new ProtocolOutInterceptor(protocol,
- streamManager, pushSynchronization, replacerResolver);
- inDispatcher = new ProtocolInDispatcher(protocol,
- streamManager, popSynchronization, replacerResolver);
+ PushSynchronization pushSynchronization =
+ ioContext.getPushSynchronization();
+ PopSynchronization popSynchronization =
+ ioContext.getPopSynchronization();
+ try {
+ msgOut = new ProtocolOutInterceptor(protocol,
+ streamManager, pushSynchronization, replacerResolver);
+ inDispatcher = new ProtocolInDispatcher(protocol,
+ streamManager, popSynchronization, replacerResolver);
+ } catch (IOException e) {
+ throw new NodeException("Can not set-up IO context.", e);
+ }
}
protected abstract Protocol newProtocol() throws ProtocolException;
- public void close() throws IOException, CommunicationException {
+ public void close() {
msgOut = null;
inDispatcher = null;
try {
protocol.drain();
} catch (ProtocolException e) {
- IOException exception = new IOException("Can not drain Protocol");
- exception.initCause(e);
- throw exception;
+ log.error("Error when closing connection", e);
+ }
+ if ( null != listener ) {
+ listener.onClose();
}
}
+ public void setLifecycleListener(LifecycleListener aListener) {
+ listener = aListener;
+ }
+
}
1.5 +6 -21 incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/network/NetworkTransportFactory.java
Index: NetworkTransportFactory.java
===================================================================
RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/network/NetworkTransportFactory.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- NetworkTransportFactory.java 8 Jul 2004 05:13:29 -0000 1.4
+++ NetworkTransportFactory.java 20 Jul 2004 00:15:05 -0000 1.5
@@ -19,14 +19,11 @@
import org.apache.geronimo.gbean.GBeanInfo;
import org.apache.geronimo.gbean.GBeanInfoFactory;
-import org.apache.geronimo.gbean.GBeanLifecycle;
-import org.apache.geronimo.gbean.WaitingException;
import org.apache.geronimo.messaging.NodeInfo;
import org.apache.geronimo.messaging.io.IOContext;
import org.apache.geronimo.messaging.remotenode.MessagingTransportFactory;
import org.apache.geronimo.messaging.remotenode.NodeServer;
import org.apache.geronimo.messaging.remotenode.RemoteNode;
-import org.apache.geronimo.messaging.remotenode.RemoteNodeConnection;
import org.apache.geronimo.network.SelectorManager;
import org.apache.geronimo.pool.ClockPool;
@@ -36,7 +33,7 @@
* @version $Revision$ $Date$
*/
public class NetworkTransportFactory
- implements GBeanLifecycle, MessagingTransportFactory
+ implements MessagingTransportFactory
{
private final SelectorManager sm;
@@ -51,28 +48,16 @@
cp = aClockPool;
}
- public void doStart() throws WaitingException, Exception {
- }
-
- public void doStop() throws WaitingException, Exception {
- }
-
- public void doFail() {
- }
-
public NodeServer factoryServer(NodeInfo aNodeInfo, IOContext anIOContext) {
return new NodeServerImpl(aNodeInfo, anIOContext, sm, cp);
}
- public RemoteNode factoryRemoteNode(NodeInfo aNodeInfo, IOContext anIOContext) {
- return new RemoteNodeJoiner(aNodeInfo, anIOContext, this);
+ public RemoteNode factoryRemoteNode(NodeInfo aLocalNodeInfo,
+ NodeInfo aRemoteNodeInfo, IOContext anIOContext) {
+ return new RemoteNodeJoiner(aLocalNodeInfo, aRemoteNodeInfo,
+ anIOContext, sm);
}
- public RemoteNodeConnection factoryRemoteNodeConnection(
- NodeInfo aNodeInfo, IOContext anIOContext) {
- return new RemoteNodeJoinerConnection(aNodeInfo, anIOContext, sm);
- }
-
public static final GBeanInfo GBEAN_INFO;
static {
1.4 +3 -4 incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/network/RemoteNodeJoinerConnection.java
Index: RemoteNodeJoinerConnection.java
===================================================================
RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/network/RemoteNodeJoinerConnection.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- RemoteNodeJoinerConnection.java 17 Jul 2004 03:45:41 -0000 1.3
+++ RemoteNodeJoinerConnection.java 20 Jul 2004 00:15:05 -0000 1.4
@@ -25,7 +25,6 @@
import org.apache.geronimo.network.SelectorManager;
import org.apache.geronimo.network.protocol.Protocol;
import org.apache.geronimo.network.protocol.ProtocolException;
-import org.apache.geronimo.network.protocol.SocketProtocol;
/**
*
@@ -60,9 +59,9 @@
String hostName = nodeInfo.getAddress().getHostName();
int port = nodeInfo.getPort();
- SocketProtocol socketProtocol = new SocketProtocol();
+ CallbackSocketProtocol socketProtocol = new CallbackSocketProtocol();
// TODO configurable.
- socketProtocol.setTimeout(10 * 1000);
+ socketProtocol.setTimeout(1000);
socketProtocol.setInterface(new InetSocketAddress(hostName, 0));
socketProtocol.setAddress(new InetSocketAddress(hostName, port));
socketProtocol.setSelectorManager(selectorManager);
1.2 +1 -3 incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/network/RemoteNodeJoinedConnection.java
Index: RemoteNodeJoinedConnection.java
===================================================================
RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/network/RemoteNodeJoinedConnection.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- RemoteNodeJoinedConnection.java 11 May 2004 12:06:42 -0000 1.1
+++ RemoteNodeJoinedConnection.java 20 Jul 2004 00:15:05 -0000 1.2
@@ -18,7 +18,6 @@
package org.apache.geronimo.messaging.remotenode.network;
import org.apache.geronimo.messaging.io.IOContext;
-import org.apache.geronimo.messaging.remotenode.RemoteNodeConnection;
import org.apache.geronimo.network.protocol.Protocol;
import org.apache.geronimo.network.protocol.ProtocolException;
@@ -28,7 +27,6 @@
*/
public class RemoteNodeJoinedConnection
extends AbstractRemoteNodeConnection
- implements RemoteNodeConnection
{
public RemoteNodeJoinedConnection(IOContext anIOContext,
1.1 incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/network/CallbackSocketProtocol.java
Index: CallbackSocketProtocol.java
===================================================================
/**
*
* Copyright 2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.geronimo.messaging.remotenode.network;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.geronimo.network.protocol.Protocol;
import org.apache.geronimo.network.protocol.SocketProtocol;
/**
* SocketProtocol providing asynchronous callbacks upon closure.
*
* @version $Revision: 1.1 $ $Date: 2004/07/20 00:15:05 $
*/
public class CallbackSocketProtocol
extends SocketProtocol
{
private Log log = LogFactory.getLog(CallbackSocketProtocol.class);
private static int nextConnectionId = 0;
/**
* To be notified when the socket is closed.
*/
private SocketProtocolListener listener;
private synchronized static int getNextConnectionId() {
return nextConnectionId++;
}
public void close() {
super.close();
if ( null != listener ) {
listener.onClose();
}
}
/**
* Gets the listener to be notified upon closure of the underlying socket.
*
* @return Listener.
*/
public SocketProtocolListener getListener() {
return listener;
}
/**
* Sets the listener.
*
* @param aListener Listener.
*/
public void setListener(SocketProtocolListener aListener) {
listener = aListener;
}
public Protocol cloneProtocol() throws CloneNotSupportedException {
CallbackSocketProtocol p = (CallbackSocketProtocol) super.clone();
p.log = LogFactory.getLog(CallbackSocketProtocol.class.getName() + ":" + getNextConnectionId());
return p;
}
/**
* When the underlying socket is closed, this callback is called.
*
* @version $Revision: 1.1 $ $Date: 2004/07/20 00:15:05 $
*/
public interface SocketProtocolListener {
public void onClose();
}
}
1.2 +24 -4 incubator-geronimo/sandbox/messaging/src/test/org/apache/geronimo/messaging/remotenode/MockNodeServer.java
Index: MockNodeServer.java
===================================================================
RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/test/org/apache/geronimo/messaging/remotenode/MockNodeServer.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- MockNodeServer.java 11 May 2004 12:06:43 -0000 1.1
+++ MockNodeServer.java 20 Jul 2004 00:15:06 -0000 1.2
@@ -1,15 +1,35 @@
+/**
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.geronimo.messaging.remotenode;
-import java.io.IOException;
+import org.apache.geronimo.messaging.NodeException;
-import org.apache.geronimo.messaging.CommunicationException;
+/**
+ *
+ * @version $Revision$ $Date$
+ */
public class MockNodeServer implements NodeServer {
- public void start() throws IOException, CommunicationException {
+ public void start() throws NodeException {
}
- public void stop() throws IOException, CommunicationException {
+ public void stop() {
}
public void setRemoteNodeManager(RemoteNodeManager aManager) {
1.5 +125 -27 incubator-geronimo/sandbox/messaging/src/test/org/apache/geronimo/messaging/remotenode/RemoteNodeManagerImplTest.java
Index: RemoteNodeManagerImplTest.java
===================================================================
RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/test/org/apache/geronimo/messaging/remotenode/RemoteNodeManagerImplTest.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- RemoteNodeManagerImplTest.java 8 Jul 2004 05:13:29 -0000 1.4
+++ RemoteNodeManagerImplTest.java 20 Jul 2004 00:15:06 -0000 1.5
@@ -54,6 +54,29 @@
cp.setPoolName("CP");
manager = new RemoteNodeManagerImpl(nodeInfo1, ioContext, cp, factory);
+
+ NodeTopology topology = new NodeTopology() {
+ public int getVersion() {
+ return 0;
+ }
+ public Set getNeighbours(NodeInfo aRoot) {
+ return new HashSet();
+ }
+ public NodeInfo[] getPath(NodeInfo aSource, NodeInfo aTarget) {
+ return null;
+ }
+ public int getIDOfNode(NodeInfo aNodeInfo) {
+ throw new UnsupportedOperationException("getVersion");
+ }
+ public NodeInfo getNodeById(int anId) {
+ throw new UnsupportedOperationException("getVersion");
+ }
+ public Set getNodes() {
+ throw new UnsupportedOperationException("getVersion");
+ }
+ };
+ manager.prepareTopology(topology);
+ manager.commitTopology();
}
public void testRegisterRemoteNode() throws Exception {
@@ -89,82 +112,148 @@
assertEquals(remoteNode, listener.event.getRemoteNode());
}
- public void testGetMsgOut() throws Exception {
+ private TestGetMsgOutInfo newGetMsgOutInfo() throws Exception {
+ final TestGetMsgOutInfo info = new TestGetMsgOutInfo();
+
InetAddress address = InetAddress.getLocalHost();
- final NodeInfo srcNode = new NodeInfo("SrcNode1", address, 8081);
- final NodeInfo node1 = new NodeInfo("Node1", address, 8081);
- final NodeInfo node2 = new NodeInfo("Node2", address, 8081);
+ info.srcNode = new NodeInfo("SrcNode1", address, 8081);
+ info.node1 = new NodeInfo("Node1", address, 8081);
+ info.node2 = new NodeInfo("Node2", address, 8081);
MockRemoteNode remoteNode1 = new MockRemoteNode();
- remoteNode1.setNodeInfo(node1);
+ remoteNode1.setNodeInfo(info.node1);
manager.registerRemoteNode(remoteNode1);
MockRemoteNode remoteNode2 = new MockRemoteNode();
- remoteNode2.setNodeInfo(node2);
+ remoteNode2.setNodeInfo(info.node2);
manager.registerRemoteNode(remoteNode2);
- NodeTopology topology = new NodeTopology() {
+ info.topology = new NodeTopology() {
public Set getNeighbours(NodeInfo aRoot) {
Set result = new HashSet();
- result.add(node1);
- result.add(node2);
+ result.add(info.node1);
+ result.add(info.node2);
return result;
}
public NodeInfo[] getPath(NodeInfo aSource, NodeInfo aTarget) {
- if ( aSource.equals(srcNode) && aTarget.equals(node1) ) {
- return new NodeInfo[] {node1};
- } else if ( aSource.equals(srcNode) && aTarget.equals(node2) ) {
- return new NodeInfo[] {node2};
+ if ( aSource.equals(info.srcNode) &&
+ aTarget.equals(info.node1) ) {
+ return new NodeInfo[] {info.node1};
+ } else if ( aSource.equals(info.srcNode) &&
+ aTarget.equals(info.node2) ) {
+ return new NodeInfo[] {info.node2};
}
return null;
}
public int getIDOfNode(NodeInfo aNodeInfo) {
- return 0;
+ throw new UnsupportedOperationException("getIDOfNode");
}
public NodeInfo getNodeById(int anId) {
- return null;
+ throw new UnsupportedOperationException("getNodeById");
}
public Set getNodes() {
- return null;
+ throw new UnsupportedOperationException("getNodes");
}
public int getVersion() {
- return 0;
+ return 1;
}
};
- manager.setTopology(topology);
+ // Test that Msg are successfully routed within the context of
+ // a prepared topology.
+ manager.prepareTopology(info.topology);
+
+ info.remoteNode1 = remoteNode1;
+ info.remoteNode2 = remoteNode2;
+ return info;
+ }
+
+ /**
+ * Test that Msg are successfully routed within the context of a prepared
+ * topology.
+ */
+ public void testPreparedGetMsgOut() throws Exception {
+ TestGetMsgOutInfo info = newGetMsgOutInfo();
+
+ MsgOutInterceptor out = manager.getMsgConsumerOut();
+ Msg msg = new Msg();
+ MsgHeader header = msg.getHeader();
+ Integer id = new Integer(1234);
+ header.addHeader(MsgHeaderConstants.CORRELATION_ID, id);
+ header.addHeader(MsgHeaderConstants.SRC_NODE, info.srcNode);
+ header.addHeader(MsgHeaderConstants.DEST_NODES, info.node1);
+ header.addHeader(MsgHeaderConstants.TOPOLOGY_VERSION,
+ new Integer(info.topology.getVersion()));
+ out.push(msg);
+
+ List receivedMsgs = info.remoteNode1.getPushedMsg();
+ assertEquals(1, receivedMsgs.size());
+ msg = (Msg) receivedMsgs.get(0);
+ assertEquals(id, msg.getHeader().getHeader(MsgHeaderConstants.CORRELATION_ID));
+ receivedMsgs.clear();
+
+ receivedMsgs = info.remoteNode2.getPushedMsg();
+ assertEquals(0, receivedMsgs.size());
+
+ msg = new Msg();
+ header = msg.getHeader();
+ header.addHeader(MsgHeaderConstants.CORRELATION_ID, id);
+ header.addHeader(MsgHeaderConstants.SRC_NODE, info.srcNode);
+ header.addHeader(MsgHeaderConstants.DEST_NODES, info.node2);
+ header.addHeader(MsgHeaderConstants.TOPOLOGY_VERSION,
+ new Integer(info.topology.getVersion()));
+ out.push(msg);
+
+ receivedMsgs = info.remoteNode2.getPushedMsg();
+ assertEquals(1, receivedMsgs.size());
+ msg = (Msg) receivedMsgs.get(0);
+ assertEquals(id, msg.getHeader().getHeader(MsgHeaderConstants.CORRELATION_ID));
+ receivedMsgs.clear();
+
+ receivedMsgs = info.remoteNode1.getPushedMsg();
+ assertEquals(0, receivedMsgs.size());
+ }
+
+ /**
+ * Test that Msg are successfully routed within the context of a committed
+ * topology.
+ */
+ public void testCommittedGetMsgOut() throws Exception {
+ TestGetMsgOutInfo info = newGetMsgOutInfo();
+
+ manager.commitTopology();
MsgOutInterceptor out = manager.getMsgConsumerOut();
Msg msg = new Msg();
MsgHeader header = msg.getHeader();
Integer id = new Integer(1234);
header.addHeader(MsgHeaderConstants.CORRELATION_ID, id);
- header.addHeader(MsgHeaderConstants.SRC_NODE, srcNode);
- header.addHeader(MsgHeaderConstants.DEST_NODES, node1);
+ header.addHeader(MsgHeaderConstants.SRC_NODE, info.srcNode);
+ header.addHeader(MsgHeaderConstants.DEST_NODES, info.node1);
out.push(msg);
- List receivedMsgs = remoteNode1.getPushedMsg();
+ List receivedMsgs = info.remoteNode1.getPushedMsg();
assertEquals(1, receivedMsgs.size());
msg = (Msg) receivedMsgs.get(0);
assertEquals(id, msg.getHeader().getHeader(MsgHeaderConstants.CORRELATION_ID));
receivedMsgs.clear();
- receivedMsgs = remoteNode2.getPushedMsg();
+ receivedMsgs = info.remoteNode2.getPushedMsg();
assertEquals(0, receivedMsgs.size());
msg = new Msg();
header = msg.getHeader();
header.addHeader(MsgHeaderConstants.CORRELATION_ID, id);
- header.addHeader(MsgHeaderConstants.SRC_NODE, srcNode);
- header.addHeader(MsgHeaderConstants.DEST_NODES, node2);
+ header.addHeader(MsgHeaderConstants.SRC_NODE, info.srcNode);
+ header.addHeader(MsgHeaderConstants.DEST_NODES, info.node2);
out.push(msg);
- receivedMsgs = remoteNode2.getPushedMsg();
+ receivedMsgs = info.remoteNode2.getPushedMsg();
assertEquals(1, receivedMsgs.size());
msg = (Msg) receivedMsgs.get(0);
assertEquals(id, msg.getHeader().getHeader(MsgHeaderConstants.CORRELATION_ID));
receivedMsgs.clear();
- receivedMsgs = remoteNode1.getPushedMsg();
+ receivedMsgs = info.remoteNode1.getPushedMsg();
assertEquals(0, receivedMsgs.size());
}
@@ -173,6 +262,15 @@
public void fireRemoteNodeEvent(RemoteNodeEvent anEvent) {
event = anEvent;
}
+ }
+
+ private class TestGetMsgOutInfo {
+ private NodeTopology topology;
+ private MockRemoteNode remoteNode1;
+ private MockRemoteNode remoteNode2;
+ private NodeInfo srcNode;
+ private NodeInfo node1;
+ private NodeInfo node2;
}
}
1.3 +23 -7 incubator-geronimo/sandbox/messaging/src/test/org/apache/geronimo/messaging/remotenode/MockMessagingTransportFactory.java
Index: MockMessagingTransportFactory.java
===================================================================
RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/test/org/apache/geronimo/messaging/remotenode/MockMessagingTransportFactory.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- MockMessagingTransportFactory.java 24 Jun 2004 23:39:03 -0000 1.2
+++ MockMessagingTransportFactory.java 20 Jul 2004 00:15:06 -0000 1.3
@@ -1,8 +1,29 @@
+/**
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.geronimo.messaging.remotenode;
import org.apache.geronimo.messaging.NodeInfo;
import org.apache.geronimo.messaging.io.IOContext;
+/**
+ *
+ * @version $Revision$ $Date$
+ */
public class MockMessagingTransportFactory
implements MessagingTransportFactory
{
@@ -19,13 +40,8 @@
return server;
}
- public RemoteNode factoryRemoteNode(NodeInfo aNodeInfo, IOContext anIOContext) {
- return null;
- }
-
- public RemoteNodeConnection factoryRemoteNodeConnection(
- NodeInfo aNodeInfo,
- IOContext anIOContext) {
+ public RemoteNode factoryRemoteNode(NodeInfo aLocalNodeInfo,
+ NodeInfo aRemoteNodeInfo, IOContext anIOContext) {
return null;
}
1.4 +7 -12 incubator-geronimo/sandbox/messaging/src/test/org/apache/geronimo/messaging/remotenode/MockRemoteNode.java
Index: MockRemoteNode.java
===================================================================
RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/test/org/apache/geronimo/messaging/remotenode/MockRemoteNode.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- MockRemoteNode.java 10 Jun 2004 23:12:25 -0000 1.3
+++ MockRemoteNode.java 20 Jul 2004 00:15:06 -0000 1.4
@@ -17,12 +17,11 @@
package org.apache.geronimo.messaging.remotenode;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import org.apache.geronimo.messaging.CommunicationException;
import org.apache.geronimo.messaging.Msg;
+import org.apache.geronimo.messaging.NodeException;
import org.apache.geronimo.messaging.NodeInfo;
import org.apache.geronimo.messaging.interceptors.MsgOutInterceptor;
@@ -49,17 +48,13 @@
return nodeInfo;
}
- public RemoteNodeConnection newConnection() throws IOException, CommunicationException {
- return null;
+ public void setManager(RemoteNodeManager aManager) {
}
-
- public void leave() throws IOException, CommunicationException {
- }
-
- public void addConnection(RemoteNodeConnection aConnection) {
+
+ public void leave() {
}
-
- public void removeConnection(RemoteNodeConnection aConnection) {
+
+ public void join() throws NodeException {
}
public void setMsgProducerOut(MsgOutInterceptor aMsgOut) {
1.3 +11 -25 incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/RemoteNode.java
Index: RemoteNode.java
===================================================================
RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/RemoteNode.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- RemoteNode.java 3 Jun 2004 14:39:44 -0000 1.2
+++ RemoteNode.java 20 Jul 2004 00:15:06 -0000 1.3
@@ -17,10 +17,9 @@
package org.apache.geronimo.messaging.remotenode;
-import java.io.IOException;
-import org.apache.geronimo.messaging.CommunicationException;
import org.apache.geronimo.messaging.MsgConsProd;
+import org.apache.geronimo.messaging.NodeException;
import org.apache.geronimo.messaging.NodeInfo;
/**
@@ -40,36 +39,23 @@
public NodeInfo getNodeInfo();
/**
- * Returns a connection to the remote node.
- * <BR>
- * This connection is not opened.
+ * Sets the manager of this remote node.
*
- * @throws IOException Indicates an I/O problem.
- * @throws CommunicationException If a communication can not be established.
+ * @param aManager Manager.
*/
- public RemoteNodeConnection newConnection()
- throws IOException, CommunicationException;
-
+ public void setManager(RemoteNodeManager aManager);
+
/**
* Leaves the remote node.
- *
- * @throws IOException Indicates an I/O problem.
- * @throws CommunicationException If a communication can not be established.
*/
- public void leave() throws IOException, CommunicationException;
+ public void leave();
/**
- * Adds a connection.
- *
- * @param aConnection Connection to be added to the RemoteNode.
- */
- public void addConnection(RemoteNodeConnection aConnection);
-
- /**
- * Removes a connection.
+ * Joins the remote node.
*
- * @param aConnection Connection to be removed.
+ * @exception NodeException Indicates that the remote node can not be
+ * joined.
*/
- public void removeConnection(RemoteNodeConnection aConnection);
+ public void join() throws NodeException;
}
1.2 +20 -9 incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/RemoteNodeConnection.java
Index: RemoteNodeConnection.java
===================================================================
RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/RemoteNodeConnection.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- RemoteNodeConnection.java 11 May 2004 12:06:42 -0000 1.1
+++ RemoteNodeConnection.java 20 Jul 2004 00:15:06 -0000 1.2
@@ -17,10 +17,9 @@
package org.apache.geronimo.messaging.remotenode;
-import java.io.IOException;
-import org.apache.geronimo.messaging.CommunicationException;
import org.apache.geronimo.messaging.MsgConsProd;
+import org.apache.geronimo.messaging.NodeException;
/**
* Connection to a remote node.
@@ -34,17 +33,29 @@
/**
* Opens the connection.
*
- * @throws IOException Indicates an I/O problem.
- * @throws CommunicationException If a communication can not be established.
+ * @throws NodeException Indicates that the connection can not be opened.
*/
- public void open() throws IOException, CommunicationException;
+ public void open() throws NodeException;
/**
* Closes the connection.
+ */
+ public void close();
+
+ /**
+ * Sets the listener to be notified when the connection is closed.
*
- * @throws IOException Indicates an I/O problem.
- * @throws CommunicationException If a communication can not be established.
+ * @param aListener Listener.
*/
- public void close() throws IOException, CommunicationException;
+ public void setLifecycleListener(LifecycleListener aListener);
+
+ /**
+ * Callback interface to be notified when the connection is closed.
+ */
+ public interface LifecycleListener {
+
+ public void onClose();
+
+ }
}
1.4 +2 -7 incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/RemoteNodeMonitor.java
Index: RemoteNodeMonitor.java
===================================================================
RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/RemoteNodeMonitor.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- RemoteNodeMonitor.java 17 Jul 2004 03:48:57 -0000 1.3
+++ RemoteNodeMonitor.java 20 Jul 2004 00:15:06 -0000 1.4
@@ -24,7 +24,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.geronimo.messaging.NodeException;
import org.apache.geronimo.messaging.NodeInfo;
import org.apache.geronimo.pool.ClockPool;
@@ -144,11 +143,7 @@
Long lastActivity = (Long)entry.getValue();
if ( lastActivity.longValue() <
System.currentTimeMillis() + IDLE_TIME ) {
- try {
- manager.leaveRemoteNode(node);
- } catch (NodeException e) {
- log.error("Can not leave " + node, e);
- }
+ manager.leaveRemoteNode(node);
iter.remove();
}
}
1.2 +9 -12 incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/NodeServer.java
Index: NodeServer.java
===================================================================
RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/NodeServer.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- NodeServer.java 11 May 2004 12:06:42 -0000 1.1
+++ NodeServer.java 20 Jul 2004 00:15:06 -0000 1.2
@@ -17,9 +17,8 @@
package org.apache.geronimo.messaging.remotenode;
-import java.io.IOException;
-import org.apache.geronimo.messaging.CommunicationException;
+import org.apache.geronimo.messaging.NodeException;
/**
* A NodeServer listens for remote nodes and delegates to a
@@ -33,22 +32,20 @@
/**
* Start the server.
*
- * @throws IOException Indicates an I/O problem.
- * @throws CommunicationException If a communication can not be established.
+ * @throws NodeException If the server can not be started.
+ * @exception IllegalStateException Indicates that no RemoteNodeManger has
+ * been set.
*/
- public void start() throws IOException, CommunicationException;
+ public void start() throws NodeException, IllegalStateException;
/**
* Stop the server.
- *
- * @throws IOException Indicates an I/O problem.
- * @throws CommunicationException If a communication can not be established.
*/
- public void stop() throws IOException, CommunicationException;
+ public void stop();
/**
- * Sets the RemoteNodeManager in charge of managing the remote node, which
- * have join this server.
+ * Sets the RemoteNodeManager in charge of managing the remote nodes, which
+ * have joined this server.
* <BR>
* A NodeServer must notify this RemoteNodeManager when a new connection
* abstracting a remote note has joined it.
1.8 +100 -113 incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/RemoteNodeManagerImpl.java
Index: RemoteNodeManagerImpl.java
===================================================================
RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/RemoteNodeManagerImpl.java,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -r1.7 -r1.8
--- RemoteNodeManagerImpl.java 17 Jul 2004 03:49:29 -0000 1.7
+++ RemoteNodeManagerImpl.java 20 Jul 2004 00:15:06 -0000 1.8
@@ -17,12 +17,10 @@
package org.apache.geronimo.messaging.remotenode;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
@@ -38,7 +36,6 @@
import org.apache.geronimo.messaging.NodeTopology;
import org.apache.geronimo.messaging.interceptors.MsgOutInterceptor;
import org.apache.geronimo.messaging.io.IOContext;
-import org.apache.geronimo.messaging.remotenode.admin.JoinRequest;
import org.apache.geronimo.pool.ClockPool;
/**
@@ -67,6 +64,11 @@
*/
private NodeTopology topology;
+ /**
+ * Topology being prepared.
+ */
+ private NodeTopology preparedTopology;
+
public RemoteNodeManagerImpl(NodeInfo aNodeInfo, IOContext anIOContext,
ClockPool aClockPool, MessagingTransportFactory aFactory) {
if ( null == aNodeInfo ) {
@@ -90,100 +92,81 @@
public void start() throws NodeException {
log.info("Starting RemoteNodeManager for node {" + nodeInfo + "}");
- try {
- server = factory.factoryServer(nodeInfo, ioContext);
- server.setRemoteNodeManager(this);
- server.start();
- } catch (IOException e) {
- throw new NodeException("Can not start server.", e);
- } catch (CommunicationException e) {
- throw new NodeException("Can not start server.", e);
- }
+ server = factory.factoryServer(nodeInfo, ioContext);
+ server.setRemoteNodeManager(this);
+ server.start();
remoteNodeMonitor.start();
}
public void stop() throws NodeException {
log.info("Stopping RemoteNodeManager for node {" + nodeInfo + "}");
remoteNodeMonitor.stop();
+ server.stop();
+ Collection nodes;
synchronized(remoteNodes) {
- for (Iterator iter = remoteNodes.values().iterator(); iter.hasNext();) {
- RemoteNode node = (RemoteNode) iter.next();
- try {
- node.leave();
- } catch (IOException e) {
- log.error(e);
- } catch (CommunicationException e) {
- log.error(e);
- } finally {
- node.setMsgProducerOut(null);
- iter.remove();
- }
- }
+ nodes = new ArrayList(remoteNodes.values());
}
- try {
- server.stop();
- } catch (IOException e) {
- throw new NodeException("Can not stop NodeServer.", e);
- } catch (CommunicationException e) {
- throw new NodeException("Can not stop NodeServer.", e);
+ for (Iterator iter = nodes.iterator(); iter.hasNext();) {
+ RemoteNode node = (RemoteNode) iter.next();
+ node.leave();
+ node.setMsgProducerOut(null);
}
}
- public void setTopology(NodeTopology aTopology) {
- Set neighbours = aTopology.getNeighbours(nodeInfo);
-
- // Makes sure that one does not try to remove the new neighbours
- // during the reconfiguration.
- remoteNodeMonitor.unscheduleNodeDeletion(neighbours);
-
- Set newNeighbours = new HashSet();
+ public void prepareTopology(NodeTopology aTopology) throws NodeException {
Set oldNeighbours;
if ( null == topology ) {
oldNeighbours = Collections.EMPTY_SET;
} else {
oldNeighbours = topology.getNeighbours(nodeInfo);
}
- // Tries to join all the neighbours declared by the specified
- // topology.
- for (Iterator iter = neighbours.iterator(); iter.hasNext();) {
+ // Computes the new neighbours
+ Set newNeighbours = aTopology.getNeighbours(nodeInfo);
+ newNeighbours.removeAll(oldNeighbours);
+
+ // Makes sure that one does not drop them during the reconfiguration.
+ remoteNodeMonitor.unscheduleNodeDeletion(newNeighbours);
+
+ Exception exception = null;
+ // Joins all the new neighbours
+ for (Iterator iter = newNeighbours.iterator(); iter.hasNext();) {
NodeInfo node = (NodeInfo) iter.next();
- if ( !oldNeighbours.contains(node) ) {
- try {
- findOrJoinRemoteNode(node);
- newNeighbours.add(node);
- } catch (NodeException e) {
- log.error("Can not apply topology change", e);
- break;
- } catch (CommunicationException e) {
- log.error("Can not apply topology change", e);
- break;
- }
+ try {
+ findOrJoinRemoteNode(node);
+ } catch (NodeException e) {
+ exception = e;
+ break;
+ } catch (CommunicationException e) {
+ exception = e;
+ break;
}
- iter.remove();
- oldNeighbours.remove(node);
}
- // One neighbour has not been joined successfully. Rolls-back the
- // physical connections created until now.
- if ( 0 < neighbours.size() ) {
+ // One new neighbour has not been joined successfully. Rolls-back.
+ if ( null != exception ) {
for (Iterator iter = newNeighbours.iterator(); iter.hasNext();) {
NodeInfo node = (NodeInfo) iter.next();
- try {
- leaveRemoteNode(node);
- } catch (NodeException e) {
- log.error("Error rolling-back topology change", e);
- } catch (CommunicationException e) {
- log.error("Error rolling-back topology change", e);
- }
+ leaveRemoteNode(node);
}
- throw new CommunicationException("Can not apply topology.");
+ throw new NodeException("Can not apply topology.", exception);
+ }
+ preparedTopology = aTopology;
+ }
+
+ public void commitTopology() {
+ Set oldNeighbours;
+ if ( null == topology ) {
+ oldNeighbours = Collections.EMPTY_SET;
+ } else {
+ oldNeighbours = topology.getNeighbours(nodeInfo);
}
+ // Computes the old neighbours
+ Set newNeighbours = preparedTopology.getNeighbours(nodeInfo);
+ oldNeighbours.removeAll(newNeighbours);
// Schedules the deletion of the old neighbours.
remoteNodeMonitor.scheduleNodeDeletion(oldNeighbours);
- // Ensures that the new neighbours will not be leaved.
- remoteNodeMonitor.unscheduleNodeDeletion(newNeighbours);
- topology = aTopology;
+ topology = preparedTopology;
}
public void addListener(RemoteNodeEventListener aListener) {
@@ -198,22 +181,14 @@
}
}
- public void leaveRemoteNode(NodeInfo aNodeInfo)
- throws NodeException {
+ public void leaveRemoteNode(NodeInfo aNodeInfo) {
synchronized(remoteNodes) {
- RemoteNode remoteNode = findRemoteNode(aNodeInfo);
+ RemoteNode remoteNode = (RemoteNode) remoteNodes.get(aNodeInfo);
if ( null == remoteNode ) {
return;
}
- try {
- remoteNode.leave();
- } catch (IOException e) {
- throw new NodeException("Can not leave " + aNodeInfo, e);
- } catch (CommunicationException e) {
- throw new NodeException("Can not leave " + aNodeInfo, e);
- } finally {
- unregisterRemoteNode(remoteNode);
- }
+ remoteNode.leave();
+ unregisterRemoteNode(remoteNode);
}
}
@@ -221,26 +196,14 @@
throws NodeException {
RemoteNode remoteNode;
synchronized(remoteNodes) {
- remoteNode = findRemoteNode(aNodeInfo);
+ remoteNode = (RemoteNode) remoteNodes.get(aNodeInfo);
if ( null != remoteNode ) {
return remoteNode;
}
- log.debug("Joining node {" + aNodeInfo + "}");
- remoteNode = factory.factoryRemoteNode(aNodeInfo, ioContext);
- RemoteNodeConnection connection;
- try {
- connection = remoteNode.newConnection();
- connection.open();
- } catch (IOException e) {
- throw new NodeException("Can not reach " + aNodeInfo, e);
- } catch (CommunicationException e) {
- throw new NodeException("Can not reach " + aNodeInfo, e);
- }
- JoinRequest joinRequest = new JoinRequest(nodeInfo, aNodeInfo);
- joinRequest.execute(connection);
-
- remoteNode.addConnection(connection);
- registerRemoteNode(remoteNode);
+ remoteNode =
+ factory.factoryRemoteNode(nodeInfo, aNodeInfo, ioContext);
+ remoteNode.setManager(this);
+ remoteNode.join();
}
return remoteNode;
}
@@ -295,10 +258,6 @@
private class RemoteNodeRouter implements MsgOutInterceptor {
public void push(Msg aMsg) {
- if ( null == topology ) {
- throw new RuntimeException("No topology is set.");
- }
-
MsgHeader header = aMsg.getHeader();
Object destNode = header.getHeader(MsgHeaderConstants.DEST_NODES);
if (destNode instanceof NodeInfo) {
@@ -323,31 +282,59 @@
MsgHeaderConstants.DEST_NODE_PATH,
NodeInfo.pop(path));
RemoteNode remoteNode = findRemoteNode(target);
+ if ( null == remoteNode ) {
+ throw new CommunicationException(target +
+ " has failed during a topology reconfiguration.");
+ }
out = remoteNode.getMsgConsumerOut();
} else {
// A path has not already been computed. Computes one.
NodeInfo src = (NodeInfo)
header2.getHeader(MsgHeaderConstants.SRC_NODE);
- path = topology.getPath(src, target);
- if ( null == path ) {
- throw new CommunicationException("{" + target +
- "} is not reachable by {" + src + "}");
+ NodeTopology topo = markTopology(header2);
+ path = topo.getPath(src, target);
+ if (null == path) {
+ throw new CommunicationException("{" + target
+ + "} is not reachable by {" + src +
+ "} in the topology " + topo);
}
- NodeInfo tmpNode = path[0];
- RemoteNode remoteNode = findRemoteNode(tmpNode);
+ RemoteNode remoteNode = findRemoteNode(path[0]);
if ( null == remoteNode ) {
- throw new CommunicationException("{" + target +
- "} is not reachable by {" + src + "}");
+ throw new CommunicationException(path[0] +
+ " has failed during a topology reconfiguration.");
}
out = remoteNode.getMsgConsumerOut();
- NodeInfo[] newPath = NodeInfo.pop(path);
// Inserts the computed path and the new dests.
- header2.addHeader(MsgHeaderConstants.DEST_NODE_PATH, newPath);
+ header2.addHeader(MsgHeaderConstants.DEST_NODE_PATH, NodeInfo.pop(path));
header2.addHeader(MsgHeaderConstants.DEST_NODES, target);
}
out.push(msg2);
}
+ }
+
+ /**
+ * If the topology version is not set, then the Msg is sent in the
+ * current topology.
+ * <BR>
+ * If it is set, then one checks that the associated topology is
+ * still defined. It must be either the currently installed or the
+ * one being prepared.
+ */
+ private NodeTopology markTopology(MsgHeader aHeader) {
+ NodeTopology topo = topology;
+ Integer version = (Integer)
+ aHeader.getOptionalHeader(MsgHeaderConstants.TOPOLOGY_VERSION);
+ if ( null == version ) {
+ aHeader.addHeader(MsgHeaderConstants.TOPOLOGY_VERSION,
+ new Integer(topo.getVersion()));
+ } else if ( version.intValue() == preparedTopology.getVersion() ) {
+ topo = preparedTopology;
+ } else if ( version.intValue() != topo.getVersion() ) {
+ throw new CommunicationException("Topology version " +
+ version + " too old.");
+ }
+ return topo;
}
}
1.4 +18 -9 incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/RemoteNodeManager.java
Index: RemoteNodeManager.java
===================================================================
RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/RemoteNodeManager.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- RemoteNodeManager.java 17 Jul 2004 03:49:29 -0000 1.3
+++ RemoteNodeManager.java 20 Jul 2004 00:15:06 -0000 1.4
@@ -48,16 +48,26 @@
public void stop() throws NodeException;
/**
- * Sets the Topology to be used to derive the path between two nodes.
+ * Prepares the Topology to be used to derive the path between two nodes.
* <BR>
- * When the topology is set, the manager tries to "apply" it: it creates
- * physical connections with all of its neighbours as defined by the
- * specified topology and drops the physical connections no more
- * required by the topology change.
+ * When the topology is prepared, the manager tries to "apply" it: it
+ * creates physical connections to all of its neighbours as defined by the
+ * specified topology. Physical connections no more required by the
+ * topology should not be dropped. These latter should be dropped only
+ * if the topology is committed.
*
* @param aTopology Topology.
+ * @exception NodeException Indicates that the topology can not be prepared.
*/
- public void setTopology(NodeTopology aTopology);
+ public void prepareTopology(NodeTopology aTopology) throws NodeException;
+
+ /**
+ * Commits the Topology which has been previously prepared.
+ * <BR>
+ * When a topology is committed, the physical connections defined
+ * by the previous topology and not by the one to be committed are dropped.
+ */
+ public void commitTopology();
/**
* Adds a listener for RemoteNode event.
@@ -77,9 +87,8 @@
* Leaves a remote node.
*
* @param aNodeInfo Meta-data of the node to be left.
- * @throws NodeException
*/
- public void leaveRemoteNode(NodeInfo aNodeInfo) throws NodeException;
+ public void leaveRemoteNode(NodeInfo aNodeInfo);
/**
* Finds or joins a remote node.
1.3 +5 -15 incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/MessagingTransportFactory.java
Index: MessagingTransportFactory.java
===================================================================
RCS file: /home/cvs/incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/MessagingTransportFactory.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- MessagingTransportFactory.java 24 Jun 2004 23:39:03 -0000 1.2
+++ MessagingTransportFactory.java 20 Jul 2004 00:15:06 -0000 1.3
@@ -42,23 +42,13 @@
/**
* Creates a RemoteNode providing a local view of the remote node aNodeInfo.
*
- * @param aNodeInfo Remote node meta-data.
+ * @param aLocalNodeInfo Local node meta-data.
+ * @param aRemoteNodeInfo Remote node meta-data.
* @param anIOContext Used to retrieve the IOContext to be used to
* communicate with the remote node.
* @return RemoteNode.
*/
- public RemoteNode factoryRemoteNode(
- NodeInfo aNodeInfo, IOContext anIOContext);
+ public RemoteNode factoryRemoteNode(NodeInfo aLocalNodeInfo,
+ NodeInfo aRemoteNodeInfo, IOContext anIOContext);
- /**
- * Creates a RemoteNodeConnection to the remote node aNodeInfo.
- *
- * @param aNodeInfo Remote node meta-data.
- * @param anIOContext Used to retrieve the IOContext to be used to
- * communicate with the remote node.
- * @return RemoteNodeConnection.
- */
- public RemoteNodeConnection factoryRemoteNodeConnection(
- NodeInfo aNodeInfo, IOContext anIOContext);
-
}
1.1 incubator-geronimo/sandbox/messaging/src/java/org/apache/geronimo/messaging/remotenode/AbstractRemoteNode.java
Index: AbstractRemoteNode.java
===================================================================
/**
*
* Copyright 2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.geronimo.messaging.remotenode;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.geronimo.messaging.MsgHeaderConstants;
import org.apache.geronimo.messaging.NodeException;
import org.apache.geronimo.messaging.NodeInfo;
import org.apache.geronimo.messaging.interceptors.HeaderOutInterceptor;
import org.apache.geronimo.messaging.interceptors.MsgOutInterceptor;
import org.apache.geronimo.messaging.io.IOContext;
import org.apache.geronimo.messaging.remotenode.RemoteNodeConnection.LifecycleListener;
/**
* Abstract implememtation for the RemoteNode contracts.
*
* @version $Revision: 1.1 $ $Date: 2004/07/20 00:15:06 $
*/
public abstract class AbstractRemoteNode
implements RemoteNode
{
private static final Log log = LogFactory.getLog(AbstractRemoteNode.class);
/**
* Local node meta-data.
*/
protected final NodeInfo localNodeInfo;
/**
* Manager of this remote node.
*/
protected RemoteNodeManager manager;
/**
* Remote node meta-data.
*/
protected NodeInfo remoteNodeInfo;
protected final IOContext ioContext;
/**
* Connection opened to this remote node.
*/
private RemoteNodeConnection connection;
/**
* Incoming Msgs (coming from remote nodes) are pushed to this output.
*/
protected MsgOutInterceptor producerOut;
public AbstractRemoteNode(NodeInfo aLocalNode, IOContext anIOContext) {
if ( null == aLocalNode ) {
throw new IllegalArgumentException("Local NodeInfo is required.");
} else if ( null == anIOContext ) {
throw new IllegalArgumentException("IOContext is required.");
}
localNodeInfo = aLocalNode;
ioContext = anIOContext;
}
public AbstractRemoteNode(NodeInfo aLocalNodeInfo, NodeInfo aRemoteNodeInfo,
IOContext anIOContext) {
this(aLocalNodeInfo, anIOContext);
if ( null == aRemoteNodeInfo ) {
throw new IllegalArgumentException("Remote NodeInfo is required.");
}
remoteNodeInfo = aRemoteNodeInfo;
}
public void setManager(RemoteNodeManager aManager) {
manager = aManager;
}
public void setMsgProducerOut(MsgOutInterceptor aMsgOut) {
producerOut = aMsgOut;
if ( null == connection ) {
return;
}
connection.setMsgProducerOut(aMsgOut);
}
public MsgOutInterceptor getMsgConsumerOut() {
return new HeaderOutInterceptor(
MsgHeaderConstants.DEST_NODE,
remoteNodeInfo,
connection.getMsgConsumerOut());
}
public NodeInfo getNodeInfo() {
return remoteNodeInfo;
}
protected void setConnection(RemoteNodeConnection aConnection)
throws NodeException {
if ( null != connection && null != aConnection ) {
throw new IllegalArgumentException("Connection already defined.");
} else if ( null != connection ) {
connection.close();
connection = null;
return;
}
connection = aConnection;
connection.open();
connection.setMsgProducerOut(producerOut);
connection.setLifecycleListener(new LifecycleListener() {
public void onClose() {
manager.unregisterRemoteNode(AbstractRemoteNode.this);
}
});
}
public void leave() {
connection.close();
connection.setMsgProducerOut(null);
connection = null;
}
}