You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@river.apache.org by Greg Trasuk <tr...@stratuscom.com> on 2015/11/26 15:29:40 UTC

Re: svn commit: r1716613 - in /river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux: Mux.java MuxClient.java MuxInputStream.java MuxServer.java StreamConnectionIO.java

In order to properly review changes, it would be great to know what the problem it is that you’re fixing - could you share?

Cheers,

Greg Trasuk
> On Nov 26, 2015, at 6:56 AM, peter_firmstone@apache.org wrote:
> 
> Author: peter_firmstone
> Date: Thu Nov 26 11:56:32 2015
> New Revision: 1716613
> 
> URL: http://svn.apache.org/viewvc?rev=1716613&view=rev
> Log:
> Commit my local Jeri multiplexer stability improvements to assist with jtreg multiplexer nio tests:
> 
> net.jini.jeri.tcp.outOfThreads.OutOfThreads.java
> net.jini.jeri.tcp.outOfThreads.OutOfThreads2.java
> 
> Modified:
>    river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/Mux.java
>    river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxClient.java
>    river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxInputStream.java
>    river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxServer.java
>    river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/StreamConnectionIO.java
> 
> Modified: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/Mux.java
> URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/Mux.java?rev=1716613&r1=1716612&r2=1716613&view=diff
> ==============================================================================
> --- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/Mux.java (original)
> +++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/Mux.java Thu Nov 26 11:56:32 2015
> @@ -34,9 +34,7 @@ import java.nio.charset.CharsetEncoder;
> import java.security.AccessController;
> import java.util.BitSet;
> import java.util.Deque;
> -import java.util.HashMap;
> import java.util.LinkedList;
> -import java.util.Map;
> import java.util.logging.Level;
> import java.util.logging.Logger;
> 
> @@ -107,6 +105,7 @@ abstract class Mux {
> 
> 	public void run() {
> 	    for (int i = 0; i < sessions.length; i++) {
> +		if (sessions[i] != null)
> 		sessions[i].setDown(message, cause);
> 	    }
> 	}
> @@ -135,7 +134,7 @@ abstract class Mux {
>     Throwable muxDownCause;
> 
>     final BitSet busySessions = new BitSet();
> -    final Map<Integer,Session> sessions = new HashMap<Integer,Session>(128);
> +    final Session [] sessions = new Session[MAX_SESSION_ID + 1];
> 
>     private int expectedPingCookie = -1;
> 
> @@ -274,10 +273,15 @@ abstract class Mux {
> 	assert Thread.holdsLock(muxLock);
> 	assert !muxDown;
> 	assert !busySessions.get(sessionID);
> -	assert sessions.get(Integer.valueOf(sessionID)) == null;
> +//	assert sessions.get(Byte.valueOf(sessionID)) == null;
> +	assert sessions[sessionID] == null;
> 
> 	busySessions.set(sessionID);
> -	sessions.put(Integer.valueOf(sessionID), session);
> +//	Throwable t = new Throwable();
> +//	System.out.println("Setting sessionID: "+ sessionID);
> +//	t.printStackTrace(System.out);
> +//	sessions.put(Byte.valueOf(sessionID), session);
> +	sessions[sessionID] = session;
>     }
> 
>     /**
> @@ -285,14 +289,17 @@ abstract class Mux {
>      * This method is intended to be invoked by this class and
>      * subclasses only.
>      *
> -     * This method MAY be invoked while synchronized on muxLock.
> +     * This method MAY be invoked while synchronized on muxLock if failure
> +     * occurs during start up.
>      */
>     final void setDown(final String message, final Throwable cause) {
> +	SessionShutdownTask sst = null;
> 	synchronized (muxLock) {
> 	    if (muxDown) return;
> 	    muxDown = true;
> 	    muxDownMessage = message;
> 	    muxDownCause = cause;
> +	    sst = new SessionShutdownTask(sessions.clone(), message, cause);
> 	    muxLock.notifyAll();
> 	}
> 
> @@ -309,11 +316,8 @@ abstract class Mux {
>              */
> 	boolean needWorker = false;
>             synchronized (sessionShutdownQueue) {
> -                if (!sessions.isEmpty()) {
> -                    sessionShutdownQueue.add(new SessionShutdownTask(
> -                        (Session[]) sessions.values().toArray(
> -                            new Session[sessions.values().size()]),
> -                        message, cause));
> +                if (sst != null) {
> +                    sessionShutdownQueue.add(sst);
>                     needWorker = true;
>                 } else {
>                     needWorker = !sessionShutdownQueue.isEmpty();
> @@ -360,7 +364,7 @@ abstract class Mux {
> 	    }
> 	    assert busySessions.get(sessionID);
> 	    busySessions.clear(sessionID);
> -	    sessions.remove(Integer.valueOf(sessionID));
> +	    sessions[sessionID] = null;
> 	}
>     }
> 
> @@ -1178,8 +1182,7 @@ abstract class Mux {
> 	getSession(sessionID).handleAcknowledgment();
>     }
> 
> -    private void handleData(int sessionID, boolean open, boolean close,
> -			    boolean eof, boolean ackRequired, ByteBuffer data)
> +    private void handleData(int sessionID, boolean open, boolean close, boolean eof, boolean ackRequired, ByteBuffer data)
> 	throws ProtocolException
>     {
> 	if (logger.isLoggable(Level.FINEST)) {
> @@ -1219,7 +1222,7 @@ abstract class Mux {
> 		throw new ProtocolException(
> 		    "inactive sessionID: " + sessionID);
> 	    }
> -	    return (Session) sessions.get(Integer.valueOf(sessionID));
> +	    return sessions[sessionID];
> 	}
>     }
> 
> 
> Modified: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxClient.java
> URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxClient.java?rev=1716613&r1=1716612&r2=1716613&view=diff
> ==============================================================================
> --- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxClient.java (original)
> +++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxClient.java Thu Nov 26 11:56:32 2015
> @@ -1,129 +1,129 @@
> -/*
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements.  See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership. The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License. You may obtain a copy of the License at
> - * 
> - *      http://www.apache.org/licenses/LICENSE-2.0
> - * 
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
> -
> -package org.apache.river.jeri.internal.mux;
> -
> -import org.apache.river.action.GetIntegerAction;
> -import java.io.IOException;
> -import java.io.InputStream;
> -import java.io.OutputStream;
> -import java.nio.channels.SocketChannel;
> -import java.security.AccessController;
> -import java.util.Collection;
> -import net.jini.jeri.OutboundRequest;
> -
> -/**
> - * A MuxClient controls the client side of multiplexed connection.
> - *
> - * @author Sun Microsystems, Inc.
> - **/
> -public class MuxClient extends Mux {
> -
> -    /** initial inbound ration as client, default is 32768 */
> -    private static final int clientInitialInboundRation =
> -	((Integer) AccessController.doPrivileged(new GetIntegerAction(
> -	    "org.apache.river.jeri.connection.mux.client.initialInboundRation",
> -	    32768))).intValue();
> -
> -    /**
> -     * Initiates the client side of the multiplexed connection over
> -     * the given input/output stream pair.
> -     *
> -     * @param out the output stream of the underlying connection
> -     *
> -     * @param in the input stream of the underlying connection
> -     **/
> -    public MuxClient(OutputStream out, InputStream in) throws IOException {
> -	super(out, in, Mux.CLIENT, clientInitialInboundRation, 1024);
> -    }
> -
> -    public MuxClient(SocketChannel channel) throws IOException {
> -	super(channel, Mux.CLIENT, clientInitialInboundRation, 1024);
> -    }
> -
> -    /**
> -     * Starts a new request over this connection, returning the
> -     * corresponding OutboundRequest object.
> -     *
> -     * @return the OutboundRequest for the newly created request
> -     **/
> -    public OutboundRequest newRequest()	throws IOException {
> -	synchronized (muxLock) {
> -	    if (muxDown) {
> -		IOException ioe = new IOException(muxDownMessage);
> -		ioe.initCause(muxDownCause);
> -		throw ioe;
> -	    }
> -	    int sessionID = busySessions.nextClearBit(0);
> -	    if (sessionID > Mux.MAX_SESSION_ID) {
> -		throw new IOException("no free sessions");
> -	    }
> -
> -	    Session session = new Session(this, sessionID, Session.CLIENT);
> -	    addSession(sessionID, session);
> -	    return session.getOutboundRequest();
> -	}
> -    }
> -
> -    /**
> -     * Returns the current number of requests in progress over this
> -     * connection.
> -     *
> -     * The value is guaranteed to not increase until the next
> -     * invocation of the newRequest method.
> -     *
> -     * @return the number of requests in progress over this connection
> -     *
> -     * @throws IOException if the multiplexed connection is no longer
> -     * active
> -     **/
> -    public int requestsInProgress() throws IOException {
> -	synchronized (muxLock) {
> -	    if (muxDown) {
> -		IOException ioe = new IOException(muxDownMessage);
> -		ioe.initCause(muxDownCause);
> -		throw ioe;
> -	    }
> -	    return busySessions.cardinality();
> -	}
> -    }
> -
> -    /**
> -     * Shuts down this multiplexed connection.  Requests in progress
> -     * will throw IOException for future I/O operations.
> -     *
> -     * @param message reason for shutdown to be included in
> -     * IOExceptions thrown from future I/O operations
> -     **/
> -    public void shutdown(String message) {
> -	synchronized (muxLock) {
> -	    setDown(message, null);
> -	}
> -    }
> -
> -    /**
> -     * Populates the context collection with information representing
> -     * this connection.
> -     *
> -     * This method should be overridden by subclasses to implement the
> -     * desired behavior of the populateContext method for
> -     * OutboundRequest instances generated for this connection.
> -     **/
> -    protected void populateContext(Collection context) {
> -    }
> -}
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership. The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License. You may obtain a copy of the License at
> + * 
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + * 
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.river.jeri.internal.mux;
> +
> +import org.apache.river.action.GetIntegerAction;
> +import java.io.IOException;
> +import java.io.InputStream;
> +import java.io.OutputStream;
> +import java.nio.channels.SocketChannel;
> +import java.security.AccessController;
> +import java.util.Collection;
> +import net.jini.jeri.OutboundRequest;
> +
> +/**
> + * A MuxClient controls the client side of multiplexed connection.
> + *
> + * @author Sun Microsystems, Inc.
> + **/
> +public class MuxClient extends Mux {
> +
> +    /** initial inbound ration as client, default is 32768 */
> +    private static final int clientInitialInboundRation =
> +	((Integer) AccessController.doPrivileged(new GetIntegerAction(
> +	    "org.apache.river.jeri.connection.mux.client.initialInboundRation",
> +	    32768))).intValue();
> +
> +    /**
> +     * Initiates the client side of the multiplexed connection over
> +     * the given input/output stream pair.
> +     *
> +     * @param out the output stream of the underlying connection
> +     *
> +     * @param in the input stream of the underlying connection
> +     **/
> +    public MuxClient(OutputStream out, InputStream in) throws IOException {
> +	super(out, in, Mux.CLIENT, clientInitialInboundRation, 1024);
> +    }
> +
> +    public MuxClient(SocketChannel channel) throws IOException {
> +	super(channel, Mux.CLIENT, clientInitialInboundRation, 1024);
> +    }
> +
> +    /**
> +     * Starts a new request over this connection, returning the
> +     * corresponding OutboundRequest object.
> +     *
> +     * @return the OutboundRequest for the newly created request
> +     **/
> +    public OutboundRequest newRequest()	throws IOException {
> +	synchronized (muxLock) {
> +	    if (muxDown) {
> +		IOException ioe = new IOException(muxDownMessage);
> +		ioe.initCause(muxDownCause);
> +		throw ioe;
> +	    }
> +	    byte sessionID = (byte) busySessions.nextClearBit(0);
> +	    if (sessionID > Mux.MAX_SESSION_ID) {
> +		throw new IOException("no free sessions");
> +	    }
> +
> +	    Session session = new Session(this, sessionID, Session.CLIENT);
> +	    addSession(sessionID, session);
> +	    return session.getOutboundRequest();
> +	}
> +    }
> +
> +    /**
> +     * Returns the current number of requests in progress over this
> +     * connection.
> +     *
> +     * The value is guaranteed to not increase until the next
> +     * invocation of the newRequest method.
> +     *
> +     * @return the number of requests in progress over this connection
> +     *
> +     * @throws IOException if the multiplexed connection is no longer
> +     * active
> +     **/
> +    public int requestsInProgress() throws IOException {
> +	synchronized (muxLock) {
> +	    if (muxDown) {
> +		IOException ioe = new IOException(muxDownMessage);
> +		ioe.initCause(muxDownCause);
> +		throw ioe;
> +	    }
> +	    return busySessions.cardinality();
> +	}
> +    }
> +
> +    /**
> +     * Shuts down this multiplexed connection.  Requests in progress
> +     * will throw IOException for future I/O operations.
> +     *
> +     * @param message reason for shutdown to be included in
> +     * IOExceptions thrown from future I/O operations
> +     **/
> +    public void shutdown(String message) {
> +	synchronized (muxLock) {
> +	    setDown(message, null);
> +	}
> +    }
> +
> +    /**
> +     * Populates the context collection with information representing
> +     * this connection.
> +     *
> +     * This method should be overridden by subclasses to implement the
> +     * desired behavior of the populateContext method for
> +     * OutboundRequest instances generated for this connection.
> +     **/
> +    protected void populateContext(Collection context) {
> +    }
> +}
> 
> Modified: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxInputStream.java
> URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxInputStream.java?rev=1716613&r1=1716612&r2=1716613&view=diff
> ==============================================================================
> --- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxInputStream.java (original)
> +++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxInputStream.java Thu Nov 26 11:56:32 2015
> @@ -1,308 +1,308 @@
> -/*
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements.  See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership. The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License. You may obtain a copy of the License at
> - * 
> - *      http://www.apache.org/licenses/LICENSE-2.0
> - * 
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
> -
> -package org.apache.river.jeri.internal.mux;
> -
> -import java.io.IOException;
> -import java.io.InputStream;
> -import java.nio.ByteBuffer;
> -import java.util.Deque;
> -import java.util.LinkedList;
> -
> -/**
> - * Output stream returned by OutboundRequests and InboundRequests for
> - * a session of a multiplexed connection.
> - */
> -class MuxInputStream extends InputStream {
> -    private final Object sessionLock;
> -    private final Session session;
> -    private final Mux mux;
> -    private final Deque<ByteBuffer> inBufQueue;
> -    private IOException sessionDown = null;
> -    private int inBufRemaining = 0;
> -    private int inBufPos = 0;
> -    private boolean inEOF = false;
> -    private boolean inClosed = false;
> -    private boolean sentAcknowledgment = false;
> -
> -    MuxInputStream(Mux mux, Session session, Object sessionLock) {
> -        this.mux = mux;
> -        this.session = session;
> -        this.sessionLock = sessionLock;
> -        this.inBufQueue = new LinkedList<ByteBuffer>();
> -    }
> -
> -    void down(IOException e) {
> -        sessionDown = e;
> -    }
> -
> -    void appendToBufQueue(ByteBuffer data) {
> -        inBufQueue.addLast(data);
> -    }
> -
> -    @Override
> -    public int read() throws IOException {
> -        synchronized (sessionLock) {
> -            if (inClosed) {
> -                throw new IOException("stream closed");
> -            }
> -            while (inBufRemaining == 0 && sessionDown == null && session.getInState() <= Session.OPEN && !inClosed) {
> -                if (session.getInState() == Session.IDLE) {
> -                    assert session.getOutState() == Session.IDLE;
> -                    mux.asyncSendData(Mux.Data | Mux.Data_open, session.sessionID, null);
> -                    session.setOutState(Session.OPEN);
> -                    session.setInState(Session.OPEN);
> -                }
> -                if (!session.inRationInfinite && session.getInRation() == 0) {
> -                    int inc = mux.initialInboundRation;
> -                    mux.asyncSendIncrementRation(session.sessionID, inc);
> -                    session.setInRation(session.getInRation() + inc);
> -                }
> -                try {
> -                    sessionLock.wait(); // REMIND: timeout?
> -                } catch (InterruptedException e) {
> -                    String message = "request I/O interrupted";
> -                    session.setDown(message, e);
> -                    throw wrap(message, e);
> -                }
> -            }
> -            if (inClosed) {
> -                throw new IOException("stream closed");
> -            }
> -            if (inBufRemaining == 0) {
> -                if (inEOF) {
> -                    return -1;
> -                } else {
> -                    if (session.getInState() == Session.TERMINATED) {
> -                        throw new IOException("request aborted by remote endpoint");
> -                    }
> -                    assert sessionDown != null;
> -                    throw sessionDown;
> -                }
> -            }
> -            assert inBufQueue.size() > 0;
> -            int result = -1;
> -            while (result == -1) {
> -                ByteBuffer buf = (ByteBuffer) inBufQueue.getFirst();
> -                if (inBufPos < buf.limit()) {
> -                    result = (buf.get() & 0xFF);
> -                    inBufPos++;
> -                    inBufRemaining--;
> -                }
> -                if (inBufPos == buf.limit()) {
> -                    inBufQueue.removeFirst();
> -                    inBufPos = 0;
> -                }
> -            }
> -            if (!session.inRationInfinite) {
> -                checkInboundRation();
> -            }
> -            return result;
> -        }
> -    }
> -
> -    private IOException wrap(String message, Exception e) {
> -        Throwable t;
> -        if (Session.traceSupression()) {
> -            t = e;
> -        } else {
> -            t = e.fillInStackTrace();
> -        }
> -        return new IOException(message, t);
> -    }
> -
> -    @Override
> -    public int read(byte[] b, int off, int len) throws IOException {
> -        if (b == null) {
> -            throw new NullPointerException();
> -        } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
> -            throw new IndexOutOfBoundsException();
> -        }
> -        synchronized (sessionLock) {
> -            if (inClosed) {
> -                throw new IOException("stream closed");
> -            } else if (len == 0) {
> -                /*
> -                 * REMIND: What if
> -                 *     - stream is at EOF?
> -                 *     - session was aborted?
> -                 */
> -                return 0;
> -            }
> -            while (inBufRemaining == 0 && sessionDown == null && session.getInState() <= Session.OPEN && !inClosed) {
> -                if (session.getInState() == Session.IDLE) {
> -                    assert session.getOutState() == Session.IDLE;
> -                    mux.asyncSendData(Mux.Data | Mux.Data_open, session.sessionID, null);
> -                    session.setOutState(Session.OPEN);
> -                    session.setInState(Session.OPEN);
> -                }
> -                if (!session.inRationInfinite && session.getInRation() == 0) {
> -                    int inc = mux.initialInboundRation;
> -                    mux.asyncSendIncrementRation(session.sessionID, inc);
> -                    session.setInRation(session.getInRation() + inc);
> -                }
> -                try {
> -                    sessionLock.wait(); // REMIND: timeout?
> -                } catch (InterruptedException e) {
> -                    String message = "request I/O interrupted";
> -                    session.setDown(message, e);
> -                    throw wrap(message, e);
> -                }
> -            }
> -            if (inClosed) {
> -                throw new IOException("stream closed");
> -            }
> -            if (inBufRemaining == 0) {
> -                if (inEOF) {
> -                    return -1;
> -                } else {
> -                    if (session.getInState() == Session.TERMINATED) {
> -                        throw new IOException("request aborted by remote endpoint");
> -                    }
> -                    assert sessionDown != null;
> -                    throw sessionDown;
> -                }
> -            }
> -            assert inBufQueue.size() > 0;
> -            int remaining = len;
> -            while (remaining > 0 && inBufRemaining > 0) {
> -                ByteBuffer buf = (ByteBuffer) inBufQueue.getFirst();
> -                if (inBufPos < buf.limit()) {
> -                    int toCopy = Math.min(buf.limit() - inBufPos, remaining);
> -                    buf.get(b, off, toCopy);
> -                    inBufPos += toCopy;
> -                    inBufRemaining -= toCopy;
> -                    off += toCopy;
> -                    remaining -= toCopy;
> -                }
> -                if (inBufPos == buf.limit()) {
> -                    inBufQueue.removeFirst();
> -                    inBufPos = 0;
> -                }
> -            }
> -            if (!session.inRationInfinite) {
> -                checkInboundRation();
> -            }
> -            return len - remaining;
> -        }
> -    }
> -
> -    /**
> -     * Sends ration increment, if read drained buffers below
> -     * a certain mark.
> -     *
> -     * This method must NOT be invoked if the inbound ration in
> -     * infinite, and it must ONLY be invoked while synchronized on
> -     * this session's lock.
> -     *
> -     * REMIND: The implementation of this action will be a
> -     * significant area for performance tuning.
> -     */
> -    private void checkInboundRation() {
> -        assert Thread.holdsLock(sessionLock);
> -        assert !session.inRationInfinite;
> -        if (session.getInState() >= Session.FINISHED) {
> -            return;
> -        }
> -        int mark = mux.initialInboundRation / 2;
> -        int used = inBufRemaining + session.getInRation();
> -        if (used <= mark) {
> -            int inc = mux.initialInboundRation - used;
> -            mux.asyncSendIncrementRation(session.sessionID, inc);
> -            session.setInRation(session.getInRation() + inc);
> -        }
> -    }
> -
> -    @Override
> -    public int available() throws IOException {
> -        synchronized (sessionLock) {
> -            if (inClosed) {
> -                throw new IOException("stream closed");
> -            }
> -            /*
> -             * REMIND: What if
> -             *     - stream is at EOF?
> -             *     - session was aborted?
> -             */
> -            return inBufRemaining;
> -        }
> -    }
> -
> -    @Override
> -    public void close() {
> -        synchronized (sessionLock) {
> -            if (inClosed) {
> -                return;
> -            }
> -            inClosed = true;
> -            inBufQueue.clear(); // allow GC of unread data
> -            if (session.role == Session.CLIENT && !sentAcknowledgment && session.isReceivedAckRequired() && session.getOutState() < Session.TERMINATED) {
> -                mux.asyncSendAcknowledgment(session.sessionID);
> -                sentAcknowledgment = true;
> -                /*
> -                 * If removing this session from the connection's
> -                 * table was delayed in order to be able to send
> -                 * an Acknowledgment message, then take care of
> -                 * removing it now.
> -                 */
> -                if (session.isRemoveLater()) {
> -                    session.setOutState(Session.TERMINATED);
> -                    mux.removeSession(session.sessionID);
> -                    session.setRemoveLater(false);
> -                }
> -            }
> -            sessionLock.notifyAll();
> -        }
> -    }
> -
> -    /**
> -     * @return the sentAcknowledgment
> -     */
> -    boolean isSentAcknowledgment() {
> -        return sentAcknowledgment;
> -    }
> -
> -    /**
> -     * @return the inBufRemaining
> -     */
> -    int getBufRemaining() {
> -        return inBufRemaining;
> -    }
> -
> -    /**
> -     * @return the inClosed
> -     */
> -    boolean isClosed() {
> -        return inClosed;
> -    }
> -
> -    /**
> -     * @param inBufRemaining the inBufRemaining to set
> -     */
> -    void setBufRemaining(int inBufRemaining) {
> -        this.inBufRemaining = inBufRemaining;
> -    }
> -
> -    /**
> -     * @param inEOF the inEOF to set
> -     */
> -    void setEOF(boolean inEOF) {
> -        this.inEOF = inEOF;
> -    }
> -    
> -}
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership. The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License. You may obtain a copy of the License at
> + * 
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + * 
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.river.jeri.internal.mux;
> +
> +import java.io.IOException;
> +import java.io.InputStream;
> +import java.nio.ByteBuffer;
> +import java.util.Deque;
> +import java.util.LinkedList;
> +
> +/**
> + * Output stream returned by OutboundRequests and InboundRequests for
> + * a session of a multiplexed connection.
> + */
> +class MuxInputStream extends InputStream {
> +    private final Object sessionLock;
> +    private final Session session;
> +    private final Mux mux;
> +    private final Deque<ByteBuffer> inBufQueue;
> +    private IOException sessionDown = null;
> +    private int inBufRemaining = 0;
> +    private int inBufPos = 0;
> +    private boolean inEOF = false;
> +    private boolean inClosed = false;
> +    private boolean sentAcknowledgment = false;
> +
> +    MuxInputStream(Mux mux, Session session, Object sessionLock) {
> +        this.mux = mux;
> +        this.session = session;
> +        this.sessionLock = sessionLock;
> +        this.inBufQueue = new LinkedList<ByteBuffer>();
> +    }
> +
> +    void down(IOException e) {
> +        sessionDown = e;
> +    }
> +
> +    void appendToBufQueue(ByteBuffer data) {
> +        inBufQueue.addLast(data);
> +    }
> +
> +    @Override
> +    public int read() throws IOException {
> +        synchronized (sessionLock) {
> +            if (inClosed) {
> +                throw new IOException("stream closed");
> +            }
> +            while (inBufRemaining == 0 && sessionDown == null && session.getInState() <= Session.OPEN && !inClosed) {
> +                if (session.getInState() == Session.IDLE) {
> +                    assert session.getOutState() == Session.IDLE;
> +                    mux.asyncSendData(Mux.Data | Mux.Data_open, session.sessionID, null);
> +                    session.setOutState(Session.OPEN);
> +                    session.setInState(Session.OPEN);
> +                }
> +                if (!session.inRationInfinite && session.getInRation() == 0) {
> +                    int inc = mux.initialInboundRation;
> +                    mux.asyncSendIncrementRation(session.sessionID, inc);
> +                    session.setInRation(session.getInRation() + inc);
> +                }
> +                try {
> +                    sessionLock.wait(5000L); // REMIND: timeout?
> +                } catch (InterruptedException e) {
> +                    String message = "request I/O interrupted";
> +                    session.setDown(message, e);
> +                    throw wrap(message, e);
> +                }
> +            }
> +            if (inClosed) {
> +                throw new IOException("stream closed");
> +            }
> +            if (inBufRemaining == 0) {
> +                if (inEOF) {
> +                    return -1;
> +                } else {
> +                    if (session.getInState() == Session.TERMINATED) {
> +                        throw new IOException("request aborted by remote endpoint");
> +                    }
> +                    assert sessionDown != null;
> +                    throw sessionDown;
> +                }
> +            }
> +            assert inBufQueue.size() > 0;
> +            int result = -1;
> +            while (result == -1) {
> +                ByteBuffer buf = (ByteBuffer) inBufQueue.getFirst();
> +                if (inBufPos < buf.limit()) {
> +                    result = (buf.get() & 0xFF);
> +                    inBufPos++;
> +                    inBufRemaining--;
> +                }
> +                if (inBufPos == buf.limit()) {
> +                    inBufQueue.removeFirst();
> +                    inBufPos = 0;
> +                }
> +            }
> +            if (!session.inRationInfinite) {
> +                checkInboundRation();
> +            }
> +            return result;
> +        }
> +    }
> +
> +    private IOException wrap(String message, Exception e) {
> +        Throwable t;
> +        if (Session.traceSupression()) {
> +            t = e;
> +        } else {
> +            t = e.fillInStackTrace();
> +        }
> +        return new IOException(message, t);
> +    }
> +
> +    @Override
> +    public int read(byte[] b, int off, int len) throws IOException {
> +        if (b == null) {
> +            throw new NullPointerException();
> +        } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
> +            throw new IndexOutOfBoundsException();
> +        }
> +        synchronized (sessionLock) {
> +            if (inClosed) {
> +                throw new IOException("stream closed");
> +            } else if (len == 0) {
> +                /*
> +                 * REMIND: What if
> +                 *     - stream is at EOF?
> +                 *     - session was aborted?
> +                 */
> +                return 0;
> +            }
> +            while (inBufRemaining == 0 && sessionDown == null && session.getInState() <= Session.OPEN && !inClosed) {
> +                if (session.getInState() == Session.IDLE) {
> +                    assert session.getOutState() == Session.IDLE;
> +                    mux.asyncSendData(Mux.Data | Mux.Data_open, session.sessionID, null);
> +                    session.setOutState(Session.OPEN);
> +                    session.setInState(Session.OPEN);
> +                }
> +                if (!session.inRationInfinite && session.getInRation() == 0) {
> +                    int inc = mux.initialInboundRation;
> +                    mux.asyncSendIncrementRation(session.sessionID, inc);
> +                    session.setInRation(session.getInRation() + inc);
> +                }
> +                try {
> +                    sessionLock.wait(5000L); // REMIND: timeout?
> +                } catch (InterruptedException e) {
> +                    String message = "request I/O interrupted";
> +                    session.setDown(message, e);
> +                    throw wrap(message, e);
> +                }
> +            }
> +            if (inClosed) {
> +                throw new IOException("stream closed");
> +            }
> +            if (inBufRemaining == 0) {
> +                if (inEOF) {
> +                    return -1;
> +                } else {
> +                    if (session.getInState() == Session.TERMINATED) {
> +                        throw new IOException("request aborted by remote endpoint");
> +                    }
> +                    assert sessionDown != null;
> +                    throw sessionDown;
> +                }
> +            }
> +            assert inBufQueue.size() > 0;
> +            int remaining = len;
> +            while (remaining > 0 && inBufRemaining > 0) {
> +                ByteBuffer buf = (ByteBuffer) inBufQueue.getFirst();
> +                if (inBufPos < buf.limit()) {
> +                    int toCopy = Math.min(buf.limit() - inBufPos, remaining);
> +                    buf.get(b, off, toCopy);
> +                    inBufPos += toCopy;
> +                    inBufRemaining -= toCopy;
> +                    off += toCopy;
> +                    remaining -= toCopy;
> +                }
> +                if (inBufPos == buf.limit()) {
> +                    inBufQueue.removeFirst();
> +                    inBufPos = 0;
> +                }
> +            }
> +            if (!session.inRationInfinite) {
> +                checkInboundRation();
> +            }
> +            return len - remaining;
> +        }
> +    }
> +
> +    /**
> +     * Sends ration increment, if read drained buffers below
> +     * a certain mark.
> +     *
> +     * This method must NOT be invoked if the inbound ration in
> +     * infinite, and it must ONLY be invoked while synchronized on
> +     * this session's lock.
> +     *
> +     * REMIND: The implementation of this action will be a
> +     * significant area for performance tuning.
> +     */
> +    private void checkInboundRation() {
> +        assert Thread.holdsLock(sessionLock);
> +        assert !session.inRationInfinite;
> +        if (session.getInState() >= Session.FINISHED) {
> +            return;
> +        }
> +        int mark = mux.initialInboundRation / 2;
> +        int used = inBufRemaining + session.getInRation();
> +        if (used <= mark) {
> +            int inc = mux.initialInboundRation - used;
> +            mux.asyncSendIncrementRation(session.sessionID, inc);
> +            session.setInRation(session.getInRation() + inc);
> +        }
> +    }
> +
> +    @Override
> +    public int available() throws IOException {
> +        synchronized (sessionLock) {
> +            if (inClosed) {
> +                throw new IOException("stream closed");
> +            }
> +            /*
> +             * REMIND: What if
> +             *     - stream is at EOF?
> +             *     - session was aborted?
> +             */
> +            return inBufRemaining;
> +        }
> +    }
> +
> +    @Override
> +    public void close() {
> +        synchronized (sessionLock) {
> +            if (inClosed) {
> +                return;
> +            }
> +            inClosed = true;
> +            inBufQueue.clear(); // allow GC of unread data
> +            if (session.role == Session.CLIENT && !sentAcknowledgment && session.isReceivedAckRequired() && session.getOutState() < Session.TERMINATED) {
> +                mux.asyncSendAcknowledgment(session.sessionID);
> +                sentAcknowledgment = true;
> +                /*
> +                 * If removing this session from the connection's
> +                 * table was delayed in order to be able to send
> +                 * an Acknowledgment message, then take care of
> +                 * removing it now.
> +                 */
> +                if (session.isRemoveLater()) {
> +                    session.setOutState(Session.TERMINATED);
> +                    mux.removeSession(session.sessionID);
> +                    session.setRemoveLater(false);
> +                }
> +            }
> +            sessionLock.notifyAll();
> +        }
> +    }
> +
> +    /**
> +     * @return the sentAcknowledgment
> +     */
> +    boolean isSentAcknowledgment() {
> +        return sentAcknowledgment;
> +    }
> +
> +    /**
> +     * @return the inBufRemaining
> +     */
> +    int getBufRemaining() {
> +        return inBufRemaining;
> +    }
> +
> +    /**
> +     * @return the inClosed
> +     */
> +    boolean isClosed() {
> +        return inClosed;
> +    }
> +
> +    /**
> +     * @param inBufRemaining the inBufRemaining to set
> +     */
> +    void setBufRemaining(int inBufRemaining) {
> +        this.inBufRemaining = inBufRemaining;
> +    }
> +
> +    /**
> +     * @param inEOF the inEOF to set
> +     */
> +    void setEOF(boolean inEOF) {
> +        this.inEOF = inEOF;
> +    }
> +    
> +}
> 
> Modified: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxServer.java
> URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxServer.java?rev=1716613&r1=1716612&r2=1716613&view=diff
> ==============================================================================
> --- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxServer.java (original)
> +++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxServer.java Thu Nov 26 11:56:32 2015
> @@ -210,7 +210,7 @@ public class MuxServer extends Mux {
> 		dispatchNewRequest(sessionID);
> 		return;
> 	    } else {
> -		session = (Session) sessions.get(Integer.valueOf(sessionID));
> +		session = sessions[sessionID];
> 		assert session != null;
> 	    }
> 	}
> 
> Modified: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/StreamConnectionIO.java
> URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/StreamConnectionIO.java?rev=1716613&r1=1716612&r2=1716613&view=diff
> ==============================================================================
> --- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/StreamConnectionIO.java (original)
> +++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/StreamConnectionIO.java Thu Nov 26 11:56:32 2015
> @@ -1,434 +1,434 @@
> -/*
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements.  See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership. The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License. You may obtain a copy of the License at
> - * 
> - *      http://www.apache.org/licenses/LICENSE-2.0
> - * 
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
> -
> -package org.apache.river.jeri.internal.mux;
> -
> -import org.apache.river.logging.Levels;
> -import org.apache.river.thread.Executor;
> -import org.apache.river.thread.GetThreadPoolAction;
> -import java.io.EOFException;
> -import java.io.IOException;
> -import java.io.InputStream;
> -import java.io.OutputStream;
> -import java.nio.ByteBuffer;
> -import java.nio.channels.ReadableByteChannel;
> -import java.nio.channels.WritableByteChannel;
> -import java.security.AccessController;
> -import java.util.Deque;
> -import java.util.LinkedList;
> -import java.util.logging.Level;
> -import java.util.logging.Logger;
> -
> -/**
> - * StreamConnectionIO implements the ConnectionIO abstraction for a
> - * connection accessible through standard (blocking) I/O streams, i.e.
> - * java.io.OutputStream and java.io.InputStream.
> - *
> - * @author Sun Microsystems, Inc.
> - **/
> -final class StreamConnectionIO extends ConnectionIO {
> -
> -    private static final int RECEIVE_BUFFER_SIZE = 2048;
> -
> -    /**
> -     * pool of threads for executing tasks in system thread group:
> -     * used for I/O (reader and writer) threads
> -     */
> -    private static final Executor systemThreadPool =
> -	(Executor) AccessController.doPrivileged(
> -	    new GetThreadPoolAction(false));
> -
> -    /** mux logger */
> -    private static final Logger logger =
> -	Logger.getLogger("net.jini.jeri.connection.mux");
> -
> -    /** I/O streams for underlying connection */
> -    private final OutputStream out;
> -    private final InputStream in;
> -
> -    /** channels wrapped around underlying I/O streams */
> -    private final WritableByteChannel outChannel;
> -    private final ReadableByteChannel inChannel;
> -
> -    /**
> -     * queue of buffers of data to be sent over connection, interspersed
> -     * with IOFuture objects that need to be notified in sequence
> -     * 
> -     * Synchronised on super.mux.muxLock;
> -     */
> -    private final Deque sendQueue;
> -
> -    
> -    /**
> -     * Creates a new StreamConnectionIO for the connection represented by
> -     * the supplied OutputStream and InputStream pair.
> -     */
> -    StreamConnectionIO(Mux mux, OutputStream out, InputStream in) {
> -	super(mux);
> -	this.out = out;
> -//	this.out = new BufferedOutputStream(out);
> -	this.in = in;
> -
> -	outChannel = newChannel(out);
> -	inChannel = newChannel(in);
> -        sendQueue = new LinkedList();
> -    }
> -
> -    /**
> -     * Starts processing connection data.  This method starts
> -     * asynchronous actions to read and write from the connection.
> -     */
> -    @Override
> -    void start() throws IOException {
> -	try {
> -	    systemThreadPool.execute(new Writer(), "mux writer");
> -	    systemThreadPool.execute(new Reader(), "mux reader");
> -	} catch (OutOfMemoryError e) {	// assume out of threads
> -	    try {
> -		logger.log(Level.WARNING,
> -			   "could not create thread for request dispatch", e);
> -	    } catch (Throwable t) {
> -	    }
> -	    throw new IOException("could not create I/O threads", e);
> -	}
> -    }
> -
> -    @Override
> -    void asyncSend(ByteBuffer buffer) {
> -	synchronized (mux.muxLock) {
> -	    if (mux.muxDown) {
> -		return;
> -	    }
> -	    sendQueue.addLast(buffer);
> -	    mux.muxLock.notifyAll();
> -	}
> -    }
> -
> -    @Override
> -    void asyncSend(ByteBuffer first, ByteBuffer second) {
> -	synchronized (mux.muxLock) {
> -	    if (mux.muxDown) {
> -		return;
> -	    }
> -	    sendQueue.addLast(first);
> -	    sendQueue.addLast(second);
> -	    mux.muxLock.notifyAll();
> -	}
> -    }
> -
> -    @Override
> -    IOFuture futureSend(ByteBuffer first, ByteBuffer second) {
> -	synchronized (mux.muxLock) {
> -	    IOFuture future = new IOFuture();
> -	    if (mux.muxDown) {
> -		IOException ioe = new IOException(mux.muxDownMessage);
> -		ioe.initCause(mux.muxDownCause);
> -		future.done(ioe);
> -		return future;
> -	    }
> -	    sendQueue.addLast(first);
> -	    sendQueue.addLast(second);
> -	    sendQueue.addLast(future);
> -	    mux.muxLock.notifyAll();
> -	    return future;
> -	}
> -	/*
> -	 * REMIND: Can/should we implement any sort of
> -	 * priority inversion avoidance scheme here?
> -	 */
> -    }
> -
> -    private class Writer implements Runnable {
> -	Writer() { }
> -
> -        @Override
> -	public void run() {
> -	    Deque localQueue = null;
> -	    try {
> -		while (true) {
> -		    synchronized (mux.muxLock) {
> -			while (!mux.muxDown && sendQueue.isEmpty()) {
> -			    /*
> -			     * REMIND: Should we use a timeout here, to send
> -			     * occasional PING messages during periods of
> -			     * inactivity, to make sure connection is alive?
> -			     */
> -			    mux.muxLock.wait();
> -			    /*
> -			     * Let an interrupt during the wait just kill this
> -			     * thread, because an interrupt during an I/O write
> -			     * would leave it in an unrecoverable state anyway.
> -			     */
> -			}
> -			if (mux.muxDown && sendQueue.isEmpty()) {
> -			    logger.log(Level.FINEST,
> -				       "mux writer thread dying, connection " +
> -				       "down and nothing more to send");
> -			    break;
> -			}
> -                        /* Clone an unshared copy and clear the queue while synchronized */
> -			localQueue = new LinkedList(sendQueue);
> -			sendQueue.clear();
> -		    }
> -
> -		    boolean needToFlush = false;
> -                    ByteBuffer last = null;
> -                    int lastIndex = Integer.MIN_VALUE;
> -		    for  ( int i = 0; !localQueue.isEmpty(); i++) {
> -			Object next = localQueue.getFirst();
> -			if (next instanceof ByteBuffer) {
> -                            ByteBuffer buffer = (ByteBuffer) next;
> -			    outChannel.write((buffer));
> -                            last = buffer;
> -                            lastIndex = i;
> -			    needToFlush = true;
> -			} else {
> -			    assert next instanceof IOFuture;
> -			    if (needToFlush) {
> -				out.flush();
> -				needToFlush = false;
> -			    }
> -                            if (lastIndex == i - 1 && last != null){
> -                                ((IOFuture) next).done(last.position());
> -                            } else {
> -                                ((IOFuture) next).done();
> -                            }
> -			}
> -			localQueue.removeFirst();
> -		    }
> -		    if (needToFlush) {
> -			out.flush();
> -		    }
> -		}
> -	    } catch (InterruptedException e) {
> -		try {
> -		    logger.log(Level.WARNING,
> -			       "mux writer thread dying, interrupted", e);
> -		} catch (Throwable t) {
> -		}
> -		mux.setDown("mux writer thread interrupted", e);
> -	    } catch (IOException e) {
> -		try {
> -		    logger.log(Levels.HANDLED,
> -			       "mux writer thread dying, I/O error", e);
> -		} catch (Throwable t) {
> -		}
> -		mux.setDown("I/O error writing to mux connection: " +
> -			    e.toString(), e);
> -	    } catch (Throwable t) {
> -		try {
> -		    logger.log(Level.WARNING,
> -			"mux writer thread dying, unexpected exception", t);
> -		} catch (Throwable tt) {
> -		}
> -		mux.setDown("unexpected exception in mux writer thread: " +
> -			    t.toString(), t);
> -	    } finally {
> -		synchronized (mux.muxLock) {
> -		    assert mux.muxDown;
> -		    if (localQueue != null) {
> -			drainQueue(localQueue);
> -		    }
> -		    drainQueue(sendQueue);
> -		}
> -		try {
> -		    outChannel.close();
> -		} catch (IOException e) {
> -		}
> -	    }
> -	}
> -    }
> -
> -    private void drainQueue(Deque queue) {
> -	while (!queue.isEmpty()) {
> -	    Object next = queue.removeFirst();
> -	    if (next instanceof IOFuture) {
> -		IOException ioe = new IOException(mux.muxDownMessage);
> -		ioe.initCause(mux.muxDownCause);
> -		((IOFuture) next).done(ioe);
> -	    }
> -	}
> -    }
> -
> -    private class Reader implements Runnable {
> -        /** buffer for reading incoming data from connection */
> -        private final ByteBuffer inputBuffer =
> -            ByteBuffer.allocate(RECEIVE_BUFFER_SIZE);	// ready for reading
> -
> -	Reader() { }
> -
> -	public void run() {
> -	    try {
> -		while (true) {
> -		    int n = inChannel.read(inputBuffer);
> -		    if (n == -1) {
> -			throw new EOFException();
> -		    }
> -		    assert n > 0;	// channel is assumed to be blocking
> -		    mux.processIncomingData(inputBuffer);
> -		    assert inputBuffer.hasRemaining();
> -		}
> -	    } catch (ProtocolException e) {
> -		IOFuture future = null;
> -		synchronized (mux.muxLock) {
> -		    /*
> -		     * If mux connection is already down, then we probably got
> -		     * here because of the receipt of a normal protocol-ending
> -		     * message, like Shutdown or Error, or else something else
> -		     * went wrong anyway.  Otherwise, a real protocol violation
> -		     * was detected, so respond with an Error message before
> -		     * taking down the whole mux connection.
> -		     */
> -		    if (!mux.muxDown) {
> -			try {
> -			    logger.log(Levels.HANDLED,
> -				"mux reader thread dying, protocol error", e);
> -			} catch (Throwable t) {
> -			}
> -			future = mux.futureSendError(e.getMessage());
> -			mux.setDown("protocol violation detected: " +	
> -				    e.getMessage(), null);
> -		    } else {
> -			try {
> -			    logger.log(Level.FINEST,
> -				"mux reader thread dying: " + e.getMessage());
> -			} catch (Throwable t) {
> -			}
> -		    }
> -		}
> -		if (future != null) {
> -		    try {
> -			future.waitUntilDone();
> -		    } catch (IOException ignore) {
> -		    } catch (InterruptedException interrupt) {
> -			Thread.currentThread().interrupt();
> -		    }
> -		}
> -	    } catch (IOException e) {
> -		try {
> -		    logger.log(Levels.HANDLED,
> -			       "mux reader thread dying, I/O error", e);
> -		} catch (Throwable t) {
> -		}
> -		mux.setDown("I/O error reading from mux connection: " +
> -			    e.toString(), e);
> -	    } catch (Throwable t) {
> -		try {
> -		    logger.log(Level.WARNING,
> -			"mux reader thread dying, unexpected exception", t);
> -		} catch (Throwable tt) {
> -		}
> -		mux.setDown("unexpected exception in mux reader thread: " +
> -			    t.toString(), t);
> -	    } finally {
> -		try {
> -		    inChannel.close();
> -		} catch (IOException e) {
> -		}
> -	    }
> -	}
> -    }
> -
> -    /**
> -     * The following two methods are modifications of their
> -     * equivalents in java.nio.channels.Channels with the assumption
> -     * that the supplied byte buffers are backed by arrays, so no
> -     * additional copying is required.
> -     */
> -
> -    public static ReadableByteChannel newChannel(final InputStream in) {
> -	return new ReadableByteChannel() {
> -	    private volatile boolean open = true;
> -
> -            // must be synchronized as per ReadableByteChannel contract
> -            @Override
> -	    public synchronized int read(ByteBuffer dst) throws IOException {
> -		assert dst.hasArray();
> -		byte[] array = dst.array();
> -		int arrayOffset = dst.arrayOffset();
> -
> -		int totalRead = 0;
> -		int bytesRead = 0;
> -		int bytesToRead;
> -		while ((bytesToRead = dst.remaining()) > 0) {
> -		    if ((totalRead > 0) && !(in.available() > 0)) {
> -			break; // block at most once
> -		    }
> -		    int pos = dst.position();
> -		    bytesRead = in.read(array, arrayOffset + pos, bytesToRead);
> -		    if (bytesRead < 0) {
> -			break;
> -		    } else {
> -			dst.position(pos + bytesRead);
> -			totalRead += bytesRead;
> -		    }
> -		}
> -		if ((bytesRead < 0) && (totalRead == 0)) {
> -		    return -1;
> -		}
> -
> -		return totalRead;
> -	    }
> -                
> -            @Override
> -	    public boolean isOpen() {
> -		return open;
> -	    }
> -            
> -            // Blocking as per Channel contract
> -            @Override
> -	    public synchronized void close() throws IOException {
> -		in.close();
> -		open = false;
> -	    }
> -	};
> -    }
> -
> -    public static WritableByteChannel newChannel(final OutputStream out) {
> -	return new WritableByteChannel() {
> -	    private volatile boolean open = true;
> -            
> -            // This method must block while writing as per WritableByteChannel contract.
> -            @Override
> -	    public synchronized int write(ByteBuffer src) throws IOException {
> -                    assert src.hasArray();
> -
> -                    int len = src.remaining();
> -                    if (len > 0) {
> -                        int pos = src.position();
> -                        out.write(src.array(), src.arrayOffset() + pos, len);
> -                        src.position(pos + len);
> -                    }
> -                    return len;
> -                }
> -                
> -            @Override
> -	    public boolean isOpen() {
> -		return open;
> -	    }
> -
> -            // This method must block as per the Channel contract
> -            @Override
> -	    public synchronized void close() throws IOException {
> -		out.close();
> -		open = false;
> -	    }
> -	};
> -    }
> -
> -}
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership. The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License. You may obtain a copy of the License at
> + * 
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + * 
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.river.jeri.internal.mux;
> +
> +import org.apache.river.logging.Levels;
> +import org.apache.river.thread.Executor;
> +import org.apache.river.thread.GetThreadPoolAction;
> +import java.io.EOFException;
> +import java.io.IOException;
> +import java.io.InputStream;
> +import java.io.OutputStream;
> +import java.nio.ByteBuffer;
> +import java.nio.channels.ReadableByteChannel;
> +import java.nio.channels.WritableByteChannel;
> +import java.security.AccessController;
> +import java.util.Deque;
> +import java.util.LinkedList;
> +import java.util.logging.Level;
> +import java.util.logging.Logger;
> +
> +/**
> + * StreamConnectionIO implements the ConnectionIO abstraction for a
> + * connection accessible through standard (blocking) I/O streams, i.e.
> + * java.io.OutputStream and java.io.InputStream.
> + *
> + * @author Sun Microsystems, Inc.
> + **/
> +final class StreamConnectionIO extends ConnectionIO {
> +
> +    private static final int RECEIVE_BUFFER_SIZE = 2048;
> +
> +    /**
> +     * pool of threads for executing tasks in system thread group:
> +     * used for I/O (reader and writer) threads
> +     */
> +    private static final Executor systemThreadPool =
> +	(Executor) AccessController.doPrivileged(
> +	    new GetThreadPoolAction(false));
> +
> +    /** mux logger */
> +    private static final Logger logger =
> +	Logger.getLogger("net.jini.jeri.connection.mux");
> +
> +    /** I/O streams for underlying connection */
> +    private final OutputStream out;
> +    private final InputStream in;
> +
> +    /** channels wrapped around underlying I/O streams */
> +    private final WritableByteChannel outChannel;
> +    private final ReadableByteChannel inChannel;
> +
> +    /**
> +     * queue of buffers of data to be sent over connection, interspersed
> +     * with IOFuture objects that need to be notified in sequence
> +     * 
> +     * Synchronised on super.mux.muxLock;
> +     */
> +    private final Deque sendQueue;
> +
> +    
> +    /**
> +     * Creates a new StreamConnectionIO for the connection represented by
> +     * the supplied OutputStream and InputStream pair.
> +     */
> +    StreamConnectionIO(Mux mux, OutputStream out, InputStream in) {
> +	super(mux);
> +	this.out = out;
> +//	this.out = new BufferedOutputStream(out);
> +	this.in = in;
> +
> +	outChannel = newChannel(out);
> +	inChannel = newChannel(in);
> +        sendQueue = new LinkedList();
> +    }
> +
> +    /**
> +     * Starts processing connection data.  This method starts
> +     * asynchronous actions to read and write from the connection.
> +     */
> +    @Override
> +    void start() throws IOException {
> +	try {
> +	    systemThreadPool.execute(new Writer(), "mux writer");
> +	    systemThreadPool.execute(new Reader(), "mux reader");
> +	} catch (OutOfMemoryError e) {	// assume out of threads
> +	    try {
> +		logger.log(Level.WARNING,
> +			   "could not create thread for request dispatch", e);
> +	    } catch (Throwable t) {
> +	    }
> +	    throw new IOException("could not create I/O threads", e);
> +	}
> +    }
> +
> +    @Override
> +    void asyncSend(ByteBuffer buffer) {
> +	synchronized (mux.muxLock) {
> +	    if (mux.muxDown) {
> +		return;
> +	    }
> +	    sendQueue.addLast(buffer);
> +	    mux.muxLock.notifyAll();
> +	}
> +    }
> +
> +    @Override
> +    void asyncSend(ByteBuffer first, ByteBuffer second) {
> +	synchronized (mux.muxLock) {
> +	    if (mux.muxDown) {
> +		return;
> +	    }
> +	    sendQueue.addLast(first);
> +	    sendQueue.addLast(second);
> +	    mux.muxLock.notifyAll();
> +	}
> +    }
> +
> +    @Override
> +    IOFuture futureSend(ByteBuffer first, ByteBuffer second) {
> +	synchronized (mux.muxLock) {
> +	    IOFuture future = new IOFuture();
> +	    if (mux.muxDown) {
> +		IOException ioe = new IOException(mux.muxDownMessage);
> +		ioe.initCause(mux.muxDownCause);
> +		future.done(ioe);
> +		return future;
> +	    }
> +	    sendQueue.addLast(first);
> +	    sendQueue.addLast(second);
> +	    sendQueue.addLast(future);
> +	    mux.muxLock.notifyAll();
> +	    return future;
> +	}
> +	/*
> +	 * REMIND: Can/should we implement any sort of
> +	 * priority inversion avoidance scheme here?
> +	 */
> +    }
> +
> +    private class Writer implements Runnable {
> +	Writer() { }
> +
> +        @Override
> +	public void run() {
> +	    Deque localQueue = null;
> +	    try {
> +		while (true) {
> +		    synchronized (mux.muxLock) {
> +			while (!mux.muxDown && sendQueue.isEmpty()) {
> +			    /*
> +			     * REMIND: Should we use a timeout here, to send
> +			     * occasional PING messages during periods of
> +			     * inactivity, to make sure connection is alive?
> +			     */
> +			    mux.muxLock.wait();
> +			    /*
> +			     * Let an interrupt during the wait just kill this
> +			     * thread, because an interrupt during an I/O write
> +			     * would leave it in an unrecoverable state anyway.
> +			     */
> +			}
> +			if (mux.muxDown && sendQueue.isEmpty()) {
> +			    logger.log(Level.FINEST,
> +				       "mux writer thread dying, connection " +
> +				       "down and nothing more to send");
> +			    break;
> +			}
> +                        /* Clone an unshared copy and clear the queue while synchronized */
> +			localQueue = new LinkedList(sendQueue);
> +			sendQueue.clear();
> +		    }
> +
> +		    boolean needToFlush = false;
> +                    ByteBuffer last = null;
> +                    int lastIndex = Integer.MIN_VALUE;
> +		    for  ( int i = 0; !localQueue.isEmpty(); i++) {
> +			Object next = localQueue.getFirst();
> +			if (next instanceof ByteBuffer) {
> +                            ByteBuffer buffer = (ByteBuffer) next;
> +			    outChannel.write((buffer));
> +                            last = buffer;
> +                            lastIndex = i;
> +			    needToFlush = true;
> +			} else {
> +			    assert next instanceof IOFuture;
> +			    if (needToFlush) {
> +				out.flush();
> +				needToFlush = false;
> +			    }
> +                            if (lastIndex == i - 1 && last != null){
> +                                ((IOFuture) next).done(last.position());
> +                            } else {
> +                                ((IOFuture) next).done();
> +                            }
> +			}
> +			localQueue.removeFirst();
> +		    }
> +		    if (needToFlush) {
> +			out.flush();
> +		    }
> +		}
> +	    } catch (InterruptedException e) {
> +		try {
> +		    logger.log(Level.WARNING,
> +			       "mux writer thread dying, interrupted", e);
> +		} catch (Throwable t) {
> +		}
> +		mux.setDown("mux writer thread interrupted", e);
> +	    } catch (IOException e) {
> +		try {
> +		    logger.log(Levels.HANDLED,
> +			       "mux writer thread dying, I/O error", e);
> +		} catch (Throwable t) {
> +		}
> +		mux.setDown("I/O error writing to mux connection: " +
> +			    e.toString(), e);
> +	    } catch (Throwable t) {
> +		try {
> +		    logger.log(Level.WARNING,
> +			"mux writer thread dying, unexpected exception", t);
> +		} catch (Throwable tt) {
> +		}
> +		mux.setDown("unexpected exception in mux writer thread: " +
> +			    t.toString(), t);
> +	    } finally {
> +		synchronized (mux.muxLock) {
> +		    assert mux.muxDown;
> +		    if (localQueue != null) {
> +			drainQueue(localQueue);
> +		    }
> +		    drainQueue(sendQueue);
> +		}
> +		try {
> +		    outChannel.close();
> +		} catch (IOException e) {
> +		}
> +	    }
> +	}
> +    }
> +
> +    private void drainQueue(Deque queue) {
> +	while (!queue.isEmpty()) {
> +	    Object next = queue.removeFirst();
> +	    if (next instanceof IOFuture) {
> +		IOException ioe = new IOException(mux.muxDownMessage);
> +		ioe.initCause(mux.muxDownCause);
> +		((IOFuture) next).done(ioe);
> +	    }
> +	}
> +    }
> +
> +    private class Reader implements Runnable {
> +        /** buffer for reading incoming data from connection */
> +        private final ByteBuffer inputBuffer =
> +            ByteBuffer.allocate(RECEIVE_BUFFER_SIZE);	// ready for reading
> +
> +	Reader() { }
> +
> +	public void run() {
> +	    try {
> +		while (true) {
> +		    int n = inChannel.read(inputBuffer);
> +		    if (n == -1) {
> +			throw new EOFException();
> +		    }
> +		    assert n > 0;	// channel is assumed to be blocking
> +		    mux.processIncomingData(inputBuffer);
> +		    assert inputBuffer.hasRemaining();
> +		}
> +	    } catch (ProtocolException e) {
> +		IOFuture future = null;
> +		synchronized (mux.muxLock) {
> +		    /*
> +		     * If mux connection is already down, then we probably got
> +		     * here because of the receipt of a normal protocol-ending
> +		     * message, like Shutdown or Error, or else something else
> +		     * went wrong anyway.  Otherwise, a real protocol violation
> +		     * was detected, so respond with an Error message before
> +		     * taking down the whole mux connection.
> +		     */
> +		    if (!mux.muxDown) {
> +			try {
> +			    logger.log(Levels.HANDLED,
> +				"mux reader thread dying, protocol error", e);
> +			} catch (Throwable t) {
> +			}
> +			future = mux.futureSendError(e.getMessage());
> +			mux.setDown("protocol violation detected: " +	
> +				    e.getMessage(), null);
> +		    } else {
> +			try {
> +			    logger.log(Level.FINEST,
> +				"mux reader thread dying: " + e.getMessage());
> +			} catch (Throwable t) {
> +			}
> +		    }
> +		}
> +		if (future != null) {
> +		    try {
> +			future.waitUntilDone();
> +		    } catch (IOException ignore) {
> +		    } catch (InterruptedException interrupt) {
> +			Thread.currentThread().interrupt();
> +		    }
> +		}
> +	    } catch (IOException e) {
> +		try {
> +		    logger.log(Levels.HANDLED,
> +			       "mux reader thread dying, I/O error", e);
> +		} catch (Throwable t) {
> +		}
> +		mux.setDown("I/O error reading from mux connection: " +
> +			    e.toString(), e);
> +	    } catch (Throwable t) {
> +		try {
> +		    logger.log(Level.WARNING,
> +			"mux reader thread dying, unexpected exception", t);
> +		} catch (Throwable tt) {
> +		}
> +		mux.setDown("unexpected exception in mux reader thread: " +
> +			    t.toString(), t);
> +	    } finally {
> +		try {
> +		    inChannel.close();
> +		} catch (IOException e) {
> +		}
> +	    }
> +	}
> +    }
> +
> +    /**
> +     * The following two methods are modifications of their
> +     * equivalents in java.nio.channels.Channels with the assumption
> +     * that the supplied byte buffers are backed by arrays, so no
> +     * additional copying is required.
> +     */
> +
> +    public static ReadableByteChannel newChannel(final InputStream in) {
> +	return new ReadableByteChannel() {
> +	    private boolean open = true;
> +
> +            // must be synchronized as per ReadableByteChannel contract
> +            @Override
> +	    public synchronized int read(ByteBuffer dst) throws IOException {
> +		assert dst.hasArray();
> +		byte[] array = dst.array();
> +		int arrayOffset = dst.arrayOffset();
> +
> +		int totalRead = 0;
> +		int bytesRead = 0;
> +		int bytesToRead;
> +		while ((bytesToRead = dst.remaining()) > 0) {
> +		    if ((totalRead > 0) && !(in.available() > 0)) {
> +			break; // block at most once
> +		    }
> +		    int pos = dst.position();
> +		    bytesRead = in.read(array, arrayOffset + pos, bytesToRead);
> +		    if (bytesRead < 0) {
> +			break;
> +		    } else {
> +			dst.position(pos + bytesRead);
> +			totalRead += bytesRead;
> +		    }
> +		}
> +		if ((bytesRead < 0) && (totalRead == 0)) {
> +		    return -1;
> +		}
> +
> +		return totalRead;
> +	    }
> +                
> +            @Override
> +	    public synchronized boolean isOpen() {
> +		return open;
> +	    }
> +            
> +            // Blocking as per Channel contract
> +            @Override
> +	    public synchronized void close() throws IOException {
> +		in.close();
> +		open = false;
> +	    }
> +	};
> +    }
> +
> +    public static WritableByteChannel newChannel(final OutputStream out) {
> +	return new WritableByteChannel() {
> +	    private volatile boolean open = true;
> +            
> +            // This method must block while writing as per WritableByteChannel contract.
> +            @Override
> +	    public synchronized int write(ByteBuffer src) throws IOException {
> +                    assert src.hasArray();
> +
> +                    int len = src.remaining();
> +                    if (len > 0) {
> +                        int pos = src.position();
> +                        out.write(src.array(), src.arrayOffset() + pos, len);
> +                        src.position(pos + len);
> +                    }
> +                    return len;
> +                }
> +                
> +            @Override
> +	    public boolean isOpen() {
> +		return open;
> +	    }
> +
> +            // This method must block as per the Channel contract
> +            @Override
> +	    public synchronized void close() throws IOException {
> +		out.close();
> +		open = false;
> +	    }
> +	};
> +    }
> +
> +}
> 
>