You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@river.apache.org by pe...@apache.org on 2014/05/18 10:02:56 UTC

svn commit: r1595569 [2/2] - in /river/jtsk/skunk/qa_refactor/trunk/src: com/sun/jini/jeri/internal/mux/ com/sun/jini/thread/ net/jini/discovery/ net/jini/jeri/ net/jini/jeri/connection/ net/jini/jeri/tcp/

Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/jeri/connection/ConnectionManager.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/jeri/connection/ConnectionManager.java?rev=1595569&r1=1595568&r2=1595569&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/jeri/connection/ConnectionManager.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/jeri/connection/ConnectionManager.java Sun May 18 08:02:56 2014
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package net.jini.jeri.connection;
 
 import com.sun.jini.action.GetLongAction;
@@ -43,21 +42,22 @@ import net.jini.jeri.OutboundRequestIter
 
 /**
  * Provides client-side connection management using the <a
- * href="{@docRoot}/net/jini/jeri/connection/doc-files/mux.html">Jini
- * extensible remote invocation (Jini ERI) multiplexing protocol</a>
+ * href="{@docRoot}/net/jini/jeri/connection/doc-files/mux.html">Jini extensible
+ * remote invocation (Jini ERI) multiplexing protocol</a>
  * to frame and multiplex requests and responses over connections.
  *
- * <p>A <code>ConnectionManager</code> is created by a
- * connection-based {@link net.jini.jeri.Endpoint} implemention to manage
- * connections to a particular {@link ConnectionEndpoint}.  The {@link
- * #newRequest newRequest} method is used to send a request to the
- * connection endpoint.
- *
- * <p>Each request attempt is mapped to a new <i>session</i> of the
- * Jini ERI multiplexing protocol on an established connection chosen
- * by the <code>ConnectionEndpoint</code>.  Request data is written as
- * the data sent for the session, and response data is read as the
- * data recdeived for the session.
+ * <p>
+ * A <code>ConnectionManager</code> is created by a connection-based
+ * {@link net.jini.jeri.Endpoint} implemention to manage connections to a
+ * particular {@link ConnectionEndpoint}. The {@link
+ * #newRequest newRequest} method is used to send a request to the connection
+ * endpoint.
+ *
+ * <p>
+ * Each request attempt is mapped to a new <i>session</i> of the Jini ERI
+ * multiplexing protocol on an established connection chosen by the
+ * <code>ConnectionEndpoint</code>. Request data is written as the data sent for
+ * the session, and response data is read as the data recdeived for the session.
  *
  * @author Sun Microsystems, Inc.
  * @since 2.0
@@ -65,82 +65,87 @@ import net.jini.jeri.OutboundRequestIter
  * @com.sun.jini.impl
  *
  * This implementation uses the {@link Logger} named
- * <code>net.jini.jeri.connection.ConnectionManager</code> to log
- * information at the following levels:
+ * <code>net.jini.jeri.connection.ConnectionManager</code> to log information at
+ * the following levels:
  *
- * <p><table summary="Describes what is logged by ConnectionManager to
- * its logger at various logging levels" border=1 cellpadding=5>
+ * <p>
+ * <table summary="Describes what is logged by ConnectionManager to its logger
+ * at various logging levels" border=1 cellpadding=5>
  *
  * <tr> <th> Level <th> Description
  *
- * <tr> <td> {@link Level#FINEST FINEST} <td> connection opened or
- * reused
+ * <tr> <td> {@link Level#FINEST FINEST} <td> connection opened or reused
  *
  * </table>
  *
- * <p>This implementation uses the {@link Logger} named
- * <code>net.jini.jeri.connection.mux</code> to log information at the
- * following levels:
+ * <p>
+ * This implementation uses the {@link Logger} named
+ * <code>net.jini.jeri.connection.mux</code> to log information at the following
+ * levels:
  *
- * <p><table summary="Describes what is logged by ConnectionManager to
- * the mux logger at various logging levels" border=1 cellpadding=5>
+ * <p>
+ * <table summary="Describes what is logged by ConnectionManager to the mux
+ * logger at various logging levels" border=1 cellpadding=5>
  *
  * <tr> <th> Level <th> Description
  *
- * <tr> <td> {@link Level#WARNING WARNING} <td> unexpected exception
- * during asynchronous I/O processing, or thread creation failure
+ * <tr> <td> {@link Level#WARNING WARNING} <td> unexpected exception during
+ * asynchronous I/O processing, or thread creation failure
  *
  * <tr> <td> {@link Levels#HANDLED HANDLED} <td> I/O exception during
  * asynchronous I/O processing
  *
- * <tr> <td> {@link Level#FINEST FINEST} <td> detailed implementation
- * activity
+ * <tr> <td> {@link Level#FINEST FINEST} <td> detailed implementation activity
  *
  * </table>
  *
- * <p>This implementation recognizes the following system properties:
- *
- * <p><ul>
+ * <p>
+ * This implementation recognizes the following system properties:
  *
- * <li><code>com.sun.jini.jeri.connectionTimeout</code> - Time in
- * milliseconds to leave idle client-side connections around before
- * closing them. The default value is 15000 milliseconds (15 seconds).
+ * <p>
+ * <ul>
  *
- * <li><code>com.sun.jini.jeri.handshakeTimeout</code> - Time in
- * milliseconds for client-side connections to wait for the server to
- * acknowledge an opening handshake. The default value is 15000 milliseconds (15 seconds).
+ * <li><code>com.sun.jini.jeri.connectionTimeout</code> - Time in milliseconds
+ * to leave idle client-side connections around before closing them. The default
+ * value is 15000 milliseconds (15 seconds).
+ *
+ * <li><code>com.sun.jini.jeri.handshakeTimeout</code> - Time in milliseconds
+ * for client-side connections to wait for the server to acknowledge an opening
+ * handshake. The default value is 120000 milliseconds (2 minutes).
  *
  * </ul>
- **/
+ *
+ */
 public final class ConnectionManager {
+
     /**
      * How long to leave idle muxes around before closing them.
      */
-    private static final long TIMEOUT =
-	( (Long) AccessController.doPrivileged(new GetLongAction(
-		"com.sun.jini.jeri.connectionTimeout", 
-		360000))).longValue();
+    private static final long TIMEOUT
+            = ((Long) AccessController.doPrivileged(new GetLongAction(
+                            "com.sun.jini.jeri.connectionTimeout",
+                            120000))).longValue();
     /**
      * How long to wait for a server to respond to an initial client message.
      */
-    private static final long HANDSHAKE_TIMEOUT =
-	((Long) AccessController.doPrivileged(new GetLongAction(
-		"com.sun.jini.jeri.handshakeTimeout", 
-		360000))).longValue();
+    private static final long HANDSHAKE_TIMEOUT
+            = ((Long) AccessController.doPrivileged(new GetLongAction(
+                            "com.sun.jini.jeri.handshakeTimeout",
+                            360000))).longValue();
     /**
      * ConnectionManager logger.
      */
-    private static final Logger logger =
-	Logger.getLogger("net.jini.jeri.connection.ConnectionManager");
+    private static final Logger logger
+            = Logger.getLogger("net.jini.jeri.connection.ConnectionManager");
     /**
      * Executor that executes tasks in pooled system threads.
      */
-    private static final Executor systemThreadPool =
-	(Executor) AccessController.doPrivileged(
-	    new GetThreadPoolAction(false));
+    private static final Executor systemThreadPool
+            = (Executor) AccessController.doPrivileged(
+                    new GetThreadPoolAction(false));
     /**
-     * Set of connection managers with open or pending muxes
-     * (connections), for consideration by the reaper thread.
+     * Set of connection managers with open or pending muxes (connections), for
+     * consideration by the reaper thread.
      */
     private static final Set<ConnectionManager> reaperSet = new HashSet<ConnectionManager>();
 
@@ -151,35 +156,35 @@ public final class ConnectionManager {
     /**
      * The OutboundMuxes.
      */
-    private final List<OutboundMux> muxes = new LinkedList<OutboundMux>();
-    
+    private final List<OutboundMux> muxes = new ArrayList<OutboundMux>(128);
+
     /**
      * Number of pending connect calls.
      */
     private int pendingConnects = 0;
 
     /**
-     * Creates a new <code>ConnectionManager</code> that manages
-     * client-side connections to the specified connection endpoint.
+     * Creates a new <code>ConnectionManager</code> that manages client-side
+     * connections to the specified connection endpoint.
      *
      * @param ep the connection endpoint
-     **/
+     *
+     */
     public ConnectionManager(ConnectionEndpoint ep) {
-	this.ep = ep;
+        this.ep = ep;
     }
 
     /**
-     * Calls connect on the connection endpoint with the active and idle
-     * muxes and the specified handle. If no connection is returned, calls
-     * connect with the handle. In either case, if a new connection is
-     * returned, creates and adds a mux for it. In all cases, bumps the
-     * newRequest count for the mux and returns it. Removes any dead muxes
-     * along the way.
+     * Calls connect on the connection endpoint with the active and idle muxes
+     * and the specified handle. If no connection is returned, calls connect
+     * with the handle. In either case, if a new connection is returned, creates
+     * and adds a mux for it. In all cases, bumps the newRequest count for the
+     * mux and returns it. Removes any dead muxes along the way.
      */
     OutboundMux connect(OutboundRequestHandle handle) throws IOException {
-	synchronized (this) {
-	    pendingConnects++;
-	}
+        synchronized (this) {
+            pendingConnects++;
+        }
         /**
          * The active muxes (during connect).
          */
@@ -188,201 +193,205 @@ public final class ConnectionManager {
          * The idle muxes (during connect).
          */
         List<Connection> idle = new LinkedList<Connection>();
-	try {
-	    synchronized (reaperSet) {
-		if (reaperSet.isEmpty()) {
-		    systemThreadPool.execute(
-			new Reaper(), "ConnectionManager.Reaper");
-		}
-		reaperSet.add(this);
-	    }
-	    synchronized (this) {
-		for (int i = muxes.size(); --i >= 0; ) {
-		    OutboundMux mux = (OutboundMux) muxes.get(i);
-		    try {
-			int n = mux.requestsInProgress();
-			if (n == 0) {
-			    idle.add(mux.getConnection());
-			} else if (n < OutboundMux.MAX_REQUESTS) {
-			    active.add(mux.getConnection());
-			}
-		    } catch (IOException e) {
-			muxes.remove(i);
-		    }
-		}
+        try {
+            synchronized (reaperSet) {
+                if (reaperSet.isEmpty()) {
+                    systemThreadPool.execute(
+                            new Reaper(), "ConnectionManager.Reaper");
+                }
+                reaperSet.add(this);
+            }
+            Connection c;
+            synchronized (this) {
+                for (int i = muxes.size(); --i >= 0;) {
+                    OutboundMux mux = (OutboundMux) muxes.get(i);
+                    try {
+                        int n = mux.requestsInProgress();
+                        if (n == 0) {
+                            idle.add(mux.getConnection());
+                        } else if (n < OutboundMux.MAX_REQUESTS) {
+                            active.add(mux.getConnection());
+                        }
+                    } catch (IOException e) {
+                        muxes.remove(i);
+                    }
+                }
+                c = ep.connect(handle, active, idle);
+                if (c != null) {
+                    for (int i = muxes.size(); --i >= 0;) {
+                        OutboundMux mux = (OutboundMux) muxes.get(i);
+                        if (c == mux.getConnection()) {
+                            if (logger.isLoggable(Level.FINEST)) {
+                                logger.log(Level.FINEST, "using {0}", c);
+                            }
+                            mux.newRequestPending();
+                            return mux;
+                        }
+                    }
+                    OutboundMux mux = newOutboundMux(c);
+                    mux.newRequestPending();
+                    muxes.add(mux);
+                    return mux;
+                }
+                c = ep.connect(handle);
+                OutboundMux mux = newOutboundMux(c);
+                mux.newRequestPending();
+                muxes.add(mux);
+                return mux;
             }
-            Connection c = ep.connect(handle, active, idle);
+        } finally {
             synchronized (this) {
-		if (c != null) {
-		    for (int i = muxes.size(); --i >= 0; ) {
-			OutboundMux mux = (OutboundMux) muxes.get(i);
-			if (c == mux.getConnection()) {
-			    if (logger.isLoggable(Level.FINEST)) {
-				logger.log(Level.FINEST, "using {0}", c);
-			    }
-			    mux.newRequestPending();
-			    return mux;
-			}
-		    }
-		    OutboundMux mux = newOutboundMux(c);
-		    mux.newRequestPending();
-		    muxes.add(mux);
-		    return mux;
-		}
-	    }
-	    c = ep.connect(handle);
-	    synchronized (this) {
-		OutboundMux mux = newOutboundMux(c);
-		mux.newRequestPending();
-		muxes.add(mux);
-		return mux;
-	    }
-	} finally {
-	    synchronized (this) {
-		assert pendingConnects > 0;
-		pendingConnects--;
-	    }
-	}
+                assert pendingConnects > 0;
+                pendingConnects--;
+            }
+        }
     }
 
     /**
-     * For each mux, calls checkIdle on the mux, and if checkIdle returns
-     * true, removes the mux and adds it to the idle list. Returns true
-     * if no connects are pending and no muxes remain.
+     * For each mux, calls checkIdle on the mux, and if checkIdle returns true,
+     * removes the mux and adds it to the idle list. Returns true if no connects
+     * are pending and no muxes remain.
      */
     synchronized boolean checkIdle(long now, List idle) {
-	for (int i = muxes.size(); --i >= 0; ) {
-	    OutboundMux mux = (OutboundMux) muxes.get(i);
-	    if (mux.checkIdle(now)) {
-		muxes.remove(i);
-		idle.add(mux);
-	    }
-	}
-	return pendingConnects == 0 && muxes.isEmpty();
+        for (int i = muxes.size(); --i >= 0;) {
+            OutboundMux mux = (OutboundMux) muxes.get(i);
+            if (mux.checkIdle(now)) {
+                muxes.remove(i);
+                idle.add(mux);
+            }
+        }
+        return pendingConnects == 0 && muxes.isEmpty();
     }
 
     /**
      * Removes and shuts down a mux.
      */
     void remove(OutboundMux mux) {
-	synchronized (this) {
-	    muxes.remove(mux);
-	}
-	mux.shutdown("writeRequestData failed");
+        synchronized (this) {
+            muxes.remove(mux);
+        }
+        mux.shutdown("writeRequestData failed");
     }
 
     /**
      * Constructs an OutboundMux instance from the connection.
      */
     private OutboundMux newOutboundMux(Connection c) throws IOException {
-	logger.log(Level.FINEST, "opened {0}", c);
-	OutboundMux mux = null;
-	try {
-	    mux = (c.getChannel() == null) ?
-		new OutboundMux(c) : new OutboundMux(c, true);
-                mux.setStartTimeout(HANDSHAKE_TIMEOUT);
-	} finally {
-	    if (mux == null) {
-		try {
-		    c.close();
-		} catch (IOException e) {
-		}
-	    }
-	}
-	return mux;
+        logger.log(Level.FINEST, "opened {0}", c);
+        OutboundMux mux = null;
+        try {
+            mux = (c.getChannel() == null)
+                    ? new OutboundMux(this, c) : new OutboundMux(this, c, true);
+            mux.setStartTimeout(HANDSHAKE_TIMEOUT);
+        } finally {
+            if (mux == null) {
+                try {
+                    c.close();
+                } catch (IOException e) {
+                }
+            }
+        }
+        return mux;
     }
 
     /**
      * Subclass wrapper around MuxClient for outbound connections.
      */
     private static final class OutboundMux extends MuxClient {
-	/**
-	 * The outbound connection.
-	 */
-	private final Connection c;
-	/**
-	 * Lock to enforce single start of mux.
-	 */
-	private final Object startLock = new Object();
-	 /**
-	 * True if the mux needs to be started.
-	 */
-	private volatile boolean notStarted = true;
+
+        /**
+         * ConnectionManager
+         */
+        private final ConnectionManager manager;
+        /**
+         * The outbound connection.
+         */
+        private final Connection c;
+        /**
+         * Lock to enforce single start of mux.
+         */
+        private final Object startLock = new Object();
+        /**
+         * True if the mux needs to be started.
+         */
+        private volatile boolean notStarted = true;
         private boolean starting = false;
-	/**
-	 * Number of pending newRequest calls.  Guarded by enclosing
-	 * ConnectionManager's lock.
-	 */
-	private int pendingNewRequests = 0;
-	/**
-	 * The time this mux was found to be idle by the Reaper
-	 * thread.  Set to zero each time a request is initiated.
-	 * Guarded by enclosing ConnectionManager's lock.
-	 */
-	private volatile long idleTime = 0;
-
-	/**
-	 * Constructs an instance from the connection's streams.
-	 */
-	OutboundMux(Connection c) throws IOException {
-	    super(c.getOutputStream(), c.getInputStream());
-	    this.c = c;
-	}
-
-	/**
-	 * Constructs an instance from the connection's channel.
-	 */
-	OutboundMux(Connection c, boolean ignore) throws IOException {
-	    super(c.getChannel());
-	    this.c = c;
-	}
-
-	/**
-	 * Returns the outbound connection.
-	 */
-	Connection getConnection() {
-	    return c;
-	}
-
-	/**
-	 * Registers a pending newRequest call.
-	 */
-	synchronized void newRequestPending() {
-	    pendingNewRequests++;
-	}
-
-	/**
-	 * Initiates a new request on the mux and returns it, and sets the
-	 * idle time to zero. Starts the mux if necessary, and decrements
-	 * the pending newRequest count.
-	 */
+        /**
+         * Number of pending newRequest calls. Guarded by enclosing
+         * ConnectionManager's lock.
+         */
+        private int pendingNewRequests = 0;
+        /**
+         * The time this mux was found to be idle by the Reaper thread. Set to
+         * zero each time a request is initiated. Guarded by enclosing
+         * ConnectionManager's lock.
+         */
+        private long idleTime = 0;
+
+        /**
+         * Constructs an instance from the connection's streams.
+         */
+        OutboundMux(ConnectionManager manager, Connection c) throws IOException {
+            super(c.getOutputStream(), c.getInputStream());
+            this.c = c;
+            this.manager = manager;
+        }
+
+        /**
+         * Constructs an instance from the connection's channel.
+         */
+        OutboundMux(ConnectionManager manager, Connection c, boolean ignore) throws IOException {
+            super(c.getChannel());
+            this.c = c;
+            this.manager = manager;
+        }
+
+        /**
+         * Returns the outbound connection.
+         */
+        Connection getConnection() {
+            return c;
+        }
+
+        /**
+         * Registers a pending newRequest call.
+         */
+        void newRequestPending() {
+            synchronized (manager) {
+                pendingNewRequests++;
+            }
+        }
+
+        /**
+         * Initiates a new request on the mux and returns it, and sets the idle
+         * time to zero. Starts the mux if necessary, and decrements the pending
+         * newRequest count.
+         */
         @Override
-	public OutboundRequest newRequest() throws IOException {
+        public OutboundRequest newRequest() throws IOException {
 //	    assert !Thread.holdsLock(ConnectionManager.this);
             boolean interrupted = false;
-	    try {
+            try {
                 boolean start = false;
-                if (notStarted){
-                    synchronized (startLock){
-                        while (starting){
+                if (notStarted) {
+                    synchronized (startLock) {
+                        while (starting) {
                             try {
                                 startLock.wait();
                             } catch (InterruptedException ex) {
                                 interrupted = true;
                             }
                         }
-                        if (notStarted){
+                        if (notStarted) {
                             starting = true;
                             start = true;
                         }
                     }
-                    if (start){
+                    if (start) {
                         try {
-                            synchronized (this){
-                                start();
-                            }
+                            start();
                         } finally {
-                            synchronized (startLock){
+                            synchronized (startLock) {
                                 notStarted = false;
                                 starting = false;
                                 startLock.notifyAll();
@@ -390,382 +399,396 @@ public final class ConnectionManager {
                         }
                     }
                 }
-                synchronized (this) {
+                synchronized (manager) {
                     idleTime = 0;
+                    return super.newRequest();
                 }
-                return super.newRequest();
-	    } finally {
-                synchronized (this) {
+            } finally {
+                synchronized (manager) {
                     assert pendingNewRequests > 0;
                     pendingNewRequests--;
                 }
-                if (interrupted) Thread.currentThread().interrupt();
+                if (interrupted) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+        }
+
+        /**
+         * Returns the number of active and pending requests.
+         */
+        @Override
+        public int requestsInProgress() throws IOException {
+            synchronized (manager) {
+                return super.requestsInProgress() + pendingNewRequests;
+            }
+        }
+
+        /**
+         * Returns true if the mux is dead, or the mux is idle and the recorded
+         * idle time is more than TIMEOUT milliseconds before now, and returns
+         * false otherwise. If the mux is idle and the recorded idle time is
+         * zero, sets the recorded idle time to now.
+         */
+        boolean checkIdle(long now) {
+            try {
+                synchronized (manager) {
+                    if (requestsInProgress() == 0) {
+                        if (idleTime == 0) {
+                            idleTime = now;
+                        } else {
+                            return now - idleTime > TIMEOUT;
+                        }
+                    }
+                    return false;
+                }
+            } catch (IOException e) {
+                return true;
+            }
+        }
+
+        /**
+         * Close the connection, so that the provider is notified.
+         */
+        @Override
+        protected void handleDown() {
+            try {
+                c.close();
+            } catch (IOException e) {
             }
-	}
+        }
 
-	/**
-	 * Returns the number of active and pending requests.
-	 */
-        @Override
-	public synchronized int requestsInProgress() throws IOException {
-	    return super.requestsInProgress() + pendingNewRequests;
-	}
-
-	/**
-	 * Returns true if the mux is dead, or the mux is idle and the
-	 * recorded idle time is more than TIMEOUT milliseconds before now,
-	 * and returns false otherwise. If the mux is idle and the recorded
-	 * idle time is zero, sets the recorded idle time to now.
-	 */
-	boolean checkIdle(long now) {
-	    try {
-                synchronized (this){
-		if (requestsInProgress() == 0) {
-		    if (idleTime == 0) {
-			idleTime = now;
-		    } else {
-			return now - idleTime > TIMEOUT;
-		    }
-		}
-		return false;
-                }
-	    } catch (IOException e) {
-		return true;
-	    }
-	}
-
-	/**
-	 * Close the connection, so that the provider is notified.
-	 */
-        @Override
-	protected void handleDown() {
-	    try {
-		c.close();
-	    } catch (IOException e) {
-	    }
-	}
-
-	boolean shouldRetry() {
-	    return false; // XXX
-	}
+        boolean shouldRetry() {
+            return false; // XXX
+        }
     }
 
     /**
      * Outbound request wrapper around the outbound request created by the mux.
      */
     private static final class Outbound implements OutboundRequest {
-	/**
-	 * The outbound request created by the mux.
-	 */
-	private final OutboundRequest req;
-	/**
-	 * The connection on which the outbound request originates.
-	 */
-	private final Connection c;
-	/**
-	 * The outbound request handle.
-	 */
-	private final OutboundRequestHandle handle;
-	/**
-	 * Wrapper around the mux's response input stream.
-	 */
-	private final InputStream in;
-	/**
-	 * Delivery status override from readResponseData.
-	 */
-	private boolean status = true;
-
-	Outbound(OutboundRequest req,
-		 Connection c,
-		 OutboundRequestHandle handle)
-	{
-	    this.req = req;
-	    this.c = c;
-	    this.handle = handle;
-	    in = new Input(handle);
-	}
-
-	/* pass-through to the underlying request */
-        @Override
-	public OutputStream getRequestOutputStream() {
-	    return req.getRequestOutputStream();
-	}
-
-	/* return the wrapper */
-        @Override
-	public InputStream getResponseInputStream() {
-	    return in;
-	}
-
-	/* delegate to the connection */
-        @Override
-	public void populateContext(Collection context) {
-	    c.populateContext(handle, context);
-	}
-
-	/* delegate to the connection */
-        @Override
-	public InvocationConstraints getUnfulfilledConstraints() {
-	    return c.getUnfulfilledConstraints(handle);
-	}
-
-	/**
-	 * False if readResponseData returned an exception, otherwise
-	 * pass-through to the underlying request.
-	 */
-        @Override
-	public boolean getDeliveryStatus() {
-	    return status && req.getDeliveryStatus();
-	}
-
-	/* pass-through to the underlying request */
-        @Override
-	public void abort() {
-	    req.abort();
-	}
-
-	/**
-	 * Wrapper for the response input stream of an outbound request,
-	 * used to call readResponseData on the underlying connection
-	 * before subsequent data is read by higher levels. Note that this
-	 * class does not support mark/reset.
-	 */
-	private final class Input extends InputStream {
-	    /**
-	     * The underlying input stream from the outbound request.
-	     */
-	    private final InputStream in;
-	    /**
-	     * The handle, or null if readResponseData has been called.
-	     */
-	    private OutboundRequestHandle handle;
-
-	    Input(OutboundRequestHandle handle) {
-		in = req.getResponseInputStream();
-		this.handle = handle;
-	    }
-
-	    /**
-	     * Calls readResponseData on the connection, exactly once.
-	     * Sets the handle to null to indicate that it has been called.
-	     */
-	    private void readFirst() throws IOException {
-		if (handle != null) {
-		    try {
-			IOException e = c.readResponseData(handle, in);
-			if (e != null) {
-			    status = false;
-			    throw e;
-			}
-		    } finally {
-			handle = null;
-		    }
-		}
-	    }
 
-	    /** Call readFirst, then pass through. */
+        /**
+         * The outbound request created by the mux.
+         */
+        private final OutboundRequest req;
+        /**
+         * The connection on which the outbound request originates.
+         */
+        private final Connection c;
+        /**
+         * The outbound request handle.
+         */
+        private final OutboundRequestHandle handle;
+        /**
+         * Wrapper around the mux's response input stream.
+         */
+        private final InputStream in;
+        /**
+         * Delivery status override from readResponseData.
+         */
+        private volatile boolean status = true;
+
+        Outbound(OutboundRequest req,
+                Connection c,
+                OutboundRequestHandle handle) {
+            this.req = req;
+            this.c = c;
+            this.handle = handle;
+            in = new Input(handle);
+        }
+
+        /* pass-through to the underlying request */
+        @Override
+        public OutputStream getRequestOutputStream() {
+            return req.getRequestOutputStream();
+        }
+
+        /* return the wrapper */
+        @Override
+        public InputStream getResponseInputStream() {
+            return in;
+        }
+
+        /* delegate to the connection */
+        @Override
+        public void populateContext(Collection context) {
+            c.populateContext(handle, context);
+        }
+
+        /* delegate to the connection */
+        @Override
+        public InvocationConstraints getUnfulfilledConstraints() {
+            return c.getUnfulfilledConstraints(handle);
+        }
+
+        /**
+         * False if readResponseData returned an exception, otherwise
+         * pass-through to the underlying request.
+         */
+        @Override
+        public boolean getDeliveryStatus() {
+            return status && req.getDeliveryStatus();
+        }
+
+        /* pass-through to the underlying request */
+        @Override
+        public void abort() {
+            req.abort();
+        }
+
+        /**
+         * Wrapper for the response input stream of an outbound request, used to
+         * call readResponseData on the underlying connection before subsequent
+         * data is read by higher levels. Note that this class does not support
+         * mark/reset.
+         */
+        private final class Input extends InputStream {
+
+            /**
+             * The underlying input stream from the outbound request.
+             */
+            private final InputStream in;
+            /**
+             * The handle, or null if readResponseData has been called.
+             */
+            private OutboundRequestHandle handle;
+
+            Input(OutboundRequestHandle handle) {
+                in = req.getResponseInputStream();
+                this.handle = handle;
+            }
+
+            /**
+             * Calls readResponseData on the connection, exactly once. Sets the
+             * handle to null to indicate that it has been called.
+             */
+            private void readFirst() throws IOException {
+                if (handle != null) {
+                    try {
+                        IOException e = c.readResponseData(handle, in);
+                        if (e != null) {
+                            status = false;
+                            throw e;
+                        }
+                    } finally {
+                        handle = null;
+                    }
+                }
+            }
+
+            /**
+             * Call readFirst, then pass through.
+             */
             @Override
-	    public int read() throws IOException {
-		readFirst();
-		return in.read();
-	    }
+            public int read() throws IOException {
+                readFirst();
+                return in.read();
+            }
 
-	    /** Call readFirst, then pass through. */
+            /**
+             * Call readFirst, then pass through.
+             */
             @Override
-	    public int read(byte[] b, int off, int len) throws IOException {
-		readFirst();
-		return in.read(b, off, len);
-	    }
+            public int read(byte[] b, int off, int len) throws IOException {
+                readFirst();
+                return in.read(b, off, len);
+            }
 
-	    /** Call readFirst, then pass through. */
+            /**
+             * Call readFirst, then pass through.
+             */
             @Override
-	    public long skip(long n) throws IOException {
-		readFirst();
-		return in.skip(n);
-	    }
+            public long skip(long n) throws IOException {
+                readFirst();
+                return in.skip(n);
+            }
 
-	    /** Call readFirst, then pass through. */
+            /**
+             * Call readFirst, then pass through.
+             */
             @Override
-	    public int available() throws IOException {
-		readFirst();
-		return in.available();
-	    }
+            public int available() throws IOException {
+                readFirst();
+                return in.available();
+            }
 
-	    /** pass-through */
+            /**
+             * pass-through
+             */
             @Override
-	    public void close() throws IOException {
-		in.close();
-	    }
-	}
+            public void close() throws IOException {
+                in.close();
+            }
+        }
     }
 
     /**
-     * Records idle times in muxes and shuts down muxes that have been
-     * idle for at least TIMEOUT milliseconds.
+     * Records idle times in muxes and shuts down muxes that have been idle for
+     * at least TIMEOUT milliseconds.
      */
     private static final class Reaper implements Runnable {
-	Reaper() {
-	}
 
-	/**
-	 * Sleep for TIMEOUT milliseconds.  Then call checkIdle on
-	 * each manager with open muxes, shutdown all of idle muxes
-	 * that have been collected, and if no managers with open
-	 * muxes remain terminate, else repeat (go back to sleep).
-	 */
-        @Override
-	public void run() {
-	    List idle = new ArrayList(1);
-	    boolean done;
-	    do {
-		try {
-		    Thread.sleep(TIMEOUT);
-		} catch (InterruptedException e) {
-		    return;
-		}
-		long now = System.currentTimeMillis();
-		synchronized (reaperSet) {
-		    for (Iterator iter = reaperSet.iterator();
-			 iter.hasNext();)
-		    {
-			ConnectionManager mgr =
-			    (ConnectionManager) iter.next();
-			if (mgr.checkIdle(now, idle)) {
-			    iter.remove();
-			}
-		    }
-		    done = reaperSet.isEmpty();
-		}
-		for (int i = idle.size(); --i >= 0; ) {
-		    ((OutboundMux) idle.get(i)).shutdown("idle");
-		}
-		idle.clear();
-	    } while (!done);
-	}
+        Reaper() {
+        }
+
+        /**
+         * Sleep for TIMEOUT milliseconds. Then call checkIdle on each manager
+         * with open muxes, shutdown all of idle muxes that have been collected,
+         * and if no managers with open muxes remain terminate, else repeat (go
+         * back to sleep).
+         */
+        @Override
+        public void run() {
+            List idle = new ArrayList(1);
+            boolean done;
+            do {
+                try {
+                    Thread.sleep(TIMEOUT);
+                } catch (InterruptedException e) {
+                    return;
+                }
+                long now = System.currentTimeMillis();
+                synchronized (reaperSet) {
+                    for (Iterator iter = reaperSet.iterator();
+                            iter.hasNext();) {
+                        ConnectionManager mgr
+                                = (ConnectionManager) iter.next();
+                        if (mgr.checkIdle(now, idle)) {
+                            iter.remove();
+                        }
+                    }
+                    done = reaperSet.isEmpty();
+                }
+                for (int i = idle.size(); --i >= 0;) {
+                    ((OutboundMux) idle.get(i)).shutdown("idle");
+                }
+                idle.clear();
+            } while (!done);
+        }
     }
 
     /**
      * Outbound request iterator returned by newRequest.
      */
     private static final class ReqIterator implements OutboundRequestIterator {
+
         /**
          * ConnectionManager
          */
         private final ConnectionManager manager;
-	/**
-	 * The request handle.
-	 */
-	private final OutboundRequestHandle handle;
-	/**
-	 * True if next has not yet been called.
-	 */
-	private boolean first = true;
-	/**
-	 * The outbound mux from the last call to next, if any.
-	 */
-	private OutboundMux mux;
+        /**
+         * The request handle.
+         */
+        private final OutboundRequestHandle handle;
+        /**
+         * True if next has not yet been called.
+         */
+        private boolean first = true;
+        /**
+         * The outbound mux from the last call to next, if any.
+         */
+        private OutboundMux mux;
 
-	ReqIterator(OutboundRequestHandle handle, ConnectionManager cm) {
-	    this.handle = handle;
+        ReqIterator(OutboundRequestHandle handle, ConnectionManager cm) {
+            this.handle = handle;
             manager = cm;
-	}
+        }
 
-	/**
-	 * Returns true if next has not yet been called or if the last mux
-	 * returned had an asynchronous close.
-	 */
-        @Override
-	public boolean hasNext() {
-	    return first || (mux != null && mux.shouldRetry());
-	}
-
-	/**
-	 * If hasNext returns true, finds the entry (if any) for the
-	 * connection endpoint. If no entry is found, creates one and spawns
-	 * a Reaper if this is the only entry. Either way, bumps the connect
-	 * count for the entry. Calls connect on the entry to get a mux, then
-	 * calls newRequest on the mux, calls writeRequestData on the
-	 * connection, and returns a new outbound request wrapper.
-	 */
-        @Override
-	public OutboundRequest next() throws IOException {
-	    if (!hasNext()) {
-		throw new NoSuchElementException();
-	    }
-	    first = false;
-	    mux = manager.connect(handle);
-	    OutboundRequest req = mux.newRequest();
-	    OutboundRequest sreq = null;
-	    try {
-		Connection c = mux.getConnection();
-		c.writeRequestData(handle, req.getRequestOutputStream());
-		sreq = new Outbound(req, c, handle);
-	    } finally {
-		if (sreq == null) {
-		    manager.remove(mux);
-		}
-	    }
-	    return sreq;
-	}
+        /**
+         * Returns true if next has not yet been called or if the last mux
+         * returned had an asynchronous close.
+         */
+        @Override
+        public boolean hasNext() {
+            return first || (mux != null && mux.shouldRetry());
+        }
+
+        /**
+         * If hasNext returns true, finds the entry (if any) for the connection
+         * endpoint. If no entry is found, creates one and spawns a Reaper if
+         * this is the only entry. Either way, bumps the connect count for the
+         * entry. Calls connect on the entry to get a mux, then calls newRequest
+         * on the mux, calls writeRequestData on the connection, and returns a
+         * new outbound request wrapper.
+         */
+        @Override
+        public OutboundRequest next() throws IOException {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            first = false;
+            mux = manager.connect(handle);
+            OutboundRequest req = mux.newRequest();
+            OutboundRequest sreq = null;
+            try {
+                Connection c = mux.getConnection();
+                c.writeRequestData(handle, req.getRequestOutputStream());
+                sreq = new Outbound(req, c, handle);
+            } finally {
+                if (sreq == null) {
+                    manager.remove(mux);
+                }
+            }
+            return sreq;
+        }
     }
 
     /**
-     * Returns an <code>OutboundRequestIterator</code> to use to send
-     * a new request for the specified handle to this connection
-     * manager's <code>ConnectionEndpoint</code>.
-     *
-     * <p>If the <code>hasNext</code> method of the returned iterator
-     * returns <code>true</code>, the <code>next</code> method behaves
-     * as follows:
+     * Returns an <code>OutboundRequestIterator</code> to use to send a new
+     * request for the specified handle to this connection manager's
+     * <code>ConnectionEndpoint</code>.
+     *
+     * <p>
+     * If the <code>hasNext</code> method of the returned iterator returns
+     * <code>true</code>, the <code>next</code> method behaves as follows:
      *
      * <blockquote>
      *
      * The connection endpoint's {@link
      * ConnectionEndpoint#connect(OutboundRequestHandle,Collection,Collection)
-     * connect} method is invoked with any active connections that
-     * have not reached their maximum number of in-progress requests,
-     * any idle connections, and <code>handle</code>.  If that returns
-     * <code>null</code>, the endpoint's {@link
-     * ConnectionEndpoint#connect(OutboundRequestHandle) connect}
-     * method is invoked with <code>handle</code>.  In either case, if
-     * a new connection is returned, the Jini ERI multiplexing
-     * protocol is started on the connection (as the client).
-     * Finally, the {@link Connection#writeRequestData
+     * connect} method is invoked with any active connections that have not
+     * reached their maximum number of in-progress requests, any idle
+     * connections, and <code>handle</code>. If that returns <code>null</code>,
+     * the endpoint's {@link
+     * ConnectionEndpoint#connect(OutboundRequestHandle) connect} method is
+     * invoked with <code>handle</code>. In either case, if a new connection is
+     * returned, the Jini ERI multiplexing protocol is started on the connection
+     * (as the client). Finally, the {@link Connection#writeRequestData
      * writeRequestData} method of the connection is invoked with
      * <code>handle</code> and the request output stream of the {@link
-     * OutboundRequest} that is created for the request.  If any
-     * exception is thrown while obtaining a connection from the
-     * endpoint or writing the request data, that exception is thrown
-     * to the caller.  The <code>OutboundRequest</code> returned by
-     * <code>next</code> will invoke the {@link
-     * Connection#readResponseData readResponseData} method of the
-     * connection with the specified handle and the response input
-     * stream before any other data is read from the response input
-     * stream.  The {@link OutboundRequest#populateContext
+     * OutboundRequest} that is created for the request. If any exception is
+     * thrown while obtaining a connection from the endpoint or writing the
+     * request data, that exception is thrown to the caller. The
+     * <code>OutboundRequest</code> returned by <code>next</code> will invoke
+     * the {@link
+     * Connection#readResponseData readResponseData} method of the connection
+     * with the specified handle and the response input stream before any other
+     * data is read from the response input stream. The {@link OutboundRequest#populateContext
      * populateContext} and {@link
      * OutboundRequest#getUnfulfilledConstraints
-     * getUnfulfilledConstraints} methods of the
-     * <code>OutboundRequest</code> are implemented by delegating to
-     * the corresponding method of the connection passing
-     * <code>handle</code> and the other arguments (if any).
+     * getUnfulfilledConstraints} methods of the <code>OutboundRequest</code>
+     * are implemented by delegating to the corresponding method of the
+     * connection passing <code>handle</code> and the other arguments (if any).
      *
      * </blockquote>
      *
-     * <p>The returned iterator might allow continued iteration if the
-     * connection used for the most recent request attempt was shut
-     * down gracefully by the server.
-     *
-     * @param handle a handle to identify the request in later
-     * invocations on the connection endpoint and its connections
-     *
-     * @return an <code>OutboundRequestIterator</code> to use to send
-     * a new request for the specified handle to this connection
-     * manager's <code>ConnectionEndpoint</code>
-     *
-     * @throws NullPointerException if <code>handle</code> is
-     * <code>null</code>
-     **/
+     * <p>
+     * The returned iterator might allow continued iteration if the connection
+     * used for the most recent request attempt was shut down gracefully by the
+     * server.
+     *
+     * @param handle a handle to identify the request in later invocations on
+     * the connection endpoint and its connections
+     *
+     * @return an <code>OutboundRequestIterator</code> to use to send a new
+     * request for the specified handle to this connection manager's
+     * <code>ConnectionEndpoint</code>
+     *
+     * @throws NullPointerException if <code>handle</code> is <code>null</code>
+     *
+     */
     public OutboundRequestIterator newRequest(OutboundRequestHandle handle) {
-	return new ReqIterator(handle, this);
+        return new ReqIterator(handle, this);
     }
 }

Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/jeri/tcp/TcpEndpoint.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/jeri/tcp/TcpEndpoint.java?rev=1595569&r1=1595568&r2=1595569&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/jeri/tcp/TcpEndpoint.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/jeri/tcp/TcpEndpoint.java Sun May 18 08:02:56 2014
@@ -93,7 +93,8 @@ public final class TcpEndpoint
      * weak set of canonical instances; in order to use WeakHashMap,
      * maps canonical instances to weak references to themselves
      **/
-    private static final Map internTable = new WeakHashMap();
+    private static final Map<TcpEndpoint,WeakReference<TcpEndpoint>> internTable 
+            = new WeakHashMap<TcpEndpoint,WeakReference<TcpEndpoint>>();
 
     /** client transport logger */
     private static final Logger logger =
@@ -126,7 +127,7 @@ public final class TcpEndpoint
      **/
     private final SocketFactory sf;
 
-    private transient ConnectionManager connectionManager;
+    private transient volatile ConnectionManager connectionManager;
 
     /**
      * Returns a <code>TcpEndpoint</code> instance for the given
@@ -189,16 +190,22 @@ public final class TcpEndpoint
      **/
     private static TcpEndpoint intern(TcpEndpoint endpoint) {
 	synchronized (internTable) {
-	    Reference ref = (WeakReference) internTable.get(endpoint);
+	    Reference<TcpEndpoint> ref = (WeakReference) internTable.get(endpoint);
 	    if (ref != null) {
-		TcpEndpoint canonical = (TcpEndpoint) ref.get();
+		TcpEndpoint canonical = ref.get();
 		if (canonical != null) {
 		    return canonical;
 		}
 	    }
 	    endpoint.connectionManager =
-		new ConnectionManager(endpoint.new ConnectionEndpointImpl());
-	    internTable.put(endpoint, new WeakReference(endpoint));
+		new ConnectionManager(
+                        new ConnectionEndpointImpl(
+                                endpoint.getHost(),
+                                endpoint.getPort(),
+                                endpoint.getSocketFactory()
+                        )
+                );
+	    internTable.put(endpoint, new WeakReference<TcpEndpoint>(endpoint));
 	    return endpoint;
 	}
     }
@@ -527,9 +534,16 @@ public final class TcpEndpoint
      * correctly, so we do not bother to validate request handles and
      * connections passed in.
      **/
-    private class ConnectionEndpointImpl implements ConnectionEndpoint {
-
-	ConnectionEndpointImpl() { }
+    private static class ConnectionEndpointImpl implements ConnectionEndpoint {
+        private final String host;
+        private final int port;
+        private final SocketFactory sf;
+
+	ConnectionEndpointImpl(String host, int port, SocketFactory sf) {
+            this.host = host;
+            this.port = port;
+            this.sf = sf;
+        }
 
 	/**
 	 * Invoked by ConnectionManager to create a new connection.