You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@river.apache.org by pe...@apache.org on 2014/05/09 09:03:19 UTC
svn commit: r1593493 [9/24] - in /river/jtsk/skunk/qa_refactor/trunk: qa/
qa/src/com/sun/jini/test/impl/end2end/jssewrapper/
qa/src/com/sun/jini/test/impl/joinmanager/
qa/src/com/sun/jini/test/impl/mahalo/
qa/src/com/sun/jini/test/impl/outrigger/matchi...
Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/Mux.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/Mux.java?rev=1593493&r1=1593492&r2=1593493&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/Mux.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/Mux.java Fri May 9 07:03:18 2014
@@ -1,1280 +1,1278 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.sun.jini.jeri.internal.mux;
-
-import com.sun.jini.jeri.internal.runtime.HexDumpEncoder;
-import com.sun.jini.thread.Executor;
-import com.sun.jini.thread.GetThreadPoolAction;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.CharacterCodingException;
-import java.nio.charset.Charset;
-import java.nio.charset.CharsetDecoder;
-import java.nio.charset.CharsetEncoder;
-import java.security.AccessController;
-import java.util.BitSet;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Mux is the abstract superclass of both client-side and server-side
- * multiplexed connections.
- *
- * @author Sun Microsystems, Inc.
- **/
-abstract class Mux {
-
- static final int CLIENT = 0;
- static final int SERVER = 1;
-
- static final int MAX_SESSION_ID = 0x7F;
- public static final int MAX_REQUESTS = MAX_SESSION_ID + 1;
-
- static final int NoOperation = 0x00; // 00000000
- static final int Shutdown = 0x02; // 00000010
- static final int Ping = 0x04; // 00000100
- static final int PingAck = 0x06; // 00000110
- static final int Error = 0x08; // 00001000
- static final int IncrementRation = 0x10; // 0001***0
- static final int Abort = 0x20; // 001000*0
- static final int Close = 0x30; // 00110000
- static final int Acknowledgment = 0x40; // 00100000
- static final int Data = 0x80; // 100****0
-
- static final int IncrementRation_shift = 0x0E;
- static final int Abort_partial = 0x02;
- static final int Data_open = 0x10;
- static final int Data_close = 0x08;
- static final int Data_eof = 0x04;
- static final int Data_ackRequired = 0x02;
-
- static final int ClientConnectionHeader_negotiate = 0x01;
-
- private static final byte[] magic = {
- (byte) 'J', (byte) 'm', (byte) 'u', (byte) 'x' // 0x4A6D7578
- };
-
- private static final int VERSION = 0x01;
-
- /**
- * pool of threads for executing tasks in system thread group:
- * used for shutting down sessions when a connection goes down
- */
- private static final Executor systemThreadPool =
- (Executor) AccessController.doPrivileged(
- new GetThreadPoolAction(false));
-
- /** session shutdown tasks to be executed asynchronously */
- private static final LinkedList sessionShutdownQueue = new LinkedList();
-
- private static class SessionShutdownTask implements Runnable {
- private final Session[] sessions;
- private final String message;
- private final Throwable cause;
-
- SessionShutdownTask(Session[] sessions,
- String message,
- Throwable cause)
- {
- this.sessions = sessions;
- this.message = message;
- this.cause = cause;
- }
-
- public void run() {
- for (int i = 0; i < sessions.length; i++) {
- sessions[i].setDown(message, cause);
- }
- }
- }
-
- /** mux logger */
- private static final Logger logger =
- Logger.getLogger("net.jini.jeri.connection.mux");
-
- final int role;
- final int initialInboundRation;
- final int maxFragmentSize;
-
- private final ConnectionIO connectionIO;
- private final boolean directBuffersUseful;
-
- /** lock guarding all mutable instance state (below) */
- final Object muxLock = new Object();
-
- int initialOutboundRation; // set from remote connection header
- private boolean clientConnectionReady = false; // server header received
- boolean serverConnectionReady = false; // server header sent
-
- boolean muxDown = false;
- String muxDownMessage;
- Throwable muxDownCause;
-
- final BitSet busySessions = new BitSet();
- final Map sessions = new HashMap(5);
-
- private int expectedPingCookie = -1;
-
- /** unguarded instance state */
- private volatile long startTimeout = 30000; // milliseconds
-
- /**
- * Constructs a new Mux instance for a connection accessible through
- * standard (blocking) I/O streams.
- */
- Mux(OutputStream out, InputStream in,
- int role, int initialInboundRation, int maxFragmentSize)
- throws IOException
- {
- this.role = role;
- if ((initialInboundRation & ~0x00FFFF00) != 0) {
- throw new IllegalArgumentException(
- "illegal initial inbound ration: " +
- toHexString(initialInboundRation));
- }
- this.initialInboundRation = initialInboundRation;
- this.maxFragmentSize = maxFragmentSize;
-
- this.connectionIO = new StreamConnectionIO(this, out, in);
- directBuffersUseful = false;
- }
-
- Mux(SocketChannel channel,
- int role, int initialInboundRation, int maxFragmentSize)
- throws IOException
- {
- this.role = role;
- if ((initialInboundRation & ~0x00FFFF00) != 0) {
- throw new IllegalArgumentException(
- "illegal initial inbound ration: " +
- toHexString(initialInboundRation));
- }
- this.initialInboundRation = initialInboundRation;
- this.maxFragmentSize = maxFragmentSize;
-
- this.connectionIO = new SocketChannelConnectionIO(this, channel);
- directBuffersUseful = true;
- }
-
- /**
- * Time in milliseconds for client-side connections to wait for the server
- * to acknowledge an opening handshake. The default value is 15000
- * milliseconds (15 seconds).
- *
- * <p>
- * This method is not thread-safe. It is expected to be called immediately
- * after a constructor.
- *
- * @param timeout
- * positive value in milliseconds
- */
- public void setStartTimeout(long timeout) {
- if (timeout <= 0)
- throw new IllegalArgumentException("start timeout must be a positive number of milliseconds");
- this.startTimeout = timeout;
- }
-
- /**
- * Starts I/O processing.
- *
- * This method should be invoked only after this instance has
- * been completely initialized, so that subclasses will not
- * see uninitialized state.
- */
- public void start() throws IOException {
- if (role == CLIENT) {
- readState = READ_SERVER_CONNECTION_HEADER;
- } else {
- assert role == SERVER;
- readState = READ_CLIENT_CONNECTION_HEADER;
- }
-
- try {
- connectionIO.start();
- } catch (IOException e) {
- setDown("I/O error starting connection", e);
- throw e;
- }
-
- if (role == CLIENT) {
- asyncSendClientConnectionHeader();
- synchronized (muxLock) {
- long now = System.currentTimeMillis();
- long endTime = now + this.startTimeout;
- while (!muxDown && !clientConnectionReady) {
- if (now >= endTime) {
- setDown("timeout waiting for server to respond to handshake", null);
- } else {
- try {
- muxLock.wait(endTime - now);
- now = System.currentTimeMillis();
- } catch (InterruptedException e) {
- setDown("interrupt waiting for connection header", e);
- }
- }
- }
- if (muxDown) {
- IOException ioe = new IOException(muxDownMessage);
- ioe.initCause(muxDownCause);
- throw ioe;
- }
- }
- }
- }
-
- /**
- * Handles indication that this multiplexed connection has
- * gone down, either through normal operation or failure.
- *
- * This method should be overridden by subclasses that want to
- * implement custom behavior when this connection has gone down.
- */
- protected void handleDown() {
- }
-
- /**
- * This method is invoked internally and is intended to be
- * overridden by subclasses.
- */
- void handleOpen(int sessionID) throws ProtocolException {
- throw new ProtocolException(
- "remote endpoint attempted to open session");
- }
-
- /**
- *
- * This method is intended to be invoked by subclasses only.
- *
- * This method must ONLY be invoked while synchronized on muxLock
- * and while muxDown is false.
- */
- final void addSession(int sessionID, Session session) {
- assert Thread.holdsLock(muxLock);
- assert !muxDown;
- assert !busySessions.get(sessionID);
- assert sessions.get(Integer.valueOf(sessionID)) == null;
-
- busySessions.set(sessionID);
- sessions.put(Integer.valueOf(sessionID), session);
- }
-
- /**
- *
- * This method is intended to be invoked by this class and
- * subclasses only.
- *
- * This method MAY be invoked while synchronized on muxLock.
- */
- final void setDown(final String message, final Throwable cause) {
- synchronized (muxLock) {
- if (muxDown) {
- return;
- }
- muxDown = true;
- muxDownMessage = message;
- muxDownCause = cause;
- muxLock.notifyAll();
- }
-
- /*
- * The following should be safe because we just left the
- * synchonized block, and after setting the muxDown latch
- * therein, no other thread should ever touch the "sessions"
- * data structure.
- *
- * Sessions are shut down asynchronously in a separate thread
- * to avoid deadlock, in case our caller holds muxLock,
- * because individual session locks must never be acquired
- * while holding muxLock.
- */
- boolean needWorker = false;
- synchronized (sessionShutdownQueue) {
- if (!sessions.values().isEmpty()) {
- sessionShutdownQueue.add(new SessionShutdownTask(
- (Session[]) sessions.values().toArray(
- new Session[sessions.values().size()]),
- message, cause));
- needWorker = true;
- } else {
- needWorker = !sessionShutdownQueue.isEmpty();
- }
- }
- if (needWorker) {
- try {
- systemThreadPool.execute(new Runnable() {
- public void run() {
- while (true) {
- Runnable task;
- synchronized (sessionShutdownQueue) {
- if (sessionShutdownQueue.isEmpty()) {
- break;
- }
- task = (Runnable)
- sessionShutdownQueue.removeFirst();
- }
- task.run();
- }
- }
- }, "mux session shutdown");
- } catch (OutOfMemoryError e) { // assume out of threads
- try {
- logger.log(Level.WARNING,
- "could not create thread for session shutdown", e);
- } catch (Throwable t) {
- }
- // absorb exception to proceed with connection shutdown;
- // session shutdown task will remain on queue for later
- }
- }
-
- handleDown();
- }
-
- /**
- * Removes the identified session from the session table.
- *
- * This method is intended to be invoked by the associated Session
- * object only.
- */
- final void removeSession(int sessionID) {
- synchronized (muxLock) {
- if (muxDown) {
- return;
- }
- assert busySessions.get(sessionID);
- busySessions.clear(sessionID);
- sessions.remove(Integer.valueOf(sessionID));
- }
- }
-
- /**
- * Returns true if it would be useful to pass direct buffers to
- * this instance's *Send* methods (because the underlying I/O
- * implementation will pass such buffers directly to channel write
- * operations); returns false otherwise.
- */
- final boolean directBuffersUseful() {
- return directBuffersUseful;
- }
-
- /**
- * Sends the ClientConnectionHeader message for this connection.
- */
- final void asyncSendClientConnectionHeader() {
- assert role == CLIENT;
-
- ByteBuffer header = ByteBuffer.allocate(8);
- header.put(magic)
- .put((byte) VERSION)
- .putShort((short) (initialInboundRation >> 8))
- .put((byte) 0)
- .flip();
- connectionIO.asyncSend(header);
- }
-
- /**
- * Sends the ServerConnectionHeader message for this connection.
- */
- final void asyncSendServerConnectionHeader() {
- assert role == SERVER;
-
- ByteBuffer header = ByteBuffer.allocate(8);
- header.put(magic)
- .put((byte) VERSION)
- .putShort((short) (initialInboundRation >> 8))
- .put((byte) 0)
- .flip();
- connectionIO.asyncSend(header);
- }
-
- /**
- * Sends a NoOperation message with the contents of the supplied buffer
- * as the data.
- *
- * The "length" of the NoOperation message will be the number of bytes
- * remaining in the buffer, and the data sent will be the contents
- * of the buffer between its current position and its limit. Or if
- * the buffer argument is null, "length" will simply be zero...
- * REMIND: split into two methods instead?
- *
- * The actual writing to the underlying connection, including access to
- * the buffer's content and other state, is asynchronous with the
- * invocation of this method; therefore, the supplied buffer must not
- * be mutated even after this method has returned.
- */
- final void asyncSendNoOperation(ByteBuffer buffer) {
- ByteBuffer header = ByteBuffer.allocate(4);
- header.put((byte) NoOperation)
- .put((byte) 0);
-
- if (buffer != null) {
- assert buffer.remaining() <= 0xFFFF;
- header.putShort((short) buffer.remaining())
- .flip();
- connectionIO.asyncSend(header, buffer);
- } else {
- header.putShort((short) 0)
- .flip();
- connectionIO.asyncSend(header);
- }
- }
-
- /**
- * Sends a Shutdown message with the UTF-8 encoding of the supplied
- * message as the data. If message is null, then zero bytes of data
- * will be sent with the message header.
- */
- final void asyncSendShutdown(String message) {
- ByteBuffer data = (message != null ?
- getUTF8BufferFromString(message) : null);
-
- ByteBuffer header = ByteBuffer.allocate(4);
- header.put((byte) Shutdown)
- .put((byte) 0);
-
- if (data != null) {
- assert data.remaining() <= 0xFFFF;
- header.putShort((short) data.remaining())
- .flip();
- connectionIO.asyncSend(header, data);
- } else {
- header.putShort((short) 0)
- .flip();
- connectionIO.asyncSend(header);
- }
- }
-
- /**
- * Sends a Ping message with the specified "cookie".
- */
- final void asyncSendPing(int cookie) {
- assert cookie >= 0 && cookie <= 0xFFFF;
-
- ByteBuffer header = ByteBuffer.allocate(4);
- header.put((byte) Ping)
- .put((byte) 0)
- .putShort((short) cookie)
- .flip();
- connectionIO.asyncSend(header);
- }
-
- /**
- * Sends a PingAck message with the specified "cookie".
- */
- final void asyncSendPingAck(int cookie) {
- assert cookie >= 0 && cookie <= 0xFFFF;
-
- ByteBuffer header = ByteBuffer.allocate(4);
- header.put((byte) PingAck)
- .put((byte) 0)
- .putShort((short) cookie)
- .flip();
- connectionIO.asyncSend(header);
- }
-
- /**
- * Sends an Error message with the UTF-8 encoding of the supplied
- * message as the data. If message is null, then zero bytes of data
- * will be sent with the message header.
- */
- final void asyncSendError(String message) {
- ByteBuffer data = (message != null ?
- getUTF8BufferFromString(message) : null);
-
- ByteBuffer header = ByteBuffer.allocate(4);
- header.put((byte) Error)
- .put((byte) 0);
-
- if (data != null) {
- assert data.remaining() <= 0xFFFF;
- header.putShort((short) data.remaining())
- .flip();
- connectionIO.asyncSend(header, data);
- } else {
- header.putShort((short) 0)
- .flip();
- connectionIO.asyncSend(header);
- }
- }
-
- /**
- * Sends an Error message with the UTF-8 encoding of the supplied
- * message as the data. If message is null, then zero bytes of data
- * will be sent with the message header.
- */
- final IOFuture futureSendError(String message) {
- ByteBuffer data = getUTF8BufferFromString(message);
-
- ByteBuffer header = ByteBuffer.allocate(4);
- header.put((byte) Error)
- .put((byte) 0);
-
- assert data.remaining() <= 0xFFFF;
- header.putShort((short) data.remaining())
- .flip();
- return connectionIO.futureSend(header, data);
- }
-
- /**
- * Sends an IncrementRation message for the specified "sessionID" and
- * the specified "increment".
- */
- final void asyncSendIncrementRation(/*****int op, *****/int sessionID,
- int increment)
- {
- final int op = IncrementRation;
-// assert (op & 0xF1) == IncrementRation; // validate operation code
-// assert (op & 0xE0) == 0; // NYI: support use of shift
- assert sessionID >= 0 && sessionID <= MAX_SESSION_ID;
- assert increment >= 0 && increment <= 0xFFFF;
-
- ByteBuffer header = ByteBuffer.allocate(4);
- header.put((byte) op)
- .put((byte) sessionID)
- .putShort((short) increment)
- .flip();
- connectionIO.asyncSend(header);
- }
-
- /**
- * Sends an Abort message for the specified "sessionID" with the contents
- * of the specified buffer as the data.
- *
- * The "length" of the Abort message will be the number of bytes
- * remaining in the buffer, and the data sent will be the contents
- * of the buffer between its current position and its limit. Or if
- * the buffer argument is null, "length" will simply be zero...
- * REMIND: split into two methods instead?
- *
- * For efficiency, the caller is responsible for pre-computing the first
- * byte of the message, including any control flags if appropriate.
- */
- final void asyncSendAbort(int op, int sessionID, ByteBuffer data) {
- assert (op & 0xFD) == Abort; // validate operation code
- assert sessionID >= 0 && sessionID <= MAX_SESSION_ID;
-
- ByteBuffer header = ByteBuffer.allocate(4);
- header.put((byte) op)
- .put((byte) sessionID);
-
- if (data != null) {
- assert data.remaining() <= 0xFFFF;
- header.putShort((short) data.remaining())
- .flip();
- connectionIO.asyncSend(header, data);
- } else {
- header.putShort((short) 0)
- .flip();
- connectionIO.asyncSend(header);
- }
- }
-
- /**
- * Sends a Close message for the specified "sessionID".
- */
- final void asyncSendClose(int sessionID) {
- assert sessionID >= 0 && sessionID <= MAX_SESSION_ID;
-
- ByteBuffer header = ByteBuffer.allocate(4);
- header.put((byte) Close)
- .put((byte) sessionID)
- .putShort((short) 0)
- .flip();
- connectionIO.asyncSend(header);
- }
-
- /**
- * Sends an Acknowledgment message for the specified "sessionID".
- */
- final void asyncSendAcknowledgment(int sessionID) {
- assert sessionID >= 0 && sessionID <= MAX_SESSION_ID;
-
- ByteBuffer header = ByteBuffer.allocate(4);
- header.put((byte) Acknowledgment)
- .put((byte) sessionID)
- .putShort((short) 0)
- .flip();
- connectionIO.asyncSend(header);
- }
-
- /**
- * Sends a Data message for the specified "sessionID" with the contents
- * of the supplied buffer as the data.
- *
- * The "length" of the Data message will be the number of bytes
- * remaining in the buffer, and the data sent will be the contents
- * of the buffer between its current position and its limit. Or if
- * the buffer argument is null, "length" will simply be zero...
- * REMIND: split into two methods instead?
- *
- * For efficiency, the caller is responsible for pre-computing the first
- * byte of the Data message, including any control flags if appropriate.
- *
- * The actual writing to the underlying connection, including access to
- * the buffer's content and other state, is asynchronous with the
- * invocation of this method; therefore, the supplied buffer must not
- * be mutated even after this method has returned.
- */
- final void asyncSendData(int op, int sessionID, ByteBuffer data) {
- assert (op & 0xE1) == Data; // validate operation code
- assert (op & Data_eof) != 0 || // close and ackRequired require eof
- (op & Data_close & Data_ackRequired) == 0;
- assert sessionID >= 0 && sessionID <= MAX_SESSION_ID;
-
- ByteBuffer header = ByteBuffer.allocate(4);
- header.put((byte) op)
- .put((byte) sessionID);
-
- if (data != null) {
- assert data.remaining() <= 0xFFFF;
- header.putShort((short) data.remaining())
- .flip();
- connectionIO.asyncSend(header, data);
- } else {
- header.putShort((short) 0)
- .flip();
- connectionIO.asyncSend(header);
- }
- }
-
- /**
- * Sends a Data message for the specified sessionID with the contents
- * of the supplied buffer as the data.
- *
- * The "length" of the Data message will be the number of bytes
- * remaining in the buffer, and the data sent will be the contents
- * of the buffer between its current position and its limit.
- *
- * For efficiency, the caller is responsible for pre-computing the first
- * byte of the Data message, including any control flags if appropriate.
- *
- * The actual writing to the underlying connection, including access to
- * the buffer's content and other state, is asynchronous with the
- * invocation of this method; therefore, the supplied buffer must not
- * be mutated even after this method has returned, until it is guaranteed
- * that use of the buffer has completed.
- *
- * The returned IOFuture object can be used to wait until the write has
- * definitely completed (or will definitely not complete due to some
- * failure). After the write has completed, the buffer's position will
- * have been incremented to its limit (which will not have changed).
- */
- final IOFuture futureSendData(int op, int sessionID, ByteBuffer data) {
- assert (op & 0xE1) == Data; // verify operation code
- assert (op & Data_eof) != 0 || // close and ackRequired require eof
- (op & Data_close & Data_ackRequired) == 0;
- assert sessionID >= 0 && sessionID <= MAX_SESSION_ID;
- assert data.remaining() <= 0xFFFF;
-
- ByteBuffer header = ByteBuffer.allocate(4);
- header.put((byte) op)
- .put((byte) sessionID)
- .putShort((short) data.remaining())
- .flip();
- return connectionIO.futureSend(header, data);
- }
-
- /*
- * read states
- */
- private static final int READ_CLIENT_CONNECTION_HEADER = 0;
- private static final int READ_SERVER_CONNECTION_HEADER = 1;
- private static final int READ_MESSAGE_HEADER = 2;
- private static final int READ_MESSAGE_BODY = 3;
-
- /*
- * current read state lock and variables
- */
- private final Object readStateLock = new Object();
- private int readState;
- private int currentOp;
- private int currentSessionID;
- private int currentLengthRemaining;
- private ByteBuffer currentDataBuffer = null;
-
- void processIncomingData(ByteBuffer buffer) throws ProtocolException {
- buffer.flip(); // process data that has been read into buffer
- assert buffer.hasRemaining();
-
- synchronized (readStateLock) {
- stateLoop:
- do {
- switch (readState) {
- case READ_CLIENT_CONNECTION_HEADER:
- if (!readClientConnectionHeader(buffer)) {
- break stateLoop;
- }
- break;
-
- case READ_SERVER_CONNECTION_HEADER:
- if (!readServerConnectionHeader(buffer)) {
- break stateLoop;
- }
- break;
-
- case READ_MESSAGE_HEADER:
- if (!readMessageHeader(buffer)) {
- break stateLoop;
- }
- break;
-
- case READ_MESSAGE_BODY:
- if (!readMessageBody(buffer)) {
- break stateLoop;
- }
- break;
-
- default:
- throw new AssertionError();
- }
- } while (buffer.hasRemaining());
- }
-
- buffer.compact();
- }
-
- private boolean readClientConnectionHeader(ByteBuffer buffer)
- throws ProtocolException
- {
- assert role == SERVER;
-
- validatePartialMagicNumber(buffer);
- if (buffer.remaining() < 8) {
- return false; // wait for complete header to arrive
- }
- int headerPosition = buffer.position();
- buffer.position(headerPosition + 4); // skip header already checked
- int version = (buffer.get() & 0xFF);
- int ration = (buffer.getShort() & 0xFFFF) << 8;
- int flags = (buffer.get() & 0xFF);
- boolean negotiate = (flags & ClientConnectionHeader_negotiate) != 0;
-
- synchronized (muxLock) {
- initialOutboundRation = ration;
- asyncSendServerConnectionHeader();
-
- if (version == 0) {
- throw new ProtocolException(
- "bad protocol version: " + version);
- }
- if (version > VERSION) {
- if (!negotiate) {
- setDown("unsupported protocol version: " + version, null);
- throw new ProtocolException(
- "unsupported protocol version: " + version);
- }
- }
-
- serverConnectionReady = true;
- }
-
- readState = READ_MESSAGE_HEADER;
- return true;
- }
-
- private boolean readServerConnectionHeader(ByteBuffer buffer)
- throws ProtocolException
- {
- assert role == CLIENT;
-
- validatePartialMagicNumber(buffer);
-
- if (buffer.remaining() < 8) {
- return false;
- }
- int headerPosition = buffer.position();
- buffer.position(headerPosition + 4); // skip header already checked
- int version = (buffer.get() & 0xFF);
- int ration = (buffer.getShort() & 0xFFFF) << 8;
- int flags = (buffer.get() & 0xFF);
-
- synchronized (muxLock) {
- initialOutboundRation = ration;
-
- if (version == 0) {
- throw new ProtocolException(
- "bad protocol version: " + version);
- }
- if (version > VERSION) {
- throw new ProtocolException(
- "unexpected protocol version: " + version);
- }
-
- clientConnectionReady = true;
- muxLock.notifyAll();
- }
-
- readState = READ_MESSAGE_HEADER;
- return true;
- }
-
- private void validatePartialMagicNumber(ByteBuffer buffer)
- throws ProtocolException
- {
- if (buffer.remaining() > 0) {
- byte[] temp = new byte[Math.min(buffer.remaining(), magic.length)];
- buffer.mark();
- buffer.get(temp);
- buffer.reset();
- for (int i = 0; i < temp.length; i++) {
- if (temp[i] != magic[i]) {
- setDown((role == CLIENT ? "server" : "client") +
- " sent bad magic number: " + toHexString(temp), null);
- throw new ProtocolException("bad magic number: " +
- toHexString(temp));
- }
- }
- }
- }
-
- private boolean readMessageHeader(ByteBuffer buffer)
- throws ProtocolException
- {
- if (buffer.remaining() < 4) {
- return false; // wait for complete header to arrive
- }
- int headerPosition = buffer.position();
- if (logger.isLoggable(Level.FINEST)) {
- logger.log(Level.FINEST,
- "message header: " +
- toHexString(buffer.getInt(headerPosition)));
- }
-
- int op = (buffer.get() & 0xFF);
- if ((op & 0xE1) == Data) {
- int sessionID = (buffer.get() & 0xFF);
- if (sessionID > MAX_SESSION_ID) {
- throw new ProtocolException("bad message header: " +
- toHexString(buffer.getInt(headerPosition)));
- }
- currentOp = op;
- currentSessionID = sessionID;
- currentLengthRemaining = (buffer.getShort() & 0xFFFF);
- if (currentLengthRemaining > 0) {
- currentDataBuffer =
- ByteBuffer.allocate(currentLengthRemaining);
- readState = READ_MESSAGE_BODY;
- } else {
- dispatchCurrentMessage();
- }
- return true;
-
- } else if ((op & 0xF1) == IncrementRation) {
- int sessionID = (buffer.get() & 0xFF);
- if (sessionID > MAX_SESSION_ID) {
- throw new ProtocolException("bad message header: " +
- toHexString(buffer.getInt(headerPosition)));
- }
- int increment = (buffer.getShort() & 0xFFFF);
- int shift = op & IncrementRation_shift;
- increment <<= shift;
- handleIncrementRation(sessionID, increment);
- return true;
-
- } else if ((op & 0xFD) == Abort) {
- int sessionID = (buffer.get() & 0xFF);
- if (sessionID > MAX_SESSION_ID) {
- throw new ProtocolException("bad message header: " +
- toHexString(buffer.getInt(headerPosition)));
- }
- currentOp = op;
- currentSessionID = sessionID;
- currentLengthRemaining = (buffer.getShort() & 0xFFFF);
- if (currentLengthRemaining > 0) {
- currentDataBuffer =
- ByteBuffer.allocate(currentLengthRemaining);
- readState = READ_MESSAGE_BODY;
- } else {
- dispatchCurrentMessage();
- }
- return true;
-
- }
- switch (op) {
- case NoOperation: {
- if (buffer.get() != 0) { // ignore sign extension
- throw new ProtocolException("bad message header: " +
- toHexString(buffer.getInt(headerPosition)));
- }
- currentOp = op;
- currentLengthRemaining = (buffer.getShort() & 0xFFFF);
- currentDataBuffer = null; // ignore data for NoOperation
- if (currentLengthRemaining > 0) {
- readState = READ_MESSAGE_BODY;
- } else {
- dispatchCurrentMessage();
- }
- return true;
- }
-
- case Shutdown: {
- if (buffer.get() != 0) { // ignore sign extension
- throw new ProtocolException("bad message header: " +
- toHexString(buffer.getInt(headerPosition)));
- }
- currentOp = op;
- currentLengthRemaining = (buffer.getShort() & 0xFFFF);
- if (currentLengthRemaining > 0) {
- currentDataBuffer =
- ByteBuffer.allocate(currentLengthRemaining);
- readState = READ_MESSAGE_BODY;
- } else {
- dispatchCurrentMessage();
- }
- return true;
- }
-
- case Ping: {
- if (buffer.get() != 0) { // ignore sign extension
- throw new ProtocolException("bad message header: " +
- toHexString(buffer.getInt(headerPosition)));
- }
- int cookie = (buffer.getShort() & 0xFFFF);
- handlePing(cookie);
- return true;
- }
-
- case PingAck: {
- if (buffer.get() != 0) { // ignore sign extension
- throw new ProtocolException("bad message header: " +
- toHexString(buffer.getInt(headerPosition)));
- }
- int cookie = (buffer.getShort() & 0xFFFF);
- handlePingAck(cookie);
- return true;
- }
-
- case Error: {
- if (buffer.get() != 0) { // ignore sign extension
- throw new ProtocolException("bad message header: " +
- toHexString(buffer.getInt(headerPosition)));
- }
- currentOp = op;
- currentLengthRemaining = (buffer.getShort() & 0xFFFF);
- if (currentLengthRemaining > 0) {
- currentDataBuffer =
- ByteBuffer.allocate(currentLengthRemaining);
- readState = READ_MESSAGE_BODY;
- } else {
- dispatchCurrentMessage();
- }
- return true;
- }
-
- case Close: {
- int sessionID = (buffer.get() & 0xFF);
- if (sessionID > MAX_SESSION_ID ||
- buffer.getShort() != 0) // ignore sign extension
- {
- throw new ProtocolException("bad message header: " +
- toHexString(buffer.getInt(headerPosition)));
- }
- handleClose(sessionID);
- return true;
- }
-
- case Acknowledgment: {
- int sessionID = (buffer.get() & 0xFF);
- if (sessionID > MAX_SESSION_ID ||
- buffer.getShort() != 0) // ignore sign extension
- {
- throw new ProtocolException("bad message header: " +
- toHexString(buffer.getInt(headerPosition)));
- }
- handleAcknowledgment(sessionID);
- return true;
- }
-
- default:
- throw new ProtocolException("bad message header: " +
- toHexString(buffer.getInt(headerPosition)));
- }
- }
-
- private boolean readMessageBody(ByteBuffer buffer)
- throws ProtocolException
- {
- assert currentLengthRemaining > 0;
- assert currentDataBuffer == null ||
- currentDataBuffer.remaining() == currentLengthRemaining;
-
- if (buffer.remaining() > currentLengthRemaining) {
- int origLimit = buffer.limit();
- buffer.limit(buffer.position() + currentLengthRemaining);
- if (currentDataBuffer != null) {
- currentDataBuffer.put(buffer);
- } else {
- buffer.position(buffer.position() + currentLengthRemaining);
- }
- currentLengthRemaining = 0;
- buffer.limit(origLimit);
- } else {
- currentLengthRemaining -= buffer.remaining();
- if (currentDataBuffer != null) {
- currentDataBuffer.put(buffer);
- } else {
- buffer.position(buffer.limit());
- }
- }
-
- if (currentLengthRemaining > 0) {
- return false;
- } else {
- currentDataBuffer.flip();
- dispatchCurrentMessage();
- currentDataBuffer = null; // don't let this linger
- readState = READ_MESSAGE_HEADER;
- return true;
- }
- }
-
- private void dispatchCurrentMessage() throws ProtocolException {
- assert currentDataBuffer == null || currentDataBuffer.hasRemaining();
-
- int op = currentOp;
- if ((op & 0xE1) == Data) {
- boolean open = (op & Data_open) != 0;
- boolean close = (op & Data_close) != 0;
- boolean eof = (op & Data_eof) != 0;
- boolean ackRequired = (op & Data_ackRequired) != 0;
- handleData(currentSessionID, open, close, eof, ackRequired,
- (currentDataBuffer != null ?
- currentDataBuffer : ByteBuffer.allocate(0)));
- return;
-
- } else if ((op & 0xFD) == Abort) {
- boolean partial = (op & Abort_partial) != 0;
- handleAbort(currentSessionID, partial,
- (currentDataBuffer != null ?
- getStringFromUTF8Buffer(currentDataBuffer) : ""));
- return;
-
- }
- switch (op) {
- case NoOperation:
- handleNoOperation();
- return;
-
- case Shutdown:
- handleShutdown(currentDataBuffer != null ?
- getStringFromUTF8Buffer(currentDataBuffer) : "");
- return;
-
- case Error:
- handleError(currentDataBuffer != null ?
- getStringFromUTF8Buffer(currentDataBuffer) : "");
- return;
-
- default:
- throw new AssertionError(Integer.toHexString((byte) op));
- }
- }
-
- private void handleNoOperation() throws ProtocolException {
- if (logger.isLoggable(Level.FINEST)) {
- logger.log(Level.FINEST, "NoOperation");
- }
-
- // do nothing
- }
-
- private void handleShutdown(String message) throws ProtocolException {
- if (logger.isLoggable(Level.FINEST)) {
- logger.log(Level.FINEST, "Shutdown");
- }
-
- if (role != CLIENT) {
- throw new ProtocolException("Shutdown sent by client");
- }
- setDown("mux connection shut down gracefully", null);
- throw new ProtocolException("received Shutdown message");
- }
-
- private void handlePing(int cookie) throws ProtocolException {
- if (logger.isLoggable(Level.FINEST)) {
- logger.log(Level.FINEST, "Ping: cookie=" + cookie);
- }
-
- asyncSendPingAck(cookie);
- }
-
- private void handlePingAck(int cookie) throws ProtocolException {
- if (logger.isLoggable(Level.FINEST)) {
- logger.log(Level.FINEST, "PingAck: cookie=" + cookie);
- }
-
- synchronized (muxLock) {
- if (cookie != expectedPingCookie) {
- throw new ProtocolException(
- "unexpected ping cookie: " + cookie);
- } else {
- expectedPingCookie = -1;
- // NYI: rest of ping machinery
- }
- }
- }
-
- private void handleError(String message) throws ProtocolException {
- if (logger.isLoggable(Level.FINEST)) {
- logger.log(Level.FINEST, "Error");
- }
-
- setDown((role == CLIENT ? "server" : "client") +
- " reported protocol error: " + message, null);
- throw new ProtocolException("received Error message");
- }
-
- private void handleIncrementRation(int sessionID, int increment)
- throws ProtocolException
- {
- if (logger.isLoggable(Level.FINEST)) {
- logger.log(Level.FINEST,
- "IncrementRation: sessionID=" + sessionID +
- ",increment=" + increment);
- }
-
- getSession(sessionID).handleIncrementRation(increment);
- }
-
- private void handleAbort(int sessionID, boolean partial, String message)
- throws ProtocolException
- {
- if (logger.isLoggable(Level.FINEST)) {
- logger.log(Level.FINEST,
- "Abort: sessionID=" + sessionID +
- ",partial=" + partial);
- }
-
- getSession(sessionID).handleAbort(partial);
- }
-
- private void handleClose(int sessionID) throws ProtocolException {
- if (logger.isLoggable(Level.FINEST)) {
- logger.log(Level.FINEST, "Close: sessionID=" + sessionID);
- }
-
- getSession(sessionID).handleClose();
- }
-
- private void handleAcknowledgment(int sessionID) throws ProtocolException {
- if (logger.isLoggable(Level.FINEST)) {
- logger.log(Level.FINEST, "Acknowledgment: sessionID=" + sessionID);
- }
-
- getSession(sessionID).handleAcknowledgment();
- }
-
- private void handleData(int sessionID, boolean open, boolean close,
- boolean eof, boolean ackRequired, ByteBuffer data)
- throws ProtocolException
- {
- if (logger.isLoggable(Level.FINEST)) {
- int length = data.remaining();
- HexDumpEncoder encoder = new HexDumpEncoder();
- byte[] bytes = new byte[data.remaining()];
- data.mark();
- data.get(bytes);
- data.reset();
- logger.log(Level.FINEST,
- "Data: sessionID=" + sessionID +
- (open ? ",open" : "") +
- (close ? ",close" : "") +
- (eof ? ",eof" : "") +
- (ackRequired ? ",ackRequired" : "") +
- ",length=" + length +
- (length > 0 ? ",data=\n" + encoder.encode(bytes) : ""));
- }
-
- if (!eof && (close || ackRequired)) {
- throw new ProtocolException("Data: eof=" + eof +
- ",close=" + close +
- ",ackRequired=" + ackRequired);
- }
-
- if (open) {
- handleOpen(sessionID);
- }
-
- getSession(sessionID).handleData(data, eof, close, ackRequired);
- }
-
- private Session getSession(int sessionID) throws ProtocolException {
- synchronized (muxLock) {
- if (!busySessions.get(sessionID)) {
- throw new ProtocolException(
- "inactive sessionID: " + sessionID);
- }
- return (Session) sessions.get(Integer.valueOf(sessionID));
- }
- }
-
- private static ByteBuffer getUTF8BufferFromString(String s) {
- CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder();
- try {
- return encoder.encode(CharBuffer.wrap(s));
- } catch (CharacterCodingException e) {
- return null;
- }
- }
-
- private static String getStringFromUTF8Buffer(ByteBuffer buffer) {
- CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
- try {
- return decoder.decode(buffer).toString();
- } catch (CharacterCodingException e) {
- return "(error decoding UTF-8 message: " + e.toString() + ")";
- }
- }
-
- private static String toHexString(byte x) {
- char[] buf = new char[2];
- buf[0] = toHexChar((x >> 4) & 0xF);
- buf[1] = toHexChar(x & 0xF);
- return new String(buf);
- }
-
- private static String toHexString(int x) {
- char[] buf = new char[8];
- for (int i = 0; i < 8; i++) {
- buf[i] = toHexChar((x >> ((7 - i) * 4)) & 0xF);
- }
- return new String(buf);
- }
-
- private static String toHexString(byte[] b) {
- char[] buf = new char[b.length * 2];
- int j = 0;
- for (int i = 0; i < b.length; i++) {
- buf[j++] = toHexChar((b[i] >> 4) & 0xF);
- buf[j++] = toHexChar(b[i] & 0xF);
- }
- return new String(buf);
- }
-
- private static char toHexChar(int x) {
- return x < 10 ? (char) ('0' + x) : (char) ('A' - 10 + x);
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.sun.jini.jeri.internal.mux;
+
+import com.sun.jini.jeri.internal.runtime.HexDumpEncoder;
+import com.sun.jini.thread.Executor;
+import com.sun.jini.thread.GetThreadPoolAction;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CharsetEncoder;
+import java.security.AccessController;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Mux is the abstract superclass of both client-side and server-side
+ * multiplexed connections.
+ *
+ * @author Sun Microsystems, Inc.
+ **/
+abstract class Mux {
+
+ static final int CLIENT = 0;
+ static final int SERVER = 1;
+
+ static final int MAX_SESSION_ID = 0x7F;
+ public static final int MAX_REQUESTS = MAX_SESSION_ID + 1;
+
+ static final int NoOperation = 0x00; // 00000000
+ static final int Shutdown = 0x02; // 00000010
+ static final int Ping = 0x04; // 00000100
+ static final int PingAck = 0x06; // 00000110
+ static final int Error = 0x08; // 00001000
+ static final int IncrementRation = 0x10; // 0001***0
+ static final int Abort = 0x20; // 001000*0
+ static final int Close = 0x30; // 00110000
+ static final int Acknowledgment = 0x40; // 00100000
+ static final int Data = 0x80; // 100****0
+
+ static final int IncrementRation_shift = 0x0E;
+ static final int Abort_partial = 0x02;
+ static final int Data_open = 0x10;
+ static final int Data_close = 0x08;
+ static final int Data_eof = 0x04;
+ static final int Data_ackRequired = 0x02;
+
+ static final int ClientConnectionHeader_negotiate = 0x01;
+
+ private static final byte[] magic = {
+ (byte) 'J', (byte) 'm', (byte) 'u', (byte) 'x' // 0x4A6D7578
+ };
+
+ private static final int VERSION = 0x01;
+
+ /**
+ * pool of threads for executing tasks in system thread group:
+ * used for shutting down sessions when a connection goes down
+ */
+ private static final Executor systemThreadPool =
+ (Executor) AccessController.doPrivileged(
+ new GetThreadPoolAction(false));
+
+ /** session shutdown tasks to be executed asynchronously */
+ private static final LinkedList sessionShutdownQueue = new LinkedList();
+
+ private static class SessionShutdownTask implements Runnable {
+ private final Session[] sessions;
+ private final String message;
+ private final Throwable cause;
+
+ SessionShutdownTask(Session[] sessions,
+ String message,
+ Throwable cause)
+ {
+ this.sessions = sessions;
+ this.message = message;
+ this.cause = cause;
+ }
+
+ public void run() {
+ for (int i = 0; i < sessions.length; i++) {
+ sessions[i].setDown(message, cause);
+ }
+ }
+ }
+
+ /** mux logger */
+ private static final Logger logger =
+ Logger.getLogger("net.jini.jeri.connection.mux");
+
+ final int role;
+ final int initialInboundRation;
+ final int maxFragmentSize;
+
+ private final ConnectionIO connectionIO;
+ private final boolean directBuffersUseful;
+
+ /** lock guarding all mutable instance state (below) */
+ final Object muxLock = new Object();
+
+ int initialOutboundRation; // set from remote connection header
+ private boolean clientConnectionReady = false; // server header received
+ boolean serverConnectionReady = false; // server header sent
+
+ boolean muxDown = false;
+ String muxDownMessage;
+ Throwable muxDownCause;
+
+ final BitSet busySessions = new BitSet();
+ final Map sessions = new HashMap(5);
+
+ private int expectedPingCookie = -1;
+
+ /** unguarded instance state */
+ private volatile long startTimeout = 30000; // milliseconds
+
+ /**
+ * Constructs a new Mux instance for a connection accessible through
+ * standard (blocking) I/O streams.
+ */
+ Mux(OutputStream out, InputStream in,
+ int role, int initialInboundRation, int maxFragmentSize)
+ throws IOException
+ {
+ this.role = role;
+ if ((initialInboundRation & ~0x00FFFF00) != 0) {
+ throw new IllegalArgumentException(
+ "illegal initial inbound ration: " +
+ toHexString(initialInboundRation));
+ }
+ this.initialInboundRation = initialInboundRation;
+ this.maxFragmentSize = maxFragmentSize;
+
+ this.connectionIO = new StreamConnectionIO(this, out, in);
+ directBuffersUseful = false;
+ }
+
+ Mux(SocketChannel channel,
+ int role, int initialInboundRation, int maxFragmentSize)
+ throws IOException
+ {
+ this.role = role;
+ if ((initialInboundRation & ~0x00FFFF00) != 0) {
+ throw new IllegalArgumentException(
+ "illegal initial inbound ration: " +
+ toHexString(initialInboundRation));
+ }
+ this.initialInboundRation = initialInboundRation;
+ this.maxFragmentSize = maxFragmentSize;
+
+ this.connectionIO = new SocketChannelConnectionIO(this, channel);
+ directBuffersUseful = true;
+ }
+
+ /**
+ * Time in milliseconds for client-side connections to wait for the server
+ * to acknowledge an opening handshake. The default value is 15000
+ * milliseconds (15 seconds).
+ *
+ * <p>
+ * This method is not thread-safe. It is expected to be called immediately
+ * after a constructor.
+ *
+ * @param timeout
+ * positive value in milliseconds
+ */
+ public void setStartTimeout(long timeout) {
+ if (timeout <= 0)
+ throw new IllegalArgumentException("start timeout must be a positive number of milliseconds");
+ this.startTimeout = timeout;
+ }
+
+ /**
+ * Starts I/O processing.
+ *
+ * This method should be invoked only after this instance has
+ * been completely initialized, so that subclasses will not
+ * see uninitialized state.
+ */
+ public void start() throws IOException {
+ if (role == CLIENT) {
+ readState = READ_SERVER_CONNECTION_HEADER;
+ } else {
+ assert role == SERVER;
+ readState = READ_CLIENT_CONNECTION_HEADER;
+ }
+
+ try {
+ connectionIO.start();
+ } catch (IOException e) {
+ setDown("I/O error starting connection", e);
+ throw e;
+ }
+
+ if (role == CLIENT) {
+ asyncSendClientConnectionHeader();
+ synchronized (muxLock) {
+ long now = System.currentTimeMillis();
+ long endTime = now + this.startTimeout;
+ while (!muxDown && !clientConnectionReady) {
+ if (now >= endTime) {
+ setDown("timeout waiting for server to respond to handshake", null);
+ } else {
+ try {
+ muxLock.wait(endTime - now);
+ now = System.currentTimeMillis();
+ } catch (InterruptedException e) {
+ setDown("interrupt waiting for connection header", e);
+ }
+ }
+ }
+ if (muxDown) {
+ throw new IOException(muxDownMessage, muxDownCause);
+ }
+ }
+ }
+ }
+
+ /**
+ * Handles indication that this multiplexed connection has
+ * gone down, either through normal operation or failure.
+ *
+ * This method should be overridden by subclasses that want to
+ * implement custom behavior when this connection has gone down.
+ */
+ protected void handleDown() {
+ }
+
+ /**
+ * This method is invoked internally and is intended to be
+ * overridden by subclasses.
+ */
+ void handleOpen(int sessionID) throws ProtocolException {
+ throw new ProtocolException(
+ "remote endpoint attempted to open session");
+ }
+
+ /**
+ *
+ * This method is intended to be invoked by subclasses only.
+ *
+ * This method must ONLY be invoked while synchronized on muxLock
+ * and while muxDown is false.
+ */
+ final void addSession(int sessionID, Session session) {
+ assert Thread.holdsLock(muxLock);
+ assert !muxDown;
+ assert !busySessions.get(sessionID);
+ assert sessions.get(Integer.valueOf(sessionID)) == null;
+
+ busySessions.set(sessionID);
+ sessions.put(Integer.valueOf(sessionID), session);
+ }
+
+ /**
+ *
+ * This method is intended to be invoked by this class and
+ * subclasses only.
+ *
+ * This method MAY be invoked while synchronized on muxLock.
+ */
+ final void setDown(final String message, final Throwable cause) {
+ synchronized (muxLock) {
+ if (muxDown) {
+ return;
+ }
+ muxDown = true;
+ muxDownMessage = message;
+ muxDownCause = cause;
+ muxLock.notifyAll();
+ }
+
+ /*
+ * The following should be safe because we just left the
+ * synchonized block, and after setting the muxDown latch
+ * therein, no other thread should ever touch the "sessions"
+ * data structure.
+ *
+ * Sessions are shut down asynchronously in a separate thread
+ * to avoid deadlock, in case our caller holds muxLock,
+ * because individual session locks must never be acquired
+ * while holding muxLock.
+ */
+ boolean needWorker = false;
+ synchronized (sessionShutdownQueue) {
+ if (!sessions.values().isEmpty()) {
+ sessionShutdownQueue.add(new SessionShutdownTask(
+ (Session[]) sessions.values().toArray(
+ new Session[sessions.values().size()]),
+ message, cause));
+ needWorker = true;
+ } else {
+ needWorker = !sessionShutdownQueue.isEmpty();
+ }
+ }
+ if (needWorker) {
+ try {
+ systemThreadPool.execute(new Runnable() {
+ public void run() {
+ while (true) {
+ Runnable task;
+ synchronized (sessionShutdownQueue) {
+ if (sessionShutdownQueue.isEmpty()) {
+ break;
+ }
+ task = (Runnable)
+ sessionShutdownQueue.removeFirst();
+ }
+ task.run();
+ }
+ }
+ }, "mux session shutdown");
+ } catch (OutOfMemoryError e) { // assume out of threads
+ try {
+ logger.log(Level.WARNING,
+ "could not create thread for session shutdown", e);
+ } catch (Throwable t) {
+ }
+ // absorb exception to proceed with connection shutdown;
+ // session shutdown task will remain on queue for later
+ }
+ }
+
+ handleDown();
+ }
+
+ /**
+ * Removes the identified session from the session table.
+ *
+ * This method is intended to be invoked by the associated Session
+ * object only.
+ */
+ final void removeSession(int sessionID) {
+ synchronized (muxLock) {
+ if (muxDown) {
+ return;
+ }
+ assert busySessions.get(sessionID);
+ busySessions.clear(sessionID);
+ sessions.remove(Integer.valueOf(sessionID));
+ }
+ }
+
+ /**
+ * Returns true if it would be useful to pass direct buffers to
+ * this instance's *Send* methods (because the underlying I/O
+ * implementation will pass such buffers directly to channel write
+ * operations); returns false otherwise.
+ */
+ final boolean directBuffersUseful() {
+ return directBuffersUseful;
+ }
+
+ /**
+ * Sends the ClientConnectionHeader message for this connection.
+ */
+ final void asyncSendClientConnectionHeader() {
+ assert role == CLIENT;
+
+ ByteBuffer header = ByteBuffer.allocate(8);
+ header.put(magic)
+ .put((byte) VERSION)
+ .putShort((short) (initialInboundRation >> 8))
+ .put((byte) 0)
+ .flip();
+ connectionIO.asyncSend(header);
+ }
+
+ /**
+ * Sends the ServerConnectionHeader message for this connection.
+ */
+ final void asyncSendServerConnectionHeader() {
+ assert role == SERVER;
+
+ ByteBuffer header = ByteBuffer.allocate(8);
+ header.put(magic)
+ .put((byte) VERSION)
+ .putShort((short) (initialInboundRation >> 8))
+ .put((byte) 0)
+ .flip();
+ connectionIO.asyncSend(header);
+ }
+
+ /**
+ * Sends a NoOperation message with the contents of the supplied buffer
+ * as the data.
+ *
+ * The "length" of the NoOperation message will be the number of bytes
+ * remaining in the buffer, and the data sent will be the contents
+ * of the buffer between its current position and its limit. Or if
+ * the buffer argument is null, "length" will simply be zero...
+ * REMIND: split into two methods instead?
+ *
+ * The actual writing to the underlying connection, including access to
+ * the buffer's content and other state, is asynchronous with the
+ * invocation of this method; therefore, the supplied buffer must not
+ * be mutated even after this method has returned.
+ */
+ final void asyncSendNoOperation(ByteBuffer buffer) {
+ ByteBuffer header = ByteBuffer.allocate(4);
+ header.put((byte) NoOperation)
+ .put((byte) 0);
+
+ if (buffer != null) {
+ assert buffer.remaining() <= 0xFFFF;
+ header.putShort((short) buffer.remaining())
+ .flip();
+ connectionIO.asyncSend(header, buffer);
+ } else {
+ header.putShort((short) 0)
+ .flip();
+ connectionIO.asyncSend(header);
+ }
+ }
+
+ /**
+ * Sends a Shutdown message with the UTF-8 encoding of the supplied
+ * message as the data. If message is null, then zero bytes of data
+ * will be sent with the message header.
+ */
+ final void asyncSendShutdown(String message) {
+ ByteBuffer data = (message != null ?
+ getUTF8BufferFromString(message) : null);
+
+ ByteBuffer header = ByteBuffer.allocate(4);
+ header.put((byte) Shutdown)
+ .put((byte) 0);
+
+ if (data != null) {
+ assert data.remaining() <= 0xFFFF;
+ header.putShort((short) data.remaining())
+ .flip();
+ connectionIO.asyncSend(header, data);
+ } else {
+ header.putShort((short) 0)
+ .flip();
+ connectionIO.asyncSend(header);
+ }
+ }
+
+ /**
+ * Sends a Ping message with the specified "cookie".
+ */
+ final void asyncSendPing(int cookie) {
+ assert cookie >= 0 && cookie <= 0xFFFF;
+
+ ByteBuffer header = ByteBuffer.allocate(4);
+ header.put((byte) Ping)
+ .put((byte) 0)
+ .putShort((short) cookie)
+ .flip();
+ connectionIO.asyncSend(header);
+ }
+
+ /**
+ * Sends a PingAck message with the specified "cookie".
+ */
+ final void asyncSendPingAck(int cookie) {
+ assert cookie >= 0 && cookie <= 0xFFFF;
+
+ ByteBuffer header = ByteBuffer.allocate(4);
+ header.put((byte) PingAck)
+ .put((byte) 0)
+ .putShort((short) cookie)
+ .flip();
+ connectionIO.asyncSend(header);
+ }
+
+ /**
+ * Sends an Error message with the UTF-8 encoding of the supplied
+ * message as the data. If message is null, then zero bytes of data
+ * will be sent with the message header.
+ */
+ final void asyncSendError(String message) {
+ ByteBuffer data = (message != null ?
+ getUTF8BufferFromString(message) : null);
+
+ ByteBuffer header = ByteBuffer.allocate(4);
+ header.put((byte) Error)
+ .put((byte) 0);
+
+ if (data != null) {
+ assert data.remaining() <= 0xFFFF;
+ header.putShort((short) data.remaining())
+ .flip();
+ connectionIO.asyncSend(header, data);
+ } else {
+ header.putShort((short) 0)
+ .flip();
+ connectionIO.asyncSend(header);
+ }
+ }
+
+ /**
+ * Sends an Error message with the UTF-8 encoding of the supplied
+ * message as the data. If message is null, then zero bytes of data
+ * will be sent with the message header.
+ */
+ final IOFuture futureSendError(String message) {
+ ByteBuffer data = getUTF8BufferFromString(message);
+
+ ByteBuffer header = ByteBuffer.allocate(4);
+ header.put((byte) Error)
+ .put((byte) 0);
+
+ assert data.remaining() <= 0xFFFF;
+ header.putShort((short) data.remaining())
+ .flip();
+ return connectionIO.futureSend(header, data);
+ }
+
+ /**
+ * Sends an IncrementRation message for the specified "sessionID" and
+ * the specified "increment".
+ */
+ final void asyncSendIncrementRation(/*****int op, *****/int sessionID,
+ int increment)
+ {
+ final int op = IncrementRation;
+// assert (op & 0xF1) == IncrementRation; // validate operation code
+// assert (op & 0xE0) == 0; // NYI: support use of shift
+ assert sessionID >= 0 && sessionID <= MAX_SESSION_ID;
+ assert increment >= 0 && increment <= 0xFFFF;
+
+ ByteBuffer header = ByteBuffer.allocate(4);
+ header.put((byte) op)
+ .put((byte) sessionID)
+ .putShort((short) increment)
+ .flip();
+ connectionIO.asyncSend(header);
+ }
+
+ /**
+ * Sends an Abort message for the specified "sessionID" with the contents
+ * of the specified buffer as the data.
+ *
+ * The "length" of the Abort message will be the number of bytes
+ * remaining in the buffer, and the data sent will be the contents
+ * of the buffer between its current position and its limit. Or if
+ * the buffer argument is null, "length" will simply be zero...
+ * REMIND: split into two methods instead?
+ *
+ * For efficiency, the caller is responsible for pre-computing the first
+ * byte of the message, including any control flags if appropriate.
+ */
+ final void asyncSendAbort(int op, int sessionID, ByteBuffer data) {
+ assert (op & 0xFD) == Abort; // validate operation code
+ assert sessionID >= 0 && sessionID <= MAX_SESSION_ID;
+
+ ByteBuffer header = ByteBuffer.allocate(4);
+ header.put((byte) op)
+ .put((byte) sessionID);
+
+ if (data != null) {
+ assert data.remaining() <= 0xFFFF;
+ header.putShort((short) data.remaining())
+ .flip();
+ connectionIO.asyncSend(header, data);
+ } else {
+ header.putShort((short) 0)
+ .flip();
+ connectionIO.asyncSend(header);
+ }
+ }
+
+ /**
+ * Sends a Close message for the specified "sessionID".
+ */
+ final void asyncSendClose(int sessionID) {
+ assert sessionID >= 0 && sessionID <= MAX_SESSION_ID;
+
+ ByteBuffer header = ByteBuffer.allocate(4);
+ header.put((byte) Close)
+ .put((byte) sessionID)
+ .putShort((short) 0)
+ .flip();
+ connectionIO.asyncSend(header);
+ }
+
+ /**
+ * Sends an Acknowledgment message for the specified "sessionID".
+ */
+ final void asyncSendAcknowledgment(int sessionID) {
+ assert sessionID >= 0 && sessionID <= MAX_SESSION_ID;
+
+ ByteBuffer header = ByteBuffer.allocate(4);
+ header.put((byte) Acknowledgment)
+ .put((byte) sessionID)
+ .putShort((short) 0)
+ .flip();
+ connectionIO.asyncSend(header);
+ }
+
+ /**
+ * Sends a Data message for the specified "sessionID" with the contents
+ * of the supplied buffer as the data.
+ *
+ * The "length" of the Data message will be the number of bytes
+ * remaining in the buffer, and the data sent will be the contents
+ * of the buffer between its current position and its limit. Or if
+ * the buffer argument is null, "length" will simply be zero...
+ * REMIND: split into two methods instead?
+ *
+ * For efficiency, the caller is responsible for pre-computing the first
+ * byte of the Data message, including any control flags if appropriate.
+ *
+ * The actual writing to the underlying connection, including access to
+ * the buffer's content and other state, is asynchronous with the
+ * invocation of this method; therefore, the supplied buffer must not
+ * be mutated even after this method has returned.
+ */
+ final void asyncSendData(int op, int sessionID, ByteBuffer data) {
+ assert (op & 0xE1) == Data; // validate operation code
+ assert (op & Data_eof) != 0 || // close and ackRequired require eof
+ (op & Data_close & Data_ackRequired) == 0;
+ assert sessionID >= 0 && sessionID <= MAX_SESSION_ID;
+
+ ByteBuffer header = ByteBuffer.allocate(4);
+ header.put((byte) op)
+ .put((byte) sessionID);
+
+ if (data != null) {
+ assert data.remaining() <= 0xFFFF;
+ header.putShort((short) data.remaining())
+ .flip();
+ connectionIO.asyncSend(header, data);
+ } else {
+ header.putShort((short) 0)
+ .flip();
+ connectionIO.asyncSend(header);
+ }
+ }
+
+ /**
+ * Sends a Data message for the specified sessionID with the contents
+ * of the supplied buffer as the data.
+ *
+ * The "length" of the Data message will be the number of bytes
+ * remaining in the buffer, and the data sent will be the contents
+ * of the buffer between its current position and its limit.
+ *
+ * For efficiency, the caller is responsible for pre-computing the first
+ * byte of the Data message, including any control flags if appropriate.
+ *
+ * The actual writing to the underlying connection, including access to
+ * the buffer's content and other state, is asynchronous with the
+ * invocation of this method; therefore, the supplied buffer must not
+ * be mutated even after this method has returned, until it is guaranteed
+ * that use of the buffer has completed.
+ *
+ * The returned IOFuture object can be used to wait until the write has
+ * definitely completed (or will definitely not complete due to some
+ * failure). After the write has completed, the buffer's position will
+ * have been incremented to its limit (which will not have changed).
+ */
+ final IOFuture futureSendData(int op, int sessionID, ByteBuffer data) {
+ assert (op & 0xE1) == Data; // verify operation code
+ assert (op & Data_eof) != 0 || // close and ackRequired require eof
+ (op & Data_close & Data_ackRequired) == 0;
+ assert sessionID >= 0 && sessionID <= MAX_SESSION_ID;
+ assert data.remaining() <= 0xFFFF;
+
+ ByteBuffer header = ByteBuffer.allocate(4);
+ header.put((byte) op)
+ .put((byte) sessionID)
+ .putShort((short) data.remaining())
+ .flip();
+ return connectionIO.futureSend(header, data);
+ }
+
+ /*
+ * read states
+ */
+ private static final int READ_CLIENT_CONNECTION_HEADER = 0;
+ private static final int READ_SERVER_CONNECTION_HEADER = 1;
+ private static final int READ_MESSAGE_HEADER = 2;
+ private static final int READ_MESSAGE_BODY = 3;
+
+ /*
+ * current read state lock and variables
+ */
+ private final Object readStateLock = new Object();
+ private int readState;
+ private int currentOp;
+ private int currentSessionID;
+ private int currentLengthRemaining;
+ private ByteBuffer currentDataBuffer = null;
+
+ void processIncomingData(ByteBuffer buffer) throws ProtocolException {
+ buffer.flip(); // process data that has been read into buffer
+ assert buffer.hasRemaining();
+
+ synchronized (readStateLock) {
+ stateLoop:
+ do {
+ switch (readState) {
+ case READ_CLIENT_CONNECTION_HEADER:
+ if (!readClientConnectionHeader(buffer)) {
+ break stateLoop;
+ }
+ break;
+
+ case READ_SERVER_CONNECTION_HEADER:
+ if (!readServerConnectionHeader(buffer)) {
+ break stateLoop;
+ }
+ break;
+
+ case READ_MESSAGE_HEADER:
+ if (!readMessageHeader(buffer)) {
+ break stateLoop;
+ }
+ break;
+
+ case READ_MESSAGE_BODY:
+ if (!readMessageBody(buffer)) {
+ break stateLoop;
+ }
+ break;
+
+ default:
+ throw new AssertionError();
+ }
+ } while (buffer.hasRemaining());
+ }
+
+ buffer.compact();
+ }
+
+ private boolean readClientConnectionHeader(ByteBuffer buffer)
+ throws ProtocolException
+ {
+ assert role == SERVER;
+
+ validatePartialMagicNumber(buffer);
+ if (buffer.remaining() < 8) {
+ return false; // wait for complete header to arrive
+ }
+ int headerPosition = buffer.position();
+ buffer.position(headerPosition + 4); // skip header already checked
+ int version = (buffer.get() & 0xFF);
+ int ration = (buffer.getShort() & 0xFFFF) << 8;
+ int flags = (buffer.get() & 0xFF);
+ boolean negotiate = (flags & ClientConnectionHeader_negotiate) != 0;
+
+ synchronized (muxLock) {
+ initialOutboundRation = ration;
+ asyncSendServerConnectionHeader();
+
+ if (version == 0) {
+ throw new ProtocolException(
+ "bad protocol version: " + version);
+ }
+ if (version > VERSION) {
+ if (!negotiate) {
+ setDown("unsupported protocol version: " + version, null);
+ throw new ProtocolException(
+ "unsupported protocol version: " + version);
+ }
+ }
+
+ serverConnectionReady = true;
+ }
+
+ readState = READ_MESSAGE_HEADER;
+ return true;
+ }
+
+ private boolean readServerConnectionHeader(ByteBuffer buffer)
+ throws ProtocolException
+ {
+ assert role == CLIENT;
+
+ validatePartialMagicNumber(buffer);
+
+ if (buffer.remaining() < 8) {
+ return false;
+ }
+ int headerPosition = buffer.position();
+ buffer.position(headerPosition + 4); // skip header already checked
+ int version = (buffer.get() & 0xFF);
+ int ration = (buffer.getShort() & 0xFFFF) << 8;
+ int flags = (buffer.get() & 0xFF);
+
+ synchronized (muxLock) {
+ initialOutboundRation = ration;
+
+ if (version == 0) {
+ throw new ProtocolException(
+ "bad protocol version: " + version);
+ }
+ if (version > VERSION) {
+ throw new ProtocolException(
+ "unexpected protocol version: " + version);
+ }
+
+ clientConnectionReady = true;
+ muxLock.notifyAll();
+ }
+
+ readState = READ_MESSAGE_HEADER;
+ return true;
+ }
+
+ private void validatePartialMagicNumber(ByteBuffer buffer)
+ throws ProtocolException
+ {
+ if (buffer.remaining() > 0) {
+ byte[] temp = new byte[Math.min(buffer.remaining(), magic.length)];
+ buffer.mark();
+ buffer.get(temp);
+ buffer.reset();
+ for (int i = 0; i < temp.length; i++) {
+ if (temp[i] != magic[i]) {
+ setDown((role == CLIENT ? "server" : "client") +
+ " sent bad magic number: " + toHexString(temp), null);
+ throw new ProtocolException("bad magic number: " +
+ toHexString(temp));
+ }
+ }
+ }
+ }
+
+ private boolean readMessageHeader(ByteBuffer buffer)
+ throws ProtocolException
+ {
+ if (buffer.remaining() < 4) {
+ return false; // wait for complete header to arrive
+ }
+ int headerPosition = buffer.position();
+ if (logger.isLoggable(Level.FINEST)) {
+ logger.log(Level.FINEST,
+ "message header: " +
+ toHexString(buffer.getInt(headerPosition)));
+ }
+
+ int op = (buffer.get() & 0xFF);
+ if ((op & 0xE1) == Data) {
+ int sessionID = (buffer.get() & 0xFF);
+ if (sessionID > MAX_SESSION_ID) {
+ throw new ProtocolException("bad message header: " +
+ toHexString(buffer.getInt(headerPosition)));
+ }
+ currentOp = op;
+ currentSessionID = sessionID;
+ currentLengthRemaining = (buffer.getShort() & 0xFFFF);
+ if (currentLengthRemaining > 0) {
+ currentDataBuffer =
+ ByteBuffer.allocate(currentLengthRemaining);
+ readState = READ_MESSAGE_BODY;
+ } else {
+ dispatchCurrentMessage();
+ }
+ return true;
+
+ } else if ((op & 0xF1) == IncrementRation) {
+ int sessionID = (buffer.get() & 0xFF);
+ if (sessionID > MAX_SESSION_ID) {
+ throw new ProtocolException("bad message header: " +
+ toHexString(buffer.getInt(headerPosition)));
+ }
+ int increment = (buffer.getShort() & 0xFFFF);
+ int shift = op & IncrementRation_shift;
+ increment <<= shift;
+ handleIncrementRation(sessionID, increment);
+ return true;
+
+ } else if ((op & 0xFD) == Abort) {
+ int sessionID = (buffer.get() & 0xFF);
+ if (sessionID > MAX_SESSION_ID) {
+ throw new ProtocolException("bad message header: " +
+ toHexString(buffer.getInt(headerPosition)));
+ }
+ currentOp = op;
+ currentSessionID = sessionID;
+ currentLengthRemaining = (buffer.getShort() & 0xFFFF);
+ if (currentLengthRemaining > 0) {
+ currentDataBuffer =
+ ByteBuffer.allocate(currentLengthRemaining);
+ readState = READ_MESSAGE_BODY;
+ } else {
+ dispatchCurrentMessage();
+ }
+ return true;
+
+ }
+ switch (op) {
+ case NoOperation: {
+ if (buffer.get() != 0) { // ignore sign extension
+ throw new ProtocolException("bad message header: " +
+ toHexString(buffer.getInt(headerPosition)));
+ }
+ currentOp = op;
+ currentLengthRemaining = (buffer.getShort() & 0xFFFF);
+ currentDataBuffer = null; // ignore data for NoOperation
+ if (currentLengthRemaining > 0) {
+ readState = READ_MESSAGE_BODY;
+ } else {
+ dispatchCurrentMessage();
+ }
+ return true;
+ }
+
+ case Shutdown: {
+ if (buffer.get() != 0) { // ignore sign extension
+ throw new ProtocolException("bad message header: " +
+ toHexString(buffer.getInt(headerPosition)));
+ }
+ currentOp = op;
+ currentLengthRemaining = (buffer.getShort() & 0xFFFF);
+ if (currentLengthRemaining > 0) {
+ currentDataBuffer =
+ ByteBuffer.allocate(currentLengthRemaining);
+ readState = READ_MESSAGE_BODY;
+ } else {
+ dispatchCurrentMessage();
+ }
+ return true;
+ }
+
+ case Ping: {
+ if (buffer.get() != 0) { // ignore sign extension
+ throw new ProtocolException("bad message header: " +
+ toHexString(buffer.getInt(headerPosition)));
+ }
+ int cookie = (buffer.getShort() & 0xFFFF);
+ handlePing(cookie);
+ return true;
+ }
+
+ case PingAck: {
+ if (buffer.get() != 0) { // ignore sign extension
+ throw new ProtocolException("bad message header: " +
+ toHexString(buffer.getInt(headerPosition)));
+ }
+ int cookie = (buffer.getShort() & 0xFFFF);
+ handlePingAck(cookie);
+ return true;
+ }
+
+ case Error: {
+ if (buffer.get() != 0) { // ignore sign extension
+ throw new ProtocolException("bad message header: " +
+ toHexString(buffer.getInt(headerPosition)));
+ }
+ currentOp = op;
+ currentLengthRemaining = (buffer.getShort() & 0xFFFF);
+ if (currentLengthRemaining > 0) {
+ currentDataBuffer =
+ ByteBuffer.allocate(currentLengthRemaining);
+ readState = READ_MESSAGE_BODY;
+ } else {
+ dispatchCurrentMessage();
+ }
+ return true;
+ }
+
+ case Close: {
+ int sessionID = (buffer.get() & 0xFF);
+ if (sessionID > MAX_SESSION_ID ||
+ buffer.getShort() != 0) // ignore sign extension
+ {
+ throw new ProtocolException("bad message header: " +
+ toHexString(buffer.getInt(headerPosition)));
+ }
+ handleClose(sessionID);
+ return true;
+ }
+
+ case Acknowledgment: {
+ int sessionID = (buffer.get() & 0xFF);
+ if (sessionID > MAX_SESSION_ID ||
+ buffer.getShort() != 0) // ignore sign extension
+ {
+ throw new ProtocolException("bad message header: " +
+ toHexString(buffer.getInt(headerPosition)));
+ }
+ handleAcknowledgment(sessionID);
+ return true;
+ }
+
+ default:
+ throw new ProtocolException("bad message header: " +
+ toHexString(buffer.getInt(headerPosition)));
+ }
+ }
+
+ private boolean readMessageBody(ByteBuffer buffer)
+ throws ProtocolException
+ {
+ assert currentLengthRemaining > 0;
+ assert currentDataBuffer == null ||
+ currentDataBuffer.remaining() == currentLengthRemaining;
+
+ if (buffer.remaining() > currentLengthRemaining) {
+ int origLimit = buffer.limit();
+ buffer.limit(buffer.position() + currentLengthRemaining);
+ if (currentDataBuffer != null) {
+ currentDataBuffer.put(buffer);
+ } else {
+ buffer.position(buffer.position() + currentLengthRemaining);
+ }
+ currentLengthRemaining = 0;
+ buffer.limit(origLimit);
+ } else {
+ currentLengthRemaining -= buffer.remaining();
+ if (currentDataBuffer != null) {
+ currentDataBuffer.put(buffer);
+ } else {
+ buffer.position(buffer.limit());
+ }
+ }
+
+ if (currentLengthRemaining > 0) {
+ return false;
+ } else {
+ currentDataBuffer.flip();
+ dispatchCurrentMessage();
+ currentDataBuffer = null; // don't let this linger
+ readState = READ_MESSAGE_HEADER;
+ return true;
+ }
+ }
+
+ private void dispatchCurrentMessage() throws ProtocolException {
+ assert currentDataBuffer == null || currentDataBuffer.hasRemaining();
+
+ int op = currentOp;
+ if ((op & 0xE1) == Data) {
+ boolean open = (op & Data_open) != 0;
+ boolean close = (op & Data_close) != 0;
+ boolean eof = (op & Data_eof) != 0;
+ boolean ackRequired = (op & Data_ackRequired) != 0;
+ handleData(currentSessionID, open, close, eof, ackRequired,
+ (currentDataBuffer != null ?
+ currentDataBuffer : ByteBuffer.allocate(0)));
+ return;
+
+ } else if ((op & 0xFD) == Abort) {
+ boolean partial = (op & Abort_partial) != 0;
+ handleAbort(currentSessionID, partial,
+ (currentDataBuffer != null ?
+ getStringFromUTF8Buffer(currentDataBuffer) : ""));
+ return;
+
+ }
+ switch (op) {
+ case NoOperation:
+ handleNoOperation();
+ return;
+
+ case Shutdown:
+ handleShutdown(currentDataBuffer != null ?
+ getStringFromUTF8Buffer(currentDataBuffer) : "");
+ return;
+
+ case Error:
+ handleError(currentDataBuffer != null ?
+ getStringFromUTF8Buffer(currentDataBuffer) : "");
+ return;
+
+ default:
+ throw new AssertionError(Integer.toHexString((byte) op));
+ }
+ }
+
+ private void handleNoOperation() throws ProtocolException {
+ if (logger.isLoggable(Level.FINEST)) {
+ logger.log(Level.FINEST, "NoOperation");
+ }
+
+ // do nothing
+ }
+
+ private void handleShutdown(String message) throws ProtocolException {
+ if (logger.isLoggable(Level.FINEST)) {
+ logger.log(Level.FINEST, "Shutdown");
+ }
+
+ if (role != CLIENT) {
+ throw new ProtocolException("Shutdown sent by client");
+ }
+ setDown("mux connection shut down gracefully", null);
+ throw new ProtocolException("received Shutdown message");
+ }
+
+ private void handlePing(int cookie) throws ProtocolException {
+ if (logger.isLoggable(Level.FINEST)) {
+ logger.log(Level.FINEST, "Ping: cookie=" + cookie);
+ }
+
+ asyncSendPingAck(cookie);
+ }
+
+ private void handlePingAck(int cookie) throws ProtocolException {
+ if (logger.isLoggable(Level.FINEST)) {
+ logger.log(Level.FINEST, "PingAck: cookie=" + cookie);
+ }
+
+ synchronized (muxLock) {
+ if (cookie != expectedPingCookie) {
+ throw new ProtocolException(
+ "unexpected ping cookie: " + cookie);
+ } else {
+ expectedPingCookie = -1;
+ // NYI: rest of ping machinery
+ }
+ }
+ }
+
+ private void handleError(String message) throws ProtocolException {
+ if (logger.isLoggable(Level.FINEST)) {
+ logger.log(Level.FINEST, "Error");
+ }
+
+ setDown((role == CLIENT ? "server" : "client") +
+ " reported protocol error: " + message, null);
+ throw new ProtocolException("received Error message");
+ }
+
+ private void handleIncrementRation(int sessionID, int increment)
+ throws ProtocolException
+ {
+ if (logger.isLoggable(Level.FINEST)) {
+ logger.log(Level.FINEST,
+ "IncrementRation: sessionID=" + sessionID +
+ ",increment=" + increment);
+ }
+
+ getSession(sessionID).handleIncrementRation(increment);
+ }
+
+ private void handleAbort(int sessionID, boolean partial, String message)
+ throws ProtocolException
+ {
+ if (logger.isLoggable(Level.FINEST)) {
+ logger.log(Level.FINEST,
+ "Abort: sessionID=" + sessionID +
+ ",partial=" + partial);
+ }
+
+ getSession(sessionID).handleAbort(partial);
+ }
+
+ private void handleClose(int sessionID) throws ProtocolException {
+ if (logger.isLoggable(Level.FINEST)) {
+ logger.log(Level.FINEST, "Close: sessionID=" + sessionID);
+ }
+
+ getSession(sessionID).handleClose();
+ }
+
+ private void handleAcknowledgment(int sessionID) throws ProtocolException {
+ if (logger.isLoggable(Level.FINEST)) {
+ logger.log(Level.FINEST, "Acknowledgment: sessionID=" + sessionID);
+ }
+
+ getSession(sessionID).handleAcknowledgment();
+ }
+
+ private void handleData(int sessionID, boolean open, boolean close,
+ boolean eof, boolean ackRequired, ByteBuffer data)
+ throws ProtocolException
+ {
+ if (logger.isLoggable(Level.FINEST)) {
+ int length = data.remaining();
+ HexDumpEncoder encoder = new HexDumpEncoder();
+ byte[] bytes = new byte[data.remaining()];
+ data.mark();
+ data.get(bytes);
+ data.reset();
+ logger.log(Level.FINEST,
+ "Data: sessionID=" + sessionID +
+ (open ? ",open" : "") +
+ (close ? ",close" : "") +
+ (eof ? ",eof" : "") +
+ (ackRequired ? ",ackRequired" : "") +
+ ",length=" + length +
+ (length > 0 ? ",data=\n" + encoder.encode(bytes) : ""));
+ }
+
+ if (!eof && (close || ackRequired)) {
+ throw new ProtocolException("Data: eof=" + eof +
+ ",close=" + close +
+ ",ackRequired=" + ackRequired);
+ }
+
+ if (open) {
+ handleOpen(sessionID);
+ }
+
+ getSession(sessionID).handleData(data, eof, close, ackRequired);
+ }
+
+ private Session getSession(int sessionID) throws ProtocolException {
+ synchronized (muxLock) {
+ if (!busySessions.get(sessionID)) {
+ throw new ProtocolException(
+ "inactive sessionID: " + sessionID);
+ }
+ return (Session) sessions.get(Integer.valueOf(sessionID));
+ }
+ }
+
+ private static ByteBuffer getUTF8BufferFromString(String s) {
+ CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder();
+ try {
+ return encoder.encode(CharBuffer.wrap(s));
+ } catch (CharacterCodingException e) {
+ return null;
+ }
+ }
+
+ private static String getStringFromUTF8Buffer(ByteBuffer buffer) {
+ CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
+ try {
+ return decoder.decode(buffer).toString();
+ } catch (CharacterCodingException e) {
+ return "(error decoding UTF-8 message: " + e.toString() + ")";
+ }
+ }
+
+ private static String toHexString(byte x) {
+ char[] buf = new char[2];
+ buf[0] = toHexChar((x >> 4) & 0xF);
+ buf[1] = toHexChar(x & 0xF);
+ return new String(buf);
+ }
+
+ private static String toHexString(int x) {
+ char[] buf = new char[8];
+ for (int i = 0; i < 8; i++) {
+ buf[i] = toHexChar((x >> ((7 - i) * 4)) & 0xF);
+ }
+ return new String(buf);
+ }
+
+ private static String toHexString(byte[] b) {
+ char[] buf = new char[b.length * 2];
+ int j = 0;
+ for (int i = 0; i < b.length; i++) {
+ buf[j++] = toHexChar((b[i] >> 4) & 0xF);
+ buf[j++] = toHexChar(b[i] & 0xF);
+ }
+ return new String(buf);
+ }
+
+ private static char toHexChar(int x) {
+ return x < 10 ? (char) ('0' + x) : (char) ('A' - 10 + x);
+ }
+}