You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@river.apache.org by pe...@apache.org on 2014/05/18 10:02:56 UTC

svn commit: r1595569 [1/2] - in /river/jtsk/skunk/qa_refactor/trunk/src: com/sun/jini/jeri/internal/mux/ com/sun/jini/thread/ net/jini/discovery/ net/jini/jeri/ net/jini/jeri/connection/ net/jini/jeri/tcp/

Author: peter_firmstone
Date: Sun May 18 08:02:56 2014
New Revision: 1595569

URL: http://svn.apache.org/r1595569
Log:
Fix data race in StreamConnectionIO, caused by ByteBuffer's being shared between calling threads and mux writer thread, other minor improvements included.

Note this fix exposes latent data races in Outrigger and ServiceDiscoveryManager.

Added:
    river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/MuxInputStream.java
    river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/MuxOutputStream.java
Modified:
    river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/IOFuture.java
    river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/Mux.java
    river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/Session.java
    river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/SocketChannelConnectionIO.java
    river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/StreamConnectionIO.java
    river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/ThreadPool.java
    river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupDiscovery.java
    river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/LookupDiscovery.java
    river/jtsk/skunk/qa_refactor/trunk/src/net/jini/jeri/BasicObjectEndpoint.java
    river/jtsk/skunk/qa_refactor/trunk/src/net/jini/jeri/connection/ConnectionManager.java
    river/jtsk/skunk/qa_refactor/trunk/src/net/jini/jeri/tcp/TcpEndpoint.java

Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/IOFuture.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/IOFuture.java?rev=1595569&r1=1595568&r2=1595569&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/IOFuture.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/IOFuture.java Sun May 18 08:02:56 2014
@@ -30,6 +30,8 @@ import java.io.IOException;
 final class IOFuture {
 
     private boolean done = false;
+    boolean data = false;
+    private int position = -1;
     private IOException exception = null;
 
     IOFuture() { }
@@ -50,8 +52,9 @@ final class IOFuture {
      *
      * @throws	InterruptedException if the current thread was
      * interrupted while waiting for the I/O to complete.
+     * @return true if data remaining.
      */
-    synchronized void waitUntilDone()
+    synchronized boolean waitUntilDone()
 	throws IOException, InterruptedException
     {
 	while (!done) {
@@ -61,16 +64,34 @@ final class IOFuture {
 	    exception.fillInStackTrace();
 	    throw exception;
 	}
+        return data;
     }
-
+    
+    synchronized int getPosition(){
+        return position;
+    }
+    
     /**
      * Signals that this I/O operation has completed successfully.
      */
     synchronized void done() {
 	assert !done;
+        data = false;
 	done = true;
 	notifyAll();
     }
+    
+    /**
+     * Signals that this I/O operation has remaining data.
+     * @param position 
+     */
+    synchronized void done(int position){
+        assert !done;
+        done = true;
+        data = true;
+        this.position = position;
+        notifyAll();
+    }
 
     /**
      * Signals that this I/O operation has failed (with details of the

Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/Mux.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/Mux.java?rev=1595569&r1=1595568&r2=1595569&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/Mux.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/Mux.java Sun May 18 08:02:56 2014
@@ -33,10 +33,10 @@ import java.nio.charset.CharsetDecoder;
 import java.nio.charset.CharsetEncoder;
 import java.security.AccessController;
 import java.util.BitSet;
+import java.util.Deque;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Map;
-import java.util.Queue;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -89,7 +89,7 @@ abstract class Mux {
 	    new GetThreadPoolAction(false));
 
     /** session shutdown tasks to be executed asynchronously */
-    private static final Queue<Runnable> sessionShutdownQueue = new LinkedList<Runnable>();
+    private static final Deque<Runnable> sessionShutdownQueue = new LinkedList<Runnable>();
 
     private static class SessionShutdownTask implements Runnable {
 	private final Session[] sessions;
@@ -179,7 +179,7 @@ abstract class Mux {
 	this.connectionIO = new SocketChannelConnectionIO(this, channel);
 	directBuffersUseful = true;
     }
-
+    
     /**
      * Time in milliseconds for client-side connections to wait for the server
      * to acknowledge an opening handshake. The default value is 15000
@@ -206,28 +206,27 @@ abstract class Mux {
      * see uninitialized state.
      */
     public void start() throws IOException {
-        synchronized (readStateLock){
-            if (role == CLIENT) {
-                readState = READ_SERVER_CONNECTION_HEADER;
-            } else {
-                assert role == SERVER;
-                readState = READ_CLIENT_CONNECTION_HEADER;
-            }
+        if (role == CLIENT) {
+            readState = READ_SERVER_CONNECTION_HEADER;
+        } else {
+            assert role == SERVER;
+            readState = READ_CLIENT_CONNECTION_HEADER;
+        }
+
+        try {
+            connectionIO.start();
+        } catch (IOException e) {
+            setDown("I/O error starting connection", e);
+            throw e;
         }
-	try {
-	    connectionIO.start();
-	} catch (IOException e) {
-	    setDown("I/O error starting connection", e);
-	    throw e;
-	}
 
-	if (role == CLIENT) {
+        if (role == CLIENT) {
+            asyncSendClientConnectionHeader();
 	    synchronized (muxLock) {
-                asyncSendClientConnectionHeader();
 		long now = System.currentTimeMillis();
 		long endTime = now + this.startTimeout;
 		while (!muxDown && !clientConnectionReady) {
-		    if (now >= endTime) {
+		    if (now > endTime) {
 			setDown("timeout waiting for server to respond to handshake", null);
 		    } else {
                         try {
@@ -289,13 +288,14 @@ abstract class Mux {
      * This method MAY be invoked while synchronized on muxLock.
      */
     final void setDown(final String message, final Throwable cause) {
-        boolean needWorker = false;
 	synchronized (muxLock) {
 	    if (muxDown) return;
 	    muxDown = true;
 	    muxDownMessage = message;
 	    muxDownCause = cause;
 	    muxLock.notifyAll();
+	}
+
             /*
              * The following should be safe because we just left the
              * synchonized block, and after setting the muxDown latch
@@ -307,7 +307,7 @@ abstract class Mux {
              * because individual session locks must never be acquired
              * while holding muxLock.
              */
-
+	boolean needWorker = false;
             synchronized (sessionShutdownQueue) {
                 if (!sessions.isEmpty()) {
                     sessionShutdownQueue.add(new SessionShutdownTask(
@@ -319,7 +319,6 @@ abstract class Mux {
                     needWorker = !sessionShutdownQueue.isEmpty();
                 }
             }
-        }
 	if (needWorker) {
 	    try {
 		systemThreadPool.execute(new Runnable() {
@@ -328,7 +327,7 @@ abstract class Mux {
 			    Runnable task;
 			    synchronized (sessionShutdownQueue) {
 				if (sessionShutdownQueue.isEmpty()) break;
-				task = sessionShutdownQueue.remove();
+				task = sessionShutdownQueue.removeFirst();
 			    }
 			    task.run();
 			}
@@ -675,7 +674,8 @@ abstract class Mux {
      * The returned IOFuture object can be used to wait until the write has
      * definitely completed (or will definitely not complete due to some
      * failure).  After the write has completed, the buffer's position will
-     * have been incremented to its limit (which will not have changed).
+     * have been incremented to its limit (which will not have changed), the
+     * position may be obtained by calling @link{IOFuture#getPosition()}.
      */
     final IOFuture futureSendData(int op, int sessionID, ByteBuffer data) {
 	assert (op & 0xE1) == Data;	// verify operation code
@@ -704,7 +704,7 @@ abstract class Mux {
      * current read state lock and variables
      */
     private final Object readStateLock = new Object();
-    private int readState;
+    private volatile int readState;
     private int currentOp;
     private int currentSessionID;
     private int currentLengthRemaining;
@@ -719,27 +719,19 @@ abstract class Mux {
 	    do {
 		switch (readState) {
 		  case READ_CLIENT_CONNECTION_HEADER:
-		    if (!readClientConnectionHeader(buffer)) {
-			break stateLoop;
-		    }
+		    if (!readClientConnectionHeader(buffer)) break stateLoop;
 		    break;
 
 		  case READ_SERVER_CONNECTION_HEADER:
-		    if (!readServerConnectionHeader(buffer)) {
-			break stateLoop;
-		    }
+		    if (!readServerConnectionHeader(buffer)) break stateLoop;
 		    break;
 
 		  case READ_MESSAGE_HEADER:
-		    if (!readMessageHeader(buffer)) {
-			break stateLoop;
-		    }
+		    if (!readMessageHeader(buffer)) break stateLoop;
 		    break;
 
 		  case READ_MESSAGE_BODY:
-		    if (!readMessageBody(buffer)) {
-			break stateLoop;
-		    }
+		    if (!readMessageBody(buffer)) break stateLoop;
 		    break;
 
 		  default:
@@ -755,6 +747,7 @@ abstract class Mux {
 	throws ProtocolException
     {
 	assert role == SERVER;
+        assert Thread.holdsLock(readStateLock);
 
 	validatePartialMagicNumber(buffer);
 	if (buffer.remaining() < 8) {
@@ -794,7 +787,8 @@ abstract class Mux {
 	throws ProtocolException
     {
 	assert role == CLIENT;
-
+        assert Thread.holdsLock(readStateLock);
+        
 	validatePartialMagicNumber(buffer);
 
 	if (buffer.remaining() < 8) {
@@ -848,6 +842,7 @@ abstract class Mux {
     private boolean readMessageHeader(ByteBuffer buffer)
 	throws ProtocolException
     {
+        assert Thread.holdsLock(readStateLock);
 	if (buffer.remaining() < 4) {
 	    return false;		// wait for complete header to arrive
 	}
@@ -1012,6 +1007,7 @@ abstract class Mux {
     private boolean readMessageBody(ByteBuffer buffer)
 	throws ProtocolException
     {
+        assert Thread.holdsLock(readStateLock);
 	assert currentLengthRemaining > 0;
 	assert currentDataBuffer == null ||
 	    currentDataBuffer.remaining() == currentLengthRemaining;
@@ -1194,13 +1190,14 @@ abstract class Mux {
 	    data.get(bytes);
 	    data.reset();
 	    logger.log(Level.FINEST,
-		"Data: sessionID=" + sessionID +
-		(open ? ",open" : "") +
-		(close ? ",close" : "") +
-		(eof ? ",eof" : "") +
-		(ackRequired ? ",ackRequired" : "") +
-		",length=" + length +
-		(length > 0 ? ",data=\n" + encoder.encode(bytes) : ""));
+                    "Data: sessionID={0}{1}{2}{3}{4},length={5}{6}",
+                    new Object[]{sessionID,
+                        open ? ",open" : "",
+                        close ? ",close" : "",
+                        eof ? ",eof" : "",
+                        ackRequired ? ",ackRequired" : "",
+                        length, 
+                        length > 0 ? ",data=\n" + encoder.encode(bytes) : ""});
 	}
 
 	if (!eof && (close || ackRequired)) {

Added: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/MuxInputStream.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/MuxInputStream.java?rev=1595569&view=auto
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/MuxInputStream.java (added)
+++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/MuxInputStream.java Sun May 18 08:02:56 2014
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.sun.jini.jeri.internal.mux;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Deque;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
+/**
+ * Output stream returned by OutboundRequests and InboundRequests for
+ * a session of a multiplexed connection.
+ */
+class MuxInputStream extends InputStream {
+    private final Object sessionLock;
+    private final Session session;
+    private final Mux mux;
+    private final Deque<ByteBuffer> inBufQueue;
+    private IOException sessionDown = null;
+    private int inBufRemaining = 0;
+    private int inBufPos = 0;
+    private boolean inEOF = false;
+    private boolean inClosed = false;
+    private boolean sentAcknowledgment = false;
+
+    MuxInputStream(Mux mux, Session session, Object sessionLock) {
+        this.mux = mux;
+        this.session = session;
+        this.sessionLock = sessionLock;
+        this.inBufQueue = new ConcurrentLinkedDeque<ByteBuffer>();
+    }
+
+    void down(IOException e) {
+        sessionDown = e;
+    }
+
+    void appendToBufQueue(ByteBuffer data) {
+        inBufQueue.addLast(data);
+    }
+
+    @Override
+    public int read() throws IOException {
+        synchronized (sessionLock) {
+            if (inClosed) {
+                throw new IOException("stream closed");
+            }
+            while (inBufRemaining == 0 && sessionDown == null && session.getInState() <= Session.OPEN && !inClosed) {
+                if (session.getInState() == Session.IDLE) {
+                    assert session.getOutState() == Session.IDLE;
+                    mux.asyncSendData(Mux.Data | Mux.Data_open, session.sessionID, null);
+                    session.setOutState(Session.OPEN);
+                    session.setInState(Session.OPEN);
+                }
+                if (!session.inRationInfinite && session.getInRation() == 0) {
+                    int inc = mux.initialInboundRation;
+                    mux.asyncSendIncrementRation(session.sessionID, inc);
+                    session.setInRation(session.getInRation() + inc);
+                }
+                try {
+                    sessionLock.wait(); // REMIND: timeout?
+                } catch (InterruptedException e) {
+                    String message = "request I/O interrupted";
+                    session.setDown(message, e);
+                    throw wrap(message, e);
+                }
+            }
+            if (inClosed) {
+                throw new IOException("stream closed");
+            }
+            if (inBufRemaining == 0) {
+                if (inEOF) {
+                    return -1;
+                } else {
+                    if (session.getInState() == Session.TERMINATED) {
+                        throw new IOException("request aborted by remote endpoint");
+                    }
+                    assert sessionDown != null;
+                    throw sessionDown;
+                }
+            }
+            assert inBufQueue.size() > 0;
+            int result = -1;
+            while (result == -1) {
+                ByteBuffer buf = (ByteBuffer) inBufQueue.getFirst();
+                if (inBufPos < buf.limit()) {
+                    result = (buf.get() & 0xFF);
+                    inBufPos++;
+                    inBufRemaining--;
+                }
+                if (inBufPos == buf.limit()) {
+                    inBufQueue.removeFirst();
+                    inBufPos = 0;
+                }
+            }
+            if (!session.inRationInfinite) {
+                checkInboundRation();
+            }
+            return result;
+        }
+    }
+
+    private IOException wrap(String message, Exception e) {
+        Throwable t;
+        if (Session.traceSupression()) {
+            t = e;
+        } else {
+            t = e.fillInStackTrace();
+        }
+        return new IOException(message, t);
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (b == null) {
+            throw new NullPointerException();
+        } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
+            throw new IndexOutOfBoundsException();
+        }
+        synchronized (sessionLock) {
+            if (inClosed) {
+                throw new IOException("stream closed");
+            } else if (len == 0) {
+                /*
+                 * REMIND: What if
+                 *     - stream is at EOF?
+                 *     - session was aborted?
+                 */
+                return 0;
+            }
+            while (inBufRemaining == 0 && sessionDown == null && session.getInState() <= Session.OPEN && !inClosed) {
+                if (session.getInState() == Session.IDLE) {
+                    assert session.getOutState() == Session.IDLE;
+                    mux.asyncSendData(Mux.Data | Mux.Data_open, session.sessionID, null);
+                    session.setOutState(Session.OPEN);
+                    session.setInState(Session.OPEN);
+                }
+                if (!session.inRationInfinite && session.getInRation() == 0) {
+                    int inc = mux.initialInboundRation;
+                    mux.asyncSendIncrementRation(session.sessionID, inc);
+                    session.setInRation(session.getInRation() + inc);
+                }
+                try {
+                    sessionLock.wait(); // REMIND: timeout?
+                } catch (InterruptedException e) {
+                    String message = "request I/O interrupted";
+                    session.setDown(message, e);
+                    throw wrap(message, e);
+                }
+            }
+            if (inClosed) {
+                throw new IOException("stream closed");
+            }
+            if (inBufRemaining == 0) {
+                if (inEOF) {
+                    return -1;
+                } else {
+                    if (session.getInState() == Session.TERMINATED) {
+                        throw new IOException("request aborted by remote endpoint");
+                    }
+                    assert sessionDown != null;
+                    throw sessionDown;
+                }
+            }
+            assert inBufQueue.size() > 0;
+            int remaining = len;
+            while (remaining > 0 && inBufRemaining > 0) {
+                ByteBuffer buf = (ByteBuffer) inBufQueue.getFirst();
+                if (inBufPos < buf.limit()) {
+                    int toCopy = Math.min(buf.limit() - inBufPos, remaining);
+                    buf.get(b, off, toCopy);
+                    inBufPos += toCopy;
+                    inBufRemaining -= toCopy;
+                    off += toCopy;
+                    remaining -= toCopy;
+                }
+                if (inBufPos == buf.limit()) {
+                    inBufQueue.removeFirst();
+                    inBufPos = 0;
+                }
+            }
+            if (!session.inRationInfinite) {
+                checkInboundRation();
+            }
+            return len - remaining;
+        }
+    }
+
+    /**
+     * Sends ration increment, if read drained buffers below
+     * a certain mark.
+     *
+     * This method must NOT be invoked if the inbound ration in
+     * infinite, and it must ONLY be invoked while synchronized on
+     * this session's lock.
+     *
+     * REMIND: The implementation of this action will be a
+     * significant area for performance tuning.
+     */
+    private void checkInboundRation() {
+        assert Thread.holdsLock(sessionLock);
+        assert !session.inRationInfinite;
+        if (session.getInState() >= Session.FINISHED) {
+            return;
+        }
+        int mark = mux.initialInboundRation / 2;
+        int used = inBufRemaining + session.getInRation();
+        if (used <= mark) {
+            int inc = mux.initialInboundRation - used;
+            mux.asyncSendIncrementRation(session.sessionID, inc);
+            session.setInRation(session.getInRation() + inc);
+        }
+    }
+
+    @Override
+    public int available() throws IOException {
+        synchronized (sessionLock) {
+            if (inClosed) {
+                throw new IOException("stream closed");
+            }
+            /*
+             * REMIND: What if
+             *     - stream is at EOF?
+             *     - session was aborted?
+             */
+            return inBufRemaining;
+        }
+    }
+
+    @Override
+    public void close() {
+        synchronized (sessionLock) {
+            if (inClosed) {
+                return;
+            }
+            inClosed = true;
+            inBufQueue.clear(); // allow GC of unread data
+            if (session.role == Session.CLIENT && !sentAcknowledgment && session.isReceivedAckRequired() && session.getOutState() < Session.TERMINATED) {
+                mux.asyncSendAcknowledgment(session.sessionID);
+                sentAcknowledgment = true;
+                /*
+                 * If removing this session from the connection's
+                 * table was delayed in order to be able to send
+                 * an Acknowledgment message, then take care of
+                 * removing it now.
+                 */
+                if (session.isRemoveLater()) {
+                    session.setOutState(Session.TERMINATED);
+                    mux.removeSession(session.sessionID);
+                    session.setRemoveLater(false);
+                }
+            }
+            sessionLock.notifyAll();
+        }
+    }
+
+    /**
+     * @return the sentAcknowledgment
+     */
+    boolean isSentAcknowledgment() {
+        return sentAcknowledgment;
+    }
+
+    /**
+     * @return the inBufRemaining
+     */
+    int getBufRemaining() {
+        return inBufRemaining;
+    }
+
+    /**
+     * @return the inClosed
+     */
+    boolean isClosed() {
+        return inClosed;
+    }
+
+    /**
+     * @param inBufRemaining the inBufRemaining to set
+     */
+    void setBufRemaining(int inBufRemaining) {
+        this.inBufRemaining = inBufRemaining;
+    }
+
+    /**
+     * @param inEOF the inEOF to set
+     */
+    void setEOF(boolean inEOF) {
+        this.inEOF = inEOF;
+    }
+    
+}

Added: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/MuxOutputStream.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/MuxOutputStream.java?rev=1595569&view=auto
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/MuxOutputStream.java (added)
+++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/MuxOutputStream.java Sun May 18 08:02:56 2014
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.sun.jini.jeri.internal.mux;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+
+/**
+ * Output stream returned by OutboundRequests and InboundRequests for
+ * a session of a multiplexed connection.
+ */
+class MuxOutputStream extends OutputStream {
+    private final ByteBuffer buffer;
+    private final Object sessionLock;
+    private final Session session;
+    private final Mux mux;
+    private boolean fakeOKtoWrite = false; // REMIND
+    private IOException sessionDown = null;
+
+    MuxOutputStream(Mux mux, Session session, Object sessionLock) {
+        this.sessionLock = sessionLock;
+        this.session = session;
+        this.mux = mux;
+        this.buffer = mux.directBuffersUseful() 
+                ? ByteBuffer.allocateDirect(mux.maxFragmentSize) 
+                : ByteBuffer.allocate(mux.maxFragmentSize);
+    }
+
+    void abort() {
+        fakeOKtoWrite = false;
+    }
+
+    void handleClose() {
+        fakeOKtoWrite = true;
+    }
+
+    void down(IOException e) {
+        sessionDown = e;
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        if (!buffer.hasRemaining()) {
+            writeBuffer(false);
+        } else {
+            synchronized (sessionLock) {
+                // REMIND: necessary?
+                ensureOpen();
+            }
+        }
+        buffer.put((byte) b);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        if (b == null) {
+            throw new NullPointerException();
+        } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
+            throw new IndexOutOfBoundsException();
+        } else if (len == 0) {
+            synchronized (sessionLock) {
+                ensureOpen();
+            }
+            return;
+        }
+        while (len > 0) {
+            int avail = buffer.remaining();
+            if (len <= avail) {
+                synchronized (sessionLock) {
+                    ensureOpen();
+                }
+                buffer.put(b, off, len);
+                return;
+            }
+            buffer.put(b, off, avail);
+            off += avail;
+            len -= avail;
+            writeBuffer(false);
+        }
+    }
+
+    /** Flush method causes deadlock */
+//    @Override
+//    public void flush() throws IOException {
+//        synchronized (sessionLock) {
+//            ensureOpen();
+//        }
+//        while (buffer.hasRemaining()) {
+//            writeBuffer(false);
+//        }
+//    }
+
+    @Override
+    public void close() throws IOException {
+        if (Session.logger.isLoggable(Level.FINEST)) {
+            Session.logger.log(Level.FINEST, "STACK TRACE", new Throwable("STACK TRACE"));
+        }
+        synchronized (sessionLock) {
+            ensureOpen();
+        }
+        while (!writeBuffer(true)) {
+        }
+    }
+
+    /**
+     *
+     * This method must ONLY be invoked while synchronized on
+     * this session's lock.
+     */
+    private void ensureOpen() throws IOException {
+        assert Thread.holdsLock(sessionLock);
+        /*
+         * While we're faking that the session is still OK when it really
+         * isn't (see above comments), return silently from here.
+         */
+        if (fakeOKtoWrite) {
+            return;
+        }
+        int outState = session.getOutState();
+        if (outState > Session.OPEN) {
+            if (outState == Session.FINISHED) {
+                throw new IOException("stream closed");
+            } else {
+                throw new IOException("session terminated");
+            }
+        } else if (sessionDown != null) {
+            throw sessionDown;
+        }
+    }
+
+    /**
+     * Writes as much of the contents of this stream's output buffer
+     * as is allowed by the current output ration.  Upon normal return,
+     * at least one byte will have been transferred from the buffer to
+     * the multiplexed connection output queue, and the buffer will have
+     * been compacted, ready to be filled at the current position.
+     *
+     * Returns true if closeIfComplete and session was marked EOF (with
+     * complete buffer written); if true, stream's output buffer should
+     * no longer be accessed (because this method will not wait for
+     * actual writing of the message).
+     */
+    private boolean writeBuffer(boolean closeIfComplete) throws IOException {
+        boolean hasData;
+        int origLimit;
+        buffer.flip();
+        origLimit = buffer.limit();
+        int toSend;
+        IOFuture future = null;
+        boolean eofSent = false;
+        synchronized (sessionLock) {
+            while (buffer.remaining() > 0 
+                    && !session.outRationInfinite 
+                    && session.getOutRation() < 1 
+                    && sessionDown == null 
+                    && session.getOutState() == Session.OPEN) 
+            {
+                try {
+                    sessionLock.wait(); // REMIND: timeout?
+                } catch (InterruptedException e) {
+                    String message = "request I/O interrupted";
+                    session.setDown(message, e);
+                    throw new IOException(message, e);
+                }
+            }
+            ensureOpen();
+            assert buffer.remaining() == 0 || session.outRationInfinite || session.getOutRation() > 0 || fakeOKtoWrite;
+            /*
+             * If we're just faking that the session is OK when it really
+             * isn't, then we need to stop the writing from proceeding
+             * past this barrier-- and if a close was requested, then
+             * satisfy it right away.
+             */
+            if (fakeOKtoWrite) {
+                assert session.role == Session.CLIENT 
+                        && session.getInState() == Session.TERMINATED;
+                if (closeIfComplete) fakeOKtoWrite = false;
+                buffer.position(origLimit);
+                buffer.compact();
+                return closeIfComplete;
+            }
+            boolean complete;
+            if (session.outRationInfinite || buffer.remaining() <= session.getOutRation()) {
+                toSend = buffer.remaining();
+                complete = true;
+            } else {
+                toSend = session.getOutRation();
+                buffer.limit(toSend);
+                complete = false;
+            }
+            if (!session.outRationInfinite) {
+                session.setOutRation(session.getOutRation() - toSend);
+            }
+            session.setPartialDeliveryStatus(true);
+            boolean open = session.getOutState() == Session.IDLE;
+            boolean eof = closeIfComplete && complete;
+            boolean close = session.role == Session.SERVER && eof 
+                    && session.getInState() > Session.OPEN;
+            boolean ackRequired = session.role == Session.SERVER 
+                    && eof && session.ackListeners();
+            int op = Mux.Data | (open ? Mux.Data_open : 0) 
+                    | (eof ? Mux.Data_eof : 0) | (close ? Mux.Data_close : 0)
+                    | (ackRequired ? Mux.Data_ackRequired : 0);
+            /*
+             * If we are the server-side, send even the final Data message
+             * for this session synchronously with this method, so that the
+             * VM will not exit before it gets delivered.  Otherwise, let
+             * final Data messages (those with eof true) be sent after this
+             * method completes.
+             *
+             * Buffers are duplicated to avoid a data race that occurred in
+             * StreamConnectionIO.  IOFuture now provides the buffer's position
+             * after sending.
+             */
+            if (!eof || session.role == Session.SERVER) {
+                future = mux.futureSendData(op, session.sessionID, buffer.duplicate());
+            } else {
+                mux.asyncSendData(op, session.sessionID, buffer.duplicate());
+            }
+
+            if (session.getOutState() == Session.IDLE) {
+                session.setOutState(Session.OPEN);
+                session.setInState(Session.OPEN);
+            }
+            if (eof) {
+                eofSent = true;
+                session.setOutState(close ? Session.TERMINATED : Session.FINISHED);
+                if (ackRequired) {
+                    session.setSentAckRequired(true);
+                }
+                sessionLock.notifyAll();
+            }
+        }
+        if (future != null) {
+            /* StreamConnectionIO uses a dedicated thread for sending buffers, 
+             * but synchronization is only used to get buffers for processing,
+             * no locks are held while sending, for this reason the state of the
+             * buffers position must be obtained from IOFuture, previously, 
+             * reading buffer position depended on a data race.
+             */
+            hasData = waitForIO(future);
+            if (hasData) {
+                buffer.position(future.getPosition()).limit(origLimit);
+                buffer.compact();
+            } else {
+                buffer.clear();
+            }
+        } else {
+            buffer.clear();
+        }
+        return eofSent;
+    }
+
+    /**
+     *
+     * This method must NOT be invoked while synchronized on
+     * this session's lock.
+     */
+    private boolean waitForIO(IOFuture future) throws IOException {
+        assert !Thread.holdsLock(sessionLock);
+        try {
+            return future.waitUntilDone();
+        } catch (InterruptedException e) {
+            String message = "request I/O interrupted";
+            session.setDown(message, e);
+            throw new IOException(message, e);
+        } catch (IOException e) {
+            session.setDown(e.getMessage(), e.getCause());
+            throw e;
+        }
+    }
+    
+}

Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/Session.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/Session.java?rev=1595569&r1=1595568&r2=1595569&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/Session.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/Session.java Sun May 18 08:02:56 2014
@@ -28,8 +28,6 @@ import java.security.AccessController;
 import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 import net.jini.core.constraint.InvocationConstraints;
@@ -50,10 +48,10 @@ final class Session {
     static final int CLIENT = 0;
     static final int SERVER = 1;
 
-    private static final int IDLE	= 0;
-    private static final int OPEN	= 1;
-    private static final int FINISHED	= 2;
-    private static final int TERMINATED	= 3;
+    static final int IDLE	= 0;
+    static final int OPEN	= 1;
+    static final int FINISHED	= 2;
+    static final int TERMINATED	= 3;
     private static final String[] stateNames = {
 	"idle", "open", "finished", "terminated"
     };
@@ -66,11 +64,12 @@ final class Session {
      * This is not optimised, because exception conditions
      * are exceptional.
      */
-    private static boolean traceSupression(){
+    static boolean traceSupression(){
         try {
             return AccessController.doPrivileged(
                 new PrivilegedAction<Boolean>() 
                 {
+                    @Override
                     public Boolean run() {
                         return Boolean.getBoolean("com.sun.jini.jeri.server.suppressStackTraces");
                     }
@@ -90,58 +89,53 @@ final class Session {
 	    new GetThreadPoolAction(false));
 
     /** mux logger */
-    private static final Logger logger =
+    static final Logger logger =
 	Logger.getLogger("net.jini.jeri.connection.mux");
 
     private final Mux mux;
-    private final int sessionID;
-    private final int role;
+    final int sessionID;
+    final int role;
 
-    private final OutputStream out;
-    private final InputStream in;
+    private final MuxOutputStream out;
+    private final MuxInputStream in;
 
     /** lock guarding all mutable instance state (below) */
-    private final Object sessionLock = new Object();
-
-    private boolean sessionDown = false;
-    private String sessionDownMessage;
-    private Throwable sessionDownCause;
+    private final Object sessionLock;
+    private boolean sessionDown;
 
     private int outState;
     private int outRation;
-    private final boolean outRationInfinite;
-    private boolean partialDeliveryStatus = false;
+    final boolean outRationInfinite;
+    private boolean partialDeliveryStatus;
 
     private int inState;
     private int inRation;
-    private final boolean inRationInfinite;
-    private int inBufRemaining = 0;
-    private final LinkedList inBufQueue = new LinkedList();
-    private int inBufPos = 0;
-    private boolean inEOF = false;
-    private boolean inClosed = false;
-
-    private boolean fakeOKtoWrite = false;		// REMIND
-    private boolean removeLater = false;		// REMIND
-
-    private boolean receivedAckRequired = false;
-    private boolean sentAcknowledgment = false;
-
-    private final Collection<AcknowledgmentSource.Listener> ackListeners = new ArrayList<AcknowledgmentSource.Listener>(3);
-    private boolean sentAckRequired = false;
-    private boolean receivedAcknowledgment = false;
+    final boolean inRationInfinite;
+		
+    private boolean removeLater;		// REMIND
+
+    private boolean receivedAckRequired;
+
+    private final Collection<AcknowledgmentSource.Listener> ackListeners;
+    private boolean sentAckRequired;
+    private boolean receivedAcknowledgment;
 
     /**
      *
      */
     Session(Mux mux, int sessionID, int role) {
+        this.receivedAcknowledgment = false;
+        this.sentAckRequired = false;
+        this.receivedAckRequired = false;
+        this.removeLater = false;
+        this.partialDeliveryStatus = false;
+        this.sessionDown = false;
+        this.ackListeners = new ArrayList<AcknowledgmentSource.Listener>(3);
+        this.sessionLock = new Object();
 	this.mux = mux;
 	this.sessionID = sessionID;
 	this.role = role;
-
-	out = new MuxOutputStream();
-	in = new MuxInputStream();
-
+        
 	outState = (role == CLIENT ? IDLE : OPEN);
 	outRation = mux.initialOutboundRation;
 	outRationInfinite = (outRation == 0);
@@ -149,6 +143,8 @@ final class Session {
 	inState = (role == CLIENT ? IDLE : OPEN);
 	inRation = mux.initialInboundRation;
 	inRationInfinite = (inRation == 0);
+        out = new MuxOutputStream(mux, this, sessionLock);
+	in = new MuxInputStream(mux, this, sessionLock);
     }
 
     /**
@@ -157,9 +153,11 @@ final class Session {
     OutboundRequest getOutboundRequest() {
 	assert role == CLIENT;
 	return new OutboundRequest() {
+            @Override
 	    public void populateContext(Collection context) {
 		((MuxClient) mux).populateContext(context);
 	    }
+            @Override
 	    public InvocationConstraints getUnfulfilledConstraints() {
 		/*
 		 * NYI: We currently have no request-specific hook
@@ -168,13 +166,17 @@ final class Session {
 		 */
 		throw new AssertionError();
 	    }
+            @Override
 	    public OutputStream getRequestOutputStream() { return out; }
+            @Override
 	    public InputStream getResponseInputStream() { return in; }
+            @Override
 	    public boolean getDeliveryStatus() {
 		synchronized (sessionLock) {
 		    return partialDeliveryStatus;
 		}
 	    }
+            @Override
 	    public void abort() { Session.this.abort(); }
 	};
     }
@@ -185,17 +187,21 @@ final class Session {
     InboundRequest getInboundRequest() {
 	assert role == SERVER;
 	return new InboundRequest() {
+            @Override
 	    public void checkPermissions() {
 		((MuxServer) mux).checkPermissions();
 	    }
+            @Override
 	    public InvocationConstraints
 		checkConstraints(InvocationConstraints constraints)
 		throws UnsupportedConstraintException
 	    {
 		return ((MuxServer) mux).checkConstraints(constraints);
 	    }
+            @Override
 	    public void populateContext(Collection context) {
 		context.add(new AcknowledgmentSource() {
+                    @Override
 		    public boolean addAcknowledgmentListener(
 			AcknowledgmentSource.Listener listener)
 		    {
@@ -203,7 +209,7 @@ final class Session {
 			    throw new NullPointerException();
 			}
 			synchronized (sessionLock) {
-			    if (outState < FINISHED) {
+			    if (getOutState() < FINISHED) {
 				ackListeners.add(listener);
 				return true;
 			    } else {
@@ -214,8 +220,11 @@ final class Session {
 		});
 		((MuxServer) mux).populateContext(context);
 	    }
+            @Override
 	    public InputStream getRequestInputStream() { return in; }
+            @Override
 	    public OutputStream getResponseOutputStream() { return out; }
+            @Override
 	    public void abort() { Session.this.abort(); }
 	};
     }
@@ -228,15 +237,16 @@ final class Session {
 	    if (!sessionDown) {
 		if (logger.isLoggable(Level.FINEST)) {
 		    logger.log(Level.FINEST,
-			"outState=" + stateNames[outState] +
-			",inState=" + stateNames[inState] +
-			",role=" + (role == CLIENT ? "CLIENT" : "SERVER"));
+                            "outState={0},inState={1},role={2}",
+                            new Object[]{stateNames[getOutState()],
+                                stateNames[inState],
+                                role == CLIENT ? "CLIENT" : "SERVER"});
 		}
 
-		if (outState == IDLE) {
+		if (getOutState() == IDLE) {
 		    mux.removeSession(sessionID);
-		} else if (outState < TERMINATED) {
-		    if (role == SERVER && outState == FINISHED) {
+		} else if (getOutState() < TERMINATED) {
+		    if (role == SERVER && getOutState() == FINISHED) {
 			/*
 			 * In this case, send Close rather than Abort, so that
 			 * a client that still hasn't finished writing will not
@@ -259,7 +269,7 @@ final class Session {
 	     * After the application has invoked abort() on the request, we
 	     * must no longer try to "fake" an OK session.
 	     */
-	    fakeOKtoWrite = false;
+            out.abort();
 
 	    /*
 	     * If removing this session from the connection's table
@@ -270,7 +280,7 @@ final class Session {
 	     * future Acknowledgment message will be sent.
 	     */
 	    if (removeLater) {
-		if (outState < TERMINATED) {
+		if (getOutState() < TERMINATED) {
 		    setOutState(TERMINATED);
 		}
 		mux.removeSession(sessionID);
@@ -286,8 +296,9 @@ final class Session {
 	synchronized (sessionLock) {
 	    if (!sessionDown) {
 		sessionDown = true;
-		sessionDownMessage = message;
-		sessionDownCause = cause;
+                IOException ex = new IOException(message, cause);
+                out.down(ex);
+                in.down(ex);
 		sessionLock.notifyAll();
 	    }
 	}
@@ -306,7 +317,7 @@ final class Session {
 		if (outRation + increment < outRation) {
 		    throw new ProtocolException("ration overflow");
 		}
-		if (outState == OPEN) {
+		if (getOutState() == OPEN) {
 		    if (increment > 0) {
 			if (outRation == 0) {
 			    sessionLock.notifyAll();
@@ -345,7 +356,7 @@ final class Session {
 	     * that no late Acknowledgment message gets sent after the
 	     * session has been removed.
 	     */
-	    if (outState < TERMINATED) {
+	    if (getOutState() < TERMINATED) {
 		mux.asyncSendAbort(Mux.Abort | (role == SERVER ?
 						Mux.Abort_partial : 0),
 				   sessionID, null);
@@ -375,7 +386,7 @@ final class Session {
 		throw new ProtocolException("Close on " +
 		    stateNames[inState] + " session: " + sessionID);
 	    }
-	    if (outState < FINISHED) {
+	    if (getOutState() < FINISHED) {
 		/*
 		 * From a protocol perspective, we need to terminate the
 		 * session at this point (because we're not finished, but
@@ -387,7 +398,7 @@ final class Session {
 		 * (temporarily) fake that the session is still in OK
 		 * shape (but not send any more data for it).
 		 */
-		fakeOKtoWrite = true;
+                out.handleClose();
 		mux.asyncSendAbort(Mux.Abort, sessionID, null);
 		setOutState(TERMINATED);
 		/*
@@ -412,8 +423,8 @@ final class Session {
 	     * connection's table now, to prevent the sessionID being
 	     * reused before the Acknowledgment message is sent.
 	     */
-	    if (outState == TERMINATED ||
-		!receivedAckRequired || sentAcknowledgment)
+	    if (getOutState() == TERMINATED ||
+		!receivedAckRequired || in.isSentAcknowledgment())
 	    {
 		mux.removeSession(sessionID);
 	    } else {
@@ -435,7 +446,7 @@ final class Session {
 		throw new ProtocolException("Acknowledgment on " +
 		    stateNames[inState] + " session: " + sessionID);
 	    }
-	    if (outState < FINISHED) {
+	    if (getOutState() < FINISHED) {
 		throw new ProtocolException(
 		    "acknowledgment received before EOF sent");
 	    }
@@ -475,14 +486,14 @@ final class Session {
 	    if (!inRationInfinite && length > inRation) {
 		throw new ProtocolException("input ration exceeded");
 	    }
-	    if (!inClosed && outState < TERMINATED) {
+	    if (!in.isClosed() && getOutState() < TERMINATED) {
 		if (length > 0) {
-		    if (inBufRemaining == 0) {
+		    if (in.getBufRemaining() == 0) {
 			sessionLock.notifyAll();
 			notified = true;
 		    }
-		    inBufQueue.addLast(data);
-		    inBufRemaining += length;
+		    in.appendToBufQueue(data);
+		    in.setBufRemaining(in.getBufRemaining() + length);
 		    if (!inRationInfinite) {
 			inRation -= length;
 		    }
@@ -490,7 +501,7 @@ final class Session {
 	    }
 
 	    if (eof) {
-		inEOF = true;
+		in.setEOF(true);
 		setInState(FINISHED);
 		if (!notified) {
 		    sessionLock.notifyAll();
@@ -515,7 +526,7 @@ final class Session {
     void handleOpen() throws ProtocolException {
 	assert role == SERVER;
 	synchronized (sessionLock) {
-	    if (inState < FINISHED || outState < TERMINATED) {
+	    if (inState < FINISHED || getOutState() < TERMINATED) {
 		throw new ProtocolException(
                     inState < FINISHED ?
 		    ("Data/open on " +
@@ -536,7 +547,8 @@ final class Session {
     /**
      *
      */
-    private void setOutState(int newState) {
+    void setOutState(int newState) {
+        assert Thread.holdsLock(sessionLock);
 	assert newState > outState;
 	outState = newState;
     }
@@ -544,534 +556,138 @@ final class Session {
     /**
      *
      */
-    private void setInState(int newState) {
+    void setInState(int newState) {
+        assert Thread.holdsLock(sessionLock);
 	assert newState > inState;
 	inState = newState;
     }
+    
+    boolean ackListeners(){
+        assert Thread.holdsLock(sessionLock);
+        return !ackListeners.isEmpty();
+    }
 
     private void notifyAcknowledgmentListeners(final boolean received) {
 	if (!ackListeners.isEmpty()) {
-	    systemThreadPool.execute(new Runnable() {
-		public void run() {
-		    Iterator iter = ackListeners.iterator();
-		    while (iter.hasNext()) {
-			AcknowledgmentSource.Listener listener =
-			    (AcknowledgmentSource.Listener) iter.next();
-			listener.acknowledgmentReceived(received);
-		    }
-		}
-	    }, "Mux ack notifier");
+	    systemThreadPool.execute(
+                    new NotifyAcknowledgementListeners(ackListeners, received),
+                    "Mux ack notifier");
 	}
     }
+    
+    private static class NotifyAcknowledgementListeners implements Runnable {
+        final Collection<AcknowledgmentSource.Listener> ackListeners;
+        final boolean received;
+        
+        NotifyAcknowledgementListeners(
+                Collection<AcknowledgmentSource.Listener> ackListeners,
+                boolean received)
+        {
+            this.ackListeners = new ArrayList<AcknowledgmentSource.Listener>(ackListeners);
+            this.received = received;
+        }
+        
+        @Override
+        public void run() {
+            for (AcknowledgmentSource.Listener listener : ackListeners) {
+                listener.acknowledgmentReceived(received);
+            }
+        }
+    }
 
     /**
-     * Output stream returned by OutboundRequests and InboundRequests for
-     * a session of a multiplexed connection.
+     * @return the outState
      */
-    private class MuxOutputStream extends OutputStream {
-
-	private ByteBuffer buffer = mux.directBuffersUseful() ?
-	    ByteBuffer.allocateDirect(mux.maxFragmentSize) :
-	    ByteBuffer.allocate(mux.maxFragmentSize);
-
-	MuxOutputStream() { }
-
-	public synchronized void write(int b) throws IOException {
-	    if (!buffer.hasRemaining()) {
-		writeBuffer(false);
-	    } else {
-		synchronized (sessionLock) {	// REMIND: necessary?
-		    ensureOpen();
-		}
-	    }
-	    buffer.put((byte) b);
-	}
-
-	public synchronized void write(byte[] b, int off, int len)
-	    throws IOException
-	{
-	    if (b == null) {
-		throw new NullPointerException();
-	    } else if ((off < 0) || (off > b.length) || (len < 0) ||
-		       ((off + len) > b.length) || ((off + len) < 0))
-	    {
-		throw new IndexOutOfBoundsException();
-	    } else if (len == 0) {
-		synchronized (sessionLock) {
-		    ensureOpen();
-		}
-		return;
-	    }
-
-	    while (len > 0) {
-		int avail = buffer.remaining();
-		if (len <= avail) {
-		    synchronized (sessionLock) {
-			ensureOpen();
-		    }
-		    buffer.put(b, off, len);
-		    return;
-		}
-
-		buffer.put(b, off, avail);
-		off += avail;
-		len -= avail;
-		writeBuffer(false);
-	    }
-	}
-
-	public synchronized void flush() throws IOException {
-//	    synchronized (sessionLock) {
-//		ensureOpen();
-//	    }
-//
-//	    while (buffer.hasRemaining()) {
-//		writeBuffer(false);
-//	    }
-	}
-
-	public synchronized void close() throws IOException {
-	    if (logger.isLoggable(Level.FINEST)) {
-		logger.log(Level.FINEST,
-			   "STACK TRACE", new Throwable("STACK TRACE"));
-	    }
-
-	    synchronized (sessionLock) {
-		ensureOpen();
-	    }
-
-	    while (!writeBuffer(true)) { }
-	}
-
-	/**
-	 *
-	 * This method must ONLY be invoked while synchronized on
-	 * this session's lock.
-	 */
-	private void ensureOpen() throws IOException {
-	    assert Thread.holdsLock(sessionLock);
-
-	    /*
-	     * While we're faking that the session is still OK when it really
-	     * isn't (see above comments), return silently from here.
-	     */
-	    if (fakeOKtoWrite) {
-		return;
-	    }
-
-	    if (outState > OPEN) {
-		if (outState == FINISHED) {
-		    throw new IOException("stream closed");
-		} else {
-		    throw new IOException("session terminated");
-		}
-	    } else if (sessionDown) {
-		IOException ioe = new IOException(sessionDownMessage);
-		if (sessionDownCause != null) {
-		    ioe.initCause(sessionDownCause);
-		}
-		throw ioe;
-	    }
-	}
-
-	/**
-	 * Writes as much of the contents of this stream's output buffer
-	 * as is allowed by the current output ration.  Upon normal return,
-	 * at least one byte will have been transferred from the buffer to
-	 * the multiplexed connection output queue, and the buffer will have
-	 * been compacted, ready to be filled at the current position.
-	 *
-	 * Returns true if closeIfComplete and session was marked EOF (with
-	 * complete buffer written); if true, stream's output buffer should
-	 * no longer be accessed (because this method will not wait for
-	 * actual writing of the message).
-	 */
-	private boolean writeBuffer(boolean closeIfComplete)
-	    throws IOException
-	{
-	    buffer.flip();
-	    int origLimit = buffer.limit();
-
-	    int toSend;
-	    IOFuture future = null;
-	    boolean eofSent = false;
-	    synchronized (sessionLock) {
-		while (buffer.remaining() > 0 &&
-		       !outRationInfinite && outRation < 1 &&
-		       !sessionDown && outState == OPEN)
-		{
-		    try {
-			sessionLock.wait();	// REMIND: timeout?
-		    } catch (InterruptedException e) {
-			String message = "request I/O interrupted";
-			setDown(message, e);
-			IOException ioe = new IOException(message);
-			ioe.initCause(e);
-			throw ioe;
-		    }
-		}
-		ensureOpen();
-		assert buffer.remaining() == 0 || outRationInfinite ||
-		    outRation > 0 || fakeOKtoWrite;
-
-		/*
-		 * If we're just faking that the session is OK when it really
-		 * isn't, then we need to stop the writing from proceeding
-		 * past this barrier-- and if a close was requested, then
-		 * satisfy it right away.
-		 */
-		if (fakeOKtoWrite) {
-		    assert role == CLIENT && inState == TERMINATED;
-		    if (closeIfComplete) {
-			fakeOKtoWrite = false;
-		    }
-		    buffer.position(origLimit);
-		    buffer.compact();
-		    return closeIfComplete;
-		}
-
-		boolean complete;
-		if (outRationInfinite || buffer.remaining() <= outRation) {
-		    toSend = buffer.remaining();
-		    complete = true;
-		} else {
-		    toSend = outRation;
-		    buffer.limit(toSend);
-		    complete = false;
-		}
-
-		if (!outRationInfinite) {
-		    outRation -= toSend;
-		}
-		partialDeliveryStatus = true;
-
-		boolean open = outState == IDLE;
-		boolean eof = closeIfComplete && complete;
-		boolean close = role == SERVER && eof && inState > OPEN;
-		boolean ackRequired = role == SERVER && eof &&
-		    (!ackListeners.isEmpty());
-
-		int op = Mux.Data |
-		    (open ? Mux.Data_open : 0) |
-		    (eof ? Mux.Data_eof : 0) |
-		    (close ? Mux.Data_close : 0) |
-		    (ackRequired ? Mux.Data_ackRequired : 0);
-
-		/*
-		 * If we are the server-side, send even the final Data message
-		 * for this session synchronously with this method, so that the
-		 * VM will not exit before it gets delivered.  Otherwise, let
-		 * final Data messages (those with eof true) be sent after this
-		 * method completes.
-		 */
-		if (!eof || role == SERVER) {
-		    future = mux.futureSendData(op, sessionID, buffer);
-		} else {
-		    mux.asyncSendData(op, sessionID, buffer);
-		}
-
-		if (outState == IDLE) {
-		    setOutState(OPEN);
-		    setInState(OPEN);
-		}
-
-		if (eof) {
-		    eofSent = true;
-		    setOutState(close ? TERMINATED : FINISHED);
-		    if (ackRequired) {
-			sentAckRequired = true;
-		    }
-		    sessionLock.notifyAll();
-		}
-	    }
-
-	    if (future != null) {
-		waitForIO(future);
-		buffer.limit(origLimit);		// REMIND: finally?
-		buffer.compact();
-	    }
-
-	    return eofSent;
-	}
-
-	/**
-	 *
-	 * This method must NOT be invoked while synchronized on
-	 * this session's lock.
-	 */
-	private void waitForIO(IOFuture future) throws IOException {
-	    assert !Thread.holdsLock(sessionLock);
-
-	    try {
-		future.waitUntilDone();
-	    } catch (InterruptedException e) {
-		String message = "request I/O interrupted";
-		setDown(message, e);
-		IOException ioe = new IOException(message);
-		ioe.initCause(e);
-		throw ioe;
-	    } catch (IOException e) {
-		setDown(e.getMessage(), e.getCause());
-		throw e;
-	    }
-	}
+    int getOutState() {
+        assert Thread.holdsLock(sessionLock);
+        return outState;
     }
 
     /**
-     * Output stream returned by OutboundRequests and InboundRequests for
-     * a session of a multiplexed connection.
+     * @return the outRation
      */
-    private class MuxInputStream extends InputStream {
-
-	MuxInputStream() { }
-
-	public int read() throws IOException {
-	    synchronized (sessionLock) {
-		if (inClosed) {
-		    throw new IOException("stream closed");
-		}
-
-		while (inBufRemaining == 0 &&
-		       !sessionDown && inState <= OPEN && !inClosed)
-		{
-		    if (inState == IDLE) {
-			assert outState == IDLE;
-			mux.asyncSendData(Mux.Data | Mux.Data_open,
-					  sessionID, null);
-			setOutState(OPEN);
-			setInState(OPEN);
-		    }
-		    if (!inRationInfinite && inRation == 0) {
-			int inc = mux.initialInboundRation;
-			mux.asyncSendIncrementRation(sessionID, inc);
-			inRation += inc;
-		    }
-		    try {
-			sessionLock.wait();	// REMIND: timeout?
-		    } catch (InterruptedException e) {
-			String message = "request I/O interrupted";
-			setDown(message, e);
-			throw wrap(message, e);
-		    }
-		}
-
-		if (inClosed) {
-		    throw new IOException("stream closed");
-		}
-
-		if (inBufRemaining == 0) {
-		    if (inEOF) {
-			return -1;
-		    } else {
-			if (inState == TERMINATED) {
-			    throw new IOException(
-				"request aborted by remote endpoint");
-			}
-			assert sessionDown;
-			IOException ioe = new IOException(sessionDownMessage);
-			if (sessionDownCause != null) {
-			    ioe.initCause(sessionDownCause);
-			}
-			throw ioe;
-		    }
-		}
-
-		assert inBufQueue.size() > 0;
-		int result = -1;
-		while (result == -1) {
-		    ByteBuffer buf = (ByteBuffer) inBufQueue.getFirst();
-		    if (inBufPos < buf.limit()) {
-			result = (buf.get() & 0xFF);
-			inBufPos++;
-			inBufRemaining--;
-		    }
-		    if (inBufPos == buf.limit()) {
-			inBufQueue.removeFirst();
-			inBufPos = 0;
-		    }
-		}
-
-		if (!inRationInfinite) {
-		    checkInboundRation();
-		}
-
-		return result;
-	    }
-	}
-        
-        private IOException wrap(String message, Exception e){
-            Throwable t = null;
-            if (traceSupression()){
-                t = e;
-            } else {
-                t = e.fillInStackTrace();
-            }
-            return new IOException(message, t);
-        }
-
-	public int read(byte b[], int off, int len) throws IOException {
-	    if (b == null) {
-		throw new NullPointerException();
-	    } else if ((off < 0) || (off > b.length) || (len < 0) ||
-		       ((off + len) > b.length) || ((off + len) < 0))
-	    {
-		throw new IndexOutOfBoundsException();
-	    }
-
-	    synchronized (sessionLock) {
-		if (inClosed) {
-		    throw new IOException("stream closed");
-		} else if (len == 0) {
-		    /*
-		     * REMIND: What if
-		     *     - stream is at EOF?
-		     *     - session was aborted?
-		     */
-		    return 0;
-		}
-
-		while (inBufRemaining == 0 &&
-		       !sessionDown && inState <= OPEN && !inClosed)
-		{
-		    if (inState == IDLE) {
-			assert outState == IDLE;
-			mux.asyncSendData(Mux.Data | Mux.Data_open,
-					  sessionID, null);
-			setOutState(OPEN);
-			setInState(OPEN);
-		    }
-		    if (!inRationInfinite && inRation == 0) {
-			int inc = mux.initialInboundRation;
-			mux.asyncSendIncrementRation(sessionID, inc);
-			inRation += inc;
-		    }
-		    try {
-			sessionLock.wait();	// REMIND: timeout?
-		    } catch (InterruptedException e) {
-			String message = "request I/O interrupted";
-			setDown(message, e);
-			throw wrap(message, e);
-		    }
-		}
+    int getOutRation() {
+        assert Thread.holdsLock(sessionLock);
+        return outRation;
+    }
 
-		if (inClosed) {
-		    throw new IOException("stream closed");
-		}
+    /**
+     * @param outRation the outRation to set
+     */
+    void setOutRation(int outRation) {
+        assert Thread.holdsLock(sessionLock);
+        this.outRation = outRation;
+    }
 
-		if (inBufRemaining == 0) {
-		    if (inEOF) {
-			return -1;
-		    } else {
-			if (inState == TERMINATED) {
-			    throw new IOException(
-				"request aborted by remote endpoint");
-			}
-			assert sessionDown;
-			IOException ioe = new IOException(sessionDownMessage);
-			if (sessionDownCause != null) {
-			    ioe.initCause(sessionDownCause);
-			}
-			throw ioe;
-		    }
-		}
+    /**
+     * @return the inState
+     */
+    int getInState() {
+        assert Thread.holdsLock(sessionLock);
+        return inState;
+    }
 
-		assert inBufQueue.size() > 0;
-		int remaining = len;
-		while (remaining > 0 && inBufRemaining > 0) {
-		    ByteBuffer buf = (ByteBuffer) inBufQueue.getFirst();
-		    if (inBufPos < buf.limit()) {
-			int toCopy = Math.min(buf.limit() - inBufPos,
-					      remaining);
-			buf.get(b, off, toCopy);
-			inBufPos += toCopy;
-			inBufRemaining -= toCopy;
-			off += toCopy;
-			remaining -= toCopy;
-		    }
-		    if (inBufPos == buf.limit()) {
-			inBufQueue.removeFirst();
-			inBufPos = 0;
-		    }
-		}
+    /**
+     * @param partialDeliveryStatus the partialDeliveryStatus to set
+     */
+    void setPartialDeliveryStatus(boolean partialDeliveryStatus) {
+        assert Thread.holdsLock(sessionLock);
+        this.partialDeliveryStatus = partialDeliveryStatus;
+    }
 
-		if (!inRationInfinite) {
-		    checkInboundRation();
-		}
+    /**
+     * @return the sentAckRequired
+     */
+    boolean isSentAckRequired() {
+        assert Thread.holdsLock(sessionLock);
+        return sentAckRequired;
+    }
 
-		return len - remaining;
-	    }
-	}
+    /**
+     * @param sentAckRequired the sentAckRequired to set
+     */
+    void setSentAckRequired(boolean sentAckRequired) {
+        assert Thread.holdsLock(sessionLock);
+        this.sentAckRequired = sentAckRequired;
+    }
 
-	/**
-	 * Sends ration increment, if read drained buffers below
-	 * a certain mark.
-	 *
-	 * This method must NOT be invoked if the inbound ration in
-	 * infinite, and it must ONLY be invoked while synchronized on
-	 * this session's lock.
-	 *
-	 * REMIND: The implementation of this action will be a
-	 * significant area for performance tuning.
-	 */
-	private void checkInboundRation() {
-	    assert Thread.holdsLock(sessionLock);
-	    assert !inRationInfinite;
-
-	    if (inState >= FINISHED) {
-		return;
-	    }
-	    int mark = mux.initialInboundRation / 2;
-	    int used = inBufRemaining + inRation;
-	    if (used <= mark) {
-		int inc = mux.initialInboundRation - used;
-		mux.asyncSendIncrementRation(sessionID, inc);
-		inRation += inc;
-	    }
-	}
+    /**
+     * @return the inRation
+     */
+    int getInRation() {
+        assert Thread.holdsLock(sessionLock);
+        return inRation;
+    }
 
-	public int available() throws IOException {
-	    synchronized (sessionLock) {
-		if (inClosed) {
-		    throw new IOException("stream closed");
-		}
-		/*
-		 * REMIND: What if
-		 *     - stream is at EOF?
-		 *     - session was aborted?
-		 */
-		return inBufRemaining;
-	    }
-	}
+    /**
+     * @param inRation the inRation to set
+     */
+    void setInRation(int inRation) {
+        assert Thread.holdsLock(sessionLock);
+        this.inRation = inRation;
+    }
 
-	public void close() {
-	    synchronized (sessionLock) {
-		if (inClosed) {
-		    return;
-		}
-		
-		inClosed = true;
-		inBufQueue.clear();		// allow GC of unread data
+    /**
+     * @return the removeLater
+     */
+    boolean isRemoveLater() {
+        assert Thread.holdsLock(sessionLock);
+        return removeLater;
+    }
 
-		if (role == CLIENT && !sentAcknowledgment &&
-		    receivedAckRequired && outState < TERMINATED)
-		{
-		    mux.asyncSendAcknowledgment(sessionID);
-		    sentAcknowledgment = true;
-		    /*
-		     * If removing this session from the connection's
-		     * table was delayed in order to be able to send
-		     * an Acknowledgment message, then take care of
-		     * removing it now.
-		     */
-		    if (removeLater) {
-			setOutState(TERMINATED);
-			mux.removeSession(sessionID);
-			removeLater = false;
-		    }
-		}
+    /**
+     * @param removeLater the removeLater to set
+     */
+    void setRemoveLater(boolean removeLater) {
+        assert Thread.holdsLock(sessionLock);
+        this.removeLater = removeLater;
+    }
 
-		sessionLock.notifyAll();
-	    }
-	}
+    /**
+     * @return the receivedAckRequired
+     */
+    boolean isReceivedAckRequired() {
+        assert Thread.holdsLock(sessionLock);
+        return receivedAckRequired;
     }
 }

Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/SocketChannelConnectionIO.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/SocketChannelConnectionIO.java?rev=1595569&r1=1595568&r2=1595569&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/SocketChannelConnectionIO.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/SocketChannelConnectionIO.java Sun May 18 08:02:56 2014
@@ -25,8 +25,10 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
+import java.util.Deque;
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -83,13 +85,13 @@ final class SocketChannelConnectionIO ex
     /**
      * queue of buffers of data to be sent over connection
      */
-    private final LinkedList sendQueue = new LinkedList();
+    private final Deque sendQueue = new LinkedList();
 
     /**
      * queue of alternating buffers (that are in sendQueue) and IOFuture
      * objects that need to be notified when those buffers are written
      */
-    private final LinkedList notifyQueue = new LinkedList();
+    private final Deque notifyQueue = new LinkedList();
 
     /** buffer for reading incoming data from connection */
     private final ByteBuffer inputBuffer =
@@ -143,6 +145,7 @@ final class SocketChannelConnectionIO ex
 	}
     }
 
+    @Override
     void asyncSend(ByteBuffer first, ByteBuffer second) {
 	synchronized (mux.muxLock) {
 	    if (mux.muxDown) {
@@ -191,6 +194,7 @@ final class SocketChannelConnectionIO ex
 	}
     }
 
+    @Override
     IOFuture futureSend(ByteBuffer first, ByteBuffer second) {
 	synchronized (mux.muxLock) {
 	    IOFuture future = new IOFuture();
@@ -226,7 +230,7 @@ final class SocketChannelConnectionIO ex
 			notifyQueue.addLast(second);
 			notifyQueue.addLast(future);
 		    } else {
-			future.done();
+			future.done(second.position());
 		    }
 		} else {
 		    sendQueue.addLast(first);
@@ -313,7 +317,7 @@ final class SocketChannelConnectionIO ex
 			} else {
 			    throw e;
 			}
-		    }
+		    }        
 		    for (int i = 0; i < len; i++) {
 			ByteBuffer bb = bufs[i];
 			assert bb == sendQueue.getFirst();
@@ -325,7 +329,7 @@ final class SocketChannelConnectionIO ex
 				notifyQueue.removeFirst();
 				IOFuture future =
 				    (IOFuture) notifyQueue.removeFirst();
-				future.done();
+				future.done(bb.position());
 			    }
 			} else {
 			    key.renewInterestMask(SelectionKey.OP_WRITE);// ###
@@ -455,6 +459,7 @@ final class SocketChannelConnectionIO ex
     }
 
     private class Handler implements SelectionManager.SelectionHandler {
+        @Override
 	public void handleSelection(int readyMask, SelectionManager.Key key) {
 	    if ((readyMask & SelectionKey.OP_WRITE) != 0) {
 		handleWriteReady();

Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/StreamConnectionIO.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/StreamConnectionIO.java?rev=1595569&r1=1595568&r2=1595569&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/StreamConnectionIO.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/StreamConnectionIO.java Sun May 18 08:02:56 2014
@@ -29,6 +29,7 @@ import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
 import java.security.AccessController;
+import java.util.Deque;
 import java.util.LinkedList;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -70,7 +71,7 @@ final class StreamConnectionIO extends C
      * 
      * Synchronised on super.mux.muxLock;
      */
-    private final LinkedList sendQueue;
+    private final Deque sendQueue;
 
     
     /**
@@ -92,6 +93,7 @@ final class StreamConnectionIO extends C
      * Starts processing connection data.  This method starts
      * asynchronous actions to read and write from the connection.
      */
+    @Override
     void start() throws IOException {
 	try {
 	    systemThreadPool.execute(new Writer(), "mux writer");
@@ -102,13 +104,11 @@ final class StreamConnectionIO extends C
 			   "could not create thread for request dispatch", e);
 	    } catch (Throwable t) {
 	    }
-	    // treat as an "expected" I/O error
-	    IOException ioe = new IOException("could not create I/O threads");
-	    ioe.initCause(e);
-	    throw ioe;
+	    throw new IOException("could not create I/O threads", e);
 	}
     }
 
+    @Override
     void asyncSend(ByteBuffer buffer) {
 	synchronized (mux.muxLock) {
 	    if (mux.muxDown) {
@@ -119,6 +119,7 @@ final class StreamConnectionIO extends C
 	}
     }
 
+    @Override
     void asyncSend(ByteBuffer first, ByteBuffer second) {
 	synchronized (mux.muxLock) {
 	    if (mux.muxDown) {
@@ -130,6 +131,7 @@ final class StreamConnectionIO extends C
 	}
     }
 
+    @Override
     IOFuture futureSend(ByteBuffer first, ByteBuffer second) {
 	synchronized (mux.muxLock) {
 	    IOFuture future = new IOFuture();
@@ -154,12 +156,13 @@ final class StreamConnectionIO extends C
     private class Writer implements Runnable {
 	Writer() { }
 
+        @Override
 	public void run() {
-	    LinkedList localQueue = null;
+	    Deque localQueue = null;
 	    try {
 		while (true) {
 		    synchronized (mux.muxLock) {
-			while (!mux.muxDown && sendQueue.size() == 0) {
+			while (!mux.muxDown && sendQueue.isEmpty()) {
 			    /*
 			     * REMIND: Should we use a timeout here, to send
 			     * occasional PING messages during periods of
@@ -172,22 +175,27 @@ final class StreamConnectionIO extends C
 			     * would leave it in an unrecoverable state anyway.
 			     */
 			}
-			if (mux.muxDown && sendQueue.size() == 0) {
+			if (mux.muxDown && sendQueue.isEmpty()) {
 			    logger.log(Level.FINEST,
 				       "mux writer thread dying, connection " +
 				       "down and nothing more to send");
 			    break;
 			}
                         /* Clone an unshared copy and clear the queue while synchronized */
-			localQueue = (LinkedList) sendQueue.clone();
+			localQueue = new LinkedList(sendQueue);
 			sendQueue.clear();
 		    }
 
 		    boolean needToFlush = false;
-		    while (!localQueue.isEmpty()) {
+                    ByteBuffer last = null;
+                    int lastIndex = Integer.MIN_VALUE;
+		    for  ( int i = 0; !localQueue.isEmpty(); i++) {
 			Object next = localQueue.getFirst();
 			if (next instanceof ByteBuffer) {
-			    outChannel.write((ByteBuffer) next);
+                            ByteBuffer buffer = (ByteBuffer) next;
+			    outChannel.write((buffer));
+                            last = buffer;
+                            lastIndex = i;
 			    needToFlush = true;
 			} else {
 			    assert next instanceof IOFuture;
@@ -195,7 +203,11 @@ final class StreamConnectionIO extends C
 				out.flush();
 				needToFlush = false;
 			    }
-			    ((IOFuture) next).done();
+                            if (lastIndex == i - 1 && last != null){
+                                ((IOFuture) next).done(last.position());
+                            } else {
+                                ((IOFuture) next).done();
+                            }
 			}
 			localQueue.removeFirst();
 		    }
@@ -242,7 +254,7 @@ final class StreamConnectionIO extends C
 	}
     }
 
-    private void drainQueue(LinkedList queue) {
+    private void drainQueue(Deque queue) {
 	while (!queue.isEmpty()) {
 	    Object next = queue.removeFirst();
 	    if (next instanceof IOFuture) {
@@ -344,6 +356,7 @@ final class StreamConnectionIO extends C
 	    private volatile boolean open = true;
 
             // must be synchronized as per ReadableByteChannel contract
+            @Override
 	    public synchronized int read(ByteBuffer dst) throws IOException {
 		assert dst.hasArray();
 		byte[] array = dst.array();
@@ -372,11 +385,13 @@ final class StreamConnectionIO extends C
 		return totalRead;
 	    }
                 
+            @Override
 	    public boolean isOpen() {
 		return open;
 	    }
             
             // Blocking as per Channel contract
+            @Override
 	    public synchronized void close() throws IOException {
 		in.close();
 		open = false;
@@ -389,23 +404,26 @@ final class StreamConnectionIO extends C
 	    private volatile boolean open = true;
             
             // This method must block while writing as per WritableByteChannel contract.
+            @Override
 	    public synchronized int write(ByteBuffer src) throws IOException {
-		assert src.hasArray();
+                    assert src.hasArray();
 
-		int len = src.remaining();
-		if (len > 0) {
-		    int pos = src.position();
-		    out.write(src.array(), src.arrayOffset() + pos, len);
-		    src.position(pos + len);
-		}
-		return len;
-	    }
+                    int len = src.remaining();
+                    if (len > 0) {
+                        int pos = src.position();
+                        out.write(src.array(), src.arrayOffset() + pos, len);
+                        src.position(pos + len);
+                    }
+                    return len;
+                }
                 
+            @Override
 	    public boolean isOpen() {
 		return open;
 	    }
 
             // This method must block as per the Channel contract
+            @Override
 	    public synchronized void close() throws IOException {
 		out.close();
 		open = false;

Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/ThreadPool.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/ThreadPool.java?rev=1595569&r1=1595568&r2=1595569&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/ThreadPool.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/ThreadPool.java Sun May 18 08:02:56 2014
@@ -83,20 +83,26 @@ final class ThreadPool implements Execut
      * immediately, however:
      *
      * 1. When ThreadPool creates threads too aggressively, stress tests in the 
-     * qa suite create too many threads and hangs because tasks that need to 
+     * qa suite create too many threads and hang because tasks that need to 
      * respond within a required time cannot.  
      * 
      * 2. Conversely when thread creation takes too long, Javaspace tests that 
      * rely on event propagation to cancel a LeasedResource find that lease still 
      * available after lease expiry.
      * 
+     * 3. If no threads are available when JERI needs to start a Mux connection,
+     * then a mux writer cannot initiate a client connection, for this reason, a
+     * new thread must be created if no waiting threads are available to the caller.
+     * 
      * ThreadPool must degrade gracefully when a system is under significant
      * load, but it must also execute tasks as soon as possible.
      * 
-     * To address these issues, a SynchronousQueue has been selected, it has
+     * To address these issues, a SynchronousQueue was originally selected, it has
      * no storage capacity, it hands tasks directly from the calling thread to
-     * the task thread.  Consider TransferBlockingQueue when Java 6 is no
-     * longer supported.
+     * the task thread, however contention can cause more threads than necessary
+     * to be created, a LinkedBlockingQueue eliminates or reduces contention 
+     * between caller and worker threads, preventing unnecessary thread creation. 
+     * Consider TransferBlockingQueue when Java 6 is no longer supported.
      * 
      * Pool threads block waiting until a task is available or idleTimeout
      * occurs after which the pool thread dies, client threads block waiting 
@@ -111,10 +117,7 @@ final class ThreadPool implements Execut
      * AccessControlContext of the calling thread, to avoid privilege escalation.
      */
     private final AtomicInteger workerCount;
-    private final AtomicInteger waitingThreads;
-    private final int delayFactor;
-    private final int numberOfThreads;
-    private final int availableProcessors;
+    private final AtomicInteger availableThreads;
     private volatile boolean shutdown = false;
     
     ThreadPool(ThreadGroup threadGroup){
@@ -129,19 +132,16 @@ final class ThreadPool implements Execut
 	this.threadGroup = threadGroup;
         queue = new LinkedBlockingQueue<Runnable>();
         workerCount = new AtomicInteger();
-        waitingThreads = new AtomicInteger();
-        this.delayFactor = delayFactor;
-        availableProcessors = Runtime.getRuntime().availableProcessors();
-        numberOfThreads = availableProcessors * 4;
+        availableThreads = new AtomicInteger();
         
 //         Thread not started until after constructor completes
 //         this escaping occurs safely.
         Runtime.getRuntime().addShutdownHook(new Thread ("ThreadPool destroy"){
             public void run (){
                 try {
-                    // Allow two seconds prior to shutdown for other
+                    // Allow three seconds prior to shutdown for other
                     // processes to complete.
-                    Thread.sleep(2000L);
+                    Thread.sleep(3000L);
                 } catch (InterruptedException ex) {
                     Thread.currentThread().interrupt();
                 }
@@ -160,25 +160,25 @@ final class ThreadPool implements Execut
         if (runnable == null) return;
         if (shutdown) throw new RejectedExecutionException("ThreadPool shutdown");
 	Runnable task = new Task(runnable, name);
-        boolean accepted = false;
         /* Startup ramps up very quickly because there are no waiting
-         * threads.
+         * threads available.
          * 
          * Tasks must not be allowed to build up in the queue, in case
          * of dependencies.
          */
-        int workers = workerCount.get();
-        if (workers < numberOfThreads
-                || (waitingThreads.get() < 1 && workers < queue.size() / 7)) { // need more threads or we get contention.
+        if ( availableThreads.get() < 1 ) { // need more threads.
             if (shutdown) {
                 throw new RejectedExecutionException("ThreadPool shutdown");
             }
             Thread t = AccessController.doPrivileged(
                     new NewThreadAction(threadGroup, new Worker(task), name, false));
             t.start();
-            accepted = true;
         } else {
-            accepted = queue.offer(task);
+            try {
+                queue.put(task);
+            } catch (InterruptedException ex) {
+                Thread.currentThread().interrupt();
+            }
         }
     }
 
@@ -254,11 +254,11 @@ final class ThreadPool implements Execut
                      */
                     try {
                         task = null;
-                        waitingThreads.incrementAndGet();
+                        availableThreads.incrementAndGet();
                         try {
                         task = queue.poll(idleTimeout, TimeUnit.MILLISECONDS);
                         } finally {
-                            waitingThreads.decrementAndGet();
+                            availableThreads.decrementAndGet();
                         }
                         thread.setName(NewThreadAction.NAME_PREFIX + task);
                         if (task != null) {

Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupDiscovery.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupDiscovery.java?rev=1595569&r1=1595568&r2=1595569&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupDiscovery.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupDiscovery.java Sun May 18 08:02:56 2014
@@ -53,7 +53,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Deque;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -63,13 +62,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;

Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/LookupDiscovery.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/LookupDiscovery.java?rev=1595569&r1=1595568&r2=1595569&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/LookupDiscovery.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/LookupDiscovery.java Sun May 18 08:02:56 2014
@@ -17,73 +17,11 @@
  */
 package net.jini.discovery;
 
-import com.sun.jini.config.Config;
-import com.sun.jini.discovery.Discovery;
-import com.sun.jini.discovery.DiscoveryConstraints;
-import com.sun.jini.discovery.DiscoveryProtocolException;
-import com.sun.jini.discovery.EncodeIterator;
-import com.sun.jini.discovery.MulticastAnnouncement;
-import com.sun.jini.discovery.MulticastRequest;
-import com.sun.jini.discovery.UnicastResponse;
-import com.sun.jini.discovery.internal.MultiIPDiscovery;
-import com.sun.jini.logging.Levels;
-import com.sun.jini.logging.LogUtil;
-import com.sun.jini.thread.WakeupManager;
-import com.sun.jini.thread.WakeupManager.Ticket;
 import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.net.DatagramPacket;
-import java.net.MulticastSocket;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
-import java.net.UnknownHostException;
-import java.nio.BufferUnderflowException;
-import java.nio.ByteBuffer;
-import java.rmi.RemoteException;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.security.PrivilegedActionException;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Level;
 import java.util.logging.Logger;
-import java.util.logging.LogRecord;
 import net.jini.config.Configuration;
 import net.jini.config.ConfigurationException;
-import net.jini.config.EmptyConfiguration;
-import net.jini.config.NoSuchEntryException;
-import net.jini.constraint.BasicMethodConstraints;
-import net.jini.core.constraint.InvocationConstraints;
 import net.jini.core.constraint.MethodConstraints;
-import net.jini.core.discovery.LookupLocator;
-import net.jini.core.lookup.ServiceID;
-import net.jini.core.lookup.ServiceRegistrar;
-import net.jini.io.UnsupportedConstraintException;
-import net.jini.security.BasicProxyPreparer;
-import net.jini.security.ProxyPreparer;
-import net.jini.security.Security;
-import net.jini.security.SecurityContext;
 
 /**
  * This class is a helper utility class that encapsulates the functionality 

Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/jeri/BasicObjectEndpoint.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/jeri/BasicObjectEndpoint.java?rev=1595569&r1=1595568&r2=1595569&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/jeri/BasicObjectEndpoint.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/jeri/BasicObjectEndpoint.java Sun May 18 08:02:56 2014
@@ -209,7 +209,8 @@ public final class BasicObjectEndpoint
      * REMIND: We'd really like to use a weak *identity* hash map here--
      * does the lack of equals() security here create a risk?
      */
-    private static final Map streamBatches = new WeakHashMap(11);
+    private static final Map<ObjectInputStream,DgcBatchContext> streamBatches 
+            = new WeakHashMap<ObjectInputStream,DgcBatchContext>(11);
 
     /**
      * The endpoint to send remote call requests to.