You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@river.apache.org by pe...@apache.org on 2014/05/18 10:02:56 UTC
svn commit: r1595569 [2/2] - in /river/jtsk/skunk/qa_refactor/trunk/src:
com/sun/jini/jeri/internal/mux/ com/sun/jini/thread/ net/jini/discovery/
net/jini/jeri/ net/jini/jeri/connection/ net/jini/jeri/tcp/
Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/jeri/connection/ConnectionManager.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/jeri/connection/ConnectionManager.java?rev=1595569&r1=1595568&r2=1595569&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/jeri/connection/ConnectionManager.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/jeri/connection/ConnectionManager.java Sun May 18 08:02:56 2014
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package net.jini.jeri.connection;
import com.sun.jini.action.GetLongAction;
@@ -43,21 +42,22 @@ import net.jini.jeri.OutboundRequestIter
/**
* Provides client-side connection management using the <a
- * href="{@docRoot}/net/jini/jeri/connection/doc-files/mux.html">Jini
- * extensible remote invocation (Jini ERI) multiplexing protocol</a>
+ * href="{@docRoot}/net/jini/jeri/connection/doc-files/mux.html">Jini extensible
+ * remote invocation (Jini ERI) multiplexing protocol</a>
* to frame and multiplex requests and responses over connections.
*
- * <p>A <code>ConnectionManager</code> is created by a
- * connection-based {@link net.jini.jeri.Endpoint} implemention to manage
- * connections to a particular {@link ConnectionEndpoint}. The {@link
- * #newRequest newRequest} method is used to send a request to the
- * connection endpoint.
- *
- * <p>Each request attempt is mapped to a new <i>session</i> of the
- * Jini ERI multiplexing protocol on an established connection chosen
- * by the <code>ConnectionEndpoint</code>. Request data is written as
- * the data sent for the session, and response data is read as the
- * data recdeived for the session.
+ * <p>
+ * A <code>ConnectionManager</code> is created by a connection-based
+ * {@link net.jini.jeri.Endpoint} implemention to manage connections to a
+ * particular {@link ConnectionEndpoint}. The {@link
+ * #newRequest newRequest} method is used to send a request to the connection
+ * endpoint.
+ *
+ * <p>
+ * Each request attempt is mapped to a new <i>session</i> of the Jini ERI
+ * multiplexing protocol on an established connection chosen by the
+ * <code>ConnectionEndpoint</code>. Request data is written as the data sent for
+ * the session, and response data is read as the data recdeived for the session.
*
* @author Sun Microsystems, Inc.
* @since 2.0
@@ -65,82 +65,87 @@ import net.jini.jeri.OutboundRequestIter
* @com.sun.jini.impl
*
* This implementation uses the {@link Logger} named
- * <code>net.jini.jeri.connection.ConnectionManager</code> to log
- * information at the following levels:
+ * <code>net.jini.jeri.connection.ConnectionManager</code> to log information at
+ * the following levels:
*
- * <p><table summary="Describes what is logged by ConnectionManager to
- * its logger at various logging levels" border=1 cellpadding=5>
+ * <p>
+ * <table summary="Describes what is logged by ConnectionManager to its logger
+ * at various logging levels" border=1 cellpadding=5>
*
* <tr> <th> Level <th> Description
*
- * <tr> <td> {@link Level#FINEST FINEST} <td> connection opened or
- * reused
+ * <tr> <td> {@link Level#FINEST FINEST} <td> connection opened or reused
*
* </table>
*
- * <p>This implementation uses the {@link Logger} named
- * <code>net.jini.jeri.connection.mux</code> to log information at the
- * following levels:
+ * <p>
+ * This implementation uses the {@link Logger} named
+ * <code>net.jini.jeri.connection.mux</code> to log information at the following
+ * levels:
*
- * <p><table summary="Describes what is logged by ConnectionManager to
- * the mux logger at various logging levels" border=1 cellpadding=5>
+ * <p>
+ * <table summary="Describes what is logged by ConnectionManager to the mux
+ * logger at various logging levels" border=1 cellpadding=5>
*
* <tr> <th> Level <th> Description
*
- * <tr> <td> {@link Level#WARNING WARNING} <td> unexpected exception
- * during asynchronous I/O processing, or thread creation failure
+ * <tr> <td> {@link Level#WARNING WARNING} <td> unexpected exception during
+ * asynchronous I/O processing, or thread creation failure
*
* <tr> <td> {@link Levels#HANDLED HANDLED} <td> I/O exception during
* asynchronous I/O processing
*
- * <tr> <td> {@link Level#FINEST FINEST} <td> detailed implementation
- * activity
+ * <tr> <td> {@link Level#FINEST FINEST} <td> detailed implementation activity
*
* </table>
*
- * <p>This implementation recognizes the following system properties:
- *
- * <p><ul>
+ * <p>
+ * This implementation recognizes the following system properties:
*
- * <li><code>com.sun.jini.jeri.connectionTimeout</code> - Time in
- * milliseconds to leave idle client-side connections around before
- * closing them. The default value is 15000 milliseconds (15 seconds).
+ * <p>
+ * <ul>
*
- * <li><code>com.sun.jini.jeri.handshakeTimeout</code> - Time in
- * milliseconds for client-side connections to wait for the server to
- * acknowledge an opening handshake. The default value is 15000 milliseconds (15 seconds).
+ * <li><code>com.sun.jini.jeri.connectionTimeout</code> - Time in milliseconds
+ * to leave idle client-side connections around before closing them. The default
+ * value is 15000 milliseconds (15 seconds).
+ *
+ * <li><code>com.sun.jini.jeri.handshakeTimeout</code> - Time in milliseconds
+ * for client-side connections to wait for the server to acknowledge an opening
+ * handshake. The default value is 120000 milliseconds (2 minutes).
*
* </ul>
- **/
+ *
+ */
public final class ConnectionManager {
+
/**
* How long to leave idle muxes around before closing them.
*/
- private static final long TIMEOUT =
- ( (Long) AccessController.doPrivileged(new GetLongAction(
- "com.sun.jini.jeri.connectionTimeout",
- 360000))).longValue();
+ private static final long TIMEOUT
+ = ((Long) AccessController.doPrivileged(new GetLongAction(
+ "com.sun.jini.jeri.connectionTimeout",
+ 120000))).longValue();
/**
* How long to wait for a server to respond to an initial client message.
*/
- private static final long HANDSHAKE_TIMEOUT =
- ((Long) AccessController.doPrivileged(new GetLongAction(
- "com.sun.jini.jeri.handshakeTimeout",
- 360000))).longValue();
+ private static final long HANDSHAKE_TIMEOUT
+ = ((Long) AccessController.doPrivileged(new GetLongAction(
+ "com.sun.jini.jeri.handshakeTimeout",
+ 360000))).longValue();
/**
* ConnectionManager logger.
*/
- private static final Logger logger =
- Logger.getLogger("net.jini.jeri.connection.ConnectionManager");
+ private static final Logger logger
+ = Logger.getLogger("net.jini.jeri.connection.ConnectionManager");
/**
* Executor that executes tasks in pooled system threads.
*/
- private static final Executor systemThreadPool =
- (Executor) AccessController.doPrivileged(
- new GetThreadPoolAction(false));
+ private static final Executor systemThreadPool
+ = (Executor) AccessController.doPrivileged(
+ new GetThreadPoolAction(false));
/**
- * Set of connection managers with open or pending muxes
- * (connections), for consideration by the reaper thread.
+ * Set of connection managers with open or pending muxes (connections), for
+ * consideration by the reaper thread.
*/
private static final Set<ConnectionManager> reaperSet = new HashSet<ConnectionManager>();
@@ -151,35 +156,35 @@ public final class ConnectionManager {
/**
* The OutboundMuxes.
*/
- private final List<OutboundMux> muxes = new LinkedList<OutboundMux>();
-
+ private final List<OutboundMux> muxes = new ArrayList<OutboundMux>(128);
+
/**
* Number of pending connect calls.
*/
private int pendingConnects = 0;
/**
- * Creates a new <code>ConnectionManager</code> that manages
- * client-side connections to the specified connection endpoint.
+ * Creates a new <code>ConnectionManager</code> that manages client-side
+ * connections to the specified connection endpoint.
*
* @param ep the connection endpoint
- **/
+ *
+ */
public ConnectionManager(ConnectionEndpoint ep) {
- this.ep = ep;
+ this.ep = ep;
}
/**
- * Calls connect on the connection endpoint with the active and idle
- * muxes and the specified handle. If no connection is returned, calls
- * connect with the handle. In either case, if a new connection is
- * returned, creates and adds a mux for it. In all cases, bumps the
- * newRequest count for the mux and returns it. Removes any dead muxes
- * along the way.
+ * Calls connect on the connection endpoint with the active and idle muxes
+ * and the specified handle. If no connection is returned, calls connect
+ * with the handle. In either case, if a new connection is returned, creates
+ * and adds a mux for it. In all cases, bumps the newRequest count for the
+ * mux and returns it. Removes any dead muxes along the way.
*/
OutboundMux connect(OutboundRequestHandle handle) throws IOException {
- synchronized (this) {
- pendingConnects++;
- }
+ synchronized (this) {
+ pendingConnects++;
+ }
/**
* The active muxes (during connect).
*/
@@ -188,201 +193,205 @@ public final class ConnectionManager {
* The idle muxes (during connect).
*/
List<Connection> idle = new LinkedList<Connection>();
- try {
- synchronized (reaperSet) {
- if (reaperSet.isEmpty()) {
- systemThreadPool.execute(
- new Reaper(), "ConnectionManager.Reaper");
- }
- reaperSet.add(this);
- }
- synchronized (this) {
- for (int i = muxes.size(); --i >= 0; ) {
- OutboundMux mux = (OutboundMux) muxes.get(i);
- try {
- int n = mux.requestsInProgress();
- if (n == 0) {
- idle.add(mux.getConnection());
- } else if (n < OutboundMux.MAX_REQUESTS) {
- active.add(mux.getConnection());
- }
- } catch (IOException e) {
- muxes.remove(i);
- }
- }
+ try {
+ synchronized (reaperSet) {
+ if (reaperSet.isEmpty()) {
+ systemThreadPool.execute(
+ new Reaper(), "ConnectionManager.Reaper");
+ }
+ reaperSet.add(this);
+ }
+ Connection c;
+ synchronized (this) {
+ for (int i = muxes.size(); --i >= 0;) {
+ OutboundMux mux = (OutboundMux) muxes.get(i);
+ try {
+ int n = mux.requestsInProgress();
+ if (n == 0) {
+ idle.add(mux.getConnection());
+ } else if (n < OutboundMux.MAX_REQUESTS) {
+ active.add(mux.getConnection());
+ }
+ } catch (IOException e) {
+ muxes.remove(i);
+ }
+ }
+ c = ep.connect(handle, active, idle);
+ if (c != null) {
+ for (int i = muxes.size(); --i >= 0;) {
+ OutboundMux mux = (OutboundMux) muxes.get(i);
+ if (c == mux.getConnection()) {
+ if (logger.isLoggable(Level.FINEST)) {
+ logger.log(Level.FINEST, "using {0}", c);
+ }
+ mux.newRequestPending();
+ return mux;
+ }
+ }
+ OutboundMux mux = newOutboundMux(c);
+ mux.newRequestPending();
+ muxes.add(mux);
+ return mux;
+ }
+ c = ep.connect(handle);
+ OutboundMux mux = newOutboundMux(c);
+ mux.newRequestPending();
+ muxes.add(mux);
+ return mux;
}
- Connection c = ep.connect(handle, active, idle);
+ } finally {
synchronized (this) {
- if (c != null) {
- for (int i = muxes.size(); --i >= 0; ) {
- OutboundMux mux = (OutboundMux) muxes.get(i);
- if (c == mux.getConnection()) {
- if (logger.isLoggable(Level.FINEST)) {
- logger.log(Level.FINEST, "using {0}", c);
- }
- mux.newRequestPending();
- return mux;
- }
- }
- OutboundMux mux = newOutboundMux(c);
- mux.newRequestPending();
- muxes.add(mux);
- return mux;
- }
- }
- c = ep.connect(handle);
- synchronized (this) {
- OutboundMux mux = newOutboundMux(c);
- mux.newRequestPending();
- muxes.add(mux);
- return mux;
- }
- } finally {
- synchronized (this) {
- assert pendingConnects > 0;
- pendingConnects--;
- }
- }
+ assert pendingConnects > 0;
+ pendingConnects--;
+ }
+ }
}
/**
- * For each mux, calls checkIdle on the mux, and if checkIdle returns
- * true, removes the mux and adds it to the idle list. Returns true
- * if no connects are pending and no muxes remain.
+ * For each mux, calls checkIdle on the mux, and if checkIdle returns true,
+ * removes the mux and adds it to the idle list. Returns true if no connects
+ * are pending and no muxes remain.
*/
synchronized boolean checkIdle(long now, List idle) {
- for (int i = muxes.size(); --i >= 0; ) {
- OutboundMux mux = (OutboundMux) muxes.get(i);
- if (mux.checkIdle(now)) {
- muxes.remove(i);
- idle.add(mux);
- }
- }
- return pendingConnects == 0 && muxes.isEmpty();
+ for (int i = muxes.size(); --i >= 0;) {
+ OutboundMux mux = (OutboundMux) muxes.get(i);
+ if (mux.checkIdle(now)) {
+ muxes.remove(i);
+ idle.add(mux);
+ }
+ }
+ return pendingConnects == 0 && muxes.isEmpty();
}
/**
* Removes and shuts down a mux.
*/
void remove(OutboundMux mux) {
- synchronized (this) {
- muxes.remove(mux);
- }
- mux.shutdown("writeRequestData failed");
+ synchronized (this) {
+ muxes.remove(mux);
+ }
+ mux.shutdown("writeRequestData failed");
}
/**
* Constructs an OutboundMux instance from the connection.
*/
private OutboundMux newOutboundMux(Connection c) throws IOException {
- logger.log(Level.FINEST, "opened {0}", c);
- OutboundMux mux = null;
- try {
- mux = (c.getChannel() == null) ?
- new OutboundMux(c) : new OutboundMux(c, true);
- mux.setStartTimeout(HANDSHAKE_TIMEOUT);
- } finally {
- if (mux == null) {
- try {
- c.close();
- } catch (IOException e) {
- }
- }
- }
- return mux;
+ logger.log(Level.FINEST, "opened {0}", c);
+ OutboundMux mux = null;
+ try {
+ mux = (c.getChannel() == null)
+ ? new OutboundMux(this, c) : new OutboundMux(this, c, true);
+ mux.setStartTimeout(HANDSHAKE_TIMEOUT);
+ } finally {
+ if (mux == null) {
+ try {
+ c.close();
+ } catch (IOException e) {
+ }
+ }
+ }
+ return mux;
}
/**
* Subclass wrapper around MuxClient for outbound connections.
*/
private static final class OutboundMux extends MuxClient {
- /**
- * The outbound connection.
- */
- private final Connection c;
- /**
- * Lock to enforce single start of mux.
- */
- private final Object startLock = new Object();
- /**
- * True if the mux needs to be started.
- */
- private volatile boolean notStarted = true;
+
+ /**
+ * ConnectionManager
+ */
+ private final ConnectionManager manager;
+ /**
+ * The outbound connection.
+ */
+ private final Connection c;
+ /**
+ * Lock to enforce single start of mux.
+ */
+ private final Object startLock = new Object();
+ /**
+ * True if the mux needs to be started.
+ */
+ private volatile boolean notStarted = true;
private boolean starting = false;
- /**
- * Number of pending newRequest calls. Guarded by enclosing
- * ConnectionManager's lock.
- */
- private int pendingNewRequests = 0;
- /**
- * The time this mux was found to be idle by the Reaper
- * thread. Set to zero each time a request is initiated.
- * Guarded by enclosing ConnectionManager's lock.
- */
- private volatile long idleTime = 0;
-
- /**
- * Constructs an instance from the connection's streams.
- */
- OutboundMux(Connection c) throws IOException {
- super(c.getOutputStream(), c.getInputStream());
- this.c = c;
- }
-
- /**
- * Constructs an instance from the connection's channel.
- */
- OutboundMux(Connection c, boolean ignore) throws IOException {
- super(c.getChannel());
- this.c = c;
- }
-
- /**
- * Returns the outbound connection.
- */
- Connection getConnection() {
- return c;
- }
-
- /**
- * Registers a pending newRequest call.
- */
- synchronized void newRequestPending() {
- pendingNewRequests++;
- }
-
- /**
- * Initiates a new request on the mux and returns it, and sets the
- * idle time to zero. Starts the mux if necessary, and decrements
- * the pending newRequest count.
- */
+ /**
+ * Number of pending newRequest calls. Guarded by enclosing
+ * ConnectionManager's lock.
+ */
+ private int pendingNewRequests = 0;
+ /**
+ * The time this mux was found to be idle by the Reaper thread. Set to
+ * zero each time a request is initiated. Guarded by enclosing
+ * ConnectionManager's lock.
+ */
+ private long idleTime = 0;
+
+ /**
+ * Constructs an instance from the connection's streams.
+ */
+ OutboundMux(ConnectionManager manager, Connection c) throws IOException {
+ super(c.getOutputStream(), c.getInputStream());
+ this.c = c;
+ this.manager = manager;
+ }
+
+ /**
+ * Constructs an instance from the connection's channel.
+ */
+ OutboundMux(ConnectionManager manager, Connection c, boolean ignore) throws IOException {
+ super(c.getChannel());
+ this.c = c;
+ this.manager = manager;
+ }
+
+ /**
+ * Returns the outbound connection.
+ */
+ Connection getConnection() {
+ return c;
+ }
+
+ /**
+ * Registers a pending newRequest call.
+ */
+ void newRequestPending() {
+ synchronized (manager) {
+ pendingNewRequests++;
+ }
+ }
+
+ /**
+ * Initiates a new request on the mux and returns it, and sets the idle
+ * time to zero. Starts the mux if necessary, and decrements the pending
+ * newRequest count.
+ */
@Override
- public OutboundRequest newRequest() throws IOException {
+ public OutboundRequest newRequest() throws IOException {
// assert !Thread.holdsLock(ConnectionManager.this);
boolean interrupted = false;
- try {
+ try {
boolean start = false;
- if (notStarted){
- synchronized (startLock){
- while (starting){
+ if (notStarted) {
+ synchronized (startLock) {
+ while (starting) {
try {
startLock.wait();
} catch (InterruptedException ex) {
interrupted = true;
}
}
- if (notStarted){
+ if (notStarted) {
starting = true;
start = true;
}
}
- if (start){
+ if (start) {
try {
- synchronized (this){
- start();
- }
+ start();
} finally {
- synchronized (startLock){
+ synchronized (startLock) {
notStarted = false;
starting = false;
startLock.notifyAll();
@@ -390,382 +399,396 @@ public final class ConnectionManager {
}
}
}
- synchronized (this) {
+ synchronized (manager) {
idleTime = 0;
+ return super.newRequest();
}
- return super.newRequest();
- } finally {
- synchronized (this) {
+ } finally {
+ synchronized (manager) {
assert pendingNewRequests > 0;
pendingNewRequests--;
}
- if (interrupted) Thread.currentThread().interrupt();
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /**
+ * Returns the number of active and pending requests.
+ */
+ @Override
+ public int requestsInProgress() throws IOException {
+ synchronized (manager) {
+ return super.requestsInProgress() + pendingNewRequests;
+ }
+ }
+
+ /**
+ * Returns true if the mux is dead, or the mux is idle and the recorded
+ * idle time is more than TIMEOUT milliseconds before now, and returns
+ * false otherwise. If the mux is idle and the recorded idle time is
+ * zero, sets the recorded idle time to now.
+ */
+ boolean checkIdle(long now) {
+ try {
+ synchronized (manager) {
+ if (requestsInProgress() == 0) {
+ if (idleTime == 0) {
+ idleTime = now;
+ } else {
+ return now - idleTime > TIMEOUT;
+ }
+ }
+ return false;
+ }
+ } catch (IOException e) {
+ return true;
+ }
+ }
+
+ /**
+ * Close the connection, so that the provider is notified.
+ */
+ @Override
+ protected void handleDown() {
+ try {
+ c.close();
+ } catch (IOException e) {
}
- }
+ }
- /**
- * Returns the number of active and pending requests.
- */
- @Override
- public synchronized int requestsInProgress() throws IOException {
- return super.requestsInProgress() + pendingNewRequests;
- }
-
- /**
- * Returns true if the mux is dead, or the mux is idle and the
- * recorded idle time is more than TIMEOUT milliseconds before now,
- * and returns false otherwise. If the mux is idle and the recorded
- * idle time is zero, sets the recorded idle time to now.
- */
- boolean checkIdle(long now) {
- try {
- synchronized (this){
- if (requestsInProgress() == 0) {
- if (idleTime == 0) {
- idleTime = now;
- } else {
- return now - idleTime > TIMEOUT;
- }
- }
- return false;
- }
- } catch (IOException e) {
- return true;
- }
- }
-
- /**
- * Close the connection, so that the provider is notified.
- */
- @Override
- protected void handleDown() {
- try {
- c.close();
- } catch (IOException e) {
- }
- }
-
- boolean shouldRetry() {
- return false; // XXX
- }
+ boolean shouldRetry() {
+ return false; // XXX
+ }
}
/**
* Outbound request wrapper around the outbound request created by the mux.
*/
private static final class Outbound implements OutboundRequest {
- /**
- * The outbound request created by the mux.
- */
- private final OutboundRequest req;
- /**
- * The connection on which the outbound request originates.
- */
- private final Connection c;
- /**
- * The outbound request handle.
- */
- private final OutboundRequestHandle handle;
- /**
- * Wrapper around the mux's response input stream.
- */
- private final InputStream in;
- /**
- * Delivery status override from readResponseData.
- */
- private boolean status = true;
-
- Outbound(OutboundRequest req,
- Connection c,
- OutboundRequestHandle handle)
- {
- this.req = req;
- this.c = c;
- this.handle = handle;
- in = new Input(handle);
- }
-
- /* pass-through to the underlying request */
- @Override
- public OutputStream getRequestOutputStream() {
- return req.getRequestOutputStream();
- }
-
- /* return the wrapper */
- @Override
- public InputStream getResponseInputStream() {
- return in;
- }
-
- /* delegate to the connection */
- @Override
- public void populateContext(Collection context) {
- c.populateContext(handle, context);
- }
-
- /* delegate to the connection */
- @Override
- public InvocationConstraints getUnfulfilledConstraints() {
- return c.getUnfulfilledConstraints(handle);
- }
-
- /**
- * False if readResponseData returned an exception, otherwise
- * pass-through to the underlying request.
- */
- @Override
- public boolean getDeliveryStatus() {
- return status && req.getDeliveryStatus();
- }
-
- /* pass-through to the underlying request */
- @Override
- public void abort() {
- req.abort();
- }
-
- /**
- * Wrapper for the response input stream of an outbound request,
- * used to call readResponseData on the underlying connection
- * before subsequent data is read by higher levels. Note that this
- * class does not support mark/reset.
- */
- private final class Input extends InputStream {
- /**
- * The underlying input stream from the outbound request.
- */
- private final InputStream in;
- /**
- * The handle, or null if readResponseData has been called.
- */
- private OutboundRequestHandle handle;
-
- Input(OutboundRequestHandle handle) {
- in = req.getResponseInputStream();
- this.handle = handle;
- }
-
- /**
- * Calls readResponseData on the connection, exactly once.
- * Sets the handle to null to indicate that it has been called.
- */
- private void readFirst() throws IOException {
- if (handle != null) {
- try {
- IOException e = c.readResponseData(handle, in);
- if (e != null) {
- status = false;
- throw e;
- }
- } finally {
- handle = null;
- }
- }
- }
- /** Call readFirst, then pass through. */
+ /**
+ * The outbound request created by the mux.
+ */
+ private final OutboundRequest req;
+ /**
+ * The connection on which the outbound request originates.
+ */
+ private final Connection c;
+ /**
+ * The outbound request handle.
+ */
+ private final OutboundRequestHandle handle;
+ /**
+ * Wrapper around the mux's response input stream.
+ */
+ private final InputStream in;
+ /**
+ * Delivery status override from readResponseData.
+ */
+ private volatile boolean status = true;
+
+ Outbound(OutboundRequest req,
+ Connection c,
+ OutboundRequestHandle handle) {
+ this.req = req;
+ this.c = c;
+ this.handle = handle;
+ in = new Input(handle);
+ }
+
+ /* pass-through to the underlying request */
+ @Override
+ public OutputStream getRequestOutputStream() {
+ return req.getRequestOutputStream();
+ }
+
+ /* return the wrapper */
+ @Override
+ public InputStream getResponseInputStream() {
+ return in;
+ }
+
+ /* delegate to the connection */
+ @Override
+ public void populateContext(Collection context) {
+ c.populateContext(handle, context);
+ }
+
+ /* delegate to the connection */
+ @Override
+ public InvocationConstraints getUnfulfilledConstraints() {
+ return c.getUnfulfilledConstraints(handle);
+ }
+
+ /**
+ * False if readResponseData returned an exception, otherwise
+ * pass-through to the underlying request.
+ */
+ @Override
+ public boolean getDeliveryStatus() {
+ return status && req.getDeliveryStatus();
+ }
+
+ /* pass-through to the underlying request */
+ @Override
+ public void abort() {
+ req.abort();
+ }
+
+ /**
+ * Wrapper for the response input stream of an outbound request, used to
+ * call readResponseData on the underlying connection before subsequent
+ * data is read by higher levels. Note that this class does not support
+ * mark/reset.
+ */
+ private final class Input extends InputStream {
+
+ /**
+ * The underlying input stream from the outbound request.
+ */
+ private final InputStream in;
+ /**
+ * The handle, or null if readResponseData has been called.
+ */
+ private OutboundRequestHandle handle;
+
+ Input(OutboundRequestHandle handle) {
+ in = req.getResponseInputStream();
+ this.handle = handle;
+ }
+
+ /**
+ * Calls readResponseData on the connection, exactly once. Sets the
+ * handle to null to indicate that it has been called.
+ */
+ private void readFirst() throws IOException {
+ if (handle != null) {
+ try {
+ IOException e = c.readResponseData(handle, in);
+ if (e != null) {
+ status = false;
+ throw e;
+ }
+ } finally {
+ handle = null;
+ }
+ }
+ }
+
+ /**
+ * Call readFirst, then pass through.
+ */
@Override
- public int read() throws IOException {
- readFirst();
- return in.read();
- }
+ public int read() throws IOException {
+ readFirst();
+ return in.read();
+ }
- /** Call readFirst, then pass through. */
+ /**
+ * Call readFirst, then pass through.
+ */
@Override
- public int read(byte[] b, int off, int len) throws IOException {
- readFirst();
- return in.read(b, off, len);
- }
+ public int read(byte[] b, int off, int len) throws IOException {
+ readFirst();
+ return in.read(b, off, len);
+ }
- /** Call readFirst, then pass through. */
+ /**
+ * Call readFirst, then pass through.
+ */
@Override
- public long skip(long n) throws IOException {
- readFirst();
- return in.skip(n);
- }
+ public long skip(long n) throws IOException {
+ readFirst();
+ return in.skip(n);
+ }
- /** Call readFirst, then pass through. */
+ /**
+ * Call readFirst, then pass through.
+ */
@Override
- public int available() throws IOException {
- readFirst();
- return in.available();
- }
+ public int available() throws IOException {
+ readFirst();
+ return in.available();
+ }
- /** pass-through */
+ /**
+ * pass-through
+ */
@Override
- public void close() throws IOException {
- in.close();
- }
- }
+ public void close() throws IOException {
+ in.close();
+ }
+ }
}
/**
- * Records idle times in muxes and shuts down muxes that have been
- * idle for at least TIMEOUT milliseconds.
+ * Records idle times in muxes and shuts down muxes that have been idle for
+ * at least TIMEOUT milliseconds.
*/
private static final class Reaper implements Runnable {
- Reaper() {
- }
- /**
- * Sleep for TIMEOUT milliseconds. Then call checkIdle on
- * each manager with open muxes, shutdown all of idle muxes
- * that have been collected, and if no managers with open
- * muxes remain terminate, else repeat (go back to sleep).
- */
- @Override
- public void run() {
- List idle = new ArrayList(1);
- boolean done;
- do {
- try {
- Thread.sleep(TIMEOUT);
- } catch (InterruptedException e) {
- return;
- }
- long now = System.currentTimeMillis();
- synchronized (reaperSet) {
- for (Iterator iter = reaperSet.iterator();
- iter.hasNext();)
- {
- ConnectionManager mgr =
- (ConnectionManager) iter.next();
- if (mgr.checkIdle(now, idle)) {
- iter.remove();
- }
- }
- done = reaperSet.isEmpty();
- }
- for (int i = idle.size(); --i >= 0; ) {
- ((OutboundMux) idle.get(i)).shutdown("idle");
- }
- idle.clear();
- } while (!done);
- }
+ Reaper() {
+ }
+
+ /**
+ * Sleep for TIMEOUT milliseconds. Then call checkIdle on each manager
+ * with open muxes, shutdown all of idle muxes that have been collected,
+ * and if no managers with open muxes remain terminate, else repeat (go
+ * back to sleep).
+ */
+ @Override
+ public void run() {
+ List idle = new ArrayList(1);
+ boolean done;
+ do {
+ try {
+ Thread.sleep(TIMEOUT);
+ } catch (InterruptedException e) {
+ return;
+ }
+ long now = System.currentTimeMillis();
+ synchronized (reaperSet) {
+ for (Iterator iter = reaperSet.iterator();
+ iter.hasNext();) {
+ ConnectionManager mgr
+ = (ConnectionManager) iter.next();
+ if (mgr.checkIdle(now, idle)) {
+ iter.remove();
+ }
+ }
+ done = reaperSet.isEmpty();
+ }
+ for (int i = idle.size(); --i >= 0;) {
+ ((OutboundMux) idle.get(i)).shutdown("idle");
+ }
+ idle.clear();
+ } while (!done);
+ }
}
/**
* Outbound request iterator returned by newRequest.
*/
private static final class ReqIterator implements OutboundRequestIterator {
+
/**
* ConnectionManager
*/
private final ConnectionManager manager;
- /**
- * The request handle.
- */
- private final OutboundRequestHandle handle;
- /**
- * True if next has not yet been called.
- */
- private boolean first = true;
- /**
- * The outbound mux from the last call to next, if any.
- */
- private OutboundMux mux;
+ /**
+ * The request handle.
+ */
+ private final OutboundRequestHandle handle;
+ /**
+ * True if next has not yet been called.
+ */
+ private boolean first = true;
+ /**
+ * The outbound mux from the last call to next, if any.
+ */
+ private OutboundMux mux;
- ReqIterator(OutboundRequestHandle handle, ConnectionManager cm) {
- this.handle = handle;
+ ReqIterator(OutboundRequestHandle handle, ConnectionManager cm) {
+ this.handle = handle;
manager = cm;
- }
+ }
- /**
- * Returns true if next has not yet been called or if the last mux
- * returned had an asynchronous close.
- */
- @Override
- public boolean hasNext() {
- return first || (mux != null && mux.shouldRetry());
- }
-
- /**
- * If hasNext returns true, finds the entry (if any) for the
- * connection endpoint. If no entry is found, creates one and spawns
- * a Reaper if this is the only entry. Either way, bumps the connect
- * count for the entry. Calls connect on the entry to get a mux, then
- * calls newRequest on the mux, calls writeRequestData on the
- * connection, and returns a new outbound request wrapper.
- */
- @Override
- public OutboundRequest next() throws IOException {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
- first = false;
- mux = manager.connect(handle);
- OutboundRequest req = mux.newRequest();
- OutboundRequest sreq = null;
- try {
- Connection c = mux.getConnection();
- c.writeRequestData(handle, req.getRequestOutputStream());
- sreq = new Outbound(req, c, handle);
- } finally {
- if (sreq == null) {
- manager.remove(mux);
- }
- }
- return sreq;
- }
+ /**
+ * Returns true if next has not yet been called or if the last mux
+ * returned had an asynchronous close.
+ */
+ @Override
+ public boolean hasNext() {
+ return first || (mux != null && mux.shouldRetry());
+ }
+
+ /**
+ * If hasNext returns true, finds the entry (if any) for the connection
+ * endpoint. If no entry is found, creates one and spawns a Reaper if
+ * this is the only entry. Either way, bumps the connect count for the
+ * entry. Calls connect on the entry to get a mux, then calls newRequest
+ * on the mux, calls writeRequestData on the connection, and returns a
+ * new outbound request wrapper.
+ */
+ @Override
+ public OutboundRequest next() throws IOException {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ first = false;
+ mux = manager.connect(handle);
+ OutboundRequest req = mux.newRequest();
+ OutboundRequest sreq = null;
+ try {
+ Connection c = mux.getConnection();
+ c.writeRequestData(handle, req.getRequestOutputStream());
+ sreq = new Outbound(req, c, handle);
+ } finally {
+ if (sreq == null) {
+ manager.remove(mux);
+ }
+ }
+ return sreq;
+ }
}
/**
- * Returns an <code>OutboundRequestIterator</code> to use to send
- * a new request for the specified handle to this connection
- * manager's <code>ConnectionEndpoint</code>.
- *
- * <p>If the <code>hasNext</code> method of the returned iterator
- * returns <code>true</code>, the <code>next</code> method behaves
- * as follows:
+ * Returns an <code>OutboundRequestIterator</code> to use to send a new
+ * request for the specified handle to this connection manager's
+ * <code>ConnectionEndpoint</code>.
+ *
+ * <p>
+ * If the <code>hasNext</code> method of the returned iterator returns
+ * <code>true</code>, the <code>next</code> method behaves as follows:
*
* <blockquote>
*
* The connection endpoint's {@link
* ConnectionEndpoint#connect(OutboundRequestHandle,Collection,Collection)
- * connect} method is invoked with any active connections that
- * have not reached their maximum number of in-progress requests,
- * any idle connections, and <code>handle</code>. If that returns
- * <code>null</code>, the endpoint's {@link
- * ConnectionEndpoint#connect(OutboundRequestHandle) connect}
- * method is invoked with <code>handle</code>. In either case, if
- * a new connection is returned, the Jini ERI multiplexing
- * protocol is started on the connection (as the client).
- * Finally, the {@link Connection#writeRequestData
+ * connect} method is invoked with any active connections that have not
+ * reached their maximum number of in-progress requests, any idle
+ * connections, and <code>handle</code>. If that returns <code>null</code>,
+ * the endpoint's {@link
+ * ConnectionEndpoint#connect(OutboundRequestHandle) connect} method is
+ * invoked with <code>handle</code>. In either case, if a new connection is
+ * returned, the Jini ERI multiplexing protocol is started on the connection
+ * (as the client). Finally, the {@link Connection#writeRequestData
* writeRequestData} method of the connection is invoked with
* <code>handle</code> and the request output stream of the {@link
- * OutboundRequest} that is created for the request. If any
- * exception is thrown while obtaining a connection from the
- * endpoint or writing the request data, that exception is thrown
- * to the caller. The <code>OutboundRequest</code> returned by
- * <code>next</code> will invoke the {@link
- * Connection#readResponseData readResponseData} method of the
- * connection with the specified handle and the response input
- * stream before any other data is read from the response input
- * stream. The {@link OutboundRequest#populateContext
+ * OutboundRequest} that is created for the request. If any exception is
+ * thrown while obtaining a connection from the endpoint or writing the
+ * request data, that exception is thrown to the caller. The
+ * <code>OutboundRequest</code> returned by <code>next</code> will invoke
+ * the {@link
+ * Connection#readResponseData readResponseData} method of the connection
+ * with the specified handle and the response input stream before any other
+ * data is read from the response input stream. The {@link OutboundRequest#populateContext
* populateContext} and {@link
* OutboundRequest#getUnfulfilledConstraints
- * getUnfulfilledConstraints} methods of the
- * <code>OutboundRequest</code> are implemented by delegating to
- * the corresponding method of the connection passing
- * <code>handle</code> and the other arguments (if any).
+ * getUnfulfilledConstraints} methods of the <code>OutboundRequest</code>
+ * are implemented by delegating to the corresponding method of the
+ * connection passing <code>handle</code> and the other arguments (if any).
*
* </blockquote>
*
- * <p>The returned iterator might allow continued iteration if the
- * connection used for the most recent request attempt was shut
- * down gracefully by the server.
- *
- * @param handle a handle to identify the request in later
- * invocations on the connection endpoint and its connections
- *
- * @return an <code>OutboundRequestIterator</code> to use to send
- * a new request for the specified handle to this connection
- * manager's <code>ConnectionEndpoint</code>
- *
- * @throws NullPointerException if <code>handle</code> is
- * <code>null</code>
- **/
+ * <p>
+ * The returned iterator might allow continued iteration if the connection
+ * used for the most recent request attempt was shut down gracefully by the
+ * server.
+ *
+ * @param handle a handle to identify the request in later invocations on
+ * the connection endpoint and its connections
+ *
+ * @return an <code>OutboundRequestIterator</code> to use to send a new
+ * request for the specified handle to this connection manager's
+ * <code>ConnectionEndpoint</code>
+ *
+ * @throws NullPointerException if <code>handle</code> is <code>null</code>
+ *
+ */
public OutboundRequestIterator newRequest(OutboundRequestHandle handle) {
- return new ReqIterator(handle, this);
+ return new ReqIterator(handle, this);
}
}
Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/jeri/tcp/TcpEndpoint.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/jeri/tcp/TcpEndpoint.java?rev=1595569&r1=1595568&r2=1595569&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/jeri/tcp/TcpEndpoint.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/jeri/tcp/TcpEndpoint.java Sun May 18 08:02:56 2014
@@ -93,7 +93,8 @@ public final class TcpEndpoint
* weak set of canonical instances; in order to use WeakHashMap,
* maps canonical instances to weak references to themselves
**/
- private static final Map internTable = new WeakHashMap();
+ private static final Map<TcpEndpoint,WeakReference<TcpEndpoint>> internTable
+ = new WeakHashMap<TcpEndpoint,WeakReference<TcpEndpoint>>();
/** client transport logger */
private static final Logger logger =
@@ -126,7 +127,7 @@ public final class TcpEndpoint
**/
private final SocketFactory sf;
- private transient ConnectionManager connectionManager;
+ private transient volatile ConnectionManager connectionManager;
/**
* Returns a <code>TcpEndpoint</code> instance for the given
@@ -189,16 +190,22 @@ public final class TcpEndpoint
**/
private static TcpEndpoint intern(TcpEndpoint endpoint) {
synchronized (internTable) {
- Reference ref = (WeakReference) internTable.get(endpoint);
+ Reference<TcpEndpoint> ref = (WeakReference) internTable.get(endpoint);
if (ref != null) {
- TcpEndpoint canonical = (TcpEndpoint) ref.get();
+ TcpEndpoint canonical = ref.get();
if (canonical != null) {
return canonical;
}
}
endpoint.connectionManager =
- new ConnectionManager(endpoint.new ConnectionEndpointImpl());
- internTable.put(endpoint, new WeakReference(endpoint));
+ new ConnectionManager(
+ new ConnectionEndpointImpl(
+ endpoint.getHost(),
+ endpoint.getPort(),
+ endpoint.getSocketFactory()
+ )
+ );
+ internTable.put(endpoint, new WeakReference<TcpEndpoint>(endpoint));
return endpoint;
}
}
@@ -527,9 +534,16 @@ public final class TcpEndpoint
* correctly, so we do not bother to validate request handles and
* connections passed in.
**/
- private class ConnectionEndpointImpl implements ConnectionEndpoint {
-
- ConnectionEndpointImpl() { }
+ private static class ConnectionEndpointImpl implements ConnectionEndpoint {
+ private final String host;
+ private final int port;
+ private final SocketFactory sf;
+
+ ConnectionEndpointImpl(String host, int port, SocketFactory sf) {
+ this.host = host;
+ this.port = port;
+ this.sf = sf;
+ }
/**
* Invoked by ConnectionManager to create a new connection.