You are viewing a plain text version of this content. The canonical link for it is here.
Posted to wadi-commits@incubator.apache.org by bd...@apache.org on 2005/12/14 23:36:16 UTC
svn commit: r356933 [16/35] - in /incubator/wadi/trunk: ./ etc/ modules/
modules/assembly/ modules/assembly/src/ modules/assembly/src/bin/
modules/assembly/src/conf/ modules/assembly/src/main/
modules/assembly/src/main/assembly/ modules/core/ modules/c...
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/BytesMessageOutputStreamConfig.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/BytesMessageOutputStreamConfig.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/BytesMessageOutputStreamConfig.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/BytesMessageOutputStreamConfig.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,29 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+
+import org.codehaus.wadi.Config;
+
+public interface BytesMessageOutputStreamConfig extends Config {
+
+ void send(BytesMessage message) throws JMSException;
+ BytesMessage createBytesMessage() throws JMSException;
+
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/NIOPipeConfig.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/NIOPipeConfig.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/NIOPipeConfig.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/NIOPipeConfig.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,25 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io;
+
+import EDU.oswego.cs.dl.util.concurrent.Sync;
+
+public interface NIOPipeConfig extends PipeConfig {
+
+ Sync getLock();
+
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/PeerConfig.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/PeerConfig.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/PeerConfig.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/PeerConfig.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,37 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.channels.WritableByteChannel;
+
+import org.codehaus.wadi.Config;
+import org.codehaus.wadi.Contextualiser;
+
+public interface PeerConfig extends Config/*, StreamConnection*/, WritableByteChannel {
+
+ public void close() throws IOException; // inherited from WriteableByteChannel - but overloaded to mean close whole Connection...
+ Contextualiser getContextualiser();
+ String getNodeId();
+ //InputStream getInputStream() throws IOException;
+ //OutputStream getOutputStream() throws IOException;
+ ObjectInputStream getObjectInputStream() throws IOException;
+ ObjectOutputStream getObjectOutputStream() throws IOException;
+
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/Pipe.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/Pipe.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/Pipe.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/Pipe.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,30 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io;
+
+import java.io.IOException;
+
+import org.codehaus.wadi.sandbox.io.impl.Peer;
+
+public interface Pipe extends Runnable, StreamConnection {
+
+ void run(); // reads peer from input, and runs it...
+ boolean run(Peer peer) throws Exception; // run a Peer ...
+ //void commit() throws IOException; // producer has finished
+ void close() throws IOException; // consumer has finished
+
+}
\ No newline at end of file
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/PipeConfig.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/PipeConfig.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/PipeConfig.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/PipeConfig.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,30 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io;
+
+import org.codehaus.wadi.Config;
+import org.codehaus.wadi.Contextualiser;
+
+public interface PipeConfig extends Config {
+
+ void notifyIdle(Pipe pipe); // called by Connection on becoming idle...
+ void notifyClosed(Pipe pipe); // called by Connection on being closed...
+
+ Contextualiser getContextualiser();
+ String getNodeId();
+
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/Server.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/Server.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/Server.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/Server.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,31 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io;
+
+public interface Server {
+
+ void init(ServerConfig config);
+ void start() throws Exception;
+ void stop() throws Exception;
+ void waitForExistingPipes();
+ void stopAcceptingPipes();
+
+ // Connection container...
+
+ void run(Pipe pipe);
+
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/ServerConfig.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/ServerConfig.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/ServerConfig.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/ServerConfig.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,29 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io;
+
+import org.codehaus.wadi.Config;
+import org.codehaus.wadi.Contextualiser;
+import org.codehaus.wadi.gridstate.ExtendedCluster;
+
+public interface ServerConfig extends Config {
+
+ ExtendedCluster getCluster();
+ Contextualiser getContextualiser();
+ String getNodeName();
+
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/StreamConnection.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/StreamConnection.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/StreamConnection.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/StreamConnection.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,28 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public interface StreamConnection {
+
+ InputStream getInputStream() throws IOException;
+ OutputStream getOutputStream() throws IOException;
+
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/AbstractAsyncInputStream.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/AbstractAsyncInputStream.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/AbstractAsyncInputStream.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/AbstractAsyncInputStream.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,138 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.wadi.impl.Utils;
+
+import EDU.oswego.cs.dl.util.concurrent.Channel;
+import EDU.oswego.cs.dl.util.concurrent.Puttable;
+import EDU.oswego.cs.dl.util.concurrent.TimeoutException;
+
+public abstract class AbstractAsyncInputStream extends InputStream implements Puttable {
+
+ protected static final Object _endOfQueue=new Object();
+
+ protected final Log _log=LogFactory.getLog(getClass());
+ protected final Channel _inputQueue;
+ protected final long _timeout;
+
+ public AbstractAsyncInputStream(Channel inputQueue, long timeout) {
+ super();
+ _inputQueue=inputQueue;
+ _timeout=timeout;
+ }
+
+ protected abstract void setBuffer(Object object);
+ protected abstract Object getBuffer();
+ protected abstract int readByte() throws IOException;
+ protected abstract void readBytes(byte b[], int off, int len) throws IOException;
+ protected abstract long getRemaining();
+ protected abstract void recycle(Object object);
+
+ protected boolean ensureBuffer() throws IOException {
+ if (getBuffer()!=null)
+ return true; // we still have input...
+
+ Object tmp=null;
+ do {
+ try {
+ tmp=_inputQueue.poll(_timeout); // we need a fresh buffer...
+ } catch (TimeoutException e) {
+ _log.error("timed out", e);
+ throw new IOException();
+ } catch (InterruptedException e) {
+ _log.error("interrupted", e);
+ }
+ } while (Thread.interrupted());
+
+ if (tmp==_endOfQueue) {// no more input - our producer has committed his end of the queue...
+ Utils.safePut(_endOfQueue, _inputQueue); // leave it there - clumsy
+ return false;
+ }
+
+ setBuffer(tmp);
+ return true;
+ }
+
+ // InputStream
+
+ public int read() throws IOException {
+ if (!ensureBuffer())
+ return -1;
+
+ int b=readByte();
+
+ if (getRemaining()<=0) {
+ Object object=getBuffer();
+ setBuffer(null);
+ recycle(object);
+ }
+
+ //_log.info("reading: "+(char)b);
+
+ return b;
+ }
+
+ // InputStream
+
+ public int read(byte b[], int off, int len) throws IOException {
+ int red=0; // read (pres.) and read (perf.) are homographs...
+ while (red<len && ensureBuffer()) {
+ int tranche=Math.min(len, (int)getRemaining());
+ readBytes(b, off+red, tranche);
+ red+=tranche;
+
+ if (getRemaining()<=0) {
+ Object object=getBuffer();
+ setBuffer(null);
+ recycle(object);
+ }
+ }
+ //_log.info("read: "+red+" bytes");
+ return red==0?-1:red;
+ }
+
+ // InputStream
+
+ public void commit() {
+ Utils.safePut(_endOfQueue, _inputQueue);
+ }
+
+ // ByteBufferInputStream
+
+ public void read(ByteBuffer buffer, int from, int to) {
+ throw new UnsupportedOperationException(); // NYI
+ }
+
+ // Puttable
+
+ public void put(Object item) throws InterruptedException {
+ //_log.info("putting buffer on input queue: "+item);
+ _inputQueue.put(item);
+ }
+
+ public boolean offer(Object item, long msecs) throws InterruptedException {
+ return _inputQueue.offer(item, msecs);
+ }
+
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/AbstractClusterPipe.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/AbstractClusterPipe.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/AbstractClusterPipe.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/AbstractClusterPipe.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,107 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+
+import org.activecluster.Cluster;
+import org.codehaus.wadi.sandbox.io.BytesMessageOutputStreamConfig;
+import org.codehaus.wadi.sandbox.io.PipeConfig;
+
+import EDU.oswego.cs.dl.util.concurrent.Channel;
+import EDU.oswego.cs.dl.util.concurrent.Puttable;
+
+public abstract class AbstractClusterPipe extends AbstractPipe implements Puttable, BytesMessageOutputStreamConfig {
+
+ protected final Cluster _cluster;
+ protected final Destination _us;
+ protected final Destination _them;
+ protected final Channel _inputQueue;
+ protected final BytesMessageInputStream _inputStream;
+ protected final BytesMessageOutputStream _outputStream;
+ protected final String _ourCorrelationId;
+
+ public AbstractClusterPipe(PipeConfig config, long timeout, Cluster cluster, Destination us, String ourId, Destination them, Channel inputQueue) {
+ super(config, timeout);
+ _cluster=cluster;
+ _us=us;
+ _them=them;
+ _inputQueue=inputQueue;
+ _inputStream=new BytesMessageInputStream(inputQueue, _timeout);
+ _outputStream=new BytesMessageOutputStream(this);
+ _ourCorrelationId=ourId;
+ }
+
+ public abstract String getTheirCorrelationId();
+
+ String getCorrelationId() {
+ return _ourCorrelationId;
+ }
+
+ // Connection
+
+ public void close() throws IOException {
+ _inputStream.commit();
+ super.close();
+ }
+
+ // StreamConnection
+
+ public InputStream getInputStream() {
+ return _inputStream;
+ }
+
+ public OutputStream getOutputStream() {
+ return _outputStream;
+ }
+
+ // Puttable - byte[] only please :-)
+
+ public void put(Object item) throws InterruptedException {
+ _inputStream.put(item);
+ }
+
+ public boolean offer(Object item, long msecs) throws InterruptedException {
+ return _inputStream.offer(item, msecs);
+ }
+
+ // ByteArrayOutputStreamConfig
+
+ public void send(BytesMessage bytesMessage) throws JMSException {
+ bytesMessage.setJMSCorrelationID(getTheirCorrelationId());
+ bytesMessage.setJMSReplyTo(_us);
+ //_log.info("sending message: "+(_isServer?"[server->client]":"[client->server]"));
+ _cluster.send(_them, bytesMessage);
+ }
+
+ public BytesMessage createBytesMessage() throws JMSException {
+ return _cluster.createBytesMessage();
+ }
+
+ // called by server...
+ public synchronized void commit() {
+ _inputStream.commit();
+ _valid=false;
+ }
+
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/AbstractPipe.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/AbstractPipe.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/AbstractPipe.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/AbstractPipe.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,155 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io.impl;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.wadi.Contextualiser;
+import org.codehaus.wadi.sandbox.io.PeerConfig;
+import org.codehaus.wadi.sandbox.io.Pipe;
+import org.codehaus.wadi.sandbox.io.PipeConfig;
+
+public abstract class AbstractPipe implements Pipe, PeerConfig {
+
+ protected static final Log _log=LogFactory.getLog(AbstractPipe.class);
+
+ protected final PipeConfig _config;
+ protected final long _timeout;
+
+ protected boolean _valid;
+
+ public AbstractPipe(PipeConfig config, long timeout) {
+ _config=config;
+ _timeout=timeout;
+ _valid=true;
+ }
+
+ protected ObjectInputStream _ois;
+ public ObjectInputStream getObjectInputStream() throws IOException {
+ if (_ois==null)
+ _ois=new ObjectInputStream(getInputStream());
+ return _ois;
+ }
+
+ protected ObjectOutputStream _oos;
+ public ObjectOutputStream getObjectOutputStream() throws IOException {
+ if (_oos==null)
+ _oos=new ObjectOutputStream(getOutputStream());
+ return _oos;
+ }
+
+ public void run() {
+ try {
+ //_log.info("running...");
+ //_log.info("starting read...");
+ ObjectInputStream ois=getObjectInputStream();
+ //_log.info("stream created...");
+ Peer peer=(Peer)ois.readObject();
+ //_log.info("object read...");
+ try {
+ run(peer);
+ } catch (Exception e) {
+ _log.error("problem running Peer", e);
+ }
+ //_log.info("...ran");
+ } catch (EOFException e) {
+ // end of the line - fall through...
+ if (_log.isTraceEnabled()) _log.trace("Connection reached end of input - quitting...: "+this);
+ _valid=false;
+ } catch (IOException e) {
+ _log.warn("problem reading object off wire", e);
+ _valid=false; // this socket is trashed...
+ } catch (ClassNotFoundException e) {
+ _log.warn("unknown Peer type - version/security problem?", e);
+ _valid=false; // this stream is unfixable ?
+ } finally {
+ _ois=null;
+ _oos=null;
+ if (_valid)
+ _config.notifyIdle(this); // after running, we declare ourselves 'idle' to our Server...
+ else
+ try {
+ close();
+ } catch (IOException e) {
+ _log.error("problem closing server Connection", e);
+ }
+ }
+ //_log.info("...idle");
+ }
+
+ public boolean run(Peer peer) throws Exception {
+ try {
+ return peer.run(this);
+ } finally {
+ _ois=null;
+ _oos=null;
+// if (_valid)
+// _config.notifyIdle(this); // after running, we declare ourselves 'idle' to our Server...
+// else
+// try {
+// close();
+// } catch (IOException e) {
+// _log.error("problem closing server Connection", e);
+// }
+ }
+ }
+
+ public void close() throws IOException {
+ //_log.info("closing...");
+ InputStream is=getInputStream();
+ OutputStream os=getOutputStream();
+ try{os.flush();}catch(IOException e){
+ _log.warn("problem flushing socket output", e);
+ }
+ try{is.close();}catch(IOException e){
+ _log.warn("problem closing socket input", e);
+ }
+ try{os.close();}catch(IOException e){
+ _log.warn("problem closing socket output", e);
+ }
+ _config.notifyClosed(this);
+ //_log.info("...closed");
+ }
+
+ // WritableByteChannel - default behaviour is not to support this...
+
+ public int write(ByteBuffer src) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean isOpen() {
+ throw new UnsupportedOperationException();
+ }
+
+ // PeerConfig
+
+ public Contextualiser getContextualiser() {
+ return _config.getContextualiser();
+ }
+
+ public String getNodeId() {
+ return _config.getNodeId();
+ }
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/AbstractServer.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/AbstractServer.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/AbstractServer.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/AbstractServer.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,76 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io.impl;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.wadi.Contextualiser;
+import org.codehaus.wadi.sandbox.io.Pipe;
+import org.codehaus.wadi.sandbox.io.PipeConfig;
+import org.codehaus.wadi.sandbox.io.Server;
+import org.codehaus.wadi.sandbox.io.ServerConfig;
+
+import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
+
+public abstract class AbstractServer implements Server, PipeConfig {
+
+ protected final Log _log=LogFactory.getLog(getClass());
+ protected final PooledExecutor _executor;
+ protected final long _pipeTimeout;
+
+ public AbstractServer(PooledExecutor executor, long pipeTimeout) {
+ super();
+ _executor=executor;
+ _pipeTimeout=pipeTimeout;
+ }
+
+ protected ServerConfig _config;
+ protected Thread _thread;
+ protected volatile boolean _running;
+
+ public void init(ServerConfig config) {
+ _config=config;
+ }
+
+ public void start() throws Exception {
+ _log.info("starting");
+ }
+
+ public void stop() throws Exception {
+ _log.info("stopped");
+ }
+
+ public void run(Pipe pipe) {
+ try {
+ _executor.execute(pipe);
+ } catch (InterruptedException e) { // TODO - do this safely...
+ _log.error(e);
+ }
+ }
+
+ // PipeConfig
+
+ public Contextualiser getContextualiser() {
+ return _config.getContextualiser();
+ }
+
+ public String getNodeId() {
+ return _config.getNodeName();
+ }
+}
+
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/AbstractSocketServer.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/AbstractSocketServer.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/AbstractSocketServer.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/AbstractSocketServer.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,63 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io.impl;
+
+import java.net.InetSocketAddress;
+
+import org.codehaus.wadi.sandbox.io.Pipe;
+
+import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
+import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
+
+public abstract class AbstractSocketServer extends AbstractServer {
+
+ protected final SynchronizedInt _numPipes=new SynchronizedInt(0);
+
+ protected InetSocketAddress _address;
+
+ public AbstractSocketServer(PooledExecutor executor, long pipeTimeout, InetSocketAddress address) {
+ super(executor, pipeTimeout);
+ _address=address;
+ }
+
+ public void add(Pipe pipe) {
+ _numPipes.increment();
+ if (_log.isTraceEnabled()) _log.trace("adding server Pipe: "+pipe);
+ }
+
+ public void remove(Pipe pipe) {
+ _numPipes.decrement();
+ if (_log.isTraceEnabled()) _log.trace("removing server Pipe: "+pipe);
+ }
+
+ public void notifyClosed(Pipe pipe) {
+ remove(pipe);
+ }
+
+ public void waitForExistingPipes() {
+ while (_numPipes.get()>0) {
+ if (_log.isInfoEnabled()) _log.info("waiting for: " + _numPipes + " Pipe[s]");
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ _log.trace("unexpected interruption - ignoring", e);
+ }
+ }
+ _log.info("existing Pipes have finished running");
+ }
+
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/BIOPipe.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/BIOPipe.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/BIOPipe.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/BIOPipe.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,67 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.net.SocketException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.wadi.sandbox.io.PipeConfig;
+
+public class BIOPipe extends AbstractPipe {
+
+ protected static final Log _log=LogFactory.getLog(BIOPipe.class);
+
+ protected final Socket _socket;
+
+ public BIOPipe(PipeConfig config, long timeout, Socket socket) {
+ super(config, timeout);
+ _socket=socket;
+ try {
+ _socket.setSoTimeout((int)_timeout); // TODO - parameterise
+ } catch (SocketException e) {
+ _log.warn("could not set socket timeout", e);
+ }
+ }
+
+ // Connection
+
+ public void run() {
+ while (_valid)
+ super.run(); // impossible to idle - loop until EOF...
+ }
+
+ public void close() throws IOException {
+ super.close(); // deals with streams...
+ try{
+ _socket.close();
+ }
+ catch(Exception e){
+ _log.warn("problem closing socket", e);
+ }
+ }
+
+ // StreamConnection
+
+ public InputStream getInputStream() throws IOException {return _socket.getInputStream();}
+ public OutputStream getOutputStream() throws IOException {return _socket.getOutputStream();}
+
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/BIOServer.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/BIOServer.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/BIOServer.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/BIOServer.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,122 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io.impl;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+
+import org.codehaus.wadi.sandbox.io.Pipe;
+
+import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
+
+
+/**
+ * A Socket Server - you send it instances of Peer, which are then fed the Socket that they arrived on to consume...
+ *
+ * @author <a href="mailto:jules@coredevelopers.net">Jules Gosnell</a>
+ * @version $Revision: 1.5 $
+ */
+
+public class BIOServer extends AbstractSocketServer {
+
+ protected final int _backlog; // 16?
+ protected final long _serverTimeout; // secs
+
+ public BIOServer(PooledExecutor executor, long pipeTimeout, InetSocketAddress address, long serverTimeout, int backlog) {
+ super(executor, pipeTimeout, address);
+ _backlog=backlog;
+ _serverTimeout=serverTimeout;
+ }
+
+ protected ServerSocket _socket;
+
+ public void start() throws IOException {
+ _running=true;
+ int port=_address.getPort();
+ InetAddress host=_address.getAddress();
+ _socket=new ServerSocket(port, _backlog, host);
+ _socket.setSoTimeout((int)_serverTimeout);
+ //_socket.setReuseAddress(true);
+ _address=new InetSocketAddress(host, _socket. getLocalPort());
+ (_thread=new Thread(new Producer(), "WADI BIO Server")).start();
+ _log.info("Producer thread started");
+ if (_log.isDebugEnabled()) _log.debug("started: "+_socket);
+ }
+
+ public void stop() {
+ if (_log.isDebugEnabled()) _log.debug("stopping: "+_socket);
+
+ stopAcceptingPipes();
+ waitForExistingPipes();
+
+ try {
+ _socket.close();
+ } catch (IOException e) {
+ _log.warn("problem closing server socket", e);
+ }
+ _socket=null;
+
+ if (_log.isDebugEnabled()) _log.debug("stopped: "+_address);
+ }
+
+ public void stopAcceptingPipes() {
+ _running=false;
+ do {
+ try {
+ _thread.join();
+ } catch (InterruptedException e) {
+ _log.trace("unexpected interruption - ignoring", e);
+ }
+ } while (Thread.interrupted());
+ _log.info("Producer thread stopped");
+ _thread=null;
+ }
+
+ public class Producer implements Runnable {
+
+ public void run() {
+ try {
+ while (_running) {
+ try {
+ if (_serverTimeout==0) Thread.yield();
+ Socket socket=_socket.accept();
+ BIOPipe pipe=new BIOPipe(BIOServer.this, _pipeTimeout, socket);
+ add(pipe);
+ BIOServer.this.run(pipe);
+
+ } catch (SocketTimeoutException ignore) {
+ // ignore...
+ }
+ }
+ } catch (IOException e) {
+ _log.warn("unexpected io problem - stopping");
+ }
+ }
+
+ }
+
+ // PipeConfig
+
+ public void notifyIdle(Pipe pipe) {
+ // BIOServer does not support idling Pipes :-(
+ }
+
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ByteBufferInputStream.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ByteBufferInputStream.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ByteBufferInputStream.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ByteBufferInputStream.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,68 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io.impl;
+
+import java.nio.ByteBuffer;
+
+import org.codehaus.wadi.impl.Utils;
+
+import EDU.oswego.cs.dl.util.concurrent.Channel;
+import EDU.oswego.cs.dl.util.concurrent.Puttable;
+
+
+// N.B. It is unfortunate that EDU.oswego.cs.dl.util.concurrent.Channel and java.nio.channels.Channel are homonyms.
+// All mentions of Channel in this file refer to the EDU.oswego.cs.dl.util.concurrent variety.
+
+// two threads will be using this object - a producer (the server) and a consumer (the stream's reader).
+
+public class ByteBufferInputStream extends AbstractAsyncInputStream implements Puttable {
+
+ protected final Puttable _outputQueue; // and then placed onto here...
+
+ public ByteBufferInputStream(Channel inputQueue, Puttable outputQueue, long timeout) {
+ super(inputQueue, timeout);
+ _outputQueue=outputQueue;
+ }
+
+ protected ByteBuffer _buffer=null; // only ever read by consumer
+
+ protected void setBuffer(Object object) {
+ _buffer=(ByteBuffer)object;
+ }
+
+ protected Object getBuffer() {
+ return _buffer;
+ }
+
+ protected int readByte() {
+ return (int)_buffer.get()&0xFF; // convert byte to unsigned int - otherwise 255==-1 i.e. EOF etc..
+ }
+
+ protected void readBytes(byte b[], int off, int len) {
+ _buffer.get(b, off, len);
+ }
+
+ protected long getRemaining() {
+ return _buffer.remaining();
+ }
+
+ public void recycle(Object object) {
+ ((ByteBuffer)object).clear();
+ Utils.safePut(object, _outputQueue);
+ }
+
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ByteBufferOutputStream.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ByteBufferOutputStream.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ByteBufferOutputStream.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ByteBufferOutputStream.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,86 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io.impl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class ByteBufferOutputStream extends OutputStream {
+
+ protected final static Log _log=LogFactory.getLog(ByteBufferOutputStream.class);
+
+ protected final SocketChannel _channel;
+ protected final ByteBuffer _buffer;
+
+ public ByteBufferOutputStream(SocketChannel channel, int bufferSize) {
+ super();
+ _channel=channel;
+ _buffer=ByteBuffer.allocateDirect(bufferSize);
+ }
+
+ // impl
+
+ protected void send() throws IOException {
+ _buffer.flip();
+ while (_buffer.hasRemaining())
+ _channel.write(_buffer);
+ _buffer.clear();
+ }
+
+ // OutputStream
+
+ public void write(int b) throws IOException {
+ //_log.info("writing: "+(char)b);
+ _buffer.put((byte)b);
+ if (!_buffer.hasRemaining())
+ send();
+ }
+
+ public void write(byte b[], int off, int len) throws IOException {
+ //_log.info("writing: "+len+" bytes");
+ int written=0;
+ while (written<len) {
+ int tranche=Math.min(_buffer.remaining(), len);
+ _buffer.put(b, off+written, tranche);
+ written+=tranche;
+
+ if (!_buffer.hasRemaining())
+ send();
+ }
+ }
+
+ public void flush() throws IOException {
+ super.flush();
+ send();
+ }
+
+ public void close() throws IOException {
+ super.close();
+ send();
+ }
+
+ // ByteBufferOutputStream
+
+ public void write(ByteBuffer buffer, int offset, int length) throws IOException {
+ _channel.write(null, offset, length);
+ }
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/BytesMessageInputStream.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/BytesMessageInputStream.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/BytesMessageInputStream.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/BytesMessageInputStream.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,85 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io.impl;
+
+import java.io.IOException;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+
+import EDU.oswego.cs.dl.util.concurrent.Channel;
+import EDU.oswego.cs.dl.util.concurrent.Puttable;
+
+
+public class BytesMessageInputStream extends AbstractAsyncInputStream implements Puttable {
+
+ public BytesMessageInputStream(Channel inputQueue, long timeout) {
+ super(inputQueue, timeout);
+ }
+
+ protected BytesMessage _buffer;
+ protected long _remaining;
+
+ protected void setBuffer(Object object) {
+ _buffer=(BytesMessage)object;
+ try {
+ _remaining=_buffer==null?0:_buffer.getBodyLength();
+ } catch (JMSException e) {
+ _log.error("could not ascertain input length", e);
+ }
+ }
+
+ protected Object getBuffer() {
+ return _buffer;
+ }
+
+ protected int readByte() throws IOException {
+ try {
+ int b=_buffer.readUnsignedByte();
+ _remaining--;
+ return b;
+ } catch (JMSException e) {
+ _log.error("could not read next byte", e);
+ throw new IOException();
+ }
+ }
+
+ protected void readBytes(byte b[], int off, int len) throws IOException {
+ try {
+ if (off==0)
+ _buffer.readBytes(b, len);
+ else {
+ // inefficient - but we are not helped by JMS API...
+ for (int i=0; i<len; i++)
+ b[off++]=_buffer.readByte();
+ }
+ _remaining-=len;
+ } catch (JMSException e) {
+ _log.error("problem reading bytes", e);
+ throw new IOException();
+ }
+ }
+
+ protected long getRemaining() {
+ return _remaining;
+ }
+
+ protected void recycle(Object object) {
+ // I don't see how we can recycle BytesMessages... ?
+ }
+
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/BytesMessageOutputStream.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/BytesMessageOutputStream.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/BytesMessageOutputStream.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/BytesMessageOutputStream.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,114 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io.impl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.wadi.sandbox.io.BytesMessageOutputStreamConfig;
+
+public class BytesMessageOutputStream extends OutputStream {
+
+ protected final static Log _log=LogFactory.getLog(BytesMessageOutputStream.class);
+
+ protected final BytesMessageOutputStreamConfig _config;
+ protected BytesMessage _buffer;
+
+ public BytesMessageOutputStream(BytesMessageOutputStreamConfig config) {
+ super();
+ _config=config;
+ try {
+ allocate();
+ } catch (IOException e) {
+ _log.error(e); // should we let this go further ?
+ }
+ }
+
+ // impl
+
+ protected void allocate() throws IOException {
+ try {
+ _buffer=_config.createBytesMessage();
+ } catch (JMSException e) {
+ _log.error(e);
+ throw new IOException();
+ }
+ }
+
+ public void send(BytesMessage message) throws IOException {
+ try {
+ message.reset(); // switch to read-only mode
+ if (message.getBodyLength()>0 || message.propertyExists("closing-stream")) {
+ _config.send(message);
+ }
+ } catch (Exception e) {
+ _log.error(e);
+ throw new IOException("problem sending bytes");
+ }
+ }
+
+ // OutputStream
+
+ public void flush() throws IOException {
+ send(_buffer);
+ allocate();
+ }
+
+ public void close() throws IOException {
+// try {
+// _buffer.setBooleanProperty("closing-stream", true);
+// //_log.info("CLIENT CLOSING STREAM: "+_buffer);
+// } catch (JMSException e) {
+// _log.warn("problem writing message meta-data", e);
+// throw new IOException();
+// }
+ send(_buffer);
+ _buffer=null;
+ }
+
+ public void write(int b) throws IOException {
+ try {
+ _buffer.writeByte((byte)b);
+ } catch (JMSException e) {
+ _log.error(e);
+ throw new IOException();
+ }
+ }
+
+ public void write(byte b[], int off, int len) throws IOException {
+ try {
+ _buffer.writeBytes(b, off, len);// can the message run out of space ? - we might have to break it up... - TODO
+ } catch (JMSException e) {
+ _log.error(e);
+ throw new IOException();
+ }
+ }
+
+ // BytesMessageOutputStream
+
+
+ public void write(ByteBuffer buffer, int from, int to) {
+ throw new UnsupportedOperationException(); // cannot be done properly over ActiveMQ
+ }
+
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ClientClusterPipe.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ClientClusterPipe.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ClientClusterPipe.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ClientClusterPipe.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,49 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io.impl;
+
+import javax.jms.Destination;
+
+import org.activecluster.Cluster;
+import org.codehaus.wadi.sandbox.io.PipeConfig;
+
+import EDU.oswego.cs.dl.util.concurrent.Channel;
+import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
+
+public class ClientClusterPipe extends AbstractClusterPipe {
+
+ protected final SynchronizedInt _count=new SynchronizedInt(0);
+ protected final String _theirCorrelationId;
+
+ public ClientClusterPipe(PipeConfig config, long timeout, Cluster cluster, Destination us, Destination them, String correlationId, Channel inputQueue) {
+ super(config, timeout, cluster, us, correlationId+"-client", them, inputQueue);
+ _theirCorrelationId=correlationId+"-server";
+ }
+
+ protected String _theirCorrelationIdWithSuffix;
+
+ public synchronized boolean run(Peer peer) throws Exception {
+ int i=_count.increment();
+ _theirCorrelationIdWithSuffix=_theirCorrelationId+"-"+i;
+ return super.run(peer);
+ }
+
+ public String getTheirCorrelationId() {
+ return _theirCorrelationIdWithSuffix;
+ }
+
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ClusterServer.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ClusterServer.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ClusterServer.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ClusterServer.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,170 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+
+import org.codehaus.wadi.gridstate.ExtendedCluster;
+import org.codehaus.wadi.impl.Utils;
+import org.codehaus.wadi.sandbox.io.Pipe;
+import org.codehaus.wadi.sandbox.io.PipeConfig;
+import org.codehaus.wadi.sandbox.io.ServerConfig;
+
+import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
+import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
+
+public class ClusterServer extends AbstractServer implements PipeConfig, MessageListener {
+
+ protected final boolean _excludeSelf;
+ protected final Map _pipes;
+
+ public ClusterServer(PooledExecutor executor, long pipeTimeout, boolean excludeSelf) {
+ super(executor, pipeTimeout);
+ _excludeSelf=excludeSelf;
+ _pipes=new HashMap();
+ }
+
+ protected ExtendedCluster _cluster;
+ protected MessageConsumer _nodeConsumer;
+ protected MessageConsumer _clusterConsumer;
+
+ public void init(ServerConfig config) {
+ super.init(config);
+ _cluster=_config.getCluster();
+ }
+
+ public void start() throws Exception {
+ super.start();
+ _clusterConsumer=_cluster.createConsumer(_cluster.getDestination(), null, _excludeSelf);
+ _clusterConsumer.setMessageListener(this);
+ _nodeConsumer=_cluster.createConsumer(_cluster.getLocalNode().getDestination(), null, _excludeSelf);
+ _nodeConsumer.setMessageListener(this);
+ }
+
+ public void stop() throws Exception {
+ stopAcceptingPipes();
+ waitForExistingPipes();
+ super.stop();
+ }
+
+ public void stopAcceptingPipes() {
+ try {
+ _clusterConsumer.setMessageListener(null);
+ _nodeConsumer.setMessageListener(null);
+ } catch (JMSException e) {
+ _log.warn("could not remove Listeners", e);
+ }
+ }
+
+ public void onMessage(Message message) {
+ try {
+ if (message instanceof BytesMessage) {
+ BytesMessage bm=(BytesMessage)message;
+ String ourId=bm.getJMSCorrelationID();
+ Destination replyTo=bm.getJMSReplyTo();
+ //_log.info("receiving message");
+ synchronized (_pipes) {
+ //_log.info("looking up Pipe: "+ourId);
+ AbstractClusterPipe pipe=(AbstractClusterPipe)_pipes.get(ourId);
+ if (pipe==null) {
+ // initialising a new Pipe...
+ String theirId=ourId.substring(0, ourId.indexOf("-server"))+"-client";
+ Destination us=_cluster.getLocalNode().getDestination();
+ pipe=new ServerClusterPipe(this, _pipeTimeout, _cluster, us, ourId, replyTo, theirId, new LinkedQueue());
+ ourId=pipe.getCorrelationId();
+ //_log.info("adding Pipe: '"+ourId+"'");
+ synchronized (_pipes) {
+ _pipes.put(ourId, pipe);
+ }
+ run(pipe);
+ } else {
+ //_log.info("found Pipe: '"+ourId+"'");
+ }
+ // servicing existing pipe...
+ if (bm.getBodyLength()>0) {
+ //_log.info("servicing Pipe: '"+correlationId+"' - "+bm.getBodyLength()+" bytes");
+ Utils.safePut(bm, pipe);
+ }
+ if (bm.getBooleanProperty("closing-stream")) {
+ //_log.info("SERVER CLOSING STREAM: "+pipe);
+// pipe.commit();
+ }
+ }
+ }
+ } catch (JMSException e) {
+ _log.error("unexpected problem", e);
+ }
+ }
+
+ // needs more thought...
+
+ public Pipe makeClientPipe(String correlationId, Destination target) {
+ Destination source=_cluster.getLocalNode().getDestination();
+ LinkedQueue queue=new LinkedQueue();
+ AbstractClusterPipe pipe=new ClientClusterPipe(this, _pipeTimeout, _cluster, source, target, correlationId, queue);
+ String id=pipe.getCorrelationId();
+ //_log.info("adding Pipe: "+id);
+ synchronized (_pipes) {
+ _pipes.put(id, pipe);
+ }
+ return pipe;
+ }
+
+ protected int getNumPipes() {
+ synchronized (_pipes) {
+ return _pipes.size();
+ }
+ }
+
+ public void waitForExistingPipes() {
+ int numPipes;
+ while ((numPipes=getNumPipes())>0) {
+ if (_log.isInfoEnabled()) _log.info("waiting for: " + numPipes + " Pipe[s]");
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ _log.trace("unexpected interruption - ignoring", e);
+ }
+ }
+ _log.info("existing Pipes have finished running");
+ }
+
+ // PipeConfig
+
+ public void notifyClosed(Pipe pipe) {
+ String correlationId=((AbstractClusterPipe)pipe)._ourCorrelationId;
+ //_log.info("removing Pipe: "+correlationId);
+ synchronized (_pipes) {
+ _pipes.remove(correlationId); // TODO - encapsulate properly
+ }
+ }
+
+ public void notifyIdle(Pipe pipe) {
+ // Cluster Connections idle automatically after running...
+ //super.notifyIdle(pipe);
+ notifyClosed(pipe);
+ }
+
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/DummyServer.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/DummyServer.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/DummyServer.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/DummyServer.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,59 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io.impl;
+
+import org.codehaus.wadi.sandbox.io.Pipe;
+import org.codehaus.wadi.sandbox.io.Server;
+import org.codehaus.wadi.sandbox.io.ServerConfig;
+
+public class DummyServer implements Server {
+
+ public DummyServer() {
+ super();
+ // TODO Auto-generated constructor stub
+ }
+
+ public void init(ServerConfig config) {
+ // do nothing
+ }
+
+ public void start() throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void stop() throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void waitForExistingPipes() {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void stopAcceptingPipes() {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void run(Pipe pipe) {
+ // TODO Auto-generated method stub
+
+ }
+
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/NIOPipe.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/NIOPipe.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/NIOPipe.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/NIOPipe.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,102 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.wadi.sandbox.io.NIOPipeConfig;
+
+import EDU.oswego.cs.dl.util.concurrent.Channel;
+import EDU.oswego.cs.dl.util.concurrent.Puttable;
+import EDU.oswego.cs.dl.util.concurrent.Sync;
+import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
+
+public class NIOPipe extends AbstractPipe implements Puttable {
+
+ protected final static Log _log=LogFactory.getLog(NIOPipe.class);
+
+ protected final SocketChannel _channel;
+ protected final SelectionKey _key;
+ protected final Channel _inputQueue;
+ protected final Puttable _outputQueue;
+
+ public NIOPipe(NIOPipeConfig config, long timeout, SocketChannel channel, SelectionKey key, Channel inputQueue, Puttable outputQueue, int bufferSize) {
+ super(config, timeout);
+ _channel=channel;
+ _key=key;
+ _inputQueue=inputQueue;
+ _outputQueue=outputQueue;
+
+ // ctor is called by Server thread - find some way to do these allocations on Consumer thread...
+ _inputStream=new ByteBufferInputStream(_inputQueue, _outputQueue, _timeout);
+ _outputStream=new ByteBufferOutputStream(_channel, bufferSize);
+ }
+
+ protected final SynchronizedBoolean _running=new SynchronizedBoolean(false);
+ public boolean getRunning() {return _running.get();}
+ public void setRunning(boolean running) {_running.set(running);}
+
+ protected ByteBufferInputStream _inputStream;
+ public InputStream getInputStream() {return _inputStream;}
+
+ protected OutputStream _outputStream;
+ public OutputStream getOutputStream() {return _outputStream;}
+
+ public void close() throws IOException {
+ super.close();
+ Sync lock=((NIOPipeConfig)_config).getLock();
+ do {
+ try {
+ lock.acquire(); // sync following actions with Server loop...
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ } while (Thread.interrupted());
+
+ try {
+ //_log.info("cancelling: "+_key);
+ _channel.socket().shutdownOutput();
+ _channel.socket().close();
+ _channel.close();
+ } finally {
+ lock.release();
+ }
+ }
+
+ // called by server...
+ public synchronized void commit() throws IOException {
+ _inputStream.commit();
+ _channel.socket().shutdownInput();
+ _key.cancel();
+ }
+
+ // Puttable - ByteBuffers only please :-)
+
+ public void put(Object item) throws InterruptedException {
+ _inputStream.put(item);
+ }
+
+ public boolean offer(Object item, long msecs) throws InterruptedException {
+ return _inputStream.offer(item, msecs);
+ }
+}
\ No newline at end of file
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/NIOServer.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/NIOServer.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/NIOServer.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/NIOServer.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,222 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+
+import org.codehaus.wadi.impl.Utils;
+import org.codehaus.wadi.sandbox.io.NIOPipeConfig;
+import org.codehaus.wadi.sandbox.io.Pipe;
+
+import EDU.oswego.cs.dl.util.concurrent.FIFOReadWriteLock;
+import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
+import EDU.oswego.cs.dl.util.concurrent.NullSync;
+import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
+import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
+import EDU.oswego.cs.dl.util.concurrent.Sync;
+import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
+
+// NOTES - reuse server BBS
+// Stream should implement bulk transfers
+
+// Do not put Connections onto Queue until the have input
+// When a Connection's Peer finishes, it loses its Thread, but is not clos()-ed
+
+public class NIOServer extends AbstractSocketServer implements NIOPipeConfig {
+
+ protected final SynchronizedBoolean _accepting=new SynchronizedBoolean(false);
+ protected final ReadWriteLock _lock=new FIFOReadWriteLock();
+ protected final EDU.oswego.cs.dl.util.concurrent.Channel _queue=new LinkedQueue(); // parameterise ?; // we get our ByteBuffers from here...
+ protected final long _serverTimeout;
+ protected final int _outputBufferSize;
+
+ public NIOServer(PooledExecutor executor, long pipeTimeout, InetSocketAddress address, long serverTimeout, int numInputBuffers, int inputBufferSize, int outputBufferSize) {
+ super(executor, pipeTimeout, address);
+ _serverTimeout=serverTimeout;
+ _outputBufferSize=outputBufferSize;
+
+ for (int i=0; i<numInputBuffers; i++)
+ Utils.safePut(ByteBuffer.allocateDirect(inputBufferSize), _queue);
+ }
+
+ protected ServerSocketChannel _channel;
+ protected Selector _selector;
+ protected SelectionKey _key;
+
+ public synchronized void start() throws Exception {
+ _channel=ServerSocketChannel.open();
+ _channel.configureBlocking(false);
+ _channel.socket().bind(_address);
+ _channel.socket().setSoTimeout((int)_serverTimeout);
+ //_channel.socket().setReuseAddress(true);
+ _log.info(_channel);
+ _address=(InetSocketAddress)_channel.socket().getLocalSocketAddress(); // in case address was not fully specified
+ _selector= Selector.open();
+ _key=_channel.register(_selector, SelectionKey.OP_ACCEPT);
+ _running=true;
+ _accepting.set(true);
+ (_thread=new Thread(new Producer(), "WADI NIO Server")).start();
+ _log.info("Producer thread started");
+ _log.info("started: " + _channel);
+
+ }
+
+ public synchronized void stop() throws Exception {
+ if (_log.isInfoEnabled()) _log.info("stopping: " + _channel);
+ stopAcceptingPipes();
+ waitForExistingPipes();
+ // stop Producer loop
+ _running=false;
+ _selector.wakeup();
+ _thread.join();
+ _selector.close();
+ _thread=null;
+ _log.info("Producer thread stopped");
+ // tidy up
+ _channel.socket().close();
+ _channel.close();
+ if (_log.isInfoEnabled()) _log.info("stopped: " + _address);
+ }
+
+ public void stopAcceptingPipes() {
+ _accepting.set(false);
+ }
+
+ public void accept(SelectionKey key) throws ClosedChannelException, IOException {
+ ServerSocketChannel server=(ServerSocketChannel)key.channel();
+ SocketChannel channel=server.accept();
+ channel.configureBlocking(false);
+ SelectionKey readKey=channel.register(_selector, SelectionKey.OP_READ/*|SelectionKey.OP_WRITE*/);
+ NIOPipe pipe=new NIOPipe(this, _pipeTimeout, channel, readKey, new LinkedQueue(), _queue, _outputBufferSize); // reuse the queue
+ readKey.attach(pipe);
+ add(pipe);
+ }
+
+ public void read(SelectionKey key) throws IOException {
+ NIOPipe pipe=(NIOPipe)key.attachment();
+ ByteBuffer buffer=(ByteBuffer)Utils.safeTake(_queue);
+
+ int count=((SocketChannel)key.channel()).read(buffer); // read off network into buffer
+ if (count<0) {
+ if (_log.isTraceEnabled()) _log.trace("committing server Pipe: "+pipe);
+ pipe.commit();
+ buffer.clear();
+ Utils.safePut(buffer, _queue); // could be cleverer
+ } else {
+ if (_log.isTraceEnabled()) _log.trace("servicing server Pipe: "+pipe+" ("+count+" bytes)");
+ buffer.flip();
+ Utils.safePut(buffer, pipe);
+ }
+
+ if (!pipe.getRunning()) {
+ pipe.setRunning(true);
+ try {
+ if (_log.isTraceEnabled()) _log.trace("running server Pipe: "+pipe);
+ _executor.execute(pipe);
+ } catch (InterruptedException e) { // TODO - do this safely...
+ _log.error("problem running Pipe", e);
+ }
+ }
+ }
+
+
+ public class Producer implements Runnable {
+
+ public void run() {
+ SelectionKey key=null;
+
+ while (_running) {
+
+ Sync lock=_lock.writeLock();
+ do {
+ try {
+ lock.acquire();
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ } while (Thread.interrupted());
+
+ try {
+ _selector.select();
+
+ for (Iterator i=_selector.selectedKeys().iterator(); i.hasNext(); ) {
+ key=(SelectionKey)i.next();
+
+ //boolean used=false;
+ //_log.info("picked up key: "+key);
+
+ if (key.isAcceptable() && _accepting.get()) {
+ accept(key);
+ //used=true;
+ //_log.info("accepted key: "+key);
+ }
+
+ if (key.isReadable()) {
+ read(key);
+ //used=true;
+ //_log.info("read key: "+key);
+ }
+
+// if (key.isWritable()) {
+// used=true;
+// // _log.info("wrote key: "+key);
+// }
+// if (key.isConnectable()) {
+// used=true;
+// //_log.info("connected key: "+key);
+// }
+
+// if (!used)
+// _log.warn("unused key: "+key);
+
+ i.remove();
+
+ }
+ } catch (Throwable t) {
+ _log.error("unexpected problem", t);
+ } finally {
+ lock.release();
+ // now threads who want to close selectors, channels etc can run on a read lock...
+ }
+ }
+ }
+ }
+
+ // PipeConfig
+
+ public void notifyIdle(Pipe pipe) {
+ if (_log.isTraceEnabled()) _log.trace("idling server Pipe: "+pipe);
+ ((NIOPipe)pipe).setRunning(false);
+ }
+
+ // NIOPipeConfig
+
+ protected final Sync _dummy=new NullSync();
+ public Sync getLock() {
+ //return _dummy;
+ return _lock.readLock();
+ }
+
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/Peer.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/Peer.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/Peer.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/Peer.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,35 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io.impl;
+
+import java.io.Serializable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.wadi.sandbox.io.PeerConfig;
+
+public abstract class Peer implements Serializable {
+
+ protected static final Log _log=LogFactory.getLog(Peer.class);
+
+ public Peer() {
+ // used for deserialisation
+ }
+
+ public abstract boolean run(PeerConfig config) throws Exception;
+
+}
\ No newline at end of file
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ServerClusterPipe.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ServerClusterPipe.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ServerClusterPipe.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ServerClusterPipe.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,39 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io.impl;
+
+import javax.jms.Destination;
+
+import org.activecluster.Cluster;
+import org.codehaus.wadi.sandbox.io.PipeConfig;
+
+import EDU.oswego.cs.dl.util.concurrent.Channel;
+
+public class ServerClusterPipe extends AbstractClusterPipe {
+
+ protected final String _theirCorrelationId;
+
+ public ServerClusterPipe(PipeConfig config, long timeout, Cluster cluster, Destination us, String ourId, Destination them, String theirId, Channel inputQueue) {
+ super(config, timeout, cluster, us, ourId, them, inputQueue);
+ _theirCorrelationId=theirId;
+ }
+
+ public String getTheirCorrelationId() {
+ return _theirCorrelationId;
+ }
+
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/SocketClientPipe.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/SocketClientPipe.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/SocketClientPipe.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/SocketClientPipe.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,86 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+import org.codehaus.wadi.Contextualiser;
+import org.codehaus.wadi.sandbox.io.Pipe;
+import org.codehaus.wadi.sandbox.io.PipeConfig;
+
+public class SocketClientPipe extends AbstractPipe {
+
+ public static class DummyPipeConfig implements PipeConfig {
+ public void notifyIdle(Pipe pipe) {/* do nothing */}
+ public void notifyClosed(Pipe pipe) {/* do nothing */}
+ public Contextualiser getContextualiser() {return null;}
+ public String getNodeId() {return null;}
+ }
+
+
+ protected final SocketChannel _channel;
+ protected final Socket _socket;
+
+ public SocketClientPipe(InetSocketAddress address, long timeout) throws IOException {
+ super(new DummyPipeConfig(), timeout);
+ _channel=SocketChannel.open(address);
+ _channel.configureBlocking(true);
+ _socket=_channel.socket();
+ _socket.setKeepAlive(true);
+ _socket.setSoTimeout((int)_timeout);
+ }
+
+
+ // Pipe
+
+ public void close() throws IOException {
+ super.close(); // deals with streams...
+ try{
+ _socket.close();
+ }
+ catch(Exception e){
+ _log.warn("problem closing socket", e);
+ }
+ try{
+ _channel.close();
+ }
+ catch(Exception e){
+ _log.warn("problem closing socket", e);
+ }
+ }
+
+ // StreamPipe
+
+ public InputStream getInputStream() throws IOException {return _socket.getInputStream();}
+ public OutputStream getOutputStream() throws IOException {return _socket.getOutputStream();}
+
+ // WritableByteChannel - supported
+
+ public int write(ByteBuffer src) throws IOException {
+ return _channel.write(src);
+ }
+
+ public boolean isOpen() {
+ return _channel.isOpen();
+ }
+}
Added: incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ThreadFactory.java
URL: http://svn.apache.org/viewcvs/incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ThreadFactory.java?rev=356933&view=auto
==============================================================================
--- incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ThreadFactory.java (added)
+++ incubator/wadi/trunk/modules/core/src/test/java/org/codehaus/wadi/sandbox/io/impl/ThreadFactory.java Wed Dec 14 15:32:56 2005
@@ -0,0 +1,27 @@
+/**
+ *
+ * Copyright 2003-2005 Core Developers Network Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.codehaus.wadi.sandbox.io.impl;
+
+public class ThreadFactory implements EDU.oswego.cs.dl.util.concurrent.ThreadFactory {
+
+ protected int _count;
+
+ public Thread newThread(Runnable runnable) {
+ return new Thread(runnable, "WADI Pool ("+(_count++)+")");
+ }
+
+}