You are viewing a plain text version of this content. The canonical link for it is here.
Posted to scm@geronimo.apache.org by gd...@apache.org on 2004/03/01 14:16:36 UTC
cvs commit: incubator-geronimo/sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote RemoteUseCaseTest.java
gdamour 2004/03/01 05:16:36
Modified: sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging
ServerNode.java ServerProcessors.java
sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote
RemoteUseCaseTest.java
Added: sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging
CommunicationException.java NodeInfo.java
MetaConnection.java
Log:
Remove the ServantNode concept.
Each node is a ServerNode, which can join another node uniquely
identified on the network by its NodeInfo property.
Revision Changes Path
1.2 +76 -132 incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/ServerNode.java
Index: ServerNode.java
===================================================================
RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/ServerNode.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- ServerNode.java 25 Feb 2004 13:36:15 -0000 1.1
+++ ServerNode.java 1 Mar 2004 13:16:35 -0000 1.2
@@ -20,7 +20,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.net.InetAddress;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
@@ -43,13 +42,10 @@
* It is also in charge of dispatching the incoming Msgs to the registered
* Connectors.
* <BR>
- * A ServantNode is the counterpart of a ServerNode: it allows to access a
- * ServerNode remotely.
- * <BR>
- * The following diagram shows how ServerNode, ServantNode and Connector are
- * combined together:
+ * The following diagram shows how ServantNode and Connectors are combined
+ * together:
*
- * Connector -- MTO -- ServantNode -- MTO -- ServerNode -- OTM -- Connector
+ * Connector -- MTO -- ServerNode -- MTO -- ServerNode -- OTM -- Connector
*
* Connector communicates with each other by sending Msgs.
*
@@ -63,9 +59,9 @@
private static final Log log = LogFactory.getLog(ServerNode.class);
/**
- * Server name.
+ * Node meta-data.
*/
- private final String name;
+ private final NodeInfo nodeInfo;
/**
* Connectors registered by this server.
@@ -104,7 +100,12 @@
/**
* Processors of this server.
*/
- private final ServerProcessors processors;
+ final ServerProcessors processors;
+
+ /**
+ * MetaConnection to other nodes.
+ */
+ final MetaConnection metaConnection;
private GBeanContext context;
@@ -119,24 +120,22 @@
* @param aMaxRequest Maximum number of concurrent requests, which can be
* processed by this server.
*/
- public ServerNode(String aName, Collection aCollOfConnectors,
- InetAddress anAddress, int aPort, int aMaxRequest) {
- super(anAddress, aPort);
- if ( null == aName ) {
- throw new IllegalArgumentException("Name is required.");
- }
+ public ServerNode(NodeInfo aNodeInfo, Collection aCollOfConnectors,
+ int aMaxRequest) {
+ super(aNodeInfo.getAddress(), aNodeInfo.getPort());
- name = aName;
+ nodeInfo = aNodeInfo;
+ metaConnection = new MetaConnection(this);
// No socket timeout.
setMaxIdleTimeMs(0);
- streamManager = new StreamManagerImpl(name);
+ streamManager = new StreamManagerImpl(getName());
processors = new ServerProcessors(this);
- queueIn = new MsgQueue(aName + " Inbound");
- queueOut = new MsgQueue(aName + " Outbound");
+ queueIn = new MsgQueue(getName() + " Inbound");
+ queueOut = new MsgQueue(getName() + " Outbound");
connections = new HashMap();
@@ -166,6 +165,48 @@
}
/**
+ * Gets the name of this node.
+ */
+ public String getName() {
+ return nodeInfo.getName();
+ }
+
+ /**
+ * Gets the NodeInfo of this node.
+ *
+ * @return NodeInfo.
+ */
+ public NodeInfo getNodeInfo() {
+ return nodeInfo;
+ }
+
+ /**
+ * Joins the node uniquely identified on the network by aNodeInfo.
+ *
+ * @param aNodeInfo NodeInfo of a remote node to join.
+ * @throws IOException Indicates that an I/O error has occured.
+ * @throws CommunicationException Indicates that the node can not be
+ * registered by the remote node identified by aNodeInfo.
+ */
+ public void join(NodeInfo aNodeInfo)
+ throws IOException, CommunicationException {
+ metaConnection.join(aNodeInfo);
+ }
+
+ /**
+ * Leaves the node uniquely identified on the network by aNodeInfo.
+ *
+ * @param aNodeInfo NodeInfo of the remote node to leave.
+ * @throws IOException Indicates that an I/O error has occured.
+ * @throws CommunicationException Indicates that the node has not leaved
+ * successfully the remote node.
+ */
+ public void leave(NodeInfo aNodeInfo)
+ throws IOException, CommunicationException {
+ metaConnection.leave(aNodeInfo);
+ }
+
+ /**
* Gets the StreamManager of this server.
*
* @return StreamManager used by this server to resolve/encode InputStreams.
@@ -175,17 +216,14 @@
}
/**
- * Gets the Output to be used to communicate with the specified servant.
+ * Gets the Output to be used to communicate with the specified node.
*
- * @param aServantName Servant name.
- * @return Output to be used to communicate with the specified servant.
+ * @param aServantName Node name.
+ * @return Output to be used to communicate with the specified node.
*/
- public MsgOutInterceptor getOutForServant(Object aServantName) {
- ConnectionWrapper connection;
- synchronized (connections) {
- connection = (ConnectionWrapper) connections.get(aServantName);
- }
- return connection.out;
+ public MsgOutInterceptor getOutForNode(String aNodeName)
+ throws CommunicationException {
+ return metaConnection.getOutForNode(aNodeName);
}
/**
@@ -226,19 +264,13 @@
* Handles a new connection.
*/
protected void handleConnection(InputStream anIn,OutputStream anOut) {
- ConnectionWrapper connection = initConnection(anIn, anOut);
-
- // Wait until the end of the connection.
- Object releaser = connection.endReleaser;
- synchronized (releaser) {
- try {
- releaser.wait();
- } catch (InterruptedException e) {
- log.error(e);
- }
+ try {
+ metaConnection.joined(anIn, anOut);
+ } catch (IOException e) {
+ log.error(e);
+ } catch (CommunicationException e) {
+ log.error(e);
}
-
- removeConnection(connection);
}
public void setGBeanContext(GBeanContext aContext) {
@@ -267,97 +299,9 @@
processors.stop();
}
-
- /**
- * Initializes a connection. Checks that a connection with the same name
- * is not already registered by the server. If a connection with the same
- * name exists, then the server refuses the connection and exits. Otherwise,
- * a connection is registered with the provided name.
- *
- * @param anIn Raw input of the connection.
- * @param anOut Raw output of the connection.
- * @return Connection.
- */
- private ConnectionWrapper initConnection(
- InputStream anIn, OutputStream anOut) {
- ConnectionWrapper connection = new ConnectionWrapper(anIn, anOut);
-
- Msg msg = connection.in.pop();
- MsgBody body = msg.getBody();
- String cName = (String) body.getContent();
-
- msg = new Msg();
- body = msg.getBody();
- synchronized (connections) {
- if ( connections.containsKey(cName) ) {
- body.setContent(Boolean.FALSE);
- connection.out.push(msg);
- throw new RuntimeException(cName + " already registered");
- }
- connection.nodeName = cName;
- addConnection(connection);
- }
- body.setContent(Boolean.TRUE);
- connection.out.push(msg);
- return connection;
- }
-
- /**
- * Releases a connection.
- *
- * @param aConnection Connection to be released.
- */
- private void removeConnection(ConnectionWrapper aConnection) {
- synchronized(connections) {
- connections.remove(aConnection.nodeName);
- }
- processors.stopConnection(aConnection);
- aConnection.close();
- }
-
- /**
- * Registers a connection.
- *
- * @param aConnection Connection to be registered.
- */
- private void addConnection(ConnectionWrapper aConnection) {
- synchronized(connections) {
- connections.put(aConnection.nodeName, aConnection);
- processors.startConnection(aConnection);
- }
- }
- class ConnectionWrapper {
- final MsgInInterceptor in;
- final InputStream rawIn;
- final MsgOutInterceptor out;
- final OutputStream rawOut;
- String nodeName;
- final Object endReleaser;
- private ConnectionWrapper(InputStream anIn, OutputStream anOut) {
- rawIn = anIn;
- in = new StreamInInterceptor(rawIn, streamManager);
- rawOut = anOut;
- out =
- new HeaderOutInterceptor(
- MsgHeaderConstants.SRC_NODE,
- name,
- new StreamOutInterceptor(anOut, streamManager));
- endReleaser = new Object();
- }
-
- private void close() {
- try {
- rawIn.close();
- } catch (IOException e) {
- log.error("Can not close input", e);
- }
- try {
- rawOut.close();
- } catch (IOException e) {
- log.error("Can not close output", e);
- }
- }
+ public String toString() {
+ return "Node {" + nodeInfo + "}";
}
}
1.2 +16 -62 incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/ServerProcessors.java
Index: ServerProcessors.java
===================================================================
RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/ServerProcessors.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- ServerProcessors.java 25 Feb 2004 13:36:15 -0000 1.1
+++ ServerProcessors.java 1 Mar 2004 13:16:35 -0000 1.2
@@ -56,32 +56,19 @@
streamManager = aServer.getStreamManager();
}
- public Processors getProcessors() {
- return processors;
- }
-
/**
- * Starts popping Msg from the provided connection and adds them to the
- * inbound Msg queue.
- *
- * @param aConnection Connection to be popped.
+ * Execute a Processor in a separate Thread.
+ *
+ * @param aProcessor Processor to be executed.
*/
- public void startConnection(ServerNode.ConnectionWrapper aConnection) {
- InboundQueueFiller filler = new InboundQueueFiller(aConnection);
- processors.execute(filler);
+ public void execute(Processor aProcessor) {
+ processors.execute(aProcessor);
}
- /**
- * Stops popping Msg from the provided connection.
- *
- * @param aConnection Connection to be stopped.
- */
- public void stopConnection(ServerNode.ConnectionWrapper aConnection) {
- Object releaser = aConnection.endReleaser;
- synchronized(releaser) {
- releaser.notify();
- }
+ public Processors getProcessors() {
+ return processors;
}
+
/**
* Dispatches the Msgs seating in the inbound queue. Pushes the Msg seating
* in the outbound queue to the relevant node.
@@ -95,45 +82,6 @@
}
/**
- * Inbound queue filler.
- */
- private class InboundQueueFiller implements Processor {
-
- /**
- * Connection to read Msg from.
- */
- private final ServerNode.ConnectionWrapper connection;
-
- /**
- * Is this Processor started.
- */
- private volatile boolean isStarted = true;
-
- /**
- * Pops Msgs from the specified connection and adds them to the
- * inbound queue.
- *
- * @param aConnection Connection to read Msg from.
- */
- private InboundQueueFiller(ServerNode.ConnectionWrapper aConnection) {
- connection = aConnection;
- }
-
- public void run() {
- QueueOutInterceptor out = new QueueOutInterceptor(server.queueIn);
- while ( isStarted ) {
- Msg msg = connection.in.pop();
- out.push(msg);
- }
- }
-
- public void release() {
- isStarted = false;
- }
-
- }
-
- /**
* Runnable in charge of dispatching the Msgs seating in the outbound
* queue to the relevant node.
*/
@@ -152,7 +100,13 @@
while ( isStarted ) {
Msg msg = in.pop();
Object destNode = in.getHeader();
- MsgOutInterceptor out = server.getOutForServant(destNode);
+ MsgOutInterceptor out;
+ try {
+ out = server.getOutForNode((String) destNode);
+ } catch (CommunicationException e) {
+ log.error(e);
+ continue;
+ }
out.push(msg);
}
}
1.1 incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/CommunicationException.java
Index: CommunicationException.java
===================================================================
/**
*
* Copyright 2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.geronimo.datastore.impl.remote.messaging;
/**
* Exception raised in case of a communication exceptions between two nodes.
*
* @version $Revision: 1.1 $ $Date: 2004/03/01 13:16:35 $
*/
public class CommunicationException extends Exception {
public CommunicationException(String aMessage) {
super(aMessage);
}
public CommunicationException(String aMessage, Throwable aNested) {
super(aMessage, aNested);
}
}
1.1 incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/NodeInfo.java
Index: NodeInfo.java
===================================================================
/**
*
* Copyright 2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.geronimo.datastore.impl.remote.messaging;
import java.io.Serializable;
import java.net.InetAddress;
/**
* Wraps the properties of a node, which identify it uniquely on the network.
* <BR>
* This class could be wrapped in a packet and send to a multicast group in
* order to notify the availability of a new node to other nodes. These other
* nodes could then decide to join it or not.
*
* @version $Revision: 1.1 $ $Date: 2004/03/01 13:16:35 $
*/
public class NodeInfo implements Serializable
{
/**
* Name.
*/
private final String name;
/**
* Listening address.
*/
private final InetAddress address;
/**
* Listening port.
*/
private final int port;
/**
* Creates a NodeInfo defining uniquely a node on a network.
*
* @param aName Name of the node.
* @param anAddess Address that the node is listening on.
* @param aPort Listening port.
*/
public NodeInfo(String aName, InetAddress anAddess, int aPort) {
if ( null == aName ) {
throw new IllegalArgumentException("Name is required.");
} else if ( null == anAddess ) {
throw new IllegalArgumentException("Address is required.");
} else if ( 0 == aPort ) {
throw new IllegalArgumentException("Port is required.");
}
name = aName;
address = anAddess;
port = aPort;
}
/**
* Gets the listening address of the node providing this instance.
*
* @return Listening address.
*/
public InetAddress getAddress() {
return address;
}
/**
* Gets the name of the node providing this instance.
*
* @return Node name.
*/
public String getName() {
return name;
}
/**
* Gets the listening port of the node providing this instance.
*
* @return Listening port.
*/
public int getPort() {
return port;
}
public boolean equals(Object obj) {
if ( false == obj instanceof NodeInfo ) {
return false;
}
NodeInfo other = (NodeInfo) obj;
return name.equals(other.name) && address.equals(other.address) &&
port == other.port;
}
public String toString() {
return "NodeInfo: node name = {" + name + "}; address = {" + address +
"}; port = {" + port + "}";
}
}
1.1 incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/MetaConnection.java
Index: MetaConnection.java
===================================================================
/**
*
* Copyright 2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.geronimo.datastore.impl.remote.messaging;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* This is a connection of connections.
*
* @version $Revision: 1.1 $ $Date: 2004/03/01 13:16:35 $
*/
public class MetaConnection
{
private static final Log log = LogFactory.getLog(MetaConnection.class);
/**
* Node owning this connection.
*/
private ServerNode node;
/**
* NodeInfo to Connection map.
*/
private Map connections;
/**
* Creates a meta-connection for the specified node.
*
* @param aNode Node.
*/
public MetaConnection(ServerNode aNode) {
if ( null == aNode ) {
throw new IllegalArgumentException("Node is required.");
}
node = aNode;
connections = new HashMap();
}
/**
* Gets the Msg output to be used to communicate with the node aNodeName.
*
* @param aNodeName Node name.
* @return Msg output.
* @throws CommunicationException Indicates that the node aNodeName is not
* registered by this connection.
*/
public MsgOutInterceptor getOutForNode(String aNodeName)
throws CommunicationException {
Map tmpConnections;
synchronized(connections) {
tmpConnections = new HashMap(connections);
}
Connection connection = null;
for (Iterator iter = tmpConnections.entrySet().iterator();
iter.hasNext();) {
Map.Entry entry = (Map.Entry) iter.next();
NodeInfo nodeInfo = (NodeInfo) entry.getKey();
if ( nodeInfo.getName().equals(aNodeName) ) {
connection = (Connection) entry.getValue();
break;
}
}
if ( null == connection ) {
throw new CommunicationException("Node {" + aNodeName +
"} is not know by {" + node.getName() + "}");
}
return connection.out;
}
/**
* Tests if the specified NodeInfo is already registered by this
* meta-connection.
*
* @param aNodeInfo NodeInfo defining a Node.
* @return true if the node info is already registered.
*/
public boolean isRegistered(NodeInfo aNodeInfo) {
synchronized(connections) {
return connections.containsKey(aNodeInfo);
}
}
/**
* Creates a new connection on top of the provided input and output streams
* and waits for the end or failure of this connection prior to return.
* <BR>
* These streams should have been provided by a remote node, which is
* joining the node owning this meta-connection.
* <BR>
* This method reads the NodeInfo of the remote node and tries to register
* it with this node.
*
* @param anIn InputStream opened by a remote node on the node owning this
* meta-connection.
* @param anOut OutputStream.
* @throws IOException Indicates that an I/O error has occured.
* @throws CommunicationException Indicates that the NodeInfo provided by
* the remote node conflicts with the current NodeInfo registrations.
*/
public void joined(InputStream anIn, OutputStream anOut)
throws IOException, CommunicationException {
Connection connection = new Connection(anIn, anOut);
// Try to register the connected node with this node.
Msg msg = connection.in.pop();
MsgBody body = msg.getBody();
NodeInfo otherNodeInfo = (NodeInfo) body.getContent();
msg = new Msg();
body = msg.getBody();
if ( isRegistered(otherNodeInfo) ) {
body.setContent(Boolean.FALSE);
connection.out.push(msg);
throw new CommunicationException(
otherNodeInfo + " already registered");
}
synchronized(connections) {
connections.put(otherNodeInfo, connection);
}
body.setContent(Boolean.TRUE);
connection.out.push(msg);
// Pops the input stream of the connection and fills in the inbound
// Msg queue.
QueueOutInterceptor out = new QueueOutInterceptor(node.queueIn);
MsgCopier copier = new MsgCopier(
connection.in, out, connection.listener);
node.processors.execute(copier);
connection.waitForEnd();
}
/**
* Creates a new connection to the node uniquely identified on the network
* by the provided NodeInfo.
*
* @param aNodeInfo NodeInfo of a node.
* @throws IOException Indicates that an I/O error has occured.
* @throws CommunicationException Indicates that the node owning this
* meta-connection can not be registered by the remote node identified by
* aNodeInfo.
*/
public void join(NodeInfo aNodeInfo)
throws IOException, CommunicationException {
if ( isRegistered(aNodeInfo) ) {
throw new IllegalArgumentException("{" + aNodeInfo +
"} is already registered by {" + node + "}");
}
Connection connection = new Connection(aNodeInfo);
// Try to register this node with the other one.
Msg msg = new Msg();
MsgBody body = msg.getBody();
body.setContent(node.getNodeInfo());
connection.out.push(msg);
msg = connection.in.pop();
// In case of successful registration, the server returns true.
Boolean success = (Boolean) msg.getBody().getContent();
if ( !success.booleanValue() ) {
throw new CommunicationException("Can not register Node {" +
node.getNodeInfo() + "} with {" + aNodeInfo + "}");
}
synchronized (connections) {
connections.put(aNodeInfo, connection);
}
// Pops the input stream of the connection and fills in the inbound
// Msg queue.
QueueOutInterceptor out = new QueueOutInterceptor(node.queueIn);
MsgCopier copier = new MsgCopier(
connection.in, out, connection.listener);
node.processors.execute(copier);
}
/**
* Closes the connection to the node identified by aNodeInfo.
*
* @param aNodeInfo NodeInfo of a remote node.
* @throws IOException Indicates that an I/O error has occured.
* @throws CommunicationException Indicates that the node owning this
* meta-connection has not leaved successfully the remote node.
*/
public void leave(NodeInfo aNodeInfo)
throws IOException, CommunicationException {
if ( isRegistered(aNodeInfo) ) {
throw new IllegalArgumentException("{" + aNodeInfo +
"} is already registered by {" + node + "}");
}
Connection connection;
synchronized (connections) {
connection = (Connection) connections.remove(aNodeInfo);
}
connection.close();
}
/**
* Logical connection.
*/
private class Connection {
/**
* Allows reading from the connection in Msg mode.
*/
private final MsgInInterceptor in;
/**
* Raw InputStream.
*/
private final InputStream rawIn;
/**
* Allows writing to the connection in Msg mode.
*/
private final MsgOutInterceptor out;
/**
* Raw OutputStream.
*/
private final OutputStream rawOut;
/**
* Receives notification when the copier poping and pushing Msgs to
* the raw InputStream and OutputStream fails.
*/
private final MsgCopier.CopierListener listener;
/**
* Monitor used to wait the end of this connection.
*/
private final Object endReleaser = new Object();
/**
* Creates a connection wrapping the provided input and output streams.
*
* @param anIn InputStream of the connection.
* @param anOut OutputStream of the connection.
* @exception IOException Indicates that an I/O error has occured.
*/
private Connection(InputStream anIn, OutputStream anOut)
throws IOException {
if ( null == anIn ) {
throw new IllegalArgumentException("InputStream is required.");
} else if ( null == anOut ) {
throw new IllegalArgumentException("OutputStream is required.");
}
rawIn = anIn;
in = new StreamInInterceptor(rawIn, node.getStreamManager());
rawOut = anOut;
// One adds the name of this node on exit.
out =
new HeaderOutInterceptor(
MsgHeaderConstants.SRC_NODE,
node.getName(),
new StreamOutInterceptor(rawOut, node.getStreamManager()));
listener = new MsgCopier.NullCopierListener() {
public void onFailure() {
close();
}
};
}
/**
* Creates a connection to the node defined by aNodeInfo.
*
* @param aNodeInfo NodeInfo of a node.
* @exception IOException Indicates that an I/O error has occured.
*/
private Connection(NodeInfo aNodeInfo)
throws IOException {
if ( null == aNodeInfo ) {
throw new IllegalArgumentException("NodeInfo is required.");
}
Socket socket =
new Socket(aNodeInfo.getAddress(), aNodeInfo.getPort());
rawIn = socket.getInputStream();
in = new StreamInInterceptor(rawIn, node.getStreamManager());
rawOut = socket.getOutputStream();
// One adds the name of this node on exit.
out =
new HeaderOutInterceptor(
MsgHeaderConstants.SRC_NODE,
node.getName(),
new StreamOutInterceptor(rawOut, node.getStreamManager()));
listener = new MsgCopier.NullCopierListener() {
public void onFailure() {
close();
}
};
}
/**
* Close the logical connection.
*/
private void close() {
try {
rawIn.close();
} catch (IOException e) {
log.error("Can not close input", e);
}
try {
rawOut.close();
} catch (IOException e) {
log.error("Can not close output", e);
}
synchronized(endReleaser) {
endReleaser.notify();
}
}
/**
* Waits until the end of this connection.
*/
private void waitForEnd() {
synchronized(endReleaser) {
try {
endReleaser.wait();
} catch (InterruptedException e) {
log.error(e);
}
}
}
}
}
1.2 +18 -10 incubator-geronimo/sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/RemoteUseCaseTest.java
Index: RemoteUseCaseTest.java
===================================================================
RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/RemoteUseCaseTest.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- RemoteUseCaseTest.java 29 Feb 2004 13:14:11 -0000 1.1
+++ RemoteUseCaseTest.java 1 Mar 2004 13:16:36 -0000 1.2
@@ -29,7 +29,7 @@
import org.apache.geronimo.datastore.impl.local.LocalGFileManager;
import org.apache.geronimo.datastore.impl.remote.datastore.GFileManagerClient;
import org.apache.geronimo.datastore.impl.remote.datastore.GFileManagerProxy;
-import org.apache.geronimo.datastore.impl.remote.messaging.ServantNode;
+import org.apache.geronimo.datastore.impl.remote.messaging.NodeInfo;
import org.apache.geronimo.datastore.impl.remote.messaging.ServerNode;
/**
@@ -39,6 +39,11 @@
*/
public class RemoteUseCaseTest extends AbstractUseCaseTest {
+ /**
+ * In this set-up one initializes two nodes, namely Node1 and Node2. A
+ * local GFileManager is mounted by Node1. A client GFileManager is mounted
+ * by Node2. Node2 joins Node1.
+ */
protected void setUp() throws Exception {
LockManager lockManager = new LockManager();
File root = new File(System.getProperty("java.io.tmpdir"),
@@ -49,18 +54,21 @@
GFileManager delegate;
delegate = new LocalGFileManager("test", root, lockManager);
InetAddress address = InetAddress.getLocalHost();
- int port = 8080;
GFileManagerProxy proxy = new GFileManagerProxy(delegate);
- ServerNode server = new ServerNode("MasterNode",
- Collections.singleton(proxy), address, port, 2);
- server.doStart();
+ NodeInfo nodeInfo1 = new NodeInfo("Node1", address, 8080);
+ ServerNode server1 = new ServerNode(nodeInfo1,
+ Collections.singleton(proxy), 10);
+ server1.doStart();
proxy.doStart();
- fileManager = new GFileManagerClient("test");
- ServantNode servant = new ServantNode(
- "ChildNode", Collections.singleton(fileManager), address, port, 10);
- servant.doStart();
+ fileManager = new GFileManagerClient("test", "Node1");
+ NodeInfo nodeInfo2 = new NodeInfo("Node2", address, 8082);
+ ServerNode server2 = new ServerNode(nodeInfo2,
+ Collections.singleton(fileManager), 10);
+ server2.doStart();
((GFileManagerClient) fileManager).doStart();
+
+ server2.join(nodeInfo1);
}
}