You are viewing a plain text version of this content. The canonical link for it is here.
Posted to scm@geronimo.apache.org by ad...@apache.org on 2005/10/28 04:00:22 UTC
svn commit: r329036 [3/7] - in /geronimo/trunk/sandbox/freeorb: ./
geronimo-orb/ geronimo-orb/src/ geronimo-orb/src/main/
geronimo-orb/src/main/java/ geronimo-orb/src/main/java/org/
geronimo-orb/src/main/java/org/apache/ geronimo-orb/src/main/java/org/...
Added: geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/OutputChannel.java
URL: http://svn.apache.org/viewcvs/geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/OutputChannel.java?rev=329036&view=auto
==============================================================================
--- geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/OutputChannel.java (added)
+++ geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/OutputChannel.java Thu Oct 27 19:00:06 2005
@@ -0,0 +1,43 @@
+/**
+ *
+ * Copyright 2005 The Apache Software Foundation or its licensors, as applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.corba.channel;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+
+public abstract class OutputChannel extends OutputStream {
+
+ public abstract void writeByte(byte b) throws IOException;
+
+ public abstract void writeShort(short s) throws IOException;
+
+ public abstract void writeInt(int i) throws IOException;
+
+ public abstract void writeLong(long l) throws IOException;
+
+ public abstract void skip(int count) throws IOException;
+
+ public abstract OutputChannelMarker mark(MarkHandler handler);
+
+ public void write(int b) throws IOException {
+ writeByte((byte) b);
+ }
+
+ public abstract void relinquish();
+
+}
Added: geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/OutputChannelMarker.java
URL: http://svn.apache.org/viewcvs/geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/OutputChannelMarker.java?rev=329036&view=auto
==============================================================================
--- geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/OutputChannelMarker.java (added)
+++ geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/OutputChannelMarker.java Thu Oct 27 19:00:06 2005
@@ -0,0 +1,32 @@
+/**
+ *
+ * Copyright 2005 The Apache Software Foundation or its licensors, as applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.corba.channel;
+
+import java.io.IOException;
+
+
+public abstract class OutputChannelMarker {
+
+ abstract public void release();
+
+ abstract public void putByte(int idx, byte b) throws IOException;
+
+ abstract public void putInt(int idx, int b) throws IOException;
+
+ abstract public void putLong(int idx, long b) throws IOException;
+
+}
Added: geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/RingBuffer.java
URL: http://svn.apache.org/viewcvs/geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/RingBuffer.java?rev=329036&view=auto
==============================================================================
--- geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/RingBuffer.java (added)
+++ geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/RingBuffer.java Thu Oct 27 19:00:06 2005
@@ -0,0 +1,181 @@
+/**
+ *
+ * Copyright 2005 The Apache Software Foundation or its licensors, as applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.corba.channel;
+
+import java.io.IOException;
+import java.nio.ByteOrder;
+
+
+/**
+ * @author Jeppe Sommer (jso@trifork.com)
+ */
+public abstract class RingBuffer {
+
+ abstract protected void putByte(byte b) throws IOException;
+
+ abstract protected void putShort(short s) throws IOException;
+
+ abstract protected void putInt(int i) throws IOException;
+
+ abstract protected void putLong(long l) throws IOException;
+
+ abstract protected byte getByte() throws IOException;
+
+ /**
+ * returns -1 at EOF
+ */
+ abstract protected int get() throws IOException;
+
+ abstract protected int getInt() throws IOException;
+
+ abstract protected short getShort() throws IOException;
+
+ abstract protected long getLong() throws IOException;
+
+ abstract protected void putSkip(int amount) throws IOException;
+
+ abstract protected void skipInput(int amount) throws IOException;
+
+ abstract protected OutputChannelMarker setPutMark(MarkHandler handler);
+
+ abstract protected void closePutEnd();
+
+ abstract protected void close();
+
+ OutputChannel outputView = new OutputChannel() {
+
+ public void writeByte(byte b) throws IOException {
+ putByte(b);
+ }
+
+ public void writeShort(short s) throws IOException {
+ putShort(s);
+ }
+
+ public void writeInt(int i) throws IOException {
+ putInt(i);
+ }
+
+ public void writeLong(long l) throws IOException {
+ putLong(l);
+ }
+
+ public void skip(int count) throws IOException {
+ putSkip(count);
+ }
+
+ public OutputChannelMarker mark(MarkHandler handler) {
+ return setPutMark(handler);
+ }
+
+ public void write(byte[] data, int off, int len) throws IOException {
+ RingBuffer.this.write(data, off, len);
+ }
+
+ public void flush() throws IOException {
+ RingBuffer.this.flush();
+ }
+
+ public void close() {
+ RingBuffer.this.closePutEnd();
+ }
+
+ public void relinquish() {
+ RingBuffer.this.relinquishOutput();
+ }
+
+ };
+
+ InputChannel inputView = new InputChannel() {
+
+ public int read(byte[] data, int off, int len) throws IOException {
+ return RingBuffer.this.get(data, off, len);
+ }
+
+ public int read() throws IOException {
+ return get();
+ }
+
+ public short readShort() throws IOException {
+ return getShort();
+ }
+
+ public byte readByte() throws IOException {
+ return getByte();
+ }
+
+ public int readInt() throws IOException {
+ return getInt();
+ }
+
+ public long readLong() throws IOException {
+ return getLong();
+ }
+
+ public void skip(int count) throws IOException {
+ skipInput(count);
+ }
+
+ public void close() {
+ RingBuffer.this.close();
+ }
+
+ public boolean isClosed() {
+ return RingBuffer.this.isEmpty() && RingBuffer.this.isClosed();
+ }
+
+ public int available() {
+ return RingBuffer.this.availableForGet();
+ }
+
+ public void relinquish() {
+ RingBuffer.this.relinquishInput();
+ }
+
+ public void setOrder(ByteOrder order) {
+ RingBuffer.this.setByteOrderForGet(order);
+ }
+
+ };
+
+ public OutputChannel getOutputChannel() {
+ return outputView;
+ }
+
+ protected abstract void setByteOrderForGet(ByteOrder order);
+
+ protected abstract void relinquishInput();
+
+ protected abstract void relinquishOutput();
+
+ protected abstract void write(byte[] data, int off, int len) throws IOException;
+
+ public abstract int availableForGet();
+
+ protected abstract boolean isClosed();
+
+ protected abstract int get(byte[] data, int off, int len) throws IOException;
+
+ protected abstract void flush() throws IOException;
+
+ public InputChannel getInputChannel() {
+ return inputView;
+ }
+
+ public abstract boolean isEmpty();
+
+}
Added: geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/RingByteBuffer.java
URL: http://svn.apache.org/viewcvs/geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/RingByteBuffer.java?rev=329036&view=auto
==============================================================================
--- geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/RingByteBuffer.java (added)
+++ geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/RingByteBuffer.java Thu Oct 27 19:00:06 2005
@@ -0,0 +1,993 @@
+/**
+ *
+ * Copyright 2005 The Apache Software Foundation or its licensors, as applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.corba.channel;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.GatheringByteChannel;
+import java.nio.channels.ScatteringByteChannel;
+import java.nio.channels.SocketChannel;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.geronimo.corba.concurrency.IOSemaphoreClosedException;
+
+
+/**
+ * @author Jeppe Sommer (jso@trifork.com)
+ * @author Kresten Krab Thorup (krab@trifotk.com)
+ */
+public abstract class RingByteBuffer extends RingBuffer {
+
+ int capacity;
+
+ ByteBuffer byteBuffer;
+
+ IOSemaphore putSpace;
+
+ IOSemaphore getSpace;
+
+ // byte[] buf;
+ // int bufStart;
+
+ List putMarks = new LinkedList();
+
+ ByteBuffer b0, b1, b2;
+
+ ByteBuffer[] bb0, bb1, bb2;
+
+ int nextPutPos;
+
+ private int nextGetPos;
+
+ private String name;
+
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+
+ sb.append("RingBuffer[").append(getName()).append(" ");
+ sb.append("put=").append(nextPutPos).append("; ");
+ sb.append("putAvailable=").append(availableForPut()).append("; ");
+ sb.append("get=").append(nextGetPos).append("; ");
+ sb.append("getAvailable=").append(availableForGet());
+ sb.append("]");
+
+ return sb.toString();
+ }
+
+ public RingByteBuffer(int capacity, boolean direct) {
+ this.capacity = capacity;
+ byteBuffer = direct ? ByteBuffer.allocateDirect(capacity) : ByteBuffer
+ .allocate(capacity);
+ byteBuffer.order(ByteOrder.BIG_ENDIAN);
+
+ // buf = byteBuffer.array(); // May throw UnsupportedOperationException
+ // bufStart = byteBuffer.arrayOffset();
+
+ putSpace = new IOSemaphore(capacity - 1);
+ getSpace = new IOSemaphore(0);
+
+ b1 = byteBuffer.slice();
+ b2 = byteBuffer.slice();
+ b0 = byteBuffer.slice(); // used for get(byte[])
+
+ // bb0 is used for empty I/O
+ bb0 = new ByteBuffer[]{ByteBuffer.allocateDirect(0)};
+
+ // bb1 is used for I/O where the contents fits in a contiguous buffer
+ bb1 = new ByteBuffer[]{b1};
+
+ // bb2 is for where we need to split the ring-buffer contents across two
+ // buffers
+ bb2 = new ByteBuffer[]{b1, b2};
+ }
+
+ public void flush() throws IOException {
+ flushMarks();
+ bufferFullHook("flush");
+ }
+
+ /**
+ * this is a hint to the implementation, that writing should commence
+ *
+ * @throws IOException
+ */
+ abstract protected void bufferFullHook(String how) throws IOException;
+
+ /**
+ * This is a hint to the implementation, that reading should commence. If
+ * the buffer is in a synchroneous context, this should simply read
+ * something into the buffer by calling readFrom, in an asycnhroneous
+ * context this should register interest in reading with an underlying
+ * selector.
+ *
+ * @throws IOException
+ */
+ abstract protected void bufferEmptyHook(String how) throws IOException;
+
+ /** */
+ abstract protected void readEOFHook();
+
+ /**
+ * put a single byte to the buffer
+ */
+ protected void putByte(byte b) throws IOException {
+
+ ensurePutSpace(1);
+
+ byteBuffer.put(nextPutPos, b);
+ nextPutPos = incr(nextPutPos, 1);
+
+ increaseGetSpace(1);
+ }
+
+ protected void putShort(short i) throws IOException {
+
+ ensurePutSpace(2);
+
+ // byteBuffer.putInt(put, i)
+ if (nextPutPos + 2 > byteBuffer.limit()) {
+ byteBuffer.put(nextPutPos, (byte) ((i >> 8) & 0xff));
+ byteBuffer.put(nextPutPos = incr(nextPutPos, 1), (byte) ((i >> 0) & 0xff));
+ nextPutPos = incr(nextPutPos, 1);
+ } else {
+ byteBuffer.putShort(nextPutPos, i);
+ nextPutPos = incr(nextPutPos, 2);
+ }
+
+ increaseGetSpace(2);
+ }
+
+ protected void putInt(int i) throws IOException {
+
+ ensurePutSpace(4);
+
+ // byteBuffer.putInt(put, i)
+ if (nextPutPos + 4 > byteBuffer.limit()) {
+ byteBuffer.put(nextPutPos, (byte) ((i >> 24) & 0xff));
+ byteBuffer.put(nextPutPos = incr(nextPutPos, 1), (byte) ((i >> 16) & 0xff));
+ byteBuffer.put(nextPutPos = incr(nextPutPos, 1), (byte) ((i >> 8) & 0xff));
+ byteBuffer.put(nextPutPos = incr(nextPutPos, 1), (byte) ((i >> 0) & 0xff));
+ nextPutPos = incr(nextPutPos, 1);
+ } else {
+ byteBuffer.putInt(nextPutPos, i);
+ nextPutPos = incr(nextPutPos, 4);
+ }
+
+ increaseGetSpace(4);
+ }
+
+ protected void putLong(long l) throws IOException {
+
+ ensurePutSpace(8);
+
+ // byteBuffer.putInt(put, i)
+ if (nextPutPos + 8 > byteBuffer.limit()) {
+ byteBuffer.put(nextPutPos, (byte) ((l >> 56) & 0xff));
+ byteBuffer.put(nextPutPos = incr(nextPutPos, 1), (byte) ((l >> 48) & 0xff));
+ byteBuffer.put(nextPutPos = incr(nextPutPos, 1), (byte) ((l >> 40) & 0xff));
+ byteBuffer.put(nextPutPos = incr(nextPutPos, 1), (byte) ((l >> 32) & 0xff));
+ byteBuffer.put(nextPutPos = incr(nextPutPos, 1), (byte) ((l >> 24) & 0xff));
+ byteBuffer.put(nextPutPos = incr(nextPutPos, 1), (byte) ((l >> 16) & 0xff));
+ byteBuffer.put(nextPutPos = incr(nextPutPos, 1), (byte) ((l >> 8) & 0xff));
+ byteBuffer.put(nextPutPos = incr(nextPutPos, 1), (byte) ((l >> 0) & 0xff));
+ nextPutPos = incr(nextPutPos, 1);
+ } else {
+ byteBuffer.putLong(nextPutPos, l);
+ nextPutPos = incr(nextPutPos, 8);
+ }
+
+ increaseGetSpace(8);
+ }
+
+ private void ensurePutSpace(int amount) throws IOException {
+ ensureSomePutSpace(amount, amount);
+ }
+
+ private int ensureSomePutSpace(int min, int max) throws IOException {
+
+ while (!putMarks.isEmpty()
+ && availableForPut() + availableForGet() < min)
+ {
+
+ // No room in buffer and we're not going to free enough
+ // space
+ // by flushing non-marked bytes.
+ flushOneMark();
+ }
+
+ if (availableForPut() <= max) {
+ bufferFullHook("ensurePutSpace");
+ }
+
+ return putSpace.acquireSome(1, max, 0L);
+ }
+
+ private void flushOneMark() throws IOException {
+ if (putMarks.isEmpty()) {
+ throw new IllegalStateException();
+ }
+ AsyncMarkState state = (AsyncMarkState) putMarks.remove(0);
+
+ if (!state.isReleased) {
+ state.handler.bufferFull(state);
+ if (!state.isReleased) {
+ state.release();
+ }
+ }
+ }
+
+ private int availableForPut() {
+ return putSpace.availablePermits();
+ }
+
+ private void flushMarks() throws IOException {
+ AsyncMarkState[] markStates = new AsyncMarkState[putMarks.size()];
+ putMarks.toArray(markStates);
+ putMarks.clear();
+
+ for (int i = markStates.length - 1; i >= 0; i--) {
+ markStates[i].handler.bufferFull(markStates[i]);
+ if (!markStates[i].isReleased) {
+ markStates[i].release();
+ }
+ }
+ }
+
+ private void increaseGetSpace(int amount) {
+ AsyncMarkState mark = lastMark();
+ if (mark != null) {
+ // A mark has been set, so don't release permits until mark is
+ // released.
+ mark.permits += amount;
+ } else {
+ // Release permits immediatly
+ getSpace.releaseIfNotClosed(amount);
+ }
+ }
+
+ private AsyncMarkState lastMark() {
+ if (putMarks.isEmpty()) {
+ return null;
+ }
+ return (AsyncMarkState) putMarks.get(putMarks.size() - 1);
+ }
+
+ private AsyncMarkState firstMark() {
+ if (putMarks.isEmpty()) {
+ return null;
+ }
+ return (AsyncMarkState) putMarks.get(0);
+ }
+
+ private void ensureGetSpace(int amount) throws IOException {
+ ensureSomeGetSpace(amount, amount);
+ }
+
+ public int availableForGet() {
+ return getSpace.availablePermits();
+ }
+
+ private int ensureSomeGetSpace(int min, int max) throws IOException {
+
+ // this operation will make the buffer empty?
+ if (availableForGet() <= min) {
+ bufferEmptyHook("ensureSomeGetSpace(" + min + "," + max + ")");
+ }
+
+ try {
+ return getSpace.acquireSome(min, max, 0L);
+ }
+ catch (IOSemaphoreClosedException ex) {
+
+ // hook method to signal that EOF has been read
+ readEOFHook();
+ throw ex;
+ }
+ }
+
+ private void increasePutSpace(int amount) {
+ putSpace.releaseIfNotClosed(amount);
+ }
+
+ private int incr(int val, int incr) {
+ return (val + incr) % capacity;
+ }
+
+ protected int get() throws IOException {
+ try {
+ return getByte();
+ }
+ catch (EOFException e) {
+ return -1;
+ }
+ }
+
+ protected void write(byte[] data, int off, int len) throws IOException {
+ if (len == 0) {
+ return;
+ }
+
+ do {
+ int bytes = writeSome(data, off, len);
+ off += bytes;
+ len -= bytes;
+ }
+ while (len != 0);
+ }
+
+ private int writeSome(byte[] data, int off, int len) throws IOException {
+
+ System.out.println("WRITE SOME");
+
+ int some = ensureSomePutSpace(1, len);
+
+ if (nextPutPos + some > capacity) {
+
+ int size1 = capacity - nextPutPos;
+ b0.position(nextPutPos);
+ b0.put(data, off, size1);
+
+ b0.position(0);
+ b0.put(data, off + size1, some - size1);
+
+ } else {
+
+ b0.position(nextPutPos);
+ b0.put(data, off, some);
+
+ }
+
+ nextPutPos = incr(nextPutPos, some);
+
+ increaseGetSpace(some);
+ return some;
+ }
+
+ protected int get(byte[] data, int off, int len) throws IOException {
+ if (len == 0) {
+ if (isEmpty() && isClosed()) {
+ return -1;
+ }
+ return 0;
+ }
+
+ int occupied;
+
+ try {
+ occupied = ensureSomeGetSpace(1, len);
+ }
+ catch (EOFException e) {
+ return -1;
+ }
+
+ if (nextGetPos + occupied > capacity) {
+ b0.position(nextGetPos);
+
+ int size1 = capacity - nextGetPos;
+ b0.get(data, off, size1);
+
+ int size2 = occupied - size1;
+ b0.position(0);
+ b0.get(data, off + size1, size2);
+
+ } else {
+ b0.position(nextGetPos);
+ b0.get(data, off, occupied);
+ }
+
+ nextGetPos = incr(nextGetPos, occupied);
+
+ increasePutSpace(occupied);
+ return occupied;
+ }
+
+ protected byte getByte() throws IOException {
+ ensureGetSpace(1);
+
+ if ((nextGetPos < 0) || (nextGetPos >= byteBuffer.limit())) {
+ System.out.println("bad");
+ }
+
+ byte result = byteBuffer.get(nextGetPos);
+
+ incrGet();
+ increasePutSpace(1);
+
+ return result;
+ }
+
+ protected void setByteOrderForGet(ByteOrder order) {
+ byteBuffer.order(order);
+ }
+
+ protected int getInt() throws IOException {
+ ensureGetSpace(4);
+ int result;
+
+ if (nextGetPos + 4 < capacity) {
+ result = byteBuffer.getInt(nextGetPos);
+ nextGetPos = incr(nextGetPos, 4);
+
+ } else if (byteBuffer.order() == ByteOrder.BIG_ENDIAN) {
+
+ result = (0xff & byteBuffer.get(nextGetPos)) << 24;
+ incrGet();
+ result |= (0xff & byteBuffer.get(nextGetPos)) << 16;
+ incrGet();
+ result |= (0xff & byteBuffer.get(nextGetPos)) << 8;
+ incrGet();
+ result |= (0xff & byteBuffer.get(nextGetPos)) << 0;
+ incrGet();
+
+ } else {
+
+ result = (0xff & byteBuffer.get(nextGetPos)) << 0;
+ incrGet();
+ result |= (0xff & byteBuffer.get(nextGetPos)) << 8;
+ incrGet();
+ result |= (0xff & byteBuffer.get(nextGetPos)) << 16;
+ incrGet();
+ result |= (0xff & byteBuffer.get(nextGetPos)) << 24;
+ incrGet();
+
+ }
+
+ increasePutSpace(4);
+
+ return result;
+ }
+
+ protected short getShort() throws IOException {
+ ensureGetSpace(2);
+ short result;
+
+ if (nextGetPos + 2 < capacity) {
+ result = byteBuffer.getShort(nextGetPos);
+ nextGetPos = incr(nextGetPos, 2);
+
+ } else if (byteBuffer.order() == ByteOrder.BIG_ENDIAN) {
+
+ result = (short) ((0xff & byteBuffer.get(nextGetPos)) << 8);
+ incrGet();
+ result |= (0xff & byteBuffer.get(nextGetPos)) << 0;
+ incrGet();
+
+ } else {
+
+ result = (short) ((0xff & byteBuffer.get(nextGetPos)) << 0);
+ incrGet();
+ result |= (0xff & byteBuffer.get(nextGetPos)) << 8;
+ incrGet();
+
+ }
+
+ increasePutSpace(2);
+
+ return result;
+ }
+
+ protected long getLong() throws IOException {
+ ensureGetSpace(8);
+ long result;
+
+ if (nextGetPos + 8 < capacity) {
+ result = byteBuffer.getLong(nextGetPos);
+ nextGetPos = incr(nextGetPos, 8);
+
+ } else if (byteBuffer.order() == ByteOrder.BIG_ENDIAN) {
+
+ result = (0xffL & byteBuffer.get(nextGetPos)) << 56;
+ incrGet();
+ result |= (0xffL & byteBuffer.get(nextGetPos)) << 48;
+ incrGet();
+ result |= (0xffL & byteBuffer.get(nextGetPos)) << 40;
+ incrGet();
+ result |= (0xffL & byteBuffer.get(nextGetPos)) << 32;
+ incrGet();
+ result |= (0xffL & byteBuffer.get(nextGetPos)) << 24;
+ incrGet();
+ result |= (0xffL & byteBuffer.get(nextGetPos)) << 16;
+ incrGet();
+ result |= (0xffL & byteBuffer.get(nextGetPos)) << 8;
+ incrGet();
+ result |= (0xffL & byteBuffer.get(nextGetPos)) << 0;
+ incrGet();
+
+ } else {
+
+ result = (0xff & byteBuffer.get(nextGetPos)) << 0;
+ incrGet();
+ result |= (0xff & byteBuffer.get(nextGetPos)) << 8;
+ incrGet();
+ result |= (0xff & byteBuffer.get(nextGetPos)) << 16;
+ incrGet();
+ result |= (0xff & byteBuffer.get(nextGetPos)) << 24;
+ incrGet();
+ result |= (0xff & byteBuffer.get(nextGetPos)) << 32;
+ incrGet();
+ result |= (0xff & byteBuffer.get(nextGetPos)) << 40;
+ incrGet();
+ result |= (0xff & byteBuffer.get(nextGetPos)) << 48;
+ incrGet();
+ result |= (0xff & byteBuffer.get(nextGetPos)) << 56;
+ incrGet();
+
+ }
+
+ increasePutSpace(8);
+
+ return result;
+ }
+
+ private void incrGet() {
+ this.nextGetPos = incr(nextGetPos, 1);
+ }
+
+ protected OutputChannelMarker setPutMark(MarkHandler handler) {
+ AsyncMarkState state = new AsyncMarkState(handler);
+
+ putMarks.add(state);
+
+ return state;
+ }
+
+ private ByteBuffer[] exposePutSpace() {
+ return expose(nextPutPos, availableForPut());
+ }
+
+ private ByteBuffer[] exposeGetSpace() {
+ return expose(nextGetPos, availableForGet());
+ }
+
+ /**
+ * expose the ring buffer's content
+ */
+ private ByteBuffer[] expose(int start, int len) {
+
+ if (len == 0) {
+ return bb0;
+ }
+
+ if (start > capacity || start < 0) {
+ throw new IllegalArgumentException();
+ }
+
+ if (start + len <= capacity) {
+ b1.position(0);
+ b1.limit(start + len);
+ b1.position(start);
+
+ if (b1.remaining() != len) {
+ throw new InternalError();
+ }
+
+ return bb1;
+ } else {
+ b1.position(0); // ensure position for next
+ b1.limit(capacity);
+ b1.position(start);
+
+ b2.position(0);
+ b2.limit(len - b1.remaining());
+
+ if (b1.remaining() + b2.remaining() != len) {
+ throw new InternalError();
+ }
+
+ return bb2;
+ }
+
+ }
+
+ public boolean isEmpty() {
+ return availableForGet() <= 0;
+ }
+
+ /**
+ * @param count
+ * @throws IOException
+ */
+ public void putSkip(int count) throws IOException {
+
+ ensurePutSpace(count);
+ nextPutPos = incr(nextPutPos, count);
+ increaseGetSpace(count);
+ }
+
+ public void skipInput(int count) throws IOException {
+
+ ensureGetSpace(count);
+
+ this.nextGetPos = incr(nextGetPos, count);
+
+ increasePutSpace(count);
+ }
+
+ class AsyncMarkState extends OutputChannelMarker {
+
+ int permits;
+
+ int position;
+
+ private final MarkHandler handler;
+
+ private boolean isReleased = false;
+
+ public AsyncMarkState(MarkHandler handler) {
+ this.handler = handler;
+ }
+
+ public void release() {
+ if (isReleased) {
+ throw new IllegalStateException();
+ }
+
+ AsyncMarkState prev = getPrevious();
+ if (prev == null) {
+ getSpace.releaseIfNotClosed(permits);
+ } else {
+ prev.permits += permits;
+ }
+
+ putMarks.remove(this);
+ isReleased = true;
+ }
+
+ private AsyncMarkState getPrevious() {
+ int idx = putMarks.indexOf(this);
+ if (idx == -1 || idx == 0) {
+ return null;
+ } else {
+ return (AsyncMarkState) putMarks.get(idx - 1);
+ }
+ }
+
+ public void putByte(int idx, byte b) throws IOException {
+ int oldPut = nextPutPos;
+ nextPutPos = incr(position, idx);
+ RingByteBuffer.this.putByte(b);
+ nextPutPos = oldPut;
+ }
+
+ public void putInt(int idx, int b) throws IOException {
+ int oldPut = nextPutPos;
+ nextPutPos = incr(position, idx);
+ RingByteBuffer.this.putInt(b);
+ nextPutPos = oldPut;
+ }
+
+ public void putLong(int idx, long b) throws IOException {
+ int oldPut = nextPutPos;
+ nextPutPos = incr(position, idx);
+ RingByteBuffer.this.putLong(b);
+ nextPutPos = oldPut;
+ }
+ }
+
+ public boolean isClosedForPut() {
+ return getSpace.isClosed();
+ }
+
+ public void closePutEnd() {
+ getSpace.close();
+ }
+
+ public boolean isClosed() {
+ return putSpace.isClosed();
+ }
+
+ /**
+ * mark the receiving end as closed
+ */
+ public void close() {
+ putSpace.close();
+ }
+
+ /** */
+ public int writeTo(Socket sock) throws IOException {
+
+ SocketChannel chan = sock.getChannel();
+ if (chan == null) {
+ return writeTo(sock.getOutputStream());
+ } else {
+ return writeTo(chan);
+ }
+ }
+
+ public int writeTo(OutputStream out) throws IOException {
+
+ ByteBuffer[] buffers = exposeGetSpace();
+
+ int count;
+
+ try {
+ count = bbwrite(out, buffers);
+
+ }
+ catch (InterruptedIOException ex) {
+ count = ex.bytesTransferred;
+ }
+ catch (IOException ex) {
+ close();
+ throw ex;
+ }
+
+ if (count > 0) {
+ skipInput(count);
+ return count;
+ } else {
+ return 0;
+ }
+ }
+
+ public int writeTo(GatheringByteChannel chan) throws IOException {
+
+ ByteBuffer[] buffers = exposeGetSpace();
+
+ int count;
+
+ try {
+ if (buffers.length == 1) {
+ count = chan.write(buffers[0]);
+ } else {
+ count = (int) chan.write(buffers);
+ }
+
+ }
+ catch (ChannelClosedException ex) {
+ close();
+ count = 0;
+
+ }
+ catch (InterruptedIOException ex) {
+ count = ex.bytesTransferred;
+ }
+
+ if (count > 0) {
+ skipInput(count);
+ return count;
+ } else {
+ return 0;
+ }
+
+ }
+
+ //
+ // Utility
+ //
+
+ private int bbread(InputStream in, ByteBuffer[] buffers) throws IOException {
+ int total = 0;
+ for (int i = 0; i < buffers.length; i++) {
+ int expected = buffers[i].remaining();
+ if (expected == 0) {
+ continue;
+ }
+
+ int bytes = bbread(in, buffers[i]);
+ if (bytes == -1) {
+ if (total == 0) {
+ return -1;
+ } else {
+ return total;
+ }
+ }
+
+ total += bytes;
+ if (bytes != expected) {
+ return total;
+ }
+ }
+
+ return total;
+ }
+
+ private int bbread(InputStream in, ByteBuffer buffer) throws IOException {
+
+ byte[] data = buffer.array();
+ int off = buffer.arrayOffset();
+ int start = buffer.position();
+ int len = buffer.remaining();
+
+ if (len != 0) {
+
+ int bytes;
+
+ try {
+ bytes = in.read(data, off + start, len);
+ }
+ catch (InterruptedIOException ex) {
+ bytes = ex.bytesTransferred;
+ }
+
+ return bytes;
+ }
+
+ return len;
+ }
+
+ private int bbwrite(OutputStream out, ByteBuffer[] buffers)
+ throws IOException
+ {
+ int written = 0;
+
+ for (int i = 0; i < buffers.length; i++) {
+ int size = buffers[i].remaining();
+ if (size != 0) {
+
+ int bytes;
+
+ try {
+ bytes = bbwrite(out, buffers[i]);
+
+ }
+ catch (IOException ex) {
+ if (written == 0) {
+ throw ex;
+ } else {
+ return written;
+ }
+ }
+
+ if (bytes == -1) {
+
+ if (written == 0) {
+ return -1;
+ } else {
+ return written;
+ }
+ }
+
+ written += bytes;
+
+ if (bytes != size) {
+ return written;
+ }
+ }
+
+ }
+
+ return written;
+ }
+
+ private int bbwrite(OutputStream out, ByteBuffer buffer) throws IOException {
+ byte[] data = buffer.array();
+ int off = buffer.arrayOffset();
+
+ int start = buffer.position();
+ int len = buffer.remaining();
+
+ try {
+ out.write(data, off + start, len);
+ buffer.position(start + len);
+ return len;
+
+ }
+ catch (InterruptedIOException ex) {
+ buffer.position(start + ex.bytesTransferred);
+ return ex.bytesTransferred;
+
+ }
+ catch (EOFException ex) {
+ return -1;
+ }
+ }
+
+ public boolean readFrom(Socket sock) throws IOException
+ {
+ SocketChannel ch = sock.getChannel();
+ if (ch == null) {
+ return readFrom(sock.getInputStream());
+ } else {
+ return readFrom(ch);
+ }
+ }
+
+ public boolean readFrom(InputStream inputStream) throws IOException {
+
+ ByteBuffer[] buffers = exposePutSpace();
+ int count;
+
+ long before = System.currentTimeMillis();
+ System.out.println("" + new Date() + " will read");
+
+ int bufsize = 0;
+ try {
+ if (buffers.length == 1) {
+ bufsize = buffers[0].remaining();
+ count = bbread(inputStream, buffers[0]);
+ } else {
+ bufsize = buffers[0].remaining() + buffers[1].remaining();
+ count = bbread(inputStream, buffers);
+ }
+ }
+ catch (SocketException ex) {
+ count = -1;
+ }
+
+ long after = System.currentTimeMillis();
+ System.out.println("" + new Date() + " did read " + count
+ + " bytes OF " + bufsize + " TOOK " + (after - before) + " ms");
+
+ if (count == -1) {
+ closePutEnd();
+ return true;
+ } else if (count == 0) {
+ return false;
+ } else {
+ putSkip(count);
+ return true;
+ }
+
+ }
+
+ public boolean readFrom(ScatteringByteChannel chan) throws IOException
+ {
+ ByteBuffer[] buffers = exposePutSpace();
+ int count;
+
+ long before = System.currentTimeMillis();
+ System.out.println("" + new Date() + " will read");
+
+ int bufsize = 0;
+ try {
+ if (buffers.length == 1) {
+ bufsize = buffers[0].remaining();
+ count = chan.read(buffers[0]);
+ } else {
+ bufsize = buffers[0].remaining() + buffers[1].remaining();
+ count = (int) chan.read(buffers);
+ }
+ }
+ catch (ClosedChannelException ex) {
+ count = -1;
+ }
+
+ long after = System.currentTimeMillis();
+ System.out.println("" + new Date() + " did read " + count
+ + " bytes OF " + bufsize + " TOOK " + (after - before) + " ms");
+
+ if (count == -1) {
+ closePutEnd();
+ return true;
+ } else if (count == 0) {
+ return false;
+ } else {
+ putSkip(count);
+ return true;
+ }
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+}
Added: geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/SocketTransportBase.java
URL: http://svn.apache.org/viewcvs/geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/SocketTransportBase.java?rev=329036&view=auto
==============================================================================
--- geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/SocketTransportBase.java (added)
+++ geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/SocketTransportBase.java Thu Oct 27 19:00:06 2005
@@ -0,0 +1,303 @@
+/**
+ *
+ * Copyright 2005 The Apache Software Foundation or its licensors, as applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.corba.channel;
+
+import java.io.IOException;
+import java.net.Socket;
+
+import EDU.oswego.cs.dl.util.concurrent.Mutex;
+import EDU.oswego.cs.dl.util.concurrent.Semaphore;
+import EDU.oswego.cs.dl.util.concurrent.Sync;
+
+import org.apache.geronimo.corba.channel.nio.ParticipationExecutor;
+
+
+public abstract class SocketTransportBase extends Transport {
+
+ static protected final int RCV_BUFFER_SIZE = getIntProperty("org.freeorb.rcv_buffer_size", 64 * 1024);
+
+ static protected final int SND_BUFFER_SIZE = getIntProperty("org.freeorb.snd_buffer_size", 64 * 1024);
+
+ protected InputHandler handler;
+
+ protected Thread inputWorker;
+
+ protected Sync inputWorkerLock = new Mutex();
+
+ protected RingByteBuffer receiveBuffer;
+
+ protected RingByteBuffer sendBuffer;
+
+ protected Semaphore outputWorkerLock = new Semaphore(1);
+
+ protected Thread outputWorker;
+
+ protected TransportManager manager;
+
+ private ParticipationExecutor executor;
+
+ protected Socket sock;
+
+ protected SocketTransportBase(TransportManager manager, InputHandler handler, Socket sock) {
+ this.manager = manager;
+ this.handler = handler;
+ this.executor = new ParticipationExecutor(manager.getExecutor());
+ this.sock = sock;
+
+ this.receiveBuffer = allocateReceiveBuffer(RCV_BUFFER_SIZE);
+ this.sendBuffer = allocateSendBuffer(SND_BUFFER_SIZE);
+ }
+
+
+ protected abstract RingByteBuffer allocateSendBuffer(int bufferSize);
+
+ protected abstract RingByteBuffer allocateReceiveBuffer(int bufferSize);
+
+
+ private static int getIntProperty(String string, int defaultValue) {
+ try {
+ return Integer.parseInt(System.getProperty(string, ""));
+ }
+ catch (NumberFormatException ex) {
+ return defaultValue;
+ }
+ }
+
+ public void releaseOutputChannel() {
+ if (outputWorker == Thread.currentThread()) {
+
+ try {
+ sendBuffer.flush();
+ }
+ catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ outputWorker = null;
+ outputWorkerLock.release();
+ }
+ }
+
+ /**
+ * wait for the output channel to become available
+ */
+ public OutputChannel getOutputChannel() {
+
+ do {
+ try {
+ outputWorkerLock.acquire();
+ }
+ catch (InterruptedException e) {
+ continue;
+ }
+ }
+ while (false);
+
+ assertEquals(outputWorker, null);
+
+ outputWorker = Thread.currentThread();
+ return sendBuffer.getOutputChannel();
+ }
+
+ public InputChannel getInputChannel() {
+ LOOP:
+ do {
+ try {
+ inputWorkerLock.acquire();
+ }
+ catch (InterruptedException e) {
+ continue LOOP;
+ }
+ }
+ while (false);
+
+ try {
+
+ if (inputWorker == null) {
+ inputWorker = Thread.currentThread();
+
+ } else if (inputWorker != Thread.currentThread()) {
+ throw new IllegalStateException(
+ "only the designated input worker can do that");
+ }
+
+ }
+ finally {
+ inputWorkerLock.release();
+ }
+
+ return receiveBuffer.getInputChannel();
+ }
+
+ /**
+ * this runnable is started when input is available
+ */
+ protected final Runnable processInput = new Runnable() {
+ public void run() {
+
+ assertEquals(inputWorker, null);
+
+ inputWorker = Thread.currentThread();
+ try {
+ inputWorkerLock.release();
+ handler.inputAvailable(SocketTransportBase.this);
+ }
+ catch (Error e) {
+ e.printStackTrace();
+ }
+ catch (RuntimeException e) {
+ e.printStackTrace();
+ }
+ finally {
+ releaseOutputChannel();
+ unsetInputWorker();
+ }
+ }
+ };
+
+ /**
+ * to be called when something is added to the input buffer
+ */
+ protected void processAvailableInput() throws InterruptedException {
+ inputWorkerLock.acquire();
+
+ // is there someone processing input?
+ // if not, then we need to start a new
+ // input processor
+
+ if (inputWorker == null && !receiveBuffer.isEmpty()
+ && handler != null)
+ {
+ executor.execute(processInput);
+ } else {
+ inputWorkerLock.release();
+ }
+ }
+
+
+ public void releaseInputChannel() {
+ unsetInputWorker();
+ }
+
+ void unsetInputWorker() {
+
+ Thread.interrupted();
+
+ do {
+ try {
+ inputWorkerLock.acquire();
+ }
+ catch (InterruptedException e) {
+ continue;
+ }
+ }
+ while (false);
+
+ if (inputWorker == Thread.currentThread()) {
+ inputWorker = null;
+ if (!receiveBuffer.isEmpty() && handler != null) {
+ // we're done with this request, but there
+ // is a new request (partially) available
+
+ do {
+ try {
+ executor.execute(processInput);
+ }
+ catch (InterruptedException e) {
+ continue;
+ }
+ }
+ while (false);
+ } else {
+ // we're done with this request and there is
+ // no more input
+ inputWorkerLock.release();
+ }
+ } else {
+ // response was given to another thread via signalResponse
+ inputWorkerLock.release();
+ }
+
+ }
+
+ void registerResponse(Object key) {
+
+ }
+
+ public Object waitForResponse(Object key) {
+
+ do {
+ try {
+ inputWorkerLock.acquire();
+ }
+ catch (InterruptedException e) {
+ continue;
+ }
+ }
+ while (false);
+
+ if (inputWorker == Thread.currentThread()) {
+ inputWorker = null;
+ }
+ inputWorkerLock.release();
+
+ Object value = executor.participate(key);
+
+ inputWorker = Thread.currentThread();
+ inputWorkerLock.release(); // {22}
+
+ return value;
+ }
+
+ public void signalResponse(Object key, Object value) {
+ assertEquals(inputWorker, Thread.currentThread());
+
+ // this lock is released at {22}, when the
+ // relevant participant reaquires control
+ do {
+ try {
+ inputWorkerLock.acquire();
+ }
+ catch (InterruptedException e) {
+ continue;
+ }
+ }
+ while (false);
+
+ inputWorker = null;
+ executor.release(key, value);
+ }
+
+
+ public void setInputHandler(InputHandler handler) {
+ this.handler = handler;
+ }
+
+
+ private void assertEquals(Object o1, Object o2) {
+ if (o1 != o2) {
+ throw new IllegalStateException("assertion failed");
+ }
+ }
+
+
+ public void close() throws IOException {
+ sock.close();
+ }
+
+
+}
Added: geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/Transport.java
URL: http://svn.apache.org/viewcvs/geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/Transport.java?rev=329036&view=auto
==============================================================================
--- geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/Transport.java (added)
+++ geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/Transport.java Thu Oct 27 19:00:06 2005
@@ -0,0 +1,53 @@
+/**
+ *
+ * Copyright 2005 The Apache Software Foundation or its licensors, as applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.corba.channel;
+
+import java.io.IOException;
+
+
+/**
+ * Class Transport is the central abstraction in this I/O framework.
+ * <p/>
+ * A <code>Transport</code> has two channels that it can make available,
+ * an input and an output channel. These channels are considered tokens
+ * or capabilities that can be handed only to one thread at a time. Channels
+ * are explicitly relinquished (by calling Channel.relinquish) in order to
+ * return them to the transport, so that other threads may use it.
+ */
+
+public abstract class Transport {
+
+ /**
+ * Get the OutputChannel for this transport.
+ * <p/>
+ * This operation may block if some other thread is currently using the
+ * output channel, and that other thread has not yet released the channel
+ * by calling Channel.relinquish.
+ */
+ abstract public OutputChannel getOutputChannel();
+
+ abstract public InputChannel getInputChannel();
+
+ abstract public Object waitForResponse(Object key);
+
+ public abstract void signalResponse(Object key, Object userData);
+
+ public abstract void close() throws IOException;
+
+ public abstract void setInputHandler(InputHandler handler);
+
+}
Added: geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/TransportManager.java
URL: http://svn.apache.org/viewcvs/geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/TransportManager.java?rev=329036&view=auto
==============================================================================
--- geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/TransportManager.java (added)
+++ geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/TransportManager.java Thu Oct 27 19:00:06 2005
@@ -0,0 +1,34 @@
+/**
+ *
+ * Copyright 2005 The Apache Software Foundation or its licensors, as applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.corba.channel;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+
+import EDU.oswego.cs.dl.util.concurrent.Executor;
+
+
+public interface TransportManager {
+
+ Transport createTransport(SocketAddress addr, InputHandler handler) throws IOException;
+
+ void start() throws InterruptedException;
+
+ void shutdown() throws IOException;
+
+ Executor getExecutor();
+}
Added: geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/classic/ClassicTransportManager.java
URL: http://svn.apache.org/viewcvs/geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/classic/ClassicTransportManager.java?rev=329036&view=auto
==============================================================================
--- geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/classic/ClassicTransportManager.java (added)
+++ geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/classic/ClassicTransportManager.java Thu Oct 27 19:00:06 2005
@@ -0,0 +1,63 @@
+/**
+ *
+ * Copyright 2005 The Apache Software Foundation or its licensors, as applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.corba.channel.classic;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketAddress;
+
+import EDU.oswego.cs.dl.util.concurrent.Executor;
+
+import org.apache.geronimo.corba.channel.InputHandler;
+import org.apache.geronimo.corba.channel.Transport;
+import org.apache.geronimo.corba.channel.TransportManager;
+
+
+public class ClassicTransportManager implements TransportManager {
+
+ private Executor executor;
+
+ public ClassicTransportManager(Executor executor) {
+ this.executor = executor;
+ }
+
+ public Transport createTransport(SocketAddress addr, InputHandler handler)
+ throws IOException
+ {
+
+
+ Socket sock = new Socket();
+ sock.connect(addr);
+
+ return new SyncClassicTransport(this, sock, handler);
+ }
+
+ public void start() throws InterruptedException {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void shutdown() throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ public Executor getExecutor() {
+ return executor;
+ }
+
+}
Added: geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/classic/SyncClassicTransport.java
URL: http://svn.apache.org/viewcvs/geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/classic/SyncClassicTransport.java?rev=329036&view=auto
==============================================================================
--- geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/classic/SyncClassicTransport.java (added)
+++ geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/classic/SyncClassicTransport.java Thu Oct 27 19:00:06 2005
@@ -0,0 +1,107 @@
+/**
+ *
+ * Copyright 2005 The Apache Software Foundation or its licensors, as applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.corba.channel.classic;
+
+import java.io.IOException;
+import java.net.Socket;
+
+import org.apache.geronimo.corba.channel.InputHandler;
+import org.apache.geronimo.corba.channel.RingByteBuffer;
+import org.apache.geronimo.corba.channel.SocketTransportBase;
+
+
+public class SyncClassicTransport extends SocketTransportBase {
+
+ public SyncClassicTransport(ClassicTransportManager manager, Socket sock,
+ InputHandler handler)
+ {
+ super(manager, handler, sock);
+ }
+
+ public Socket socket() {
+ return sock;
+ }
+
+
+ protected RingByteBuffer allocateSendBuffer(int bufferSize) {
+
+ return new RingByteBuffer(bufferSize, false) {
+
+ protected void bufferFullHook(String how) throws IOException {
+ if (!socket().isOutputShutdown()) {
+ writeTo(sock);
+ }
+ }
+
+ protected void bufferEmptyHook(String how) {
+ // stopSenderThread();
+ }
+
+ /**
+ * the send buffer was closed(), and we have send everything
+ */
+ protected void readEOFHook() {
+ // do nothing //
+ try {
+ sock.shutdownOutput();
+ }
+ catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ protected void relinquishInput() {
+ throw new InternalError();
+ }
+
+ protected void relinquishOutput() {
+ releaseOutputChannel();
+ }
+
+ };
+ }
+
+ protected RingByteBuffer allocateReceiveBuffer(int bufferSize) {
+ return new RingByteBuffer( bufferSize, false) {
+
+ protected void bufferFullHook(String how) {
+
+ }
+
+ protected void bufferEmptyHook(String how) throws IOException {
+ if (!isClosedForPut()) {
+ readFrom(sock);
+ }
+ }
+
+ protected void readEOFHook() {
+ // the client just read the EOF marker //
+ }
+
+ protected void relinquishInput() {
+ releaseInputChannel();
+ }
+
+ protected void relinquishOutput() {
+ throw new InternalError();
+ }
+
+ };
+ }
+
+}
Added: geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/nio/AsyncNIOSocketTransport.java
URL: http://svn.apache.org/viewcvs/geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/nio/AsyncNIOSocketTransport.java?rev=329036&view=auto
==============================================================================
--- geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/nio/AsyncNIOSocketTransport.java (added)
+++ geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/nio/AsyncNIOSocketTransport.java Thu Oct 27 19:00:06 2005
@@ -0,0 +1,202 @@
+/**
+ *
+ * Copyright 2005 The Apache Software Foundation or its licensors, as applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.corba.channel.nio;
+
+import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+import org.apache.geronimo.corba.channel.InputHandler;
+import org.apache.geronimo.corba.channel.RingByteBuffer;
+import org.apache.geronimo.corba.channel.SocketTransportBase;
+
+
+public class AsyncNIOSocketTransport extends SocketTransportBase implements
+ SelectionListener
+{
+
+ public final SocketChannel chan;
+
+ AsyncNIOSocketTransport(AsyncNIOTransportManager manager,
+ final SocketChannel chan, InputHandler handler)
+ {
+
+ super(manager, handler, chan.socket());
+
+ this.chan = chan;
+ }
+
+ protected RingByteBuffer allocateSendBuffer(int bufferSize) {
+
+ return new RingByteBuffer(bufferSize, true) {
+
+ public String getName() {
+ return "send buffer for " + sock.toString();
+ }
+
+ protected void bufferFullHook(String how) {
+ if (!chan.socket().isOutputShutdown()) {
+ addInterest(SelectionKey.OP_WRITE, "output buffer full : "
+ + how);
+ }
+ }
+
+ protected void bufferEmptyHook(String how) {
+ removeInterest(SelectionKey.OP_WRITE, "send buffer empty : "
+ + how);
+ }
+
+ /**
+ * the send buffer was closed(), and we have send everything
+ */
+ protected void readEOFHook() {
+ // do nothing //
+ try {
+ chan.socket().shutdownOutput();
+ }
+ catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ protected void relinquishInput() {
+ throw new InternalError();
+ }
+
+ protected void relinquishOutput() {
+ releaseOutputChannel();
+ }
+
+ };
+ }
+
+
+ protected RingByteBuffer allocateReceiveBuffer(int bufferSize) {
+ return new RingByteBuffer(bufferSize, true) {
+
+ public String getName() {
+ return "receive buffer for " + sock.toString();
+ }
+
+ protected void bufferFullHook(String how) {
+ removeInterest(SelectionKey.OP_READ, "receive buffer full : "
+ + how);
+ }
+
+ protected void bufferEmptyHook(String how) {
+ if (!isClosedForPut()) {
+ addInterest(SelectionKey.OP_READ, "input buffer empty : "
+ + how);
+ }
+ }
+
+ protected void readEOFHook() {
+ // the client just read the EOF marker //
+ }
+
+ protected void relinquishInput() {
+ releaseInputChannel();
+ }
+
+ protected void relinquishOutput() {
+ throw new InternalError();
+ }
+
+ };
+ }
+
+
+ AsyncNIOTransportManager getNIOManager() {
+ return (AsyncNIOTransportManager) manager;
+ }
+
+ protected void removeInterest(int interest, String why) {
+ getNIOManager().removeInterest(this, interest, why);
+ }
+
+ protected void addInterest(int interest, String why) {
+ getNIOManager().addInterest(this, interest, why);
+ }
+
+ public void canAccept() {
+ }
+
+ public void canConnect() {
+ try {
+ chan.finishConnect();
+ removeInterest(SelectionKey.OP_CONNECT, "connected");
+ addInterest(SelectionKey.OP_READ, "can connect");
+ }
+ catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ public void canRead() {
+
+ try {
+ if (receiveBuffer.readFrom(chan)) {
+
+ if (receiveBuffer.isClosedForPut()) {
+ removeInterest(SelectionKey.OP_READ, "reached eof");
+ }
+
+ processAvailableInput();
+ }
+
+ }
+ catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+
+ public void canWrite() {
+
+ try {
+ sendBuffer.writeTo(chan);
+
+ if (sendBuffer.isClosed() && sendBuffer.isEmpty()) {
+ removeInterest(SelectionKey.OP_WRITE, "output closed");
+ }
+
+ }
+ catch (IOException e) {
+ removeInterest(SelectionKey.OP_WRITE, "write failed");
+ }
+ }
+
+ public void channelClosed(ClosedChannelException e) {
+ }
+
+ public SocketChannel channel() {
+ return chan;
+ }
+
+ public void close() throws IOException {
+ channel().close();
+ }
+
+}
Added: geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/nio/AsyncNIOTransportManager.java
URL: http://svn.apache.org/viewcvs/geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/nio/AsyncNIOTransportManager.java?rev=329036&view=auto
==============================================================================
--- geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/nio/AsyncNIOTransportManager.java (added)
+++ geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/nio/AsyncNIOTransportManager.java Thu Oct 27 19:00:06 2005
@@ -0,0 +1,94 @@
+/**
+ *
+ * Copyright 2005 The Apache Software Foundation or its licensors, as applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.corba.channel.nio;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.spi.SelectorProvider;
+
+import EDU.oswego.cs.dl.util.concurrent.Executor;
+
+import org.apache.geronimo.corba.channel.InputHandler;
+import org.apache.geronimo.corba.channel.Transport;
+import org.apache.geronimo.corba.channel.TransportManager;
+
+
+public class AsyncNIOTransportManager implements TransportManager {
+
+ private NIOSelector selector;
+
+ private final SelectorProvider provider;
+
+ private final Executor executor;
+
+ AsyncNIOTransportManager(Executor executor) throws IOException {
+ this(executor, SelectorProvider.provider());
+ }
+
+ AsyncNIOTransportManager(Executor executor, SelectorProvider provider)
+ throws IOException
+ {
+ this.executor = executor;
+ this.provider = provider;
+ this.selector = new NIOSelector(this, provider.openSelector());
+ }
+
+ public synchronized void start() throws InterruptedException {
+ executor.execute(selector);
+ }
+
+ public void shutdown() throws IOException {
+ if (selector.isRunning()) {
+ selector.shutdown();
+ }
+ }
+
+ public Transport createTransport(SocketAddress addr, InputHandler handler) throws IOException {
+
+ SocketChannel ch = provider.openSocketChannel();
+ ch.configureBlocking(false);
+ ch.connect(addr);
+
+ AsyncNIOSocketTransport result = new AsyncNIOSocketTransport(this, ch, handler);
+
+ NIOSelector sel = getSelector();
+
+ sel.register(ch, result);
+ sel.addInterest(ch, SelectionKey.OP_CONNECT, "initial");
+
+ return result;
+ }
+
+ NIOSelector getSelector() {
+ return selector;
+ }
+
+ public Executor getExecutor() {
+ return executor;
+ }
+
+ public void removeInterest(AsyncNIOSocketTransport transport, int interest, String why) {
+ selector.removeInterest(transport.channel(), interest, why);
+ }
+
+ public void addInterest(AsyncNIOSocketTransport transport, int interest, String why) {
+ selector.addInterest(transport.channel(), interest, why);
+ }
+
+}
Added: geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/nio/NIOSelector.java
URL: http://svn.apache.org/viewcvs/geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/nio/NIOSelector.java?rev=329036&view=auto
==============================================================================
--- geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/nio/NIOSelector.java (added)
+++ geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/nio/NIOSelector.java Thu Oct 27 19:00:06 2005
@@ -0,0 +1,402 @@
+/**
+ *
+ * Copyright 2005 The Apache Software Foundation or its licensors, as applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.corba.channel.nio;
+
+import java.io.IOException;
+import java.nio.channels.CancelledKeyException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.ClosedSelectorException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+
+public class NIOSelector implements Runnable {
+
+ final Selector sel;
+
+ Thread selectorThread = null;
+
+ List regs = new ArrayList();
+
+ private AsyncNIOTransportManager manager;
+
+ public NIOSelector(AsyncNIOTransportManager manager, Selector selector) {
+ this.sel = selector;
+ this.manager = manager;
+ }
+
+ public void wakeup() {
+ if (Thread.currentThread() == selectorThread) {
+ // do nothing //
+ } else {
+ if (sel != null) {
+ sel.wakeup();
+ }
+ }
+ }
+
+ public void removeInterest(SocketChannel ch, int interest, String why) {
+ Selector sel = this.sel;
+ if (sel == null) {
+ return;
+ }
+ if (Thread.currentThread() == selectorThread && regs.isEmpty()) {
+ SelectionKey sk = ch.keyFor(sel);
+ sk.interestOps(sk.interestOps() & ~interest);
+
+ System.out.println("FAST REMOVE " + interest(interest) + " : "
+ + why);
+
+ return;
+ }
+
+ RemoveInterest reg = new RemoveInterest(ch, interest);
+
+ synchronized (regs) {
+ regs.add(reg);
+ }
+
+ sel.wakeup();
+ }
+
+ public void addInterest(SocketChannel ch, int interest, String why) {
+
+ if (Thread.currentThread() == selectorThread && regs.isEmpty()) {
+ SelectionKey sk = ch.keyFor(sel);
+ sk.interestOps(sk.interestOps() | interest);
+
+ System.out.println("FAST ADD " + interest(interest) + " : " + why);
+
+ return;
+ }
+
+ AddInterest reg = new AddInterest(ch, interest, why);
+
+ addCommand(reg);
+ }
+
+ private void addCommand(Command reg) {
+ synchronized (regs) {
+ regs.add(reg);
+ }
+
+ sel.wakeup();
+ }
+
+ public void register(SocketChannel ch, SelectionListener listener)
+ throws ClosedChannelException
+ {
+
+ Registration reg = new Registration(ch, listener);
+
+ addCommand(reg);
+ }
+
+ private String interest(int interest) {
+ if (interest == 0) {
+ return "NONE";
+ }
+ StringBuffer sb = new StringBuffer();
+ if ((interest & SelectionKey.OP_ACCEPT) != 0) {
+ sb.append("OP_ACCEPT ");
+ }
+ if ((interest & SelectionKey.OP_CONNECT) != 0) {
+ sb.append("OP_CONNECT ");
+ }
+ if ((interest & SelectionKey.OP_READ) != 0) {
+ sb.append("OP_READ ");
+ }
+ if ((interest & SelectionKey.OP_WRITE) != 0) {
+ sb.append("OP_WRITE ");
+ }
+ return sb.toString();
+ }
+
+ abstract static class Command {
+
+ abstract void exec();
+ }
+
+ class AddInterest extends Command {
+
+ SelectableChannel chan;
+
+ int interest;
+
+ private final String why;
+
+ AddInterest(SelectableChannel chan, int interest, String why) {
+ this.chan = chan;
+ this.interest = interest;
+ this.why = why;
+ }
+
+ void exec() {
+
+ System.out.println("add interest : " + interest(interest) + " : "
+ + why);
+
+ SelectionKey key = chan.keyFor(sel);
+
+ if (key != null) {
+ key.interestOps(interest | key.interestOps());
+
+ System.out.println("interest is: "
+ + interest(key.interestOps()));
+ }
+ }
+
+ }
+
+ class RemoveInterest extends Command {
+
+ SelectableChannel chan;
+
+ int interest;
+
+ RemoveInterest(SelectableChannel chan, int interest) {
+ this.chan = chan;
+ this.interest = interest;
+ }
+
+ void exec() {
+ System.out.println("remove interest : " + interest(interest));
+ SelectionKey key = chan.keyFor(sel);
+ if (key != null) {
+ key.interestOps(key.interestOps() & ~interest);
+
+ System.out.println("interest is: "
+ + interest(key.interestOps()));
+ }
+ }
+
+ }
+
+ class Registration extends Command {
+
+ private final SocketChannel ch;
+
+ private final SelectionListener listener;
+
+ public Registration(SocketChannel ch, SelectionListener result) {
+ this.ch = ch;
+ this.listener = result;
+ }
+
+ void exec() {
+ try {
+ SelectionKey key = ch.keyFor(sel);
+ if (key == null) {
+ if (ch.isRegistered()) {
+ // TODO: problem //
+ } else {
+ key = ch.register(sel, 0, listener);
+ }
+ } else {
+ if (listener != key.attachment()) {
+ System.out.println("wrong attachment");
+ }
+ }
+
+ }
+ catch (ClosedChannelException e) {
+ listener.channelClosed(e);
+ }
+ }
+ }
+
+ public void run() {
+
+ try {
+ run0();
+ System.out.println("DONE.");
+ }
+ catch (Error e) {
+ e.printStackTrace();
+ }
+ catch (RuntimeException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private Command[] reg = new Command[0];
+
+ public void run0() {
+
+ this.selectorThread = Thread.currentThread();
+
+ SelectionKey[] sks = new SelectionKey[0];
+
+ if (doCommands()) {
+ if (!sel.isOpen()) {
+ return;
+ }
+ }
+
+ while (true) {
+
+ Selector sel = this.sel;
+ if (sel == null) {
+ return;
+ }
+
+ try {
+ // print(sel);
+ sel.select(100);
+ }
+ catch (ClosedSelectorException ex) {
+ return;
+ }
+ catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ if (sel == null) {
+ return;
+ }
+
+ if (doCommands()) {
+
+ if (!sel.isOpen()) {
+ return;
+ }
+
+ Set selectedKeys = sel.selectedKeys();
+ if (selectedKeys != null) {
+ selectedKeys.clear();
+ }
+ continue;
+ }
+
+ Set selectedKeys = sel.selectedKeys();
+ int count = selectedKeys.size();
+ if (count != 0) {
+ sks = (SelectionKey[]) selectedKeys.toArray(sks);
+ selectedKeys.clear();
+
+ for (int i = 0; i < count; i++) {
+ SelectionListener listener = (SelectionListener) sks[i]
+ .attachment();
+
+ System.out.println("doing " + interest(sks[i].readyOps())
+ + " on " + sks[i].channel());
+
+ SelectableChannel ch = sks[i].channel();
+ if (!ch.isOpen()) {
+ listener.channelClosed(null);
+ sks[i].cancel();
+
+ } else {
+
+ try {
+
+ if (sks[i].isAcceptable()) {
+ listener.canAccept();
+ }
+
+ if (sks[i].isConnectable()) {
+ listener.canConnect();
+ }
+
+ if (sks[i].isReadable()) {
+ listener.canRead();
+ }
+
+ if (sks[i].isWritable()) {
+ listener.canWrite();
+ }
+
+ }
+ catch (CancelledKeyException e) {
+ // ignore //
+ }
+ }
+
+ }
+
+ }
+
+ }
+
+ }
+
+ private void print(Selector sel) {
+
+ if (sel == null) {
+ return;
+ }
+ Set s = sel.keys();
+ SelectionKey[] sk = (SelectionKey[]) s.toArray(new SelectionKey[s
+ .size()]);
+ for (int i = 0; i < sk.length; i++) {
+ System.out.println(sk[i].channel() + " "
+ + interest(sk[i].interestOps()));
+ }
+ System.out.println("===");
+ }
+
+ private boolean doCommands() {
+ int size = 0;
+ synchronized (regs) {
+ size = regs.size();
+
+ if (size == 0) {
+ return false;
+ }
+
+ reg = (Command[]) regs.toArray(reg);
+ regs.clear();
+
+ }
+
+ for (int i = 0; i < size; i++) {
+ Command r = reg[i];
+ r.exec();
+ }
+
+ return true;
+ }
+
+ public boolean isRunning() {
+ if (selectorThread == null) {
+ return false;
+ } else {
+ return sel.isOpen();
+ }
+ }
+
+ public void shutdown() throws IOException {
+ addCommand(new Command() {
+
+ void exec() {
+ try {
+ sel.close();
+ }
+ catch (IOException e) {
+ }
+ }
+
+ });
+ }
+
+}
Added: geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/nio/ParticipationExecutor.java
URL: http://svn.apache.org/viewcvs/geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/nio/ParticipationExecutor.java?rev=329036&view=auto
==============================================================================
--- geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/nio/ParticipationExecutor.java (added)
+++ geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/nio/ParticipationExecutor.java Thu Oct 27 19:00:06 2005
@@ -0,0 +1,125 @@
+/**
+ *
+ * Copyright 2005 The Apache Software Foundation or its licensors, as applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.corba.channel.nio;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import EDU.oswego.cs.dl.util.concurrent.Executor;
+import EDU.oswego.cs.dl.util.concurrent.Mutex;
+import EDU.oswego.cs.dl.util.concurrent.ReentrantLock;
+import EDU.oswego.cs.dl.util.concurrent.Semaphore;
+import EDU.oswego.cs.dl.util.concurrent.Sync;
+import EDU.oswego.cs.dl.util.concurrent.SyncMap;
+
+
+public class ParticipationExecutor implements Executor {
+
+ private SyncMap map = new SyncMap(new HashMap(), new Mutex(),
+ new ReentrantLock());
+
+ private final Executor backing;
+
+ public ParticipationExecutor(Executor backing) {
+ this.backing = backing;
+ }
+
+ public void execute(Runnable arg0) throws InterruptedException {
+
+ Participation p = null;
+ if (!map.isEmpty()) {
+ Sync lock = map.writerSync();
+ lock.acquire();
+ try {
+ Set set = map.entrySet();
+ Iterator iter = set.iterator();
+ if (iter.hasNext()) {
+ Map.Entry ent = (Map.Entry) iter.next();
+ p = (Participation) ent.getValue();
+ iter.remove();
+ }
+ }
+ finally {
+ lock.release();
+ }
+ }
+
+ if (p == null) {
+ backing.execute(arg0);
+ } else {
+ p.task = arg0;
+ p.release();
+ }
+
+ }
+
+ static class Participation extends Semaphore {
+
+ Thread participant = Thread.currentThread();
+
+ public Participation() {
+ super(0);
+ }
+
+ Runnable task;
+ public Object value;
+ }
+
+ public Object participate(Object key) {
+
+ Participation p = new Participation();
+ map.put(key, p);
+
+ while (true) {
+ try {
+ p.acquire();
+ }
+ catch (InterruptedException e) {
+ continue;
+ }
+
+ if (p.task == null) {
+ return p.value;
+ } else {
+ try {
+ p.task.run();
+ }
+ catch (RuntimeException ex) {
+ ex.printStackTrace();
+ }
+ catch (Error ex) {
+ ex.printStackTrace();
+ }
+ finally {
+ p.task = null;
+ map.put(key, p);
+ }
+ }
+ }
+ }
+
+ public void release(Object key, Object value) {
+ Participation p = (Participation) map.remove(key);
+ if (p != null) {
+ p.value = value;
+ p.release();
+ }
+ }
+
+}
Added: geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/nio/SelectionListener.java
URL: http://svn.apache.org/viewcvs/geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/nio/SelectionListener.java?rev=329036&view=auto
==============================================================================
--- geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/nio/SelectionListener.java (added)
+++ geronimo/trunk/sandbox/freeorb/geronimo-orb/src/main/java/org/apache/geronimo/corba/channel/nio/SelectionListener.java Thu Oct 27 19:00:06 2005
@@ -0,0 +1,34 @@
+/**
+ *
+ * Copyright 2005 The Apache Software Foundation or its licensors, as applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.corba.channel.nio;
+
+import java.nio.channels.ClosedChannelException;
+
+
+public interface SelectionListener {
+
+ void canAccept();
+
+ void canConnect();
+
+ void canRead();
+
+ void canWrite();
+
+ void channelClosed(ClosedChannelException e);
+
+}