You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@river.apache.org by Greg Trasuk <tr...@stratuscom.com> on 2015/11/26 15:29:40 UTC
Re: svn commit: r1716613 - in /river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux: Mux.java MuxClient.java MuxInputStream.java MuxServer.java StreamConnectionIO.java
In order to properly review changes, it would be great to know what the problem it is that you’re fixing - could you share?
Cheers,
Greg Trasuk
> On Nov 26, 2015, at 6:56 AM, peter_firmstone@apache.org wrote:
>
> Author: peter_firmstone
> Date: Thu Nov 26 11:56:32 2015
> New Revision: 1716613
>
> URL: http://svn.apache.org/viewvc?rev=1716613&view=rev
> Log:
> Commit my local Jeri multiplexer stability improvements to assist with jtreg multiplexer nio tests:
>
> net.jini.jeri.tcp.outOfThreads.OutOfThreads.java
> net.jini.jeri.tcp.outOfThreads.OutOfThreads2.java
>
> Modified:
> river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/Mux.java
> river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxClient.java
> river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxInputStream.java
> river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxServer.java
> river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/StreamConnectionIO.java
>
> Modified: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/Mux.java
> URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/Mux.java?rev=1716613&r1=1716612&r2=1716613&view=diff
> ==============================================================================
> --- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/Mux.java (original)
> +++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/Mux.java Thu Nov 26 11:56:32 2015
> @@ -34,9 +34,7 @@ import java.nio.charset.CharsetEncoder;
> import java.security.AccessController;
> import java.util.BitSet;
> import java.util.Deque;
> -import java.util.HashMap;
> import java.util.LinkedList;
> -import java.util.Map;
> import java.util.logging.Level;
> import java.util.logging.Logger;
>
> @@ -107,6 +105,7 @@ abstract class Mux {
>
> public void run() {
> for (int i = 0; i < sessions.length; i++) {
> + if (sessions[i] != null)
> sessions[i].setDown(message, cause);
> }
> }
> @@ -135,7 +134,7 @@ abstract class Mux {
> Throwable muxDownCause;
>
> final BitSet busySessions = new BitSet();
> - final Map<Integer,Session> sessions = new HashMap<Integer,Session>(128);
> + final Session [] sessions = new Session[MAX_SESSION_ID + 1];
>
> private int expectedPingCookie = -1;
>
> @@ -274,10 +273,15 @@ abstract class Mux {
> assert Thread.holdsLock(muxLock);
> assert !muxDown;
> assert !busySessions.get(sessionID);
> - assert sessions.get(Integer.valueOf(sessionID)) == null;
> +// assert sessions.get(Byte.valueOf(sessionID)) == null;
> + assert sessions[sessionID] == null;
>
> busySessions.set(sessionID);
> - sessions.put(Integer.valueOf(sessionID), session);
> +// Throwable t = new Throwable();
> +// System.out.println("Setting sessionID: "+ sessionID);
> +// t.printStackTrace(System.out);
> +// sessions.put(Byte.valueOf(sessionID), session);
> + sessions[sessionID] = session;
> }
>
> /**
> @@ -285,14 +289,17 @@ abstract class Mux {
> * This method is intended to be invoked by this class and
> * subclasses only.
> *
> - * This method MAY be invoked while synchronized on muxLock.
> + * This method MAY be invoked while synchronized on muxLock if failure
> + * occurs during start up.
> */
> final void setDown(final String message, final Throwable cause) {
> + SessionShutdownTask sst = null;
> synchronized (muxLock) {
> if (muxDown) return;
> muxDown = true;
> muxDownMessage = message;
> muxDownCause = cause;
> + sst = new SessionShutdownTask(sessions.clone(), message, cause);
> muxLock.notifyAll();
> }
>
> @@ -309,11 +316,8 @@ abstract class Mux {
> */
> boolean needWorker = false;
> synchronized (sessionShutdownQueue) {
> - if (!sessions.isEmpty()) {
> - sessionShutdownQueue.add(new SessionShutdownTask(
> - (Session[]) sessions.values().toArray(
> - new Session[sessions.values().size()]),
> - message, cause));
> + if (sst != null) {
> + sessionShutdownQueue.add(sst);
> needWorker = true;
> } else {
> needWorker = !sessionShutdownQueue.isEmpty();
> @@ -360,7 +364,7 @@ abstract class Mux {
> }
> assert busySessions.get(sessionID);
> busySessions.clear(sessionID);
> - sessions.remove(Integer.valueOf(sessionID));
> + sessions[sessionID] = null;
> }
> }
>
> @@ -1178,8 +1182,7 @@ abstract class Mux {
> getSession(sessionID).handleAcknowledgment();
> }
>
> - private void handleData(int sessionID, boolean open, boolean close,
> - boolean eof, boolean ackRequired, ByteBuffer data)
> + private void handleData(int sessionID, boolean open, boolean close, boolean eof, boolean ackRequired, ByteBuffer data)
> throws ProtocolException
> {
> if (logger.isLoggable(Level.FINEST)) {
> @@ -1219,7 +1222,7 @@ abstract class Mux {
> throw new ProtocolException(
> "inactive sessionID: " + sessionID);
> }
> - return (Session) sessions.get(Integer.valueOf(sessionID));
> + return sessions[sessionID];
> }
> }
>
>
> Modified: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxClient.java
> URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxClient.java?rev=1716613&r1=1716612&r2=1716613&view=diff
> ==============================================================================
> --- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxClient.java (original)
> +++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxClient.java Thu Nov 26 11:56:32 2015
> @@ -1,129 +1,129 @@
> -/*
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements. See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership. The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License. You may obtain a copy of the License at
> - *
> - * http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
> -
> -package org.apache.river.jeri.internal.mux;
> -
> -import org.apache.river.action.GetIntegerAction;
> -import java.io.IOException;
> -import java.io.InputStream;
> -import java.io.OutputStream;
> -import java.nio.channels.SocketChannel;
> -import java.security.AccessController;
> -import java.util.Collection;
> -import net.jini.jeri.OutboundRequest;
> -
> -/**
> - * A MuxClient controls the client side of multiplexed connection.
> - *
> - * @author Sun Microsystems, Inc.
> - **/
> -public class MuxClient extends Mux {
> -
> - /** initial inbound ration as client, default is 32768 */
> - private static final int clientInitialInboundRation =
> - ((Integer) AccessController.doPrivileged(new GetIntegerAction(
> - "org.apache.river.jeri.connection.mux.client.initialInboundRation",
> - 32768))).intValue();
> -
> - /**
> - * Initiates the client side of the multiplexed connection over
> - * the given input/output stream pair.
> - *
> - * @param out the output stream of the underlying connection
> - *
> - * @param in the input stream of the underlying connection
> - **/
> - public MuxClient(OutputStream out, InputStream in) throws IOException {
> - super(out, in, Mux.CLIENT, clientInitialInboundRation, 1024);
> - }
> -
> - public MuxClient(SocketChannel channel) throws IOException {
> - super(channel, Mux.CLIENT, clientInitialInboundRation, 1024);
> - }
> -
> - /**
> - * Starts a new request over this connection, returning the
> - * corresponding OutboundRequest object.
> - *
> - * @return the OutboundRequest for the newly created request
> - **/
> - public OutboundRequest newRequest() throws IOException {
> - synchronized (muxLock) {
> - if (muxDown) {
> - IOException ioe = new IOException(muxDownMessage);
> - ioe.initCause(muxDownCause);
> - throw ioe;
> - }
> - int sessionID = busySessions.nextClearBit(0);
> - if (sessionID > Mux.MAX_SESSION_ID) {
> - throw new IOException("no free sessions");
> - }
> -
> - Session session = new Session(this, sessionID, Session.CLIENT);
> - addSession(sessionID, session);
> - return session.getOutboundRequest();
> - }
> - }
> -
> - /**
> - * Returns the current number of requests in progress over this
> - * connection.
> - *
> - * The value is guaranteed to not increase until the next
> - * invocation of the newRequest method.
> - *
> - * @return the number of requests in progress over this connection
> - *
> - * @throws IOException if the multiplexed connection is no longer
> - * active
> - **/
> - public int requestsInProgress() throws IOException {
> - synchronized (muxLock) {
> - if (muxDown) {
> - IOException ioe = new IOException(muxDownMessage);
> - ioe.initCause(muxDownCause);
> - throw ioe;
> - }
> - return busySessions.cardinality();
> - }
> - }
> -
> - /**
> - * Shuts down this multiplexed connection. Requests in progress
> - * will throw IOException for future I/O operations.
> - *
> - * @param message reason for shutdown to be included in
> - * IOExceptions thrown from future I/O operations
> - **/
> - public void shutdown(String message) {
> - synchronized (muxLock) {
> - setDown(message, null);
> - }
> - }
> -
> - /**
> - * Populates the context collection with information representing
> - * this connection.
> - *
> - * This method should be overridden by subclasses to implement the
> - * desired behavior of the populateContext method for
> - * OutboundRequest instances generated for this connection.
> - **/
> - protected void populateContext(Collection context) {
> - }
> -}
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements. See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership. The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.river.jeri.internal.mux;
> +
> +import org.apache.river.action.GetIntegerAction;
> +import java.io.IOException;
> +import java.io.InputStream;
> +import java.io.OutputStream;
> +import java.nio.channels.SocketChannel;
> +import java.security.AccessController;
> +import java.util.Collection;
> +import net.jini.jeri.OutboundRequest;
> +
> +/**
> + * A MuxClient controls the client side of multiplexed connection.
> + *
> + * @author Sun Microsystems, Inc.
> + **/
> +public class MuxClient extends Mux {
> +
> + /** initial inbound ration as client, default is 32768 */
> + private static final int clientInitialInboundRation =
> + ((Integer) AccessController.doPrivileged(new GetIntegerAction(
> + "org.apache.river.jeri.connection.mux.client.initialInboundRation",
> + 32768))).intValue();
> +
> + /**
> + * Initiates the client side of the multiplexed connection over
> + * the given input/output stream pair.
> + *
> + * @param out the output stream of the underlying connection
> + *
> + * @param in the input stream of the underlying connection
> + **/
> + public MuxClient(OutputStream out, InputStream in) throws IOException {
> + super(out, in, Mux.CLIENT, clientInitialInboundRation, 1024);
> + }
> +
> + public MuxClient(SocketChannel channel) throws IOException {
> + super(channel, Mux.CLIENT, clientInitialInboundRation, 1024);
> + }
> +
> + /**
> + * Starts a new request over this connection, returning the
> + * corresponding OutboundRequest object.
> + *
> + * @return the OutboundRequest for the newly created request
> + **/
> + public OutboundRequest newRequest() throws IOException {
> + synchronized (muxLock) {
> + if (muxDown) {
> + IOException ioe = new IOException(muxDownMessage);
> + ioe.initCause(muxDownCause);
> + throw ioe;
> + }
> + byte sessionID = (byte) busySessions.nextClearBit(0);
> + if (sessionID > Mux.MAX_SESSION_ID) {
> + throw new IOException("no free sessions");
> + }
> +
> + Session session = new Session(this, sessionID, Session.CLIENT);
> + addSession(sessionID, session);
> + return session.getOutboundRequest();
> + }
> + }
> +
> + /**
> + * Returns the current number of requests in progress over this
> + * connection.
> + *
> + * The value is guaranteed to not increase until the next
> + * invocation of the newRequest method.
> + *
> + * @return the number of requests in progress over this connection
> + *
> + * @throws IOException if the multiplexed connection is no longer
> + * active
> + **/
> + public int requestsInProgress() throws IOException {
> + synchronized (muxLock) {
> + if (muxDown) {
> + IOException ioe = new IOException(muxDownMessage);
> + ioe.initCause(muxDownCause);
> + throw ioe;
> + }
> + return busySessions.cardinality();
> + }
> + }
> +
> + /**
> + * Shuts down this multiplexed connection. Requests in progress
> + * will throw IOException for future I/O operations.
> + *
> + * @param message reason for shutdown to be included in
> + * IOExceptions thrown from future I/O operations
> + **/
> + public void shutdown(String message) {
> + synchronized (muxLock) {
> + setDown(message, null);
> + }
> + }
> +
> + /**
> + * Populates the context collection with information representing
> + * this connection.
> + *
> + * This method should be overridden by subclasses to implement the
> + * desired behavior of the populateContext method for
> + * OutboundRequest instances generated for this connection.
> + **/
> + protected void populateContext(Collection context) {
> + }
> +}
>
> Modified: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxInputStream.java
> URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxInputStream.java?rev=1716613&r1=1716612&r2=1716613&view=diff
> ==============================================================================
> --- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxInputStream.java (original)
> +++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxInputStream.java Thu Nov 26 11:56:32 2015
> @@ -1,308 +1,308 @@
> -/*
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements. See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership. The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License. You may obtain a copy of the License at
> - *
> - * http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
> -
> -package org.apache.river.jeri.internal.mux;
> -
> -import java.io.IOException;
> -import java.io.InputStream;
> -import java.nio.ByteBuffer;
> -import java.util.Deque;
> -import java.util.LinkedList;
> -
> -/**
> - * Output stream returned by OutboundRequests and InboundRequests for
> - * a session of a multiplexed connection.
> - */
> -class MuxInputStream extends InputStream {
> - private final Object sessionLock;
> - private final Session session;
> - private final Mux mux;
> - private final Deque<ByteBuffer> inBufQueue;
> - private IOException sessionDown = null;
> - private int inBufRemaining = 0;
> - private int inBufPos = 0;
> - private boolean inEOF = false;
> - private boolean inClosed = false;
> - private boolean sentAcknowledgment = false;
> -
> - MuxInputStream(Mux mux, Session session, Object sessionLock) {
> - this.mux = mux;
> - this.session = session;
> - this.sessionLock = sessionLock;
> - this.inBufQueue = new LinkedList<ByteBuffer>();
> - }
> -
> - void down(IOException e) {
> - sessionDown = e;
> - }
> -
> - void appendToBufQueue(ByteBuffer data) {
> - inBufQueue.addLast(data);
> - }
> -
> - @Override
> - public int read() throws IOException {
> - synchronized (sessionLock) {
> - if (inClosed) {
> - throw new IOException("stream closed");
> - }
> - while (inBufRemaining == 0 && sessionDown == null && session.getInState() <= Session.OPEN && !inClosed) {
> - if (session.getInState() == Session.IDLE) {
> - assert session.getOutState() == Session.IDLE;
> - mux.asyncSendData(Mux.Data | Mux.Data_open, session.sessionID, null);
> - session.setOutState(Session.OPEN);
> - session.setInState(Session.OPEN);
> - }
> - if (!session.inRationInfinite && session.getInRation() == 0) {
> - int inc = mux.initialInboundRation;
> - mux.asyncSendIncrementRation(session.sessionID, inc);
> - session.setInRation(session.getInRation() + inc);
> - }
> - try {
> - sessionLock.wait(); // REMIND: timeout?
> - } catch (InterruptedException e) {
> - String message = "request I/O interrupted";
> - session.setDown(message, e);
> - throw wrap(message, e);
> - }
> - }
> - if (inClosed) {
> - throw new IOException("stream closed");
> - }
> - if (inBufRemaining == 0) {
> - if (inEOF) {
> - return -1;
> - } else {
> - if (session.getInState() == Session.TERMINATED) {
> - throw new IOException("request aborted by remote endpoint");
> - }
> - assert sessionDown != null;
> - throw sessionDown;
> - }
> - }
> - assert inBufQueue.size() > 0;
> - int result = -1;
> - while (result == -1) {
> - ByteBuffer buf = (ByteBuffer) inBufQueue.getFirst();
> - if (inBufPos < buf.limit()) {
> - result = (buf.get() & 0xFF);
> - inBufPos++;
> - inBufRemaining--;
> - }
> - if (inBufPos == buf.limit()) {
> - inBufQueue.removeFirst();
> - inBufPos = 0;
> - }
> - }
> - if (!session.inRationInfinite) {
> - checkInboundRation();
> - }
> - return result;
> - }
> - }
> -
> - private IOException wrap(String message, Exception e) {
> - Throwable t;
> - if (Session.traceSupression()) {
> - t = e;
> - } else {
> - t = e.fillInStackTrace();
> - }
> - return new IOException(message, t);
> - }
> -
> - @Override
> - public int read(byte[] b, int off, int len) throws IOException {
> - if (b == null) {
> - throw new NullPointerException();
> - } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
> - throw new IndexOutOfBoundsException();
> - }
> - synchronized (sessionLock) {
> - if (inClosed) {
> - throw new IOException("stream closed");
> - } else if (len == 0) {
> - /*
> - * REMIND: What if
> - * - stream is at EOF?
> - * - session was aborted?
> - */
> - return 0;
> - }
> - while (inBufRemaining == 0 && sessionDown == null && session.getInState() <= Session.OPEN && !inClosed) {
> - if (session.getInState() == Session.IDLE) {
> - assert session.getOutState() == Session.IDLE;
> - mux.asyncSendData(Mux.Data | Mux.Data_open, session.sessionID, null);
> - session.setOutState(Session.OPEN);
> - session.setInState(Session.OPEN);
> - }
> - if (!session.inRationInfinite && session.getInRation() == 0) {
> - int inc = mux.initialInboundRation;
> - mux.asyncSendIncrementRation(session.sessionID, inc);
> - session.setInRation(session.getInRation() + inc);
> - }
> - try {
> - sessionLock.wait(); // REMIND: timeout?
> - } catch (InterruptedException e) {
> - String message = "request I/O interrupted";
> - session.setDown(message, e);
> - throw wrap(message, e);
> - }
> - }
> - if (inClosed) {
> - throw new IOException("stream closed");
> - }
> - if (inBufRemaining == 0) {
> - if (inEOF) {
> - return -1;
> - } else {
> - if (session.getInState() == Session.TERMINATED) {
> - throw new IOException("request aborted by remote endpoint");
> - }
> - assert sessionDown != null;
> - throw sessionDown;
> - }
> - }
> - assert inBufQueue.size() > 0;
> - int remaining = len;
> - while (remaining > 0 && inBufRemaining > 0) {
> - ByteBuffer buf = (ByteBuffer) inBufQueue.getFirst();
> - if (inBufPos < buf.limit()) {
> - int toCopy = Math.min(buf.limit() - inBufPos, remaining);
> - buf.get(b, off, toCopy);
> - inBufPos += toCopy;
> - inBufRemaining -= toCopy;
> - off += toCopy;
> - remaining -= toCopy;
> - }
> - if (inBufPos == buf.limit()) {
> - inBufQueue.removeFirst();
> - inBufPos = 0;
> - }
> - }
> - if (!session.inRationInfinite) {
> - checkInboundRation();
> - }
> - return len - remaining;
> - }
> - }
> -
> - /**
> - * Sends ration increment, if read drained buffers below
> - * a certain mark.
> - *
> - * This method must NOT be invoked if the inbound ration in
> - * infinite, and it must ONLY be invoked while synchronized on
> - * this session's lock.
> - *
> - * REMIND: The implementation of this action will be a
> - * significant area for performance tuning.
> - */
> - private void checkInboundRation() {
> - assert Thread.holdsLock(sessionLock);
> - assert !session.inRationInfinite;
> - if (session.getInState() >= Session.FINISHED) {
> - return;
> - }
> - int mark = mux.initialInboundRation / 2;
> - int used = inBufRemaining + session.getInRation();
> - if (used <= mark) {
> - int inc = mux.initialInboundRation - used;
> - mux.asyncSendIncrementRation(session.sessionID, inc);
> - session.setInRation(session.getInRation() + inc);
> - }
> - }
> -
> - @Override
> - public int available() throws IOException {
> - synchronized (sessionLock) {
> - if (inClosed) {
> - throw new IOException("stream closed");
> - }
> - /*
> - * REMIND: What if
> - * - stream is at EOF?
> - * - session was aborted?
> - */
> - return inBufRemaining;
> - }
> - }
> -
> - @Override
> - public void close() {
> - synchronized (sessionLock) {
> - if (inClosed) {
> - return;
> - }
> - inClosed = true;
> - inBufQueue.clear(); // allow GC of unread data
> - if (session.role == Session.CLIENT && !sentAcknowledgment && session.isReceivedAckRequired() && session.getOutState() < Session.TERMINATED) {
> - mux.asyncSendAcknowledgment(session.sessionID);
> - sentAcknowledgment = true;
> - /*
> - * If removing this session from the connection's
> - * table was delayed in order to be able to send
> - * an Acknowledgment message, then take care of
> - * removing it now.
> - */
> - if (session.isRemoveLater()) {
> - session.setOutState(Session.TERMINATED);
> - mux.removeSession(session.sessionID);
> - session.setRemoveLater(false);
> - }
> - }
> - sessionLock.notifyAll();
> - }
> - }
> -
> - /**
> - * @return the sentAcknowledgment
> - */
> - boolean isSentAcknowledgment() {
> - return sentAcknowledgment;
> - }
> -
> - /**
> - * @return the inBufRemaining
> - */
> - int getBufRemaining() {
> - return inBufRemaining;
> - }
> -
> - /**
> - * @return the inClosed
> - */
> - boolean isClosed() {
> - return inClosed;
> - }
> -
> - /**
> - * @param inBufRemaining the inBufRemaining to set
> - */
> - void setBufRemaining(int inBufRemaining) {
> - this.inBufRemaining = inBufRemaining;
> - }
> -
> - /**
> - * @param inEOF the inEOF to set
> - */
> - void setEOF(boolean inEOF) {
> - this.inEOF = inEOF;
> - }
> -
> -}
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements. See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership. The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.river.jeri.internal.mux;
> +
> +import java.io.IOException;
> +import java.io.InputStream;
> +import java.nio.ByteBuffer;
> +import java.util.Deque;
> +import java.util.LinkedList;
> +
> +/**
> + * Output stream returned by OutboundRequests and InboundRequests for
> + * a session of a multiplexed connection.
> + */
> +class MuxInputStream extends InputStream {
> + private final Object sessionLock;
> + private final Session session;
> + private final Mux mux;
> + private final Deque<ByteBuffer> inBufQueue;
> + private IOException sessionDown = null;
> + private int inBufRemaining = 0;
> + private int inBufPos = 0;
> + private boolean inEOF = false;
> + private boolean inClosed = false;
> + private boolean sentAcknowledgment = false;
> +
> + MuxInputStream(Mux mux, Session session, Object sessionLock) {
> + this.mux = mux;
> + this.session = session;
> + this.sessionLock = sessionLock;
> + this.inBufQueue = new LinkedList<ByteBuffer>();
> + }
> +
> + void down(IOException e) {
> + sessionDown = e;
> + }
> +
> + void appendToBufQueue(ByteBuffer data) {
> + inBufQueue.addLast(data);
> + }
> +
> + @Override
> + public int read() throws IOException {
> + synchronized (sessionLock) {
> + if (inClosed) {
> + throw new IOException("stream closed");
> + }
> + while (inBufRemaining == 0 && sessionDown == null && session.getInState() <= Session.OPEN && !inClosed) {
> + if (session.getInState() == Session.IDLE) {
> + assert session.getOutState() == Session.IDLE;
> + mux.asyncSendData(Mux.Data | Mux.Data_open, session.sessionID, null);
> + session.setOutState(Session.OPEN);
> + session.setInState(Session.OPEN);
> + }
> + if (!session.inRationInfinite && session.getInRation() == 0) {
> + int inc = mux.initialInboundRation;
> + mux.asyncSendIncrementRation(session.sessionID, inc);
> + session.setInRation(session.getInRation() + inc);
> + }
> + try {
> + sessionLock.wait(5000L); // REMIND: timeout?
> + } catch (InterruptedException e) {
> + String message = "request I/O interrupted";
> + session.setDown(message, e);
> + throw wrap(message, e);
> + }
> + }
> + if (inClosed) {
> + throw new IOException("stream closed");
> + }
> + if (inBufRemaining == 0) {
> + if (inEOF) {
> + return -1;
> + } else {
> + if (session.getInState() == Session.TERMINATED) {
> + throw new IOException("request aborted by remote endpoint");
> + }
> + assert sessionDown != null;
> + throw sessionDown;
> + }
> + }
> + assert inBufQueue.size() > 0;
> + int result = -1;
> + while (result == -1) {
> + ByteBuffer buf = (ByteBuffer) inBufQueue.getFirst();
> + if (inBufPos < buf.limit()) {
> + result = (buf.get() & 0xFF);
> + inBufPos++;
> + inBufRemaining--;
> + }
> + if (inBufPos == buf.limit()) {
> + inBufQueue.removeFirst();
> + inBufPos = 0;
> + }
> + }
> + if (!session.inRationInfinite) {
> + checkInboundRation();
> + }
> + return result;
> + }
> + }
> +
> + private IOException wrap(String message, Exception e) {
> + Throwable t;
> + if (Session.traceSupression()) {
> + t = e;
> + } else {
> + t = e.fillInStackTrace();
> + }
> + return new IOException(message, t);
> + }
> +
> + @Override
> + public int read(byte[] b, int off, int len) throws IOException {
> + if (b == null) {
> + throw new NullPointerException();
> + } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
> + throw new IndexOutOfBoundsException();
> + }
> + synchronized (sessionLock) {
> + if (inClosed) {
> + throw new IOException("stream closed");
> + } else if (len == 0) {
> + /*
> + * REMIND: What if
> + * - stream is at EOF?
> + * - session was aborted?
> + */
> + return 0;
> + }
> + while (inBufRemaining == 0 && sessionDown == null && session.getInState() <= Session.OPEN && !inClosed) {
> + if (session.getInState() == Session.IDLE) {
> + assert session.getOutState() == Session.IDLE;
> + mux.asyncSendData(Mux.Data | Mux.Data_open, session.sessionID, null);
> + session.setOutState(Session.OPEN);
> + session.setInState(Session.OPEN);
> + }
> + if (!session.inRationInfinite && session.getInRation() == 0) {
> + int inc = mux.initialInboundRation;
> + mux.asyncSendIncrementRation(session.sessionID, inc);
> + session.setInRation(session.getInRation() + inc);
> + }
> + try {
> + sessionLock.wait(5000L); // REMIND: timeout?
> + } catch (InterruptedException e) {
> + String message = "request I/O interrupted";
> + session.setDown(message, e);
> + throw wrap(message, e);
> + }
> + }
> + if (inClosed) {
> + throw new IOException("stream closed");
> + }
> + if (inBufRemaining == 0) {
> + if (inEOF) {
> + return -1;
> + } else {
> + if (session.getInState() == Session.TERMINATED) {
> + throw new IOException("request aborted by remote endpoint");
> + }
> + assert sessionDown != null;
> + throw sessionDown;
> + }
> + }
> + assert inBufQueue.size() > 0;
> + int remaining = len;
> + while (remaining > 0 && inBufRemaining > 0) {
> + ByteBuffer buf = (ByteBuffer) inBufQueue.getFirst();
> + if (inBufPos < buf.limit()) {
> + int toCopy = Math.min(buf.limit() - inBufPos, remaining);
> + buf.get(b, off, toCopy);
> + inBufPos += toCopy;
> + inBufRemaining -= toCopy;
> + off += toCopy;
> + remaining -= toCopy;
> + }
> + if (inBufPos == buf.limit()) {
> + inBufQueue.removeFirst();
> + inBufPos = 0;
> + }
> + }
> + if (!session.inRationInfinite) {
> + checkInboundRation();
> + }
> + return len - remaining;
> + }
> + }
> +
> + /**
> + * Sends ration increment, if read drained buffers below
> + * a certain mark.
> + *
> + * This method must NOT be invoked if the inbound ration in
> + * infinite, and it must ONLY be invoked while synchronized on
> + * this session's lock.
> + *
> + * REMIND: The implementation of this action will be a
> + * significant area for performance tuning.
> + */
> + private void checkInboundRation() {
> + assert Thread.holdsLock(sessionLock);
> + assert !session.inRationInfinite;
> + if (session.getInState() >= Session.FINISHED) {
> + return;
> + }
> + int mark = mux.initialInboundRation / 2;
> + int used = inBufRemaining + session.getInRation();
> + if (used <= mark) {
> + int inc = mux.initialInboundRation - used;
> + mux.asyncSendIncrementRation(session.sessionID, inc);
> + session.setInRation(session.getInRation() + inc);
> + }
> + }
> +
> + @Override
> + public int available() throws IOException {
> + synchronized (sessionLock) {
> + if (inClosed) {
> + throw new IOException("stream closed");
> + }
> + /*
> + * REMIND: What if
> + * - stream is at EOF?
> + * - session was aborted?
> + */
> + return inBufRemaining;
> + }
> + }
> +
> + @Override
> + public void close() {
> + synchronized (sessionLock) {
> + if (inClosed) {
> + return;
> + }
> + inClosed = true;
> + inBufQueue.clear(); // allow GC of unread data
> + if (session.role == Session.CLIENT && !sentAcknowledgment && session.isReceivedAckRequired() && session.getOutState() < Session.TERMINATED) {
> + mux.asyncSendAcknowledgment(session.sessionID);
> + sentAcknowledgment = true;
> + /*
> + * If removing this session from the connection's
> + * table was delayed in order to be able to send
> + * an Acknowledgment message, then take care of
> + * removing it now.
> + */
> + if (session.isRemoveLater()) {
> + session.setOutState(Session.TERMINATED);
> + mux.removeSession(session.sessionID);
> + session.setRemoveLater(false);
> + }
> + }
> + sessionLock.notifyAll();
> + }
> + }
> +
> + /**
> + * @return the sentAcknowledgment
> + */
> + boolean isSentAcknowledgment() {
> + return sentAcknowledgment;
> + }
> +
> + /**
> + * @return the inBufRemaining
> + */
> + int getBufRemaining() {
> + return inBufRemaining;
> + }
> +
> + /**
> + * @return the inClosed
> + */
> + boolean isClosed() {
> + return inClosed;
> + }
> +
> + /**
> + * @param inBufRemaining the inBufRemaining to set
> + */
> + void setBufRemaining(int inBufRemaining) {
> + this.inBufRemaining = inBufRemaining;
> + }
> +
> + /**
> + * @param inEOF the inEOF to set
> + */
> + void setEOF(boolean inEOF) {
> + this.inEOF = inEOF;
> + }
> +
> +}
>
> Modified: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxServer.java
> URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxServer.java?rev=1716613&r1=1716612&r2=1716613&view=diff
> ==============================================================================
> --- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxServer.java (original)
> +++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/MuxServer.java Thu Nov 26 11:56:32 2015
> @@ -210,7 +210,7 @@ public class MuxServer extends Mux {
> dispatchNewRequest(sessionID);
> return;
> } else {
> - session = (Session) sessions.get(Integer.valueOf(sessionID));
> + session = sessions[sessionID];
> assert session != null;
> }
> }
>
> Modified: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/StreamConnectionIO.java
> URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/StreamConnectionIO.java?rev=1716613&r1=1716612&r2=1716613&view=diff
> ==============================================================================
> --- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/StreamConnectionIO.java (original)
> +++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/jeri/internal/mux/StreamConnectionIO.java Thu Nov 26 11:56:32 2015
> @@ -1,434 +1,434 @@
> -/*
> - * Licensed to the Apache Software Foundation (ASF) under one
> - * or more contributor license agreements. See the NOTICE file
> - * distributed with this work for additional information
> - * regarding copyright ownership. The ASF licenses this file
> - * to you under the Apache License, Version 2.0 (the
> - * "License"); you may not use this file except in compliance
> - * with the License. You may obtain a copy of the License at
> - *
> - * http://www.apache.org/licenses/LICENSE-2.0
> - *
> - * Unless required by applicable law or agreed to in writing, software
> - * distributed under the License is distributed on an "AS IS" BASIS,
> - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> - * See the License for the specific language governing permissions and
> - * limitations under the License.
> - */
> -
> -package org.apache.river.jeri.internal.mux;
> -
> -import org.apache.river.logging.Levels;
> -import org.apache.river.thread.Executor;
> -import org.apache.river.thread.GetThreadPoolAction;
> -import java.io.EOFException;
> -import java.io.IOException;
> -import java.io.InputStream;
> -import java.io.OutputStream;
> -import java.nio.ByteBuffer;
> -import java.nio.channels.ReadableByteChannel;
> -import java.nio.channels.WritableByteChannel;
> -import java.security.AccessController;
> -import java.util.Deque;
> -import java.util.LinkedList;
> -import java.util.logging.Level;
> -import java.util.logging.Logger;
> -
> -/**
> - * StreamConnectionIO implements the ConnectionIO abstraction for a
> - * connection accessible through standard (blocking) I/O streams, i.e.
> - * java.io.OutputStream and java.io.InputStream.
> - *
> - * @author Sun Microsystems, Inc.
> - **/
> -final class StreamConnectionIO extends ConnectionIO {
> -
> - private static final int RECEIVE_BUFFER_SIZE = 2048;
> -
> - /**
> - * pool of threads for executing tasks in system thread group:
> - * used for I/O (reader and writer) threads
> - */
> - private static final Executor systemThreadPool =
> - (Executor) AccessController.doPrivileged(
> - new GetThreadPoolAction(false));
> -
> - /** mux logger */
> - private static final Logger logger =
> - Logger.getLogger("net.jini.jeri.connection.mux");
> -
> - /** I/O streams for underlying connection */
> - private final OutputStream out;
> - private final InputStream in;
> -
> - /** channels wrapped around underlying I/O streams */
> - private final WritableByteChannel outChannel;
> - private final ReadableByteChannel inChannel;
> -
> - /**
> - * queue of buffers of data to be sent over connection, interspersed
> - * with IOFuture objects that need to be notified in sequence
> - *
> - * Synchronised on super.mux.muxLock;
> - */
> - private final Deque sendQueue;
> -
> -
> - /**
> - * Creates a new StreamConnectionIO for the connection represented by
> - * the supplied OutputStream and InputStream pair.
> - */
> - StreamConnectionIO(Mux mux, OutputStream out, InputStream in) {
> - super(mux);
> - this.out = out;
> -// this.out = new BufferedOutputStream(out);
> - this.in = in;
> -
> - outChannel = newChannel(out);
> - inChannel = newChannel(in);
> - sendQueue = new LinkedList();
> - }
> -
> - /**
> - * Starts processing connection data. This method starts
> - * asynchronous actions to read and write from the connection.
> - */
> - @Override
> - void start() throws IOException {
> - try {
> - systemThreadPool.execute(new Writer(), "mux writer");
> - systemThreadPool.execute(new Reader(), "mux reader");
> - } catch (OutOfMemoryError e) { // assume out of threads
> - try {
> - logger.log(Level.WARNING,
> - "could not create thread for request dispatch", e);
> - } catch (Throwable t) {
> - }
> - throw new IOException("could not create I/O threads", e);
> - }
> - }
> -
> - @Override
> - void asyncSend(ByteBuffer buffer) {
> - synchronized (mux.muxLock) {
> - if (mux.muxDown) {
> - return;
> - }
> - sendQueue.addLast(buffer);
> - mux.muxLock.notifyAll();
> - }
> - }
> -
> - @Override
> - void asyncSend(ByteBuffer first, ByteBuffer second) {
> - synchronized (mux.muxLock) {
> - if (mux.muxDown) {
> - return;
> - }
> - sendQueue.addLast(first);
> - sendQueue.addLast(second);
> - mux.muxLock.notifyAll();
> - }
> - }
> -
> - @Override
> - IOFuture futureSend(ByteBuffer first, ByteBuffer second) {
> - synchronized (mux.muxLock) {
> - IOFuture future = new IOFuture();
> - if (mux.muxDown) {
> - IOException ioe = new IOException(mux.muxDownMessage);
> - ioe.initCause(mux.muxDownCause);
> - future.done(ioe);
> - return future;
> - }
> - sendQueue.addLast(first);
> - sendQueue.addLast(second);
> - sendQueue.addLast(future);
> - mux.muxLock.notifyAll();
> - return future;
> - }
> - /*
> - * REMIND: Can/should we implement any sort of
> - * priority inversion avoidance scheme here?
> - */
> - }
> -
> - private class Writer implements Runnable {
> - Writer() { }
> -
> - @Override
> - public void run() {
> - Deque localQueue = null;
> - try {
> - while (true) {
> - synchronized (mux.muxLock) {
> - while (!mux.muxDown && sendQueue.isEmpty()) {
> - /*
> - * REMIND: Should we use a timeout here, to send
> - * occasional PING messages during periods of
> - * inactivity, to make sure connection is alive?
> - */
> - mux.muxLock.wait();
> - /*
> - * Let an interrupt during the wait just kill this
> - * thread, because an interrupt during an I/O write
> - * would leave it in an unrecoverable state anyway.
> - */
> - }
> - if (mux.muxDown && sendQueue.isEmpty()) {
> - logger.log(Level.FINEST,
> - "mux writer thread dying, connection " +
> - "down and nothing more to send");
> - break;
> - }
> - /* Clone an unshared copy and clear the queue while synchronized */
> - localQueue = new LinkedList(sendQueue);
> - sendQueue.clear();
> - }
> -
> - boolean needToFlush = false;
> - ByteBuffer last = null;
> - int lastIndex = Integer.MIN_VALUE;
> - for ( int i = 0; !localQueue.isEmpty(); i++) {
> - Object next = localQueue.getFirst();
> - if (next instanceof ByteBuffer) {
> - ByteBuffer buffer = (ByteBuffer) next;
> - outChannel.write((buffer));
> - last = buffer;
> - lastIndex = i;
> - needToFlush = true;
> - } else {
> - assert next instanceof IOFuture;
> - if (needToFlush) {
> - out.flush();
> - needToFlush = false;
> - }
> - if (lastIndex == i - 1 && last != null){
> - ((IOFuture) next).done(last.position());
> - } else {
> - ((IOFuture) next).done();
> - }
> - }
> - localQueue.removeFirst();
> - }
> - if (needToFlush) {
> - out.flush();
> - }
> - }
> - } catch (InterruptedException e) {
> - try {
> - logger.log(Level.WARNING,
> - "mux writer thread dying, interrupted", e);
> - } catch (Throwable t) {
> - }
> - mux.setDown("mux writer thread interrupted", e);
> - } catch (IOException e) {
> - try {
> - logger.log(Levels.HANDLED,
> - "mux writer thread dying, I/O error", e);
> - } catch (Throwable t) {
> - }
> - mux.setDown("I/O error writing to mux connection: " +
> - e.toString(), e);
> - } catch (Throwable t) {
> - try {
> - logger.log(Level.WARNING,
> - "mux writer thread dying, unexpected exception", t);
> - } catch (Throwable tt) {
> - }
> - mux.setDown("unexpected exception in mux writer thread: " +
> - t.toString(), t);
> - } finally {
> - synchronized (mux.muxLock) {
> - assert mux.muxDown;
> - if (localQueue != null) {
> - drainQueue(localQueue);
> - }
> - drainQueue(sendQueue);
> - }
> - try {
> - outChannel.close();
> - } catch (IOException e) {
> - }
> - }
> - }
> - }
> -
> - private void drainQueue(Deque queue) {
> - while (!queue.isEmpty()) {
> - Object next = queue.removeFirst();
> - if (next instanceof IOFuture) {
> - IOException ioe = new IOException(mux.muxDownMessage);
> - ioe.initCause(mux.muxDownCause);
> - ((IOFuture) next).done(ioe);
> - }
> - }
> - }
> -
> - private class Reader implements Runnable {
> - /** buffer for reading incoming data from connection */
> - private final ByteBuffer inputBuffer =
> - ByteBuffer.allocate(RECEIVE_BUFFER_SIZE); // ready for reading
> -
> - Reader() { }
> -
> - public void run() {
> - try {
> - while (true) {
> - int n = inChannel.read(inputBuffer);
> - if (n == -1) {
> - throw new EOFException();
> - }
> - assert n > 0; // channel is assumed to be blocking
> - mux.processIncomingData(inputBuffer);
> - assert inputBuffer.hasRemaining();
> - }
> - } catch (ProtocolException e) {
> - IOFuture future = null;
> - synchronized (mux.muxLock) {
> - /*
> - * If mux connection is already down, then we probably got
> - * here because of the receipt of a normal protocol-ending
> - * message, like Shutdown or Error, or else something else
> - * went wrong anyway. Otherwise, a real protocol violation
> - * was detected, so respond with an Error message before
> - * taking down the whole mux connection.
> - */
> - if (!mux.muxDown) {
> - try {
> - logger.log(Levels.HANDLED,
> - "mux reader thread dying, protocol error", e);
> - } catch (Throwable t) {
> - }
> - future = mux.futureSendError(e.getMessage());
> - mux.setDown("protocol violation detected: " +
> - e.getMessage(), null);
> - } else {
> - try {
> - logger.log(Level.FINEST,
> - "mux reader thread dying: " + e.getMessage());
> - } catch (Throwable t) {
> - }
> - }
> - }
> - if (future != null) {
> - try {
> - future.waitUntilDone();
> - } catch (IOException ignore) {
> - } catch (InterruptedException interrupt) {
> - Thread.currentThread().interrupt();
> - }
> - }
> - } catch (IOException e) {
> - try {
> - logger.log(Levels.HANDLED,
> - "mux reader thread dying, I/O error", e);
> - } catch (Throwable t) {
> - }
> - mux.setDown("I/O error reading from mux connection: " +
> - e.toString(), e);
> - } catch (Throwable t) {
> - try {
> - logger.log(Level.WARNING,
> - "mux reader thread dying, unexpected exception", t);
> - } catch (Throwable tt) {
> - }
> - mux.setDown("unexpected exception in mux reader thread: " +
> - t.toString(), t);
> - } finally {
> - try {
> - inChannel.close();
> - } catch (IOException e) {
> - }
> - }
> - }
> - }
> -
> - /**
> - * The following two methods are modifications of their
> - * equivalents in java.nio.channels.Channels with the assumption
> - * that the supplied byte buffers are backed by arrays, so no
> - * additional copying is required.
> - */
> -
> - public static ReadableByteChannel newChannel(final InputStream in) {
> - return new ReadableByteChannel() {
> - private volatile boolean open = true;
> -
> - // must be synchronized as per ReadableByteChannel contract
> - @Override
> - public synchronized int read(ByteBuffer dst) throws IOException {
> - assert dst.hasArray();
> - byte[] array = dst.array();
> - int arrayOffset = dst.arrayOffset();
> -
> - int totalRead = 0;
> - int bytesRead = 0;
> - int bytesToRead;
> - while ((bytesToRead = dst.remaining()) > 0) {
> - if ((totalRead > 0) && !(in.available() > 0)) {
> - break; // block at most once
> - }
> - int pos = dst.position();
> - bytesRead = in.read(array, arrayOffset + pos, bytesToRead);
> - if (bytesRead < 0) {
> - break;
> - } else {
> - dst.position(pos + bytesRead);
> - totalRead += bytesRead;
> - }
> - }
> - if ((bytesRead < 0) && (totalRead == 0)) {
> - return -1;
> - }
> -
> - return totalRead;
> - }
> -
> - @Override
> - public boolean isOpen() {
> - return open;
> - }
> -
> - // Blocking as per Channel contract
> - @Override
> - public synchronized void close() throws IOException {
> - in.close();
> - open = false;
> - }
> - };
> - }
> -
> - public static WritableByteChannel newChannel(final OutputStream out) {
> - return new WritableByteChannel() {
> - private volatile boolean open = true;
> -
> - // This method must block while writing as per WritableByteChannel contract.
> - @Override
> - public synchronized int write(ByteBuffer src) throws IOException {
> - assert src.hasArray();
> -
> - int len = src.remaining();
> - if (len > 0) {
> - int pos = src.position();
> - out.write(src.array(), src.arrayOffset() + pos, len);
> - src.position(pos + len);
> - }
> - return len;
> - }
> -
> - @Override
> - public boolean isOpen() {
> - return open;
> - }
> -
> - // This method must block as per the Channel contract
> - @Override
> - public synchronized void close() throws IOException {
> - out.close();
> - open = false;
> - }
> - };
> - }
> -
> -}
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements. See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership. The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.river.jeri.internal.mux;
> +
> +import org.apache.river.logging.Levels;
> +import org.apache.river.thread.Executor;
> +import org.apache.river.thread.GetThreadPoolAction;
> +import java.io.EOFException;
> +import java.io.IOException;
> +import java.io.InputStream;
> +import java.io.OutputStream;
> +import java.nio.ByteBuffer;
> +import java.nio.channels.ReadableByteChannel;
> +import java.nio.channels.WritableByteChannel;
> +import java.security.AccessController;
> +import java.util.Deque;
> +import java.util.LinkedList;
> +import java.util.logging.Level;
> +import java.util.logging.Logger;
> +
> +/**
> + * StreamConnectionIO implements the ConnectionIO abstraction for a
> + * connection accessible through standard (blocking) I/O streams, i.e.
> + * java.io.OutputStream and java.io.InputStream.
> + *
> + * @author Sun Microsystems, Inc.
> + **/
> +final class StreamConnectionIO extends ConnectionIO {
> +
> + private static final int RECEIVE_BUFFER_SIZE = 2048;
> +
> + /**
> + * pool of threads for executing tasks in system thread group:
> + * used for I/O (reader and writer) threads
> + */
> + private static final Executor systemThreadPool =
> + (Executor) AccessController.doPrivileged(
> + new GetThreadPoolAction(false));
> +
> + /** mux logger */
> + private static final Logger logger =
> + Logger.getLogger("net.jini.jeri.connection.mux");
> +
> + /** I/O streams for underlying connection */
> + private final OutputStream out;
> + private final InputStream in;
> +
> + /** channels wrapped around underlying I/O streams */
> + private final WritableByteChannel outChannel;
> + private final ReadableByteChannel inChannel;
> +
> + /**
> + * queue of buffers of data to be sent over connection, interspersed
> + * with IOFuture objects that need to be notified in sequence
> + *
> + * Synchronised on super.mux.muxLock;
> + */
> + private final Deque sendQueue;
> +
> +
> + /**
> + * Creates a new StreamConnectionIO for the connection represented by
> + * the supplied OutputStream and InputStream pair.
> + */
> + StreamConnectionIO(Mux mux, OutputStream out, InputStream in) {
> + super(mux);
> + this.out = out;
> +// this.out = new BufferedOutputStream(out);
> + this.in = in;
> +
> + outChannel = newChannel(out);
> + inChannel = newChannel(in);
> + sendQueue = new LinkedList();
> + }
> +
> + /**
> + * Starts processing connection data. This method starts
> + * asynchronous actions to read and write from the connection.
> + */
> + @Override
> + void start() throws IOException {
> + try {
> + systemThreadPool.execute(new Writer(), "mux writer");
> + systemThreadPool.execute(new Reader(), "mux reader");
> + } catch (OutOfMemoryError e) { // assume out of threads
> + try {
> + logger.log(Level.WARNING,
> + "could not create thread for request dispatch", e);
> + } catch (Throwable t) {
> + }
> + throw new IOException("could not create I/O threads", e);
> + }
> + }
> +
> + @Override
> + void asyncSend(ByteBuffer buffer) {
> + synchronized (mux.muxLock) {
> + if (mux.muxDown) {
> + return;
> + }
> + sendQueue.addLast(buffer);
> + mux.muxLock.notifyAll();
> + }
> + }
> +
> + @Override
> + void asyncSend(ByteBuffer first, ByteBuffer second) {
> + synchronized (mux.muxLock) {
> + if (mux.muxDown) {
> + return;
> + }
> + sendQueue.addLast(first);
> + sendQueue.addLast(second);
> + mux.muxLock.notifyAll();
> + }
> + }
> +
> + @Override
> + IOFuture futureSend(ByteBuffer first, ByteBuffer second) {
> + synchronized (mux.muxLock) {
> + IOFuture future = new IOFuture();
> + if (mux.muxDown) {
> + IOException ioe = new IOException(mux.muxDownMessage);
> + ioe.initCause(mux.muxDownCause);
> + future.done(ioe);
> + return future;
> + }
> + sendQueue.addLast(first);
> + sendQueue.addLast(second);
> + sendQueue.addLast(future);
> + mux.muxLock.notifyAll();
> + return future;
> + }
> + /*
> + * REMIND: Can/should we implement any sort of
> + * priority inversion avoidance scheme here?
> + */
> + }
> +
> + private class Writer implements Runnable {
> + Writer() { }
> +
> + @Override
> + public void run() {
> + Deque localQueue = null;
> + try {
> + while (true) {
> + synchronized (mux.muxLock) {
> + while (!mux.muxDown && sendQueue.isEmpty()) {
> + /*
> + * REMIND: Should we use a timeout here, to send
> + * occasional PING messages during periods of
> + * inactivity, to make sure connection is alive?
> + */
> + mux.muxLock.wait();
> + /*
> + * Let an interrupt during the wait just kill this
> + * thread, because an interrupt during an I/O write
> + * would leave it in an unrecoverable state anyway.
> + */
> + }
> + if (mux.muxDown && sendQueue.isEmpty()) {
> + logger.log(Level.FINEST,
> + "mux writer thread dying, connection " +
> + "down and nothing more to send");
> + break;
> + }
> + /* Clone an unshared copy and clear the queue while synchronized */
> + localQueue = new LinkedList(sendQueue);
> + sendQueue.clear();
> + }
> +
> + boolean needToFlush = false;
> + ByteBuffer last = null;
> + int lastIndex = Integer.MIN_VALUE;
> + for ( int i = 0; !localQueue.isEmpty(); i++) {
> + Object next = localQueue.getFirst();
> + if (next instanceof ByteBuffer) {
> + ByteBuffer buffer = (ByteBuffer) next;
> + outChannel.write((buffer));
> + last = buffer;
> + lastIndex = i;
> + needToFlush = true;
> + } else {
> + assert next instanceof IOFuture;
> + if (needToFlush) {
> + out.flush();
> + needToFlush = false;
> + }
> + if (lastIndex == i - 1 && last != null){
> + ((IOFuture) next).done(last.position());
> + } else {
> + ((IOFuture) next).done();
> + }
> + }
> + localQueue.removeFirst();
> + }
> + if (needToFlush) {
> + out.flush();
> + }
> + }
> + } catch (InterruptedException e) {
> + try {
> + logger.log(Level.WARNING,
> + "mux writer thread dying, interrupted", e);
> + } catch (Throwable t) {
> + }
> + mux.setDown("mux writer thread interrupted", e);
> + } catch (IOException e) {
> + try {
> + logger.log(Levels.HANDLED,
> + "mux writer thread dying, I/O error", e);
> + } catch (Throwable t) {
> + }
> + mux.setDown("I/O error writing to mux connection: " +
> + e.toString(), e);
> + } catch (Throwable t) {
> + try {
> + logger.log(Level.WARNING,
> + "mux writer thread dying, unexpected exception", t);
> + } catch (Throwable tt) {
> + }
> + mux.setDown("unexpected exception in mux writer thread: " +
> + t.toString(), t);
> + } finally {
> + synchronized (mux.muxLock) {
> + assert mux.muxDown;
> + if (localQueue != null) {
> + drainQueue(localQueue);
> + }
> + drainQueue(sendQueue);
> + }
> + try {
> + outChannel.close();
> + } catch (IOException e) {
> + }
> + }
> + }
> + }
> +
> + private void drainQueue(Deque queue) {
> + while (!queue.isEmpty()) {
> + Object next = queue.removeFirst();
> + if (next instanceof IOFuture) {
> + IOException ioe = new IOException(mux.muxDownMessage);
> + ioe.initCause(mux.muxDownCause);
> + ((IOFuture) next).done(ioe);
> + }
> + }
> + }
> +
> + private class Reader implements Runnable {
> + /** buffer for reading incoming data from connection */
> + private final ByteBuffer inputBuffer =
> + ByteBuffer.allocate(RECEIVE_BUFFER_SIZE); // ready for reading
> +
> + Reader() { }
> +
> + public void run() {
> + try {
> + while (true) {
> + int n = inChannel.read(inputBuffer);
> + if (n == -1) {
> + throw new EOFException();
> + }
> + assert n > 0; // channel is assumed to be blocking
> + mux.processIncomingData(inputBuffer);
> + assert inputBuffer.hasRemaining();
> + }
> + } catch (ProtocolException e) {
> + IOFuture future = null;
> + synchronized (mux.muxLock) {
> + /*
> + * If mux connection is already down, then we probably got
> + * here because of the receipt of a normal protocol-ending
> + * message, like Shutdown or Error, or else something else
> + * went wrong anyway. Otherwise, a real protocol violation
> + * was detected, so respond with an Error message before
> + * taking down the whole mux connection.
> + */
> + if (!mux.muxDown) {
> + try {
> + logger.log(Levels.HANDLED,
> + "mux reader thread dying, protocol error", e);
> + } catch (Throwable t) {
> + }
> + future = mux.futureSendError(e.getMessage());
> + mux.setDown("protocol violation detected: " +
> + e.getMessage(), null);
> + } else {
> + try {
> + logger.log(Level.FINEST,
> + "mux reader thread dying: " + e.getMessage());
> + } catch (Throwable t) {
> + }
> + }
> + }
> + if (future != null) {
> + try {
> + future.waitUntilDone();
> + } catch (IOException ignore) {
> + } catch (InterruptedException interrupt) {
> + Thread.currentThread().interrupt();
> + }
> + }
> + } catch (IOException e) {
> + try {
> + logger.log(Levels.HANDLED,
> + "mux reader thread dying, I/O error", e);
> + } catch (Throwable t) {
> + }
> + mux.setDown("I/O error reading from mux connection: " +
> + e.toString(), e);
> + } catch (Throwable t) {
> + try {
> + logger.log(Level.WARNING,
> + "mux reader thread dying, unexpected exception", t);
> + } catch (Throwable tt) {
> + }
> + mux.setDown("unexpected exception in mux reader thread: " +
> + t.toString(), t);
> + } finally {
> + try {
> + inChannel.close();
> + } catch (IOException e) {
> + }
> + }
> + }
> + }
> +
> + /**
> + * The following two methods are modifications of their
> + * equivalents in java.nio.channels.Channels with the assumption
> + * that the supplied byte buffers are backed by arrays, so no
> + * additional copying is required.
> + */
> +
> + public static ReadableByteChannel newChannel(final InputStream in) {
> + return new ReadableByteChannel() {
> + private boolean open = true;
> +
> + // must be synchronized as per ReadableByteChannel contract
> + @Override
> + public synchronized int read(ByteBuffer dst) throws IOException {
> + assert dst.hasArray();
> + byte[] array = dst.array();
> + int arrayOffset = dst.arrayOffset();
> +
> + int totalRead = 0;
> + int bytesRead = 0;
> + int bytesToRead;
> + while ((bytesToRead = dst.remaining()) > 0) {
> + if ((totalRead > 0) && !(in.available() > 0)) {
> + break; // block at most once
> + }
> + int pos = dst.position();
> + bytesRead = in.read(array, arrayOffset + pos, bytesToRead);
> + if (bytesRead < 0) {
> + break;
> + } else {
> + dst.position(pos + bytesRead);
> + totalRead += bytesRead;
> + }
> + }
> + if ((bytesRead < 0) && (totalRead == 0)) {
> + return -1;
> + }
> +
> + return totalRead;
> + }
> +
> + @Override
> + public synchronized boolean isOpen() {
> + return open;
> + }
> +
> + // Blocking as per Channel contract
> + @Override
> + public synchronized void close() throws IOException {
> + in.close();
> + open = false;
> + }
> + };
> + }
> +
> + public static WritableByteChannel newChannel(final OutputStream out) {
> + return new WritableByteChannel() {
> + private volatile boolean open = true;
> +
> + // This method must block while writing as per WritableByteChannel contract.
> + @Override
> + public synchronized int write(ByteBuffer src) throws IOException {
> + assert src.hasArray();
> +
> + int len = src.remaining();
> + if (len > 0) {
> + int pos = src.position();
> + out.write(src.array(), src.arrayOffset() + pos, len);
> + src.position(pos + len);
> + }
> + return len;
> + }
> +
> + @Override
> + public boolean isOpen() {
> + return open;
> + }
> +
> + // This method must block as per the Channel contract
> + @Override
> + public synchronized void close() throws IOException {
> + out.close();
> + open = false;
> + }
> + };
> + }
> +
> +}
>
>