You are viewing a plain text version of this content. The canonical link for it is here.
Posted to scm@geronimo.apache.org by gd...@apache.org on 2004/03/16 15:48:59 UTC
cvs commit: incubator-geronimo/sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/messaging IndexedMapTest.java
gdamour 2004/03/16 06:48:59
Modified: sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging
GInputStream.java StreamOutInterceptor.java
StreamInInterceptor.java RequestSender.java
Topology.java StreamOutputStream.java
StreamInputStream.java MsgHeader.java NodeInfo.java
MetaConnection.java
Added: sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging
IndexedMap.java
sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/messaging
IndexedMapTest.java
Log:
o NodeInfos are no more sent across the wire when passing Msgs
between nodes. Identifiers allocated by Topology are used instead.
o Round of refactoring: StreamInputInterceptor and
StreamOutputInterceptor extends ObjectInputStream and
ObjectOutputStream respectively.
Revision Changes Path
1.3 +3 -5 incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/GInputStream.java
Index: GInputStream.java
===================================================================
RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/GInputStream.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- GInputStream.java 11 Mar 2004 15:36:14 -0000 1.2
+++ GInputStream.java 16 Mar 2004 14:48:58 -0000 1.3
@@ -72,15 +72,13 @@
* Serializes the wrapped InputStream.
*/
public void writeExternal(ObjectOutput anOut) throws IOException {
- StreamOutputStream.CustomObjectOutputStream objOut =
- (StreamOutputStream.CustomObjectOutputStream) anOut;
+ StreamOutputStream objOut = (StreamOutputStream) anOut;
objOut.writeStream(content);
objOut.flush();
}
public void readExternal(ObjectInput anIn) throws IOException, ClassNotFoundException {
- StreamInputStream.CustomObjectInputStream objIn =
- (StreamInputStream.CustomObjectInputStream) anIn;
+ StreamInputStream objIn = (StreamInputStream) anIn;
content = objIn.readStream();
}
1.3 +72 -7 incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/StreamOutInterceptor.java
Index: StreamOutInterceptor.java
===================================================================
RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/StreamOutInterceptor.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- StreamOutInterceptor.java 11 Mar 2004 15:36:14 -0000 1.2
+++ StreamOutInterceptor.java 16 Mar 2004 14:48:58 -0000 1.3
@@ -38,7 +38,12 @@
* Buffer size to be applied on top of the OutputStream when writting.
*/
private static final int BUFFER_SIZE = 2048;
-
+
+ /**
+ * Null PushSerialization;
+ */
+ private static final PushSynchronization NULL_PUSH_SYNC =
+ new PushSynchronizationAdaptor();
private static final Log log = LogFactory.getLog(StreamOutInterceptor.class);
@@ -57,7 +62,11 @@
* actual OutputStream.
*/
private final ByteArrayOutputStream memOut;
-
+
+ /**
+ * Push synchronization to be applied by this instance.
+ */
+ private final PushSynchronization pushSynchronization;
/**
* Pushes Msgs to an OutputStream. Msgs are written by a StreamOutputStream
@@ -67,20 +76,31 @@
* @param anOut OutputStream to write to.
* @param aManager Used to encode InputStream.
*/
- public StreamOutInterceptor(OutputStream anOut, StreamManager aManager) {
+ public StreamOutInterceptor(OutputStream anOut, StreamManager aManager,
+ PushSynchronization aSerialization)
+ throws IOException {
if ( null == anOut ) {
throw new IllegalArgumentException("OutputStream is required.");
} else if ( null == aManager ) {
throw new IllegalArgumentException("StreamManager is required.");
}
out = new BufferedOutputStream(anOut, BUFFER_SIZE);
- memOut = new ByteArrayOutputStream();
+ memOut = new ByteArrayOutputStream(BUFFER_SIZE);
streamOutputStream = new StreamOutputStream(memOut, aManager);
+ if ( null == aSerialization ) {
+ pushSynchronization = NULL_PUSH_SYNC;
+ } else {
+ pushSynchronization = aSerialization;
+ }
}
public void push(Msg aMessage) {
try {
+ Object opaque =
+ pushSynchronization.beforePush(streamOutputStream, aMessage);
streamOutputStream.writeObject(aMessage);
+ pushSynchronization.afterPush(streamOutputStream, aMessage, opaque);
+ streamOutputStream.reset();
streamOutputStream.flush();
} catch (IOException e) {
log.error(e);
@@ -88,7 +108,7 @@
}
synchronized(out) {
try {
- out.write(memOut.toByteArray());
+ memOut.writeTo(out);
out.flush();
} catch (IOException e) {
log.error(e);
@@ -97,5 +117,50 @@
}
memOut.reset();
}
-
+
+ /**
+ * Allows an implementation to be notified when a Msg is about to be
+ * pushed.
+ */
+ public interface PushSynchronization {
+ /**
+ * Notifies the implementation that a Msg is being pushed.
+ * <BR>
+ * This method is called before the actual push of the Msg.
+ *
+ * @param anOut Used to write information before the Msg itself.
+ * @param aMsg Msg being pushed.
+ * @return Opaque object which is passed by to this instance via
+ * afterPush. It can be used to pass information between a beforePush
+ * and a afterPush call.
+ * @throws IOException Indicates that an I/O error has occured.
+ */
+ public Object beforePush(StreamOutputStream anOut, Msg aMsg)
+ throws IOException;
+ /**
+ * Notifies the implementation that a Msg has been pushed.
+ *
+ * @param anOut Used to write information after the Msg itself.
+ * @param aMsg Msg which has just been pushed.
+ * @param anOpaque Value returned by beforePush.
+ * @throws IOException Indicates that an I/O error has occured.
+ */
+ public void afterPush(StreamOutputStream anOut, Msg aMsg,
+ Object anOpaque)
+ throws IOException;
+ }
+
+ /**
+ * PushSynchronization adaptor.
+ */
+ public static class PushSynchronizationAdaptor
+ implements PushSynchronization {
+ public Object beforePush(StreamOutputStream anOut, Msg aMsg)
+ throws IOException {
+ return null;
+ }
+ public void afterPush(StreamOutputStream anOut, Msg aMsg,
+ Object anOpaque) throws IOException {}
+ }
+
}
1.3 +77 -3 incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/StreamInInterceptor.java
Index: StreamInInterceptor.java
===================================================================
RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/StreamInInterceptor.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- StreamInInterceptor.java 11 Mar 2004 15:36:14 -0000 1.2
+++ StreamInInterceptor.java 16 Mar 2004 14:48:58 -0000 1.3
@@ -39,6 +39,17 @@
private static final int BUFFER_SIZE = 2048;
private static final Log log = LogFactory.getLog(StreamInInterceptor.class);
+
+ /**
+ * Null PopSynchronizarion;
+ */
+ private static final PopSynchronization NULL_POP_SYNC =
+ new PopSynchronizationAdaptor();
+
+ /**
+ * Raw InputStream to pop Msgs from.
+ */
+ private final InputStream rawIn;
/**
* InputStream to pop Msgs from.
@@ -51,27 +62,45 @@
private final StreamInputStream streamInputStream;
/**
+ * Pop synchronization to be applied by this instance.
+ */
+ private final PopSynchronization popSynchronization;
+
+ /**
* Pops Msgs from an InputStream. Msgs are read by a StreamInputStream
* using the provided StreamManager to resolve InputStream encoded in the
* raw InputStream.
*
* @param anIn InputStream to read from.
* @param aManager Used to resolve encoded InputStream.
+ * @param aSynchronization PopSynchronization applied by this instance.
*/
- public StreamInInterceptor(InputStream anIn, StreamManager aManager) {
+ public StreamInInterceptor(InputStream anIn, StreamManager aManager,
+ PopSynchronization aSynchronization)
+ throws IOException {
if ( null == anIn ) {
throw new IllegalArgumentException("InputStream is required.");
} else if ( null == aManager ) {
throw new IllegalArgumentException("StreamManager is required.");
}
+ rawIn = anIn;
in = new BufferedInputStream(anIn, BUFFER_SIZE);
streamInputStream = new StreamInputStream(in, aManager);
+ if ( null == aSynchronization ) {
+ popSynchronization = NULL_POP_SYNC;
+ } else {
+ popSynchronization = aSynchronization;
+ }
}
public Msg pop() {
Msg msg;
try {
- msg = (Msg) streamInputStream.readObject();
+ synchronized (rawIn) {
+ Object opaque = popSynchronization.beforePop(streamInputStream);
+ msg = (Msg) streamInputStream.readObject();
+ popSynchronization.afterPop(streamInputStream, msg, opaque);
+ }
} catch (ClassNotFoundException e) {
log.error(e);
throw new RuntimeException(e);
@@ -82,4 +111,49 @@
return msg;
}
+ /**
+ * Allows an implementation to be notified when a Msg is about to be
+ * popped.
+ */
+ public interface PopSynchronization {
+ /**
+ * Notifies the implementation that a Msg is being popped.
+ * <BR>
+ * This method is called before the actual pop of the Msg.
+ *
+ * @param anIn Used to read information from the input stream before
+ * the Msg itself.
+ * @return Opaque object which is passed by to this instance via
+ * afterPop. It can be used to pass information between a beforePop
+ * and a afterPop call.
+ * @throws IOException Indicates that an I/O error has occured.
+ */
+ public Object beforePop(StreamInputStream anIn)
+ throws IOException ;
+
+ /**
+ * Notifies the implementation that a Msg has been popped.
+ *
+ * @param anIn Used to read information from the input stream after
+ * the Msg itself.
+ * @param aMsg Msg which has just been popped.
+ * @param anOpaque Value returned by beforePop.
+ * @throws IOException Indicates that an I/O error has occured.
+ */
+ public void afterPop(StreamInputStream anIn, Msg aMsg, Object anOpaque)
+ throws IOException;
+ }
+
+ /**
+ * PopSynchronizarion adaptor.
+ */
+ public static class PopSynchronizationAdaptor implements PopSynchronization {
+ public Object beforePop(StreamInputStream anIn)
+ throws IOException {
+ return null;
+ }
+ public void afterPop(StreamInputStream anIn, Msg aMsg,
+ Object anOpaque) throws IOException {}
+ }
+
}
1.4 +22 -37 incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/RequestSender.java
Index: RequestSender.java
===================================================================
RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/RequestSender.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- RequestSender.java 11 Mar 2004 15:36:14 -0000 1.3
+++ RequestSender.java 16 Mar 2004 14:48:59 -0000 1.4
@@ -22,8 +22,6 @@
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.lang.reflect.InvocationTargetException;
-import java.util.HashMap;
-import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -56,7 +54,7 @@
/**
* Request id to FuturResult map.
*/
- private final Map responses;
+ private final IndexedMap responses;
/**
* Node using this sender.
@@ -77,7 +75,7 @@
throw new IllegalArgumentException("Node is required.");
}
srcNode = aSrcNode;
- responses = new HashMap();
+ responses = new IndexedMap(1024);
}
/**
@@ -106,9 +104,8 @@
Msg msg = new Msg();
MsgHeader header = msg.getHeader();
- Object id = createID(aTargetNodes);
+ RequestID id = createID(aTargetNodes);
header.addHeader(MsgHeaderConstants.CORRELATION_ID, id);
- header.addHeader(MsgHeaderConstants.SRC_NODE, srcNode);
header.addHeader(MsgHeaderConstants.DEST_NODES, aTargetNodes);
MsgBody body = msg.getBody();
@@ -130,16 +127,15 @@
* @param aTargetNodes Nodes to which the request is to be sent.
* @return Request identifier.
*/
- private Object createID(NodeInfo[] aTargetNodes) {
+ private RequestID createID(NodeInfo[] aTargetNodes) {
RequestID id;
- synchronized (responses) {
- id = new RequestID(requestIDSeq++);
- FutureResult[] results = new FutureResult[aTargetNodes.length];
- for (int i = 0; i < results.length; i++) {
- results[i] = new FutureResult();
- }
- responses.put(id, results);
+ int mapID = responses.allocateId();
+ id = new RequestID(mapID);
+ FutureResult[] results = new FutureResult[aTargetNodes.length];
+ for (int i = 0; i < results.length; i++) {
+ results[i] = new FutureResult();
}
+ responses.put(mapID, results);
return id;
}
@@ -150,11 +146,9 @@
* @param aWaitTime number of milliseconds to wait for a response.
* @return Result of the request.
*/
- private CommandResult waitResponse(Object anID, long aWaitTime) {
- FutureResult[] results;
- synchronized(responses) {
- results = (FutureResult[]) responses.get(anID);
- }
+ private CommandResult waitResponse(RequestID anID, long aWaitTime) {
+ FutureResult[] results =
+ (FutureResult[]) responses.get(anID.id);
Exception ex;
try {
CommandResult returned = null;
@@ -163,9 +157,7 @@
returned = (CommandResult) results[i].get();
// CommandResult returned = (CommandResult) result.timedGet(aWaitTime);
}
- synchronized(responses) {
- responses.remove(anID);
- }
+ responses.remove(anID.id);
return returned;
} catch (TimeoutException e) {
log.error(e);
@@ -187,10 +179,12 @@
* @param aResult Response
*/
public void setResponse(Object anID, CommandResult aResult) {
- FutureResult[] results;
- synchronized(responses) {
- results = (FutureResult[]) responses.get(anID);
+ if ( false == anID instanceof RequestID ) {
+ throw new IllegalArgumentException("ID is of the wrong type.");
}
+ RequestID id = (RequestID) anID;
+ FutureResult[] results;
+ results = (FutureResult[]) responses.get(id.id);
for (int i = 0; i < results.length; i++) {
FutureResult result = results[i];
if ( null == result.peek() ) {
@@ -209,23 +203,14 @@
* Required for Externalization.
*/
public RequestID() {}
- public RequestID(int anId) {
- id = anId;
+ public RequestID(int anID) {
+ id = anID;
}
public void writeExternal(ObjectOutput out) throws IOException {
out.writeInt(id);
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
id = in.readInt();
- }
- public boolean equals(Object obj) {
- if ( false == obj instanceof RequestID ) {
- return false;
- }
- return id == ((RequestID)obj).id;
- }
- public int hashCode() {
- return id;
}
}
1.2 +105 -35 incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/Topology.java
Index: Topology.java
===================================================================
RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/Topology.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- Topology.java 11 Mar 2004 15:36:14 -0000 1.1
+++ Topology.java 16 Mar 2004 14:48:59 -0000 1.2
@@ -48,9 +48,36 @@
* NodeInfo to PathWeight map.
*/
private Map nodeToPaths;
+
+ private Object nodeAndIdLock = new Object();
+ private Map nodeToID;
+ private Map idToNode;
+
+ private static int seqID = 0;
public Topology() {
nodeToPaths = new HashMap();
+ nodeToID = new HashMap();
+ idToNode = new HashMap();
+ }
+
+ private void registerNode(NodeInfo aNodeInfo) {
+ synchronized(nodeAndIdLock) {
+ if ( null == nodeToID.get(aNodeInfo) ) {
+ Integer id = new Integer(seqID++);
+ nodeToID.put(aNodeInfo, id);
+ idToNode.put(id, aNodeInfo);
+ }
+ }
+ }
+
+ private void unregisterNode(NodeInfo aNodeInfo) {
+ synchronized(nodeAndIdLock) {
+ if ( null == nodeToID.remove(aNodeInfo) ) {
+ throw new IllegalArgumentException(aNodeInfo +
+ " is not registered by this topology.");
+ }
+ }
}
/**
@@ -64,18 +91,20 @@
throw new IllegalArgumentException("Path is required.");
}
synchronized(nodeToPaths) {
+ registerNode(aPath.nodeOne);
Collection related = (Collection) nodeToPaths.get(aPath.nodeOne);
if ( null == related ) {
related = new ArrayList();
nodeToPaths.put(aPath.nodeOne, related);
}
- related.add(new Node(aPath.nodeTwo, aPath.weigthOneToTwo));
+ related.add(new NodeWrapper(aPath.nodeTwo, aPath.weigthOneToTwo));
+ registerNode(aPath.nodeTwo);
related = (Collection) nodeToPaths.get(aPath.nodeTwo);
if ( null == related ) {
related = new ArrayList();
nodeToPaths.put(aPath.nodeTwo, related);
}
- related.add(new Node(aPath.nodeOne, aPath.weigthTwoToOne));
+ related.add(new NodeWrapper(aPath.nodeOne, aPath.weigthTwoToOne));
}
}
@@ -95,6 +124,7 @@
" is not registered by this topology.");
}
}
+ unregisterNode(aNode);
}
/**
@@ -112,19 +142,27 @@
throw new IllegalArgumentException(aPath.nodeOne +
" is not registered by this topology.");
}
- if ( !related.remove(new Node(aPath.nodeTwo, null)) ) {
+ if ( !related.remove(new NodeWrapper(aPath.nodeTwo, null)) ) {
throw new IllegalArgumentException(aPath +
" is not registered by this topology.");
}
+ if ( 0 == related.size() ) {
+ unregisterNode(aPath.nodeOne);
+ nodeToPaths.remove(aPath.nodeOne);
+ }
related = (Collection) nodeToPaths.get(aPath.nodeTwo);
if ( null == related ) {
throw new IllegalArgumentException(aPath.nodeTwo +
" is not registered by this topology.");
}
- if ( !related.remove(new Node(aPath.nodeOne, null)) ) {
+ if ( !related.remove(new NodeWrapper(aPath.nodeOne, null)) ) {
throw new IllegalArgumentException(aPath +
" is not registered by this topology.");
}
+ if ( 0 == related.size() ) {
+ unregisterNode(aPath.nodeTwo);
+ nodeToPaths.remove(aPath.nodeTwo);
+ }
}
}
@@ -154,10 +192,8 @@
for (Iterator iter = paths.iterator(); iter.hasNext();index++) {
int weight = 0;
List nodeList = (List) iter.next();
- for (Iterator iterator = nodeList.iterator();
- iterator.hasNext();
- ) {
- Node node = (Node) iterator.next();
+ for (Iterator iter2 = nodeList.iterator(); iter2.hasNext();) {
+ NodeWrapper node = (NodeWrapper) iter2.next();
weight += node.weigth.getWeight();
}
if ( -1 == minWeight || weight < minWeight ) {
@@ -166,7 +202,13 @@
}
}
List path = (List) paths.get(minPathIndex);
- return (NodeInfo[]) path.toArray(new NodeInfo[0]);
+ NodeInfo[] result = new NodeInfo[path.size()];
+ int i = 0;
+ for (Iterator iter = path.iterator(); iter.hasNext();) {
+ NodeWrapper wrapper = (NodeWrapper) iter.next();
+ result[i++] = wrapper.node;
+ }
+ return result;
}
private List getPaths(NodeInfo aSource, NodeInfo aTarget, List aPath,
@@ -178,22 +220,55 @@
}
List returned = new ArrayList();
for (Iterator iter = related.iterator(); iter.hasNext();) {
- Node node = (Node) iter.next();
- if ( aPath.contains(node) ) {
+ NodeWrapper wrapper = (NodeWrapper) iter.next();
+ if ( aPath.contains(wrapper) ) {
continue;
}
- aPath.add(node);
- if ( node.equals(aTarget) ) {
+ aPath.add(wrapper);
+ if ( wrapper.node.equals(aTarget) ) {
returned.add(new ArrayList(aPath));
} else {
- Collection paths = getPaths(node, aTarget, aPath, aNodeToPath);
+ Collection paths =
+ getPaths(wrapper.node, aTarget, aPath, aNodeToPath);
returned.addAll(paths);
}
- aPath.remove(node);
+ aPath.remove(wrapper);
}
return returned;
}
+ /**
+ * Gets the identifier of the provided node.
+ * <BR>
+ * When a node added to a topology, this latter assigns it an identifier.
+ *
+ * @param aNodeInfo Node whose identifier is to be returned.
+ * @return Node identifier.
+ */
+ public int getIDOfNode(NodeInfo aNodeInfo) {
+ Integer id;
+ synchronized(nodeAndIdLock) {
+ id = (Integer) nodeToID.get(aNodeInfo);
+ }
+ if ( null == id ) {
+ throw new IllegalArgumentException(aNodeInfo +
+ " is not registered by this topology.");
+ }
+ return id.intValue();
+ }
+
+ public NodeInfo getNodeById(int anId) {
+ NodeInfo nodeInfo;
+ synchronized(nodeAndIdLock) {
+ nodeInfo = (NodeInfo) idToNode.get(new Integer(anId));
+ }
+ if ( null == nodeInfo ) {
+ throw new IllegalArgumentException("Identifier " + anId +
+ " is not registered by this topology.");
+ }
+ return nodeInfo;
+ }
+
public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(nodeToPaths);
}
@@ -227,35 +302,30 @@
}
}
- private static class Node
- extends NodeInfo
- implements Externalizable {
+ private static class NodeWrapper implements Externalizable {
private PathWeight weigth;
- /**
- * Required for Externalization.
- */
- public Node() {}
- public Node(NodeInfo aNode, PathWeight aWeight) {
- super(aNode.getName(), aNode.getAddress(), aNode.getPort());
- if ( null == aNode ) {
- throw new IllegalArgumentException("Node is required");
- }
+ private NodeInfo node;
+ public NodeWrapper(NodeInfo aNode, PathWeight aWeight) {
+ node = aNode;
weigth = aWeight;
}
public boolean equals(Object obj) {
- return super.equals(obj);
+ if ( false == obj instanceof NodeWrapper ) {
+ return false;
+ }
+ NodeWrapper other = (NodeWrapper) obj;
+ return node.equals(other.node);
}
public int hashCode() {
- return super.hashCode();
+ return node.hashCode();
}
public void writeExternal(ObjectOutput out) throws IOException {
- super.writeExternal(out);
out.writeObject(weigth);
+ out.writeObject(node);
}
- public void readExternal(ObjectInput in)
- throws IOException, ClassNotFoundException {
- super.readExternal(in);
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
weigth = (PathWeight) in.readObject();
+ node = (NodeInfo) in.readObject();
}
}
1.4 +14 -26 incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/StreamOutputStream.java
Index: StreamOutputStream.java
===================================================================
RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/StreamOutputStream.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- StreamOutputStream.java 11 Mar 2004 15:36:14 -0000 1.3
+++ StreamOutputStream.java 16 Mar 2004 14:48:59 -0000 1.4
@@ -17,7 +17,6 @@
package org.apache.geronimo.datastore.impl.remote.messaging;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectOutputStream;
@@ -29,13 +28,15 @@
* @version $Revision$ $Date$
*/
public class StreamOutputStream
- extends DataOutputStream
+ extends ObjectOutputStream
{
private final StreamManager streamManager;
- public StreamOutputStream(OutputStream anOut, StreamManager aManager) {
+ public StreamOutputStream(OutputStream anOut, StreamManager aManager)
+ throws IOException {
super(anOut);
+ enableReplaceObject(true);
if ( null == aManager ) {
throw new IllegalArgumentException("StreamManager is required.");
}
@@ -47,11 +48,6 @@
writeObject(id);
}
- public void writeObject(Object anObject) throws IOException {
- CustomObjectOutputStream objOut = new CustomObjectOutputStream();
- objOut.writeObject(anObject);
- }
-
/**
* Gets the StreamManager used to resolve InputStream identifiers.
*
@@ -60,25 +56,17 @@
public StreamManager getStreamManager() {
return streamManager;
}
-
- public class CustomObjectOutputStream extends ObjectOutputStream {
-
- public CustomObjectOutputStream() throws IOException, SecurityException {
- super(StreamOutputStream.this);
- enableReplaceObject(true);
- }
- public void writeStream(InputStream aStream) throws IOException {
- StreamOutputStream.this.writeStream(aStream);
- }
-
- protected Object replaceObject(Object obj) throws IOException {
- if ( obj instanceof InputStream ) {
- return new GInputStream((InputStream) obj);
- }
- return obj;
+ /**
+ * It is critical to avoid to write
+ */
+ protected void writeStreamHeader() throws IOException {}
+
+ protected Object replaceObject(Object obj) throws IOException {
+ if ( obj instanceof InputStream ) {
+ return new GInputStream((InputStream) obj);
}
-
+ return obj;
}
}
1.4 +25 -28 incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/StreamInputStream.java
Index: StreamInputStream.java
===================================================================
RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/StreamInputStream.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- StreamInputStream.java 11 Mar 2004 15:36:14 -0000 1.3
+++ StreamInputStream.java 16 Mar 2004 14:48:59 -0000 1.4
@@ -17,11 +17,10 @@
package org.apache.geronimo.datastore.impl.remote.messaging;
-import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.io.ObjectInput;
import java.io.ObjectInputStream;
+import java.io.StreamCorruptedException;
/**
* Allows to read an InputStream from the underlying InputStream. More
@@ -32,8 +31,7 @@
* @version $Revision$ $Date$
*/
public class StreamInputStream
- extends DataInputStream
- implements ObjectInput
+ extends ObjectInputStream
{
/**
@@ -50,8 +48,10 @@
* @param aManager StreamManager.
* @throws IOException If an I/O error has occured.
*/
- public StreamInputStream(InputStream anIn, StreamManager aManager) {
+ public StreamInputStream(InputStream anIn, StreamManager aManager)
+ throws IOException {
super(anIn);
+ enableResolveObject(true);
if ( null == aManager ) {
throw new IllegalArgumentException("StreamManager is required.");
}
@@ -67,7 +67,7 @@
* @throws IOException May indicates that the StreamManager does not know
* about the encoded identifer.
*/
- public InputStream readInputStream() throws IOException {
+ public InputStream readStream() throws IOException {
Object id;
try {
id = readObject();
@@ -80,11 +80,6 @@
return returned;
}
- public Object readObject() throws ClassNotFoundException, IOException {
- CustomObjectInputStream objIn = new CustomObjectInputStream();
- return objIn.readObject();
- }
-
/**
* Gets the StreamManager used to register InputStreams.
*
@@ -94,23 +89,25 @@
return streamManager;
}
- public class CustomObjectInputStream extends ObjectInputStream {
-
- public CustomObjectInputStream() throws IOException, SecurityException {
- super(StreamInputStream.this);
- enableResolveObject(true);
- }
-
- public InputStream readStream() throws IOException {
- return StreamInputStream.this.readInputStream();
- }
-
- protected Object resolveObject(Object obj) throws IOException {
- if ( obj instanceof GInputStream ) {
- return ((GInputStream)obj).getRawInputStream();
- }
- return obj;
+ /**
+ * It is critical to read nothing during the creation of ObjectInputStream.
+ * <BR>
+ * Indeed, StreamInputStream and StreamOutputStream are used by
+ * StreamInInterceptor and StreamOutInterceptor respectively. It should be
+ * possible to instantiate these two objects at the same time, for instance
+ * to wrap the InputStream and OutputStream of a Socket.
+ * <BR>
+ * If ObjectInputStream reads a StreamHeader during its creation, then it
+ * is not possible to create these two classes at the same time.
+ */
+ protected void readStreamHeader()
+ throws IOException, StreamCorruptedException {}
+
+ protected Object resolveObject(Object obj) throws IOException {
+ if ( obj instanceof GInputStream ) {
+ return ((GInputStream)obj).getRawInputStream();
}
+ return obj;
}
}
1.3 +36 -25 incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/MsgHeader.java
Index: MsgHeader.java
===================================================================
RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/MsgHeader.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- MsgHeader.java 11 Mar 2004 15:36:14 -0000 1.2
+++ MsgHeader.java 16 Mar 2004 14:48:59 -0000 1.3
@@ -61,6 +61,18 @@
headers[i] = aHeader.headers[i];
}
}
+
+ /**
+ * Resets to null the value of an header.
+ *
+ * @param aKey Header key.
+ */
+ public Object resetHeader(Object aKey) {
+ int idx = getHeaderIndex(aKey);
+ Object result = headers[idx];
+ headers[idx] = null;
+ return result;
+ }
/**
* Adds an header.
@@ -69,16 +81,10 @@
* @param aValue Header value.
*/
public void addHeader(Object aKey, Object aValue) {
- for (int i = 0; i < headerConstants.length; i++) {
- if ( aKey == headerConstants[i] ) {
- headers[i] = aValue;
- return;
- }
- }
- throw new IllegalArgumentException("Header {" + aKey +
- "} is not supported.");
+ int idx = getHeaderIndex(aKey);
+ headers[idx] = aValue;
}
-
+
/**
* Gets a required header.
*
@@ -88,18 +94,13 @@
* does not exist.
*/
public Object getHeader(Object aKey) {
- for (int i = 0; i < headerConstants.length; i++) {
- if ( aKey == headerConstants[i] ) {
- Object value = headers[i];
- if ( null == value ) {
- throw new IllegalArgumentException("Header {" + aKey +
- "} is not set.");
- }
- return value;
- }
+ int idx = getHeaderIndex(aKey);
+ Object value = headers[idx];
+ if ( null == value ) {
+ throw new IllegalArgumentException("Header {" + aKey +
+ "} is not set.");
}
- throw new IllegalArgumentException("Header {" + aKey +
- "} is not supported.");
+ return value;
}
/**
@@ -109,16 +110,26 @@
* @return Header value.
*/
public Object getOptionalHeader(Object aKey) {
- Object value;
+ int idx = getHeaderIndex(aKey);
+ return headers[idx];
+ }
+
+ /**
+ * Gets the index of the header having the specified key.
+ *
+ * @param aKey Header key.
+ * @return header index.
+ */
+ private int getHeaderIndex(Object aKey) {
for (int i = 0; i < headerConstants.length; i++) {
if ( aKey == headerConstants[i] ) {
- return headers[i];
+ return i;
}
}
throw new IllegalArgumentException("Header {" + aKey +
- "} is not supported.");
+ "} is not supported.");
}
-
+
public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(headers);
}
1.3 +6 -2 incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/NodeInfo.java
Index: NodeInfo.java
===================================================================
RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/NodeInfo.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- NodeInfo.java 11 Mar 2004 15:36:14 -0000 1.2
+++ NodeInfo.java 16 Mar 2004 14:48:59 -0000 1.3
@@ -52,11 +52,15 @@
* Pops the first element of the array and returns the resulting array.
*
* @param aNodeInfo Array whose first element is to be poped.
- * @return New array.
+ * @return New array. If the size of aNodeInfo is one, then null is
+ * returned.
*/
public static NodeInfo[] pop(NodeInfo[] aNodeInfo) {
if ( null == aNodeInfo || 0 == aNodeInfo.length) {
throw new IllegalArgumentException("NodeInfo array is required.");
+ }
+ if ( 1 == aNodeInfo.length ) {
+ return null;
}
NodeInfo[] returned = new NodeInfo[aNodeInfo.length-1];
for (int i = 1; i < aNodeInfo.length; i++) {
1.4 +113 -11 incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/MetaConnection.java
Index: MetaConnection.java
===================================================================
RCS file: /home/cvs/incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/MetaConnection.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- MetaConnection.java 11 Mar 2004 15:36:14 -0000 1.3
+++ MetaConnection.java 16 Mar 2004 14:48:59 -0000 1.4
@@ -54,6 +54,12 @@
private Topology topology;
/**
+ * Logical compression and decompression applied on Msgs pushed and poped
+ * by this meta-connection.
+ */
+ private LogicalCompression logicalComp;
+
+ /**
* Creates a meta-connection for the specified node.
*
* @param aNode Node.
@@ -64,6 +70,7 @@
}
node = aNode;
connections = new HashMap();
+ logicalComp = new LogicalCompression();
}
/**
@@ -120,7 +127,6 @@
"} is not reachable by {" + node.getNodeInfo() + "}");
}
NodeInfo tmpNode = path[0];
- NodeInfo[] newPath = NodeInfo.pop(path);
Connection connection;
synchronized(connections) {
connection = (Connection) connections.get(tmpNode);
@@ -129,14 +135,26 @@
throw new CommunicationException("{" + aNode +
"} is not reachable by {" + node.getNodeInfo() + "}");
}
- return
+ NodeInfo[] newPath = NodeInfo.pop(path);
+ MsgOutInterceptor out =
new HeaderOutInterceptor(
- MsgHeaderConstants.DEST_NODES,
- aNode,
+ MsgHeaderConstants.SRC_NODE,
+ node.getNodeInfo(),
+ new HeaderOutInterceptor(
+ MsgHeaderConstants.DEST_NODE,
+ aNode,
+ new HeaderOutInterceptor(
+ MsgHeaderConstants.DEST_NODES,
+ aNode,
+ connection.out)));
+ if ( null != newPath ) {
+ out =
new HeaderOutInterceptor(
MsgHeaderConstants.DEST_NODE_PATH,
newPath,
- connection.out));
+ out);
+ }
+ return out;
}
/**
@@ -272,7 +290,7 @@
public void push(Msg aMsg) {
MsgHeader header = aMsg.getHeader();
if ( node.getNodeInfo().equals(
- header.getHeader(MsgHeaderConstants.DEST_NODES)) ) {
+ header.getHeader(MsgHeaderConstants.DEST_NODE)) ) {
inboundOut.push(aMsg);
} else {
outboundOut.push(aMsg);
@@ -283,6 +301,86 @@
aConnection.in, out, aConnection.listener);
node.processors.execute(copier);
}
+
+ /**
+ * Logical compression of the Msgs pushed and poped by this meta-connection.
+ * <BR>
+ * Its goal is to remove from Msgs to be serialized the information, shared
+ * by all the nodes. For instance, as a Topology is shared by all the nodes
+ * one can replace the NodeInfo instances contained by Msgs by their
+ * corresponding identifier.
+ *
+ * @version $Revision$ $Date$
+ */
+ private class LogicalCompression implements
+ StreamInInterceptor.PopSynchronization,
+ StreamOutInterceptor.PushSynchronization {
+
+ /**
+ * No logical compression.
+ */
+ private final static byte NULL = 0x00;
+
+ /**
+ * Compression based on the Topology shared knowledge.
+ */
+ private final static byte TOPOLOGY = 0x01;
+
+ public Object beforePop(StreamInputStream anIn)
+ throws IOException {
+ byte type = anIn.readByte();
+ if ( type == NULL ) {
+ return null;
+ }
+ if ( null == topology ) {
+ throw new IllegalArgumentException("No topology is defined.");
+ }
+ Object[] result = new Object[2];
+ int id = anIn.readInt();
+ NodeInfo nodeInfo = topology.getNodeById(id);
+ result[0] = nodeInfo;
+ id = anIn.readInt();
+ nodeInfo = topology.getNodeById(id);
+ result[1] = nodeInfo;
+ return result;
+ }
+ public void afterPop(StreamInputStream anIn, Msg aMsg, Object anOpaque)
+ throws IOException {
+ if ( null == anOpaque ) {
+ return;
+ }
+ Object[] prePop = (Object[]) anOpaque;
+ MsgHeader header = aMsg.getHeader();
+ header.addHeader(MsgHeaderConstants.SRC_NODE, prePop[0]);
+ header.addHeader(MsgHeaderConstants.DEST_NODE, prePop[1]);
+ header.addHeader(MsgHeaderConstants.DEST_NODES, prePop[1]);
+ }
+
+ public Object beforePush(StreamOutputStream anOut, Msg aMsg)
+ throws IOException {
+ if ( null == topology ) {
+ anOut.writeByte(NULL);
+ return null;
+ }
+ anOut.writeByte(TOPOLOGY);
+ MsgHeader header = aMsg.getHeader();
+ NodeInfo info =
+ (NodeInfo) header.resetHeader(MsgHeaderConstants.SRC_NODE);
+ int id = topology.getIDOfNode(info);
+ anOut.writeInt(id);
+ info =
+ (NodeInfo) header.resetHeader(MsgHeaderConstants.DEST_NODE);
+ id = topology.getIDOfNode(info);
+ // When pushing a Msg on the network, DEST_NODES equals to
+ // DEST_NODE.
+ header.resetHeader(MsgHeaderConstants.DEST_NODES);
+ anOut.writeInt(id);
+ return null;
+ }
+ public void afterPush(StreamOutputStream anOut, Msg aMsg,
+ Object anOpaque) throws IOException {
+ }
+ }
/**
* Logical connection.
@@ -330,9 +428,11 @@
}
rawIn = anIn;
- in = new StreamInInterceptor(rawIn, node.getStreamManager());
+ in = new StreamInInterceptor(rawIn, node.getStreamManager(),
+ logicalComp);
rawOut = anOut;
- out = new StreamOutInterceptor(rawOut, node.getStreamManager());
+ out = new StreamOutInterceptor(rawOut, node.getStreamManager(),
+ logicalComp);
listener = new MsgCopier.NullCopierListener() {
public void onFailure() {
close();
@@ -354,9 +454,11 @@
new Socket(aNodeInfo.getAddress(), aNodeInfo.getPort());
rawIn = socket.getInputStream();
- in = new StreamInInterceptor(rawIn, node.getStreamManager());
+ in = new StreamInInterceptor(rawIn, node.getStreamManager(),
+ logicalComp);
rawOut = socket.getOutputStream();
- out = new StreamOutInterceptor(rawOut, node.getStreamManager());
+ out = new StreamOutInterceptor(rawOut, node.getStreamManager(),
+ logicalComp);
listener = new MsgCopier.NullCopierListener() {
public void onFailure() {
close();
1.1 incubator-geronimo/sandbox/webdav/src/java/org/apache/geronimo/datastore/impl/remote/messaging/IndexedMap.java
Index: IndexedMap.java
===================================================================
/**
*
* Copyright 2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.geronimo.datastore.impl.remote.messaging;
/**
* @version $Revision: 1.1 $ $Date: 2004/03/16 14:48:59 $
*/
public class IndexedMap {
private static final Object RESERVED = new Object();
private final Object[] map;
private int lastAllocatedIndex;
public IndexedMap(int aSize) {
if ( aSize < 1 ) {
throw new IllegalArgumentException("Size is not valid.");
}
map = new Object[aSize];
lastAllocatedIndex = 0;
}
public synchronized int allocateId() {
for (int i=lastAllocatedIndex; i < map.length; i++) {
if ( null == map[i] ) {
lastAllocatedIndex = i;
map[i] = RESERVED;
return i;
}
}
for (int i=0; i < lastAllocatedIndex; i++) {
if ( null == map[i] ) {
lastAllocatedIndex = i;
map[i] = RESERVED;
return i;
}
}
throw new RuntimeException("Map is fulled.");
}
public Object get(int anId) {
return map[anId];
}
public void remove(int anId) {
map[anId] = null;
}
public void put(int anId, Object anObject) {
map[anId] = anObject;
}
}
1.1 incubator-geronimo/sandbox/webdav/src/test/org/apache/geronimo/datastore/impl/remote/messaging/IndexedMapTest.java
Index: IndexedMapTest.java
===================================================================
/**
*
* Copyright 2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.geronimo.datastore.impl.remote.messaging;
import junit.framework.TestCase;
/**
*
* @version $Revision: 1.1 $ $Date: 2004/03/16 14:48:59 $
*/
public class IndexedMapTest extends TestCase {
private IndexedMap map;
private int size;
protected void setUp() throws Exception {
size = 10;
map = new IndexedMap(size);
}
public void testAllocate() throws Exception {
for(int i = 0; i < size; i++) {
map.allocateId();
}
try {
map.allocateId();
fail("Map should be full.");
} catch (RuntimeException e) {
}
}
public void testGetPut() {
Object value = new Object();
int id = map.allocateId();
map.put(id, value);
assertEquals(value, map.get(id));
}
}