You are viewing a plain text version of this content. The canonical link for it is here.
Posted to wadi-commits@incubator.apache.org by bd...@apache.org on 2005/12/14 23:36:16 UTC

svn commit: r356933 [16/35] - in /incubator/wadi/trunk: ./ etc/ modules/ modules/assembly/ modules/assembly/src/ modules/assembly/src/bin/ modules/assembly/src/conf/ modules/assembly/src/main/ modules/assembly/src/main/assembly/ modules/core/ modules/c...

Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/BytesMessageOutputStreamConfig.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/BytesMessageOutputStreamConfig.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/BytesMessageOutputStreamConfig.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/BytesMessageOutputStreamConfig.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,29 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+
+import org.codehaus.wadi.Config;
+
+public interface BytesMessageOutputStreamConfig extends Config {
+
+    void send(BytesMessage message) throws JMSException;
+    BytesMessage createBytesMessage() throws JMSException;
+
+}

Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/NIOPipeConfig.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/NIOPipeConfig.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/NIOPipeConfig.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/NIOPipeConfig.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,25 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io;
+
+import EDU.oswego.cs.dl.util.concurrent.Sync;
+
+public interface NIOPipeConfig extends PipeConfig {
+
+    Sync getLock();
+    
+}

Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/PeerConfig.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/PeerConfig.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/PeerConfig.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/PeerConfig.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,37 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.channels.WritableByteChannel;
+
+import org.codehaus.wadi.Config;
+import org.codehaus.wadi.Contextualiser;
+
+public interface PeerConfig extends Config/*, StreamConnection*/, WritableByteChannel {
+
+    public void close() throws IOException; // inherited from WriteableByteChannel - but overloaded to mean close whole Connection...
+    Contextualiser getContextualiser();
+    String getNodeId();
+    //InputStream getInputStream() throws IOException;
+    //OutputStream getOutputStream() throws IOException;
+    ObjectInputStream getObjectInputStream() throws IOException;
+    ObjectOutputStream getObjectOutputStream() throws IOException;
+    
+}

Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/Pipe.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/Pipe.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/Pipe.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/Pipe.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,30 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io;
+
+import java.io.IOException;
+
+import org.codehaus.wadi.sandbox.io.impl.Peer;
+
+public interface Pipe extends Runnable, StreamConnection {
+
+    void run(); // reads peer from input, and runs it...
+    boolean run(Peer peer) throws Exception; // run a Peer ...
+    //void commit() throws IOException; // producer has finished
+    void close() throws IOException; // consumer has finished
+
+}
\ No newline at end of file

Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/PipeConfig.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/PipeConfig.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/PipeConfig.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/PipeConfig.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,30 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io;
+
+import org.codehaus.wadi.Config;
+import org.codehaus.wadi.Contextualiser;
+
+public interface PipeConfig extends Config {
+
+    void notifyIdle(Pipe pipe); // called by Connection on becoming idle...
+    void notifyClosed(Pipe pipe); // called by Connection on being closed...
+    
+    Contextualiser getContextualiser();
+    String getNodeId();
+
+}

Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/Server.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/Server.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/Server.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/Server.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,31 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io;
+
+public interface Server {
+
+    void init(ServerConfig config);
+    void start() throws Exception;
+    void stop() throws Exception;
+    void waitForExistingPipes();
+    void stopAcceptingPipes();
+    
+    // Connection container...
+    
+    void run(Pipe pipe);
+    
+}

Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/ServerConfig.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/ServerConfig.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/ServerConfig.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/ServerConfig.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,29 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io;
+
+import org.codehaus.wadi.Config;
+import org.codehaus.wadi.Contextualiser;
+import org.codehaus.wadi.gridstate.ExtendedCluster;
+
+public interface ServerConfig extends Config {
+
+        ExtendedCluster getCluster();
+        Contextualiser getContextualiser();
+        String getNodeName();
+    
+}

Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/StreamConnection.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/StreamConnection.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/StreamConnection.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/StreamConnection.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,28 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public interface StreamConnection {
+
+    InputStream getInputStream() throws IOException;
+    OutputStream getOutputStream() throws IOException;
+    
+}

Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/AbstractAsyncInputStream.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/AbstractAsyncInputStream.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/AbstractAsyncInputStream.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/AbstractAsyncInputStream.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,138 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.wadi.impl.Utils;
+
+import EDU.oswego.cs.dl.util.concurrent.Channel;
+import EDU.oswego.cs.dl.util.concurrent.Puttable;
+import EDU.oswego.cs.dl.util.concurrent.TimeoutException;
+
+public abstract class AbstractAsyncInputStream extends InputStream implements Puttable {
+
+    protected static final Object _endOfQueue=new Object();
+
+    protected final Log _log=LogFactory.getLog(getClass());
+    protected final Channel _inputQueue;
+    protected final long _timeout;
+
+    public AbstractAsyncInputStream(Channel inputQueue, long timeout) {
+        super();
+        _inputQueue=inputQueue;
+        _timeout=timeout;
+    }
+
+    protected abstract void setBuffer(Object object);
+    protected abstract Object getBuffer();
+    protected abstract int readByte() throws IOException;
+    protected abstract void readBytes(byte b[], int off, int len) throws IOException;
+    protected abstract long getRemaining();
+    protected abstract void recycle(Object object);
+
+    protected boolean ensureBuffer() throws IOException {
+        if (getBuffer()!=null)
+            return true; // we still have input...
+
+        Object tmp=null;
+        do {
+            try {
+                tmp=_inputQueue.poll(_timeout); // we need a fresh buffer...
+            } catch (TimeoutException e) {
+	      _log.error("timed out", e);
+                throw new IOException();
+            } catch (InterruptedException e) {
+	      _log.error("interrupted", e);
+            }
+        } while (Thread.interrupted());
+
+        if (tmp==_endOfQueue) {// no more input - our producer has committed his end of the queue...
+            Utils.safePut(_endOfQueue, _inputQueue); // leave it there - clumsy
+            return false;
+        }
+
+        setBuffer(tmp);
+        return true;
+    }
+
+    // InputStream
+
+    public int read() throws IOException {
+        if (!ensureBuffer())
+            return -1;
+
+        int b=readByte();
+
+        if (getRemaining()<=0) {
+            Object object=getBuffer();
+            setBuffer(null);
+            recycle(object);
+        }
+
+        //_log.info("reading: "+(char)b);
+
+        return b;
+    }
+
+    // InputStream
+
+    public int read(byte b[], int off, int len) throws IOException {
+        int red=0; // read (pres.) and read (perf.) are homographs...
+        while (red<len && ensureBuffer()) {
+            int tranche=Math.min(len, (int)getRemaining());
+            readBytes(b, off+red, tranche);
+            red+=tranche;
+
+            if (getRemaining()<=0) {
+                Object object=getBuffer();
+                setBuffer(null);
+                recycle(object);
+            }
+        }
+        //_log.info("read: "+red+" bytes");
+        return red==0?-1:red;
+    }
+
+    // InputStream
+
+    public void commit() {
+        Utils.safePut(_endOfQueue, _inputQueue);
+    }
+
+    // ByteBufferInputStream
+
+    public void read(ByteBuffer buffer, int from, int to) {
+        throw new UnsupportedOperationException(); // NYI
+    }
+
+    // Puttable
+
+    public void put(Object item) throws InterruptedException {
+        //_log.info("putting buffer on input queue: "+item);
+        _inputQueue.put(item);
+    }
+
+    public boolean offer(Object item, long msecs) throws InterruptedException {
+        return _inputQueue.offer(item, msecs);
+    }
+
+}

Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/AbstractClusterPipe.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/AbstractClusterPipe.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/AbstractClusterPipe.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/AbstractClusterPipe.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,107 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+
+import org.activecluster.Cluster;
+import org.codehaus.wadi.sandbox.io.BytesMessageOutputStreamConfig;
+import org.codehaus.wadi.sandbox.io.PipeConfig;
+
+import EDU.oswego.cs.dl.util.concurrent.Channel;
+import EDU.oswego.cs.dl.util.concurrent.Puttable;
+
+public abstract class AbstractClusterPipe extends AbstractPipe implements Puttable, BytesMessageOutputStreamConfig {
+
+    protected final Cluster _cluster;
+    protected final Destination _us;
+    protected final Destination _them;
+    protected final Channel _inputQueue;
+    protected final BytesMessageInputStream _inputStream;
+    protected final BytesMessageOutputStream _outputStream;
+    protected final String _ourCorrelationId;
+    
+    public AbstractClusterPipe(PipeConfig config, long timeout, Cluster cluster, Destination us, String ourId, Destination them, Channel inputQueue) {
+        super(config, timeout);
+        _cluster=cluster;
+        _us=us;
+        _them=them;
+        _inputQueue=inputQueue;
+        _inputStream=new BytesMessageInputStream(inputQueue, _timeout);
+        _outputStream=new BytesMessageOutputStream(this);
+        _ourCorrelationId=ourId;
+    }
+
+    public abstract String getTheirCorrelationId();
+    
+    String getCorrelationId() {
+        return _ourCorrelationId;
+    }
+    
+    // Connection
+    
+    public void close() throws IOException {
+        _inputStream.commit();
+        super.close();
+    }
+
+    // StreamConnection
+    
+    public InputStream getInputStream() {
+        return _inputStream;
+    }
+
+    public OutputStream getOutputStream() {
+        return _outputStream;
+    }
+
+    // Puttable - byte[] only please :-)
+    
+    public void put(Object item) throws InterruptedException {
+        _inputStream.put(item);
+    }
+
+    public boolean offer(Object item, long msecs) throws InterruptedException {
+        return _inputStream.offer(item, msecs);
+    }
+    
+    // ByteArrayOutputStreamConfig
+    
+    public void send(BytesMessage bytesMessage) throws JMSException {
+        bytesMessage.setJMSCorrelationID(getTheirCorrelationId());
+        bytesMessage.setJMSReplyTo(_us);
+        //_log.info("sending message: "+(_isServer?"[server->client]":"[client->server]"));
+        _cluster.send(_them, bytesMessage);
+    }
+    
+    public BytesMessage createBytesMessage() throws JMSException {
+        return _cluster.createBytesMessage();
+    }
+
+    // called by server...
+    public synchronized void commit() {
+        _inputStream.commit();
+        _valid=false;
+    }
+
+}

Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/AbstractPipe.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/AbstractPipe.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/AbstractPipe.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/AbstractPipe.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,155 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io.impl;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.wadi.Contextualiser;
+import org.codehaus.wadi.sandbox.io.PeerConfig;
+import org.codehaus.wadi.sandbox.io.Pipe;
+import org.codehaus.wadi.sandbox.io.PipeConfig;
+
+public abstract class AbstractPipe implements Pipe, PeerConfig  {
+
+    protected static final Log _log=LogFactory.getLog(AbstractPipe.class);
+
+    protected final PipeConfig _config;
+    protected final long _timeout;
+
+    protected boolean _valid;
+
+    public AbstractPipe(PipeConfig config, long timeout) {
+        _config=config;
+        _timeout=timeout;
+        _valid=true;
+    }
+
+    protected ObjectInputStream _ois;
+    public ObjectInputStream getObjectInputStream() throws IOException {
+        if (_ois==null)
+            _ois=new ObjectInputStream(getInputStream());
+        return _ois;
+    }
+
+    protected ObjectOutputStream _oos;
+    public ObjectOutputStream getObjectOutputStream() throws IOException {
+        if (_oos==null)
+            _oos=new ObjectOutputStream(getOutputStream());
+        return _oos;
+    }
+
+    public void run() {
+        try {
+            //_log.info("running...");
+            //_log.info("starting read...");
+            ObjectInputStream ois=getObjectInputStream();
+            //_log.info("stream created...");
+            Peer peer=(Peer)ois.readObject();
+            //_log.info("object read...");
+            try {
+            run(peer);
+            } catch (Exception e) {
+	      _log.error("problem running Peer", e);
+            }
+            //_log.info("...ran");
+        } catch (EOFException e) {
+            // end of the line - fall through...
+            if (_log.isTraceEnabled()) _log.trace("Connection reached end of input - quitting...: "+this);
+            _valid=false;
+        } catch (IOException e) {
+	  _log.warn("problem reading object off wire", e);
+            _valid=false; // this socket is trashed...
+        } catch (ClassNotFoundException e) {
+	  _log.warn("unknown Peer type - version/security problem?", e);
+            _valid=false; // this stream is unfixable ?
+        } finally {
+            _ois=null;
+            _oos=null;
+            if (_valid)
+                _config.notifyIdle(this); // after running, we declare ourselves 'idle' to our Server...
+            else
+                try {
+                    close();
+                } catch (IOException e) {
+		  _log.error("problem closing server Connection", e);
+                }
+        }
+        //_log.info("...idle");
+    }
+
+    public boolean run(Peer peer) throws Exception {
+        try {
+            return peer.run(this);
+        } finally {
+            _ois=null;
+            _oos=null;
+//          if (_valid)
+//          _config.notifyIdle(this); // after running, we declare ourselves 'idle' to our Server...
+//          else
+//          try {
+//          close();
+//          } catch (IOException e) {
+//          _log.error("problem closing server Connection", e);
+//          }
+        }
+    }
+
+    public void close() throws IOException {
+        //_log.info("closing...");
+        InputStream is=getInputStream();
+        OutputStream os=getOutputStream();
+        try{os.flush();}catch(IOException e){
+	  _log.warn("problem flushing socket output", e);
+        }
+        try{is.close();}catch(IOException e){
+	  _log.warn("problem closing socket input", e);
+        }
+        try{os.close();}catch(IOException e){
+	  _log.warn("problem closing socket output", e);
+        }
+        _config.notifyClosed(this);
+        //_log.info("...closed");
+    }
+
+    // WritableByteChannel - default behaviour is not to support this...
+
+    public int write(ByteBuffer src) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    public boolean isOpen() {
+        throw new UnsupportedOperationException();
+    }
+
+    // PeerConfig
+
+    public Contextualiser getContextualiser() {
+        return _config.getContextualiser();
+    }
+
+    public String getNodeId() {
+        return _config.getNodeId();
+    }
+}

Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/AbstractServer.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/AbstractServer.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/AbstractServer.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/AbstractServer.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,76 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io.impl;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.wadi.Contextualiser;
+import org.codehaus.wadi.sandbox.io.Pipe;
+import org.codehaus.wadi.sandbox.io.PipeConfig;
+import org.codehaus.wadi.sandbox.io.Server;
+import org.codehaus.wadi.sandbox.io.ServerConfig;
+
+import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
+
+public abstract class AbstractServer implements Server, PipeConfig {
+
+    protected final Log _log=LogFactory.getLog(getClass());
+    protected final PooledExecutor _executor;
+    protected final long _pipeTimeout;
+
+    public AbstractServer(PooledExecutor executor, long pipeTimeout) {
+        super();
+        _executor=executor;
+        _pipeTimeout=pipeTimeout;
+    }
+
+    protected ServerConfig _config;
+    protected Thread _thread;
+    protected volatile boolean _running;
+
+    public void init(ServerConfig config) {
+        _config=config;
+    }
+
+    public void start() throws Exception {
+      _log.info("starting");
+    }
+
+    public void stop() throws Exception {
+      _log.info("stopped");
+    }
+
+    public void run(Pipe pipe) {
+        try {
+            _executor.execute(pipe);
+        } catch (InterruptedException e) { // TODO - do this safely...
+	  _log.error(e);
+        }
+    }
+
+    // PipeConfig
+
+    public Contextualiser getContextualiser() {
+        return _config.getContextualiser();
+    }
+
+    public String getNodeId() {
+        return _config.getNodeName();
+    }
+}
+

Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/AbstractSocketServer.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/AbstractSocketServer.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/AbstractSocketServer.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/AbstractSocketServer.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,63 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io.impl;
+
+import java.net.InetSocketAddress;
+
+import org.codehaus.wadi.sandbox.io.Pipe;
+
+import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
+import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
+
+public abstract class AbstractSocketServer extends AbstractServer {
+
+    protected final SynchronizedInt _numPipes=new SynchronizedInt(0);
+
+    protected InetSocketAddress _address;
+
+    public AbstractSocketServer(PooledExecutor executor, long pipeTimeout, InetSocketAddress address) {
+        super(executor, pipeTimeout);
+        _address=address;
+    }
+
+    public void add(Pipe pipe) {
+        _numPipes.increment();
+        if (_log.isTraceEnabled()) _log.trace("adding server Pipe: "+pipe);
+    }
+
+    public void remove(Pipe pipe) {
+        _numPipes.decrement();
+        if (_log.isTraceEnabled()) _log.trace("removing server Pipe: "+pipe);
+    }
+
+    public void notifyClosed(Pipe pipe) {
+        remove(pipe);
+    }
+
+    public void waitForExistingPipes() {
+        while (_numPipes.get()>0) {
+            if (_log.isInfoEnabled()) _log.info("waiting for: " + _numPipes + " Pipe[s]");
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+	      _log.trace("unexpected interruption - ignoring", e);
+            }
+        }
+	_log.info("existing Pipes have finished running");
+    }
+
+}

Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/BIOPipe.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/BIOPipe.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/BIOPipe.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/BIOPipe.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,67 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.net.SocketException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.wadi.sandbox.io.PipeConfig;
+
+public class BIOPipe extends AbstractPipe {
+
+    protected static final Log _log=LogFactory.getLog(BIOPipe.class);
+
+    protected final Socket _socket;
+
+    public BIOPipe(PipeConfig config, long timeout, Socket socket) {
+        super(config, timeout);
+        _socket=socket;
+        try {
+            _socket.setSoTimeout((int)_timeout); // TODO - parameterise
+        } catch (SocketException e) {
+	  _log.warn("could not set socket timeout", e);
+        }
+    }
+
+    // Connection
+
+    public void run() {
+        while (_valid)
+            super.run(); // impossible to idle - loop until EOF...
+    }
+
+    public void close() throws IOException {
+        super.close(); // deals with streams...
+        try{
+            _socket.close();
+        }
+        catch(Exception e){
+	  _log.warn("problem closing socket", e);
+        }
+    }
+
+    // StreamConnection
+
+    public InputStream getInputStream() throws IOException {return _socket.getInputStream();}
+    public OutputStream getOutputStream() throws IOException {return _socket.getOutputStream();}
+
+}

Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/BIOServer.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/BIOServer.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/BIOServer.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/BIOServer.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,122 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io.impl;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+
+import org.codehaus.wadi.sandbox.io.Pipe;
+
+import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
+
+
+/**
+ * A Socket Server - you send it instances of Peer, which are then fed the Socket that they arrived on to consume...
+ *
+ * @author <a href="mailto:jules@coredevelopers.net">Jules Gosnell</a>
+ * @version $Revision: 1.5 $
+ */
+
+public class BIOServer extends AbstractSocketServer {
+
+    protected final int _backlog; // 16?
+    protected final long _serverTimeout; // secs
+
+    public BIOServer(PooledExecutor executor, long pipeTimeout, InetSocketAddress address, long serverTimeout, int backlog) {
+        super(executor, pipeTimeout, address);
+        _backlog=backlog;
+        _serverTimeout=serverTimeout;
+        }
+
+    protected ServerSocket _socket;
+
+    public void start() throws IOException {
+        _running=true;
+        int port=_address.getPort();
+        InetAddress host=_address.getAddress();
+        _socket=new ServerSocket(port, _backlog, host);
+        _socket.setSoTimeout((int)_serverTimeout);
+        //_socket.setReuseAddress(true);
+        _address=new InetSocketAddress(host, _socket. getLocalPort());
+        (_thread=new Thread(new Producer(), "WADI BIO Server")).start();
+	_log.info("Producer thread started");
+        if (_log.isDebugEnabled()) _log.debug("started: "+_socket);
+    }
+
+    public void stop() {
+        if (_log.isDebugEnabled()) _log.debug("stopping: "+_socket);
+
+        stopAcceptingPipes();
+        waitForExistingPipes();
+
+        try {
+            _socket.close();
+        } catch (IOException e) {
+	  _log.warn("problem closing server socket", e);
+        }
+        _socket=null;
+
+        if (_log.isDebugEnabled()) _log.debug("stopped: "+_address);
+    }
+
+    public void stopAcceptingPipes() {
+        _running=false;
+        do {
+            try {
+                _thread.join();
+            } catch (InterruptedException e) {
+	      _log.trace("unexpected interruption - ignoring", e);
+            }
+        } while (Thread.interrupted());
+	_log.info("Producer thread stopped");
+        _thread=null;
+    }
+
+    public class Producer implements Runnable {
+
+        public void run() {
+            try {
+                while (_running) {
+                    try {
+                        if (_serverTimeout==0) Thread.yield();
+                        Socket socket=_socket.accept();
+                        BIOPipe pipe=new BIOPipe(BIOServer.this, _pipeTimeout, socket);
+                        add(pipe);
+                        BIOServer.this.run(pipe);
+
+                    } catch (SocketTimeoutException ignore) {
+                        // ignore...
+                    }
+                }
+            } catch (IOException e) {
+	      _log.warn("unexpected io problem - stopping");
+            }
+        }
+
+    }
+
+    // PipeConfig
+
+    public void notifyIdle(Pipe pipe) {
+        // BIOServer does not support idling Pipes :-(
+    }
+
+}

Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ByteBufferInputStream.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ByteBufferInputStream.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ByteBufferInputStream.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ByteBufferInputStream.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,68 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io.impl;
+
+import java.nio.ByteBuffer;
+
+import org.codehaus.wadi.impl.Utils;
+
+import EDU.oswego.cs.dl.util.concurrent.Channel;
+import EDU.oswego.cs.dl.util.concurrent.Puttable;
+
+
+// N.B. It is unfortunate that EDU.oswego.cs.dl.util.concurrent.Channel and java.nio.channels.Channel are homonyms.
+// All mentions of Channel in this file refer to the EDU.oswego.cs.dl.util.concurrent variety.
+
+// two threads will be using this object - a producer (the server) and a consumer (the stream's reader).
+
+public class ByteBufferInputStream extends AbstractAsyncInputStream implements Puttable {
+    
+    protected final Puttable _outputQueue; // and then placed onto here...
+    
+    public ByteBufferInputStream(Channel inputQueue, Puttable outputQueue, long timeout) {
+        super(inputQueue, timeout);
+        _outputQueue=outputQueue;
+    }
+
+    protected ByteBuffer _buffer=null; // only ever read by consumer
+
+    protected void setBuffer(Object object) {
+        _buffer=(ByteBuffer)object;
+    }
+    
+    protected Object getBuffer() {
+        return _buffer;
+    }
+    
+    protected int readByte() {
+        return (int)_buffer.get()&0xFF; // convert byte to unsigned int - otherwise 255==-1 i.e. EOF etc..
+    }
+    
+    protected void readBytes(byte b[], int off, int len) {
+        _buffer.get(b, off, len);
+    }
+    
+    protected long getRemaining() {
+        return _buffer.remaining();
+    }
+    
+    public void recycle(Object object) {
+        ((ByteBuffer)object).clear();
+        Utils.safePut(object, _outputQueue);  
+    }
+    
+}

Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ByteBufferOutputStream.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ByteBufferOutputStream.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ByteBufferOutputStream.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ByteBufferOutputStream.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,86 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io.impl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class ByteBufferOutputStream extends OutputStream {
+
+    protected final static Log _log=LogFactory.getLog(ByteBufferOutputStream.class);
+    
+    protected final SocketChannel _channel;
+    protected final ByteBuffer _buffer;
+    
+    public ByteBufferOutputStream(SocketChannel channel, int bufferSize) {
+        super();
+        _channel=channel;
+        _buffer=ByteBuffer.allocateDirect(bufferSize);
+    }
+
+    // impl
+    
+    protected void send() throws IOException {
+        _buffer.flip();
+        while (_buffer.hasRemaining())
+            _channel.write(_buffer);
+        _buffer.clear();
+    }
+    
+    // OutputStream
+    
+    public void write(int b) throws IOException {
+        //_log.info("writing: "+(char)b);
+        _buffer.put((byte)b);
+        if (!_buffer.hasRemaining())
+            send();
+    }
+    
+    public void write(byte b[], int off, int len) throws IOException {
+        //_log.info("writing: "+len+" bytes");
+        int written=0;
+        while (written<len) {
+            int tranche=Math.min(_buffer.remaining(), len);
+            _buffer.put(b, off+written, tranche);
+            written+=tranche;
+            
+            if (!_buffer.hasRemaining())
+                send();
+        }
+    }
+    
+    public void flush() throws IOException {
+        super.flush();
+        send();
+    }
+    
+    public void close() throws IOException {
+        super.close();
+        send();
+    }
+    
+    // ByteBufferOutputStream
+    
+    public void write(ByteBuffer buffer, int offset, int length) throws IOException {
+        _channel.write(null, offset, length);
+    }
+}

Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/BytesMessageInputStream.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/BytesMessageInputStream.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/BytesMessageInputStream.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/BytesMessageInputStream.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,85 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io.impl;
+
+import java.io.IOException;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+
+import EDU.oswego.cs.dl.util.concurrent.Channel;
+import EDU.oswego.cs.dl.util.concurrent.Puttable;
+
+
+public class BytesMessageInputStream extends AbstractAsyncInputStream implements Puttable {
+
+    public BytesMessageInputStream(Channel inputQueue, long timeout) {
+        super(inputQueue, timeout);
+    }
+
+    protected BytesMessage _buffer;
+    protected long _remaining;
+
+    protected void setBuffer(Object object) {
+        _buffer=(BytesMessage)object;
+        try {
+            _remaining=_buffer==null?0:_buffer.getBodyLength();
+        } catch (JMSException e) {
+	  _log.error("could not ascertain input length", e);
+        }
+    }
+
+    protected Object getBuffer() {
+        return _buffer;
+    }
+
+    protected int readByte() throws IOException {
+        try {
+            int b=_buffer.readUnsignedByte();
+            _remaining--;
+            return b;
+        } catch (JMSException e) {
+	  _log.error("could not read next byte", e);
+            throw new IOException();
+        }
+    }
+
+    protected void readBytes(byte b[], int off, int len) throws IOException {
+        try {
+            if (off==0)
+                _buffer.readBytes(b, len);
+            else {
+                // inefficient - but we are not helped by JMS API...
+                for (int i=0; i<len; i++)
+                    b[off++]=_buffer.readByte();
+            }
+            _remaining-=len;
+        } catch (JMSException e) {
+	  _log.error("problem reading bytes", e);
+            throw new IOException();
+        }
+    }
+
+    protected long getRemaining() {
+        return _remaining;
+    }
+
+    protected void recycle(Object object) {
+        // I don't see how we can recycle BytesMessages... ?
+    }
+
+}

Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/BytesMessageOutputStream.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/BytesMessageOutputStream.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/BytesMessageOutputStream.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/BytesMessageOutputStream.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,114 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io.impl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.wadi.sandbox.io.BytesMessageOutputStreamConfig;
+
+public class BytesMessageOutputStream extends OutputStream {
+
+    protected final static Log _log=LogFactory.getLog(BytesMessageOutputStream.class);
+
+    protected final BytesMessageOutputStreamConfig _config;
+    protected BytesMessage _buffer;
+
+    public BytesMessageOutputStream(BytesMessageOutputStreamConfig config) {
+        super();
+        _config=config;
+        try {
+            allocate();
+        } catch (IOException e) {
+	  _log.error(e); // should we let this go further ?
+        }
+    }
+
+    // impl
+
+    protected void allocate() throws IOException {
+        try {
+            _buffer=_config.createBytesMessage();
+        } catch (JMSException e) {
+	  _log.error(e);
+            throw new IOException();
+        }
+    }
+
+    public void send(BytesMessage message) throws IOException {
+        try {
+            message.reset(); // switch to read-only mode
+            if (message.getBodyLength()>0 || message.propertyExists("closing-stream")) {
+                _config.send(message);
+            }
+        } catch (Exception e) {
+	  _log.error(e);
+            throw new IOException("problem sending bytes");
+        }
+    }
+
+    // OutputStream
+
+    public void flush() throws IOException {
+        send(_buffer);
+        allocate();
+    }
+
+    public void close() throws IOException {
+//        try {
+//            _buffer.setBooleanProperty("closing-stream", true);
+//            //_log.info("CLIENT CLOSING STREAM: "+_buffer);
+//        } catch (JMSException e) {
+//            _log.warn("problem writing message meta-data", e);
+//            throw new IOException();
+//        }
+        send(_buffer);
+        _buffer=null;
+    }
+
+    public void write(int b) throws IOException {
+        try {
+            _buffer.writeByte((byte)b);
+        } catch (JMSException e) {
+	  _log.error(e);
+            throw new IOException();
+        }
+    }
+
+    public void write(byte b[], int off, int len) throws IOException {
+        try {
+            _buffer.writeBytes(b, off, len);// can the message run out of space ? - we might have to break it up... - TODO
+        } catch (JMSException e) {
+	  _log.error(e);
+            throw new IOException();
+        }
+    }
+
+    // BytesMessageOutputStream
+
+
+    public void write(ByteBuffer buffer, int from, int to) {
+        throw new UnsupportedOperationException(); // cannot be done properly over ActiveMQ
+    }
+
+}

Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ClientClusterPipe.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ClientClusterPipe.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ClientClusterPipe.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ClientClusterPipe.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,49 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io.impl;
+
+import javax.jms.Destination;
+
+import org.activecluster.Cluster;
+import org.codehaus.wadi.sandbox.io.PipeConfig;
+
+import EDU.oswego.cs.dl.util.concurrent.Channel;
+import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
+
+public class ClientClusterPipe extends AbstractClusterPipe {
+
+    protected final SynchronizedInt _count=new SynchronizedInt(0);
+    protected final String _theirCorrelationId;
+    
+    public ClientClusterPipe(PipeConfig config, long timeout, Cluster cluster, Destination us, Destination them, String correlationId, Channel inputQueue) {
+        super(config, timeout, cluster, us, correlationId+"-client", them, inputQueue);
+        _theirCorrelationId=correlationId+"-server";
+    }
+    
+    protected String _theirCorrelationIdWithSuffix;
+    
+    public synchronized boolean run(Peer peer) throws Exception {
+        int i=_count.increment();
+        _theirCorrelationIdWithSuffix=_theirCorrelationId+"-"+i;
+        return super.run(peer);
+    }
+    
+    public String getTheirCorrelationId() {
+        return _theirCorrelationIdWithSuffix;
+    }
+    
+}

Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ClusterServer.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ClusterServer.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ClusterServer.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ClusterServer.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,170 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+
+import org.codehaus.wadi.gridstate.ExtendedCluster;
+import org.codehaus.wadi.impl.Utils;
+import org.codehaus.wadi.sandbox.io.Pipe;
+import org.codehaus.wadi.sandbox.io.PipeConfig;
+import org.codehaus.wadi.sandbox.io.ServerConfig;
+
+import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
+import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
+
+public class ClusterServer extends AbstractServer implements PipeConfig, MessageListener {
+
+    protected final boolean _excludeSelf;
+    protected final Map _pipes;
+
+    public ClusterServer(PooledExecutor executor, long pipeTimeout, boolean excludeSelf) {
+        super(executor, pipeTimeout);
+        _excludeSelf=excludeSelf;
+        _pipes=new HashMap();
+    }
+
+    protected ExtendedCluster _cluster;
+    protected MessageConsumer _nodeConsumer;
+    protected MessageConsumer _clusterConsumer;
+
+    public void init(ServerConfig config) {
+        super.init(config);
+        _cluster=_config.getCluster();
+    }
+
+    public void start() throws Exception {
+        super.start();
+        _clusterConsumer=_cluster.createConsumer(_cluster.getDestination(), null, _excludeSelf);
+        _clusterConsumer.setMessageListener(this);
+        _nodeConsumer=_cluster.createConsumer(_cluster.getLocalNode().getDestination(), null, _excludeSelf);
+        _nodeConsumer.setMessageListener(this);
+    }
+
+    public void stop() throws Exception {
+        stopAcceptingPipes();
+        waitForExistingPipes();
+        super.stop();
+    }
+
+    public void stopAcceptingPipes() {
+        try {
+        _clusterConsumer.setMessageListener(null);
+        _nodeConsumer.setMessageListener(null);
+        } catch (JMSException e) {
+	  _log.warn("could not remove Listeners", e);
+        }
+    }
+
+    public void onMessage(Message message) {
+        try {
+            if (message instanceof BytesMessage) {
+                BytesMessage bm=(BytesMessage)message;
+                String ourId=bm.getJMSCorrelationID();
+                Destination replyTo=bm.getJMSReplyTo();
+                //_log.info("receiving message");
+                synchronized (_pipes) {
+                    //_log.info("looking up Pipe: "+ourId);
+                    AbstractClusterPipe pipe=(AbstractClusterPipe)_pipes.get(ourId);
+                    if (pipe==null) {
+                        // initialising a new Pipe...
+                        String theirId=ourId.substring(0, ourId.indexOf("-server"))+"-client";
+                        Destination us=_cluster.getLocalNode().getDestination();
+                        pipe=new ServerClusterPipe(this, _pipeTimeout, _cluster, us, ourId, replyTo, theirId, new LinkedQueue());
+                        ourId=pipe.getCorrelationId();
+                        //_log.info("adding Pipe: '"+ourId+"'");
+                        synchronized (_pipes) {
+                            _pipes.put(ourId, pipe);
+                        }
+                        run(pipe);
+                    } else {
+                        //_log.info("found Pipe: '"+ourId+"'");
+                    }
+                    // servicing existing pipe...
+                    if (bm.getBodyLength()>0) {
+                        //_log.info("servicing Pipe: '"+correlationId+"' - "+bm.getBodyLength()+" bytes");
+                        Utils.safePut(bm, pipe);
+                    }
+                    if (bm.getBooleanProperty("closing-stream")) {
+                        //_log.info("SERVER CLOSING STREAM: "+pipe);
+//                        pipe.commit();
+                    }
+                }
+            }
+        } catch (JMSException e) {
+	  _log.error("unexpected problem", e);
+        }
+    }
+
+    // needs more thought...
+
+    public Pipe makeClientPipe(String correlationId, Destination target) {
+        Destination source=_cluster.getLocalNode().getDestination();
+        LinkedQueue queue=new LinkedQueue();
+        AbstractClusterPipe pipe=new ClientClusterPipe(this, _pipeTimeout, _cluster, source, target, correlationId, queue);
+        String id=pipe.getCorrelationId();
+        //_log.info("adding Pipe: "+id);
+        synchronized (_pipes) {
+            _pipes.put(id, pipe);
+        }
+        return pipe;
+    }
+
+    protected int getNumPipes() {
+        synchronized (_pipes) {
+            return _pipes.size();
+        }
+    }
+
+    public void waitForExistingPipes() {
+        int numPipes;
+        while ((numPipes=getNumPipes())>0) {
+            if (_log.isInfoEnabled()) _log.info("waiting for: " + numPipes + " Pipe[s]");
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+	      _log.trace("unexpected interruption - ignoring", e);
+            }
+        }
+	_log.info("existing Pipes have finished running");
+    }
+
+    // PipeConfig
+
+    public void notifyClosed(Pipe pipe) {
+        String correlationId=((AbstractClusterPipe)pipe)._ourCorrelationId;
+        //_log.info("removing Pipe: "+correlationId);
+        synchronized (_pipes) {
+            _pipes.remove(correlationId); // TODO - encapsulate properly
+        }
+    }
+
+    public void notifyIdle(Pipe pipe) {
+        // Cluster Connections idle automatically after running...
+        //super.notifyIdle(pipe);
+        notifyClosed(pipe);
+    }
+
+}

Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/DummyServer.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/DummyServer.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/DummyServer.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/DummyServer.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,59 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io.impl;
+
+import org.codehaus.wadi.sandbox.io.Pipe;
+import org.codehaus.wadi.sandbox.io.Server;
+import org.codehaus.wadi.sandbox.io.ServerConfig;
+
+public class DummyServer implements Server {
+
+    public DummyServer() {
+        super();
+        // TODO Auto-generated constructor stub
+    }
+
+    public void init(ServerConfig config) {
+        // do nothing
+    }
+    
+    public void start() throws Exception {
+        // TODO Auto-generated method stub
+
+    }
+
+    public void stop() throws Exception {
+        // TODO Auto-generated method stub
+
+    }
+
+    public void waitForExistingPipes() {
+        // TODO Auto-generated method stub
+
+    }
+
+    public void stopAcceptingPipes() {
+        // TODO Auto-generated method stub
+
+    }
+
+    public void run(Pipe pipe) {
+        // TODO Auto-generated method stub
+
+    }
+
+}

Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/NIOPipe.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/NIOPipe.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/NIOPipe.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/NIOPipe.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,102 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.wadi.sandbox.io.NIOPipeConfig;
+
+import EDU.oswego.cs.dl.util.concurrent.Channel;
+import EDU.oswego.cs.dl.util.concurrent.Puttable;
+import EDU.oswego.cs.dl.util.concurrent.Sync;
+import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
+
+public class NIOPipe extends AbstractPipe implements Puttable {
+    
+    protected final static Log _log=LogFactory.getLog(NIOPipe.class);
+    
+    protected final SocketChannel _channel;
+    protected final SelectionKey _key;
+    protected final Channel _inputQueue;
+    protected final Puttable _outputQueue;
+ 
+    public NIOPipe(NIOPipeConfig config, long timeout, SocketChannel channel, SelectionKey key, Channel inputQueue, Puttable outputQueue, int bufferSize) {
+        super(config, timeout);
+        _channel=channel;
+        _key=key;
+        _inputQueue=inputQueue;
+        _outputQueue=outputQueue;
+
+        // ctor is called by Server thread - find some way to do these allocations on Consumer thread...
+        _inputStream=new ByteBufferInputStream(_inputQueue, _outputQueue, _timeout);
+        _outputStream=new ByteBufferOutputStream(_channel, bufferSize);
+        }
+
+    protected final SynchronizedBoolean _running=new SynchronizedBoolean(false); 
+    public boolean getRunning() {return _running.get();}
+    public void setRunning(boolean running) {_running.set(running);}
+    
+    protected ByteBufferInputStream _inputStream;
+    public InputStream getInputStream() {return _inputStream;}
+
+    protected OutputStream _outputStream;
+    public OutputStream getOutputStream() {return _outputStream;}
+    
+    public void close() throws IOException {
+        super.close();
+        Sync lock=((NIOPipeConfig)_config).getLock();
+        do {
+            try {
+                lock.acquire(); // sync following actions with Server loop...
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        } while (Thread.interrupted());
+        
+        try {
+            //_log.info("cancelling: "+_key);
+            _channel.socket().shutdownOutput();
+            _channel.socket().close();
+            _channel.close();
+        } finally {
+            lock.release();
+        }
+    }
+
+    // called by server...
+    public synchronized void commit() throws IOException {
+        _inputStream.commit();
+        _channel.socket().shutdownInput();
+        _key.cancel();
+    }
+
+    // Puttable - ByteBuffers only please :-)
+    
+    public void put(Object item) throws InterruptedException {
+        _inputStream.put(item);
+    }
+
+    public boolean offer(Object item, long msecs) throws InterruptedException {
+        return _inputStream.offer(item, msecs);
+    }
+}
\ No newline at end of file

Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/NIOServer.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/NIOServer.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/NIOServer.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/NIOServer.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,222 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+
+import org.codehaus.wadi.impl.Utils;
+import org.codehaus.wadi.sandbox.io.NIOPipeConfig;
+import org.codehaus.wadi.sandbox.io.Pipe;
+
+import EDU.oswego.cs.dl.util.concurrent.FIFOReadWriteLock;
+import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
+import EDU.oswego.cs.dl.util.concurrent.NullSync;
+import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
+import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
+import EDU.oswego.cs.dl.util.concurrent.Sync;
+import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
+
+// NOTES - reuse server BBS
+// Stream should implement bulk transfers
+
+// Do not put Connections onto Queue until the have input
+// When a Connection's Peer finishes, it loses its Thread, but is not clos()-ed
+
+public class NIOServer extends AbstractSocketServer implements NIOPipeConfig {
+
+    protected final SynchronizedBoolean _accepting=new SynchronizedBoolean(false);
+    protected final ReadWriteLock _lock=new FIFOReadWriteLock();
+    protected final EDU.oswego.cs.dl.util.concurrent.Channel _queue=new LinkedQueue(); // parameterise ?; // we get our ByteBuffers from here...
+    protected final long _serverTimeout;
+    protected final int _outputBufferSize;
+
+    public NIOServer(PooledExecutor executor, long pipeTimeout, InetSocketAddress address, long serverTimeout, int numInputBuffers, int inputBufferSize, int outputBufferSize) {
+        super(executor, pipeTimeout, address);
+        _serverTimeout=serverTimeout;
+        _outputBufferSize=outputBufferSize;
+
+        for (int i=0; i<numInputBuffers; i++)
+            Utils.safePut(ByteBuffer.allocateDirect(inputBufferSize), _queue);
+    }
+
+    protected ServerSocketChannel _channel;
+    protected Selector _selector;
+    protected SelectionKey _key;
+
+    public synchronized void start() throws Exception {
+        _channel=ServerSocketChannel.open();
+        _channel.configureBlocking(false);
+        _channel.socket().bind(_address);
+        _channel.socket().setSoTimeout((int)_serverTimeout);
+        //_channel.socket().setReuseAddress(true);
+	_log.info(_channel);
+        _address=(InetSocketAddress)_channel.socket().getLocalSocketAddress(); // in case address was not fully specified
+        _selector= Selector.open();
+        _key=_channel.register(_selector, SelectionKey.OP_ACCEPT);
+        _running=true;
+        _accepting.set(true);
+        (_thread=new Thread(new Producer(), "WADI NIO Server")).start();
+	_log.info("Producer thread started");
+            _log.info("started: " + _channel);
+
+    }
+
+    public synchronized void stop() throws Exception {
+        if (_log.isInfoEnabled()) _log.info("stopping: " + _channel);
+        stopAcceptingPipes();
+        waitForExistingPipes();
+        // stop Producer loop
+        _running=false;
+        _selector.wakeup();
+        _thread.join();
+        _selector.close();
+        _thread=null;
+	_log.info("Producer thread stopped");
+        // tidy up
+        _channel.socket().close();
+        _channel.close();
+        if (_log.isInfoEnabled()) _log.info("stopped: " + _address);
+    }
+
+    public void stopAcceptingPipes() {
+        _accepting.set(false);
+    }
+
+    public void accept(SelectionKey key) throws ClosedChannelException, IOException {
+        ServerSocketChannel server=(ServerSocketChannel)key.channel();
+        SocketChannel channel=server.accept();
+        channel.configureBlocking(false);
+        SelectionKey readKey=channel.register(_selector, SelectionKey.OP_READ/*|SelectionKey.OP_WRITE*/);
+        NIOPipe pipe=new NIOPipe(this, _pipeTimeout, channel, readKey, new LinkedQueue(), _queue, _outputBufferSize); // reuse the queue
+        readKey.attach(pipe);
+        add(pipe);
+    }
+
+    public void read(SelectionKey key) throws IOException {
+        NIOPipe pipe=(NIOPipe)key.attachment();
+        ByteBuffer buffer=(ByteBuffer)Utils.safeTake(_queue);
+
+        int count=((SocketChannel)key.channel()).read(buffer); // read off network into buffer
+        if (count<0) {
+            if (_log.isTraceEnabled()) _log.trace("committing server Pipe: "+pipe);
+            pipe.commit();
+            buffer.clear();
+            Utils.safePut(buffer, _queue); // could be cleverer
+        } else {
+            if (_log.isTraceEnabled()) _log.trace("servicing server Pipe: "+pipe+" ("+count+" bytes)");
+            buffer.flip();
+            Utils.safePut(buffer, pipe);
+        }
+
+        if (!pipe.getRunning()) {
+            pipe.setRunning(true);
+            try {
+                if (_log.isTraceEnabled()) _log.trace("running server Pipe: "+pipe);
+                _executor.execute(pipe);
+            } catch (InterruptedException e) { // TODO - do this safely...
+	      _log.error("problem running Pipe", e);
+            }
+        }
+    }
+
+
+    public class Producer implements Runnable {
+
+        public void run() {
+            SelectionKey key=null;
+
+             while (_running) {
+
+                 Sync lock=_lock.writeLock();
+                 do {
+                     try {
+                         lock.acquire();
+                     } catch (InterruptedException e) {
+                         // ignore
+                     }
+                 } while (Thread.interrupted());
+
+                try {
+                    _selector.select();
+
+                    for (Iterator i=_selector.selectedKeys().iterator(); i.hasNext(); ) {
+                        key=(SelectionKey)i.next();
+
+                        //boolean used=false;
+                        //_log.info("picked up key: "+key);
+
+                        if (key.isAcceptable() && _accepting.get()) {
+                            accept(key);
+                            //used=true;
+                            //_log.info("accepted key: "+key);
+                        }
+
+                        if (key.isReadable()) {
+                            read(key);
+                            //used=true;
+                            //_log.info("read key: "+key);
+                        }
+
+//                        if (key.isWritable()) {
+//                            used=true;
+//                           // _log.info("wrote key: "+key);
+//                        }
+//                        if (key.isConnectable()) {
+//                            used=true;
+//                            //_log.info("connected key: "+key);
+//                        }
+
+//                        if (!used)
+//                            _log.warn("unused key: "+key);
+
+                        i.remove();
+
+                    }
+                } catch (Throwable t) {
+		  _log.error("unexpected problem", t);
+                } finally {
+                    lock.release();
+                    // now threads who want to close selectors, channels etc can run on a read lock...
+                }
+            }
+        }
+    }
+
+    // PipeConfig
+
+    public void notifyIdle(Pipe pipe) {
+        if (_log.isTraceEnabled()) _log.trace("idling server Pipe: "+pipe);
+        ((NIOPipe)pipe).setRunning(false);
+    }
+
+    // NIOPipeConfig
+
+    protected final Sync _dummy=new NullSync();
+    public Sync getLock() {
+        //return _dummy;
+        return _lock.readLock();
+    }
+
+}

Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/Peer.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/Peer.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/Peer.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/Peer.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,35 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io.impl;
+
+import java.io.Serializable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.wadi.sandbox.io.PeerConfig;
+
+public abstract class Peer implements Serializable {
+    
+    protected static final Log _log=LogFactory.getLog(Peer.class);
+
+    public Peer() {
+        // used for deserialisation
+    }
+    
+    public abstract boolean run(PeerConfig config) throws Exception;
+    
+}
\ No newline at end of file

Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ServerClusterPipe.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ServerClusterPipe.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ServerClusterPipe.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ServerClusterPipe.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,39 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io.impl;
+
+import javax.jms.Destination;
+
+import org.activecluster.Cluster;
+import org.codehaus.wadi.sandbox.io.PipeConfig;
+
+import EDU.oswego.cs.dl.util.concurrent.Channel;
+
+public class ServerClusterPipe extends AbstractClusterPipe {
+
+    protected final String _theirCorrelationId;
+    
+    public ServerClusterPipe(PipeConfig config, long timeout, Cluster cluster, Destination us, String ourId, Destination them, String theirId, Channel inputQueue) {
+        super(config, timeout, cluster, us, ourId, them, inputQueue);
+        _theirCorrelationId=theirId;
+    }
+
+    public String getTheirCorrelationId() {
+        return _theirCorrelationId;
+    }
+
+}

Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/SocketClientPipe.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/SocketClientPipe.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/SocketClientPipe.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/SocketClientPipe.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,86 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+import org.codehaus.wadi.Contextualiser;
+import org.codehaus.wadi.sandbox.io.Pipe;
+import org.codehaus.wadi.sandbox.io.PipeConfig;
+
+public class SocketClientPipe extends AbstractPipe {
+
+    public static class DummyPipeConfig implements PipeConfig {
+        public void notifyIdle(Pipe pipe) {/* do nothing */}
+        public void notifyClosed(Pipe pipe) {/* do nothing */}
+        public Contextualiser getContextualiser() {return null;}
+        public String getNodeId() {return null;}
+    }
+
+
+    protected final SocketChannel _channel;
+    protected final Socket _socket;
+
+    public SocketClientPipe(InetSocketAddress address, long timeout) throws IOException {
+        super(new DummyPipeConfig(), timeout);
+        _channel=SocketChannel.open(address);
+        _channel.configureBlocking(true);
+        _socket=_channel.socket();
+        _socket.setKeepAlive(true);
+        _socket.setSoTimeout((int)_timeout);
+    }
+
+
+    // Pipe
+
+    public void close() throws IOException {
+        super.close(); // deals with streams...
+        try{
+            _socket.close();
+        }
+        catch(Exception e){
+	  _log.warn("problem closing socket", e);
+        }
+        try{
+            _channel.close();
+        }
+        catch(Exception e){
+	  _log.warn("problem closing socket", e);
+        }
+    }
+
+    // StreamPipe
+
+    public InputStream getInputStream() throws IOException {return _socket.getInputStream();}
+    public OutputStream getOutputStream() throws IOException {return _socket.getOutputStream();}
+
+    // WritableByteChannel - supported
+
+    public int write(ByteBuffer src) throws IOException {
+        return _channel.write(src);
+    }
+
+    public boolean isOpen() {
+        return _channel.isOpen();
+    }
+}

Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ThreadFactory.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ThreadFactory.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ThreadFactory.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ThreadFactory.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,27 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io.impl;
+
+public class ThreadFactory implements EDU.oswego.cs.dl.util.concurrent.ThreadFactory {
+
+    protected int _count;
+
+    public Thread newThread(Runnable runnable) {
+        return new Thread(runnable, "WADI Pool ("+(_count++)+")");
+    }
+
+}