You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@river.apache.org by pe...@apache.org on 2014/05/09 09:03:19 UTC
svn commit: r1593493 [20/24] - in /river/jtsk/skunk/qa_refactor/trunk: qa/
qa/src/com/sun/jini/test/impl/end2end/jssewrapper/
qa/src/com/sun/jini/test/impl/joinmanager/
qa/src/com/sun/jini/test/impl/mahalo/
qa/src/com/sun/jini/test/impl/outrigger/match...
Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupDiscovery.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupDiscovery.java?rev=1593493&r1=1593492&r2=1593493&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupDiscovery.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupDiscovery.java Fri May 9 07:03:18 2014
@@ -295,10 +295,10 @@ abstract class AbstractLookupDiscovery i
* Only 1 instance of this thread is run.
*/
private class Notifier extends Thread {
- /** Create a daemon thread */
+
public Notifier() {
super("event listener notification");
- setDaemon(true);
+ setDaemon(false);
}//end constructor
public void run() {
@@ -428,10 +428,9 @@ abstract class AbstractLookupDiscovery i
*/
private ArrayList<NetworkInterface> retryNics = null;
- /** Create a daemon thread */
public AnnouncementListener() throws IOException {
super("multicast discovery announcement listener");
- setDaemon(true);
+ setDaemon(false);
sock = new MulticastSocket(Constants.discoveryPort);
switch(nicsToUse) {
case NICS_USE_ALL:
@@ -628,10 +627,9 @@ abstract class AbstractLookupDiscovery i
/** Server socket for accepting connections */
public final ServerSocket serv;
- /** Create a daemon thread */
public ResponseListener() throws IOException {
super("multicast discovery response listener");
- setDaemon(true);
+ setDaemon(false);
serv = new ServerSocket(0);
}//end constructor
@@ -709,12 +707,11 @@ abstract class AbstractLookupDiscovery i
private final String[] groups;
private final boolean delayFlag;
- /** Create a daemon thread */
public Requestor(String[] groups, int port, boolean delayFlag)
throws IOException
{
super("multicast discovery request");
- setDaemon(true);
+ setDaemon(false);
sock = new MulticastSocket(Constants.discoveryPort);
sock.setTimeToLive(
multicastRequestConstraints.getMulticastTimeToLive(
@@ -807,10 +804,10 @@ abstract class AbstractLookupDiscovery i
private class AnnouncementTimerThread extends Thread {
/* Number of interval to exceed for declaring announcements stopped */
private static final long N_INTERVALS = 3;
- /** Create a daemon thread */
+
public AnnouncementTimerThread() {
super("multicast announcement timer");
- setDaemon(true);
+ setDaemon(false);
}
public void run() {
long timeThreshold = N_INTERVALS*multicastAnnouncementInterval;
@@ -2426,6 +2423,7 @@ abstract class AbstractLookupDiscovery i
discoveryWakeupMgr.stop();
}
}//end sync
+ if (notifierThread != null) notifierThread.interrupt();
return null;
}//end run
});//end doPrivileged
Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupLocatorDiscovery.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupLocatorDiscovery.java?rev=1593493&r1=1593492&r2=1593493&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupLocatorDiscovery.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupLocatorDiscovery.java Fri May 9 07:03:18 2014
@@ -399,10 +399,10 @@ abstract class AbstractLookupLocatorDisc
private class Notifier extends Thread {
// In case client code catches and resets interrupt.
private volatile boolean interrupted = false;
- /** Construct a daemon thread */
+
public Notifier() {
super("event notifier");
- setDaemon(true);
+ setDaemon(false);
}//end constructor
public void interrupt(){
Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/jeri/connection/ConnectionManager.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/jeri/connection/ConnectionManager.java?rev=1593493&r1=1593492&r2=1593493&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/jeri/connection/ConnectionManager.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/jeri/connection/ConnectionManager.java Fri May 9 07:03:18 2014
@@ -1,739 +1,764 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package net.jini.jeri.connection;
-
-import com.sun.jini.action.GetLongAction;
-import com.sun.jini.jeri.internal.mux.MuxClient;
-import com.sun.jini.logging.Levels;
-import com.sun.jini.thread.Executor;
-import com.sun.jini.thread.GetThreadPoolAction;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.security.AccessController;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import net.jini.core.constraint.InvocationConstraints;
-import net.jini.jeri.OutboundRequest;
-import net.jini.jeri.OutboundRequestIterator;
-
-/**
- * Provides client-side connection management using the <a
- * href="{@docRoot}/net/jini/jeri/connection/doc-files/mux.html">Jini
- * extensible remote invocation (Jini ERI) multiplexing protocol</a>
- * to frame and multiplex requests and responses over connections.
- *
- * <p>A <code>ConnectionManager</code> is created by a
- * connection-based {@link net.jini.jeri.Endpoint} implemention to manage
- * connections to a particular {@link ConnectionEndpoint}. The {@link
- * #newRequest newRequest} method is used to send a request to the
- * connection endpoint.
- *
- * <p>Each request attempt is mapped to a new <i>session</i> of the
- * Jini ERI multiplexing protocol on an established connection chosen
- * by the <code>ConnectionEndpoint</code>. Request data is written as
- * the data sent for the session, and response data is read as the
- * data recdeived for the session.
- *
- * @author Sun Microsystems, Inc.
- * @since 2.0
- *
- * @com.sun.jini.impl
- *
- * This implementation uses the {@link Logger} named
- * <code>net.jini.jeri.connection.ConnectionManager</code> to log
- * information at the following levels:
- *
- * <p><table summary="Describes what is logged by ConnectionManager to
- * its logger at various logging levels" border=1 cellpadding=5>
- *
- * <tr> <th> Level <th> Description
- *
- * <tr> <td> {@link Level#FINEST FINEST} <td> connection opened or
- * reused
- *
- * </table>
- *
- * <p>This implementation uses the {@link Logger} named
- * <code>net.jini.jeri.connection.mux</code> to log information at the
- * following levels:
- *
- * <p><table summary="Describes what is logged by ConnectionManager to
- * the mux logger at various logging levels" border=1 cellpadding=5>
- *
- * <tr> <th> Level <th> Description
- *
- * <tr> <td> {@link Level#WARNING WARNING} <td> unexpected exception
- * during asynchronous I/O processing, or thread creation failure
- *
- * <tr> <td> {@link Levels#HANDLED HANDLED} <td> I/O exception during
- * asynchronous I/O processing
- *
- * <tr> <td> {@link Level#FINEST FINEST} <td> detailed implementation
- * activity
- *
- * </table>
- *
- * <p>This implementation recognizes the following system properties:
- *
- * <p><ul>
- *
- * <li><code>com.sun.jini.jeri.connectionTimeout</code> - Time in
- * milliseconds to leave idle client-side connections around before
- * closing them. The default value is 15000 milliseconds (15 seconds).
- *
- * <li><code>com.sun.jini.jeri.handshakeTimeout</code> - Time in
- * milliseconds for client-side connections to wait for the server to
- * acknowledge an opening handshake. The default value is 15000 milliseconds (15 seconds).
- *
- * </ul>
- **/
-public final class ConnectionManager {
- /**
- * How long to leave idle muxes around before closing them.
- */
- private static final long TIMEOUT =
- ( (Long) AccessController.doPrivileged(new GetLongAction(
- "com.sun.jini.jeri.connectionTimeout",
- 15000))).longValue();
- /**
- * How long to wait for a server to respond to an initial client message.
- */
- private static final long HANDSHAKE_TIMEOUT =
- ((Long) AccessController.doPrivileged(new GetLongAction(
- "com.sun.jini.jeri.handshakeTimeout",
- 15000))).longValue();
- /**
- * ConnectionManager logger.
- */
- private static final Logger logger =
- Logger.getLogger("net.jini.jeri.connection.ConnectionManager");
- /**
- * Executor that executes tasks in pooled system threads.
- */
- private static final Executor systemThreadPool =
- (Executor) AccessController.doPrivileged(
- new GetThreadPoolAction(false));
- /**
- * Set of connection managers with open or pending muxes
- * (connections), for consideration by the reaper thread.
- */
- private static final Set reaperSet = new HashSet();
-
- /**
- * The endpoint.
- */
- private final ConnectionEndpoint ep;
- /**
- * The OutboundMuxes.
- */
- private final List muxes = new ArrayList(1);
- /**
- * The active muxes (during connect).
- */
- private final List active = new ArrayList(1);
- /**
- * Unmodifiable view of active.
- */
- private final Collection roactive =
- Collections.unmodifiableCollection(active);
- /**
- * The idle muxes (during connect).
- */
- private final List idle = new ArrayList(1);
- /**
- * Unmodifiable view of idle.
- */
- private final Collection roidle =
- Collections.unmodifiableCollection(idle);
- /**
- * Number of pending connect calls.
- */
- private int pendingConnects = 0;
-
- /**
- * Creates a new <code>ConnectionManager</code> that manages
- * client-side connections to the specified connection endpoint.
- *
- * @param ep the connection endpoint
- **/
- public ConnectionManager(ConnectionEndpoint ep) {
- this.ep = ep;
- }
-
- /**
- * Calls connect on the connection endpoint with the active and idle
- * muxes and the specified handle. If no connection is returned, calls
- * connect with the handle. In either case, if a new connection is
- * returned, creates and adds a mux for it. In all cases, bumps the
- * newRequest count for the mux and returns it. Removes any dead muxes
- * along the way.
- */
- OutboundMux connect(OutboundRequestHandle handle) throws IOException {
- synchronized (this) {
- pendingConnects++;
- }
- try {
- synchronized (reaperSet) {
- if (reaperSet.isEmpty()) {
- systemThreadPool.execute(
- new Reaper(), "ConnectionManager.Reaper");
- }
- reaperSet.add(this);
- }
- synchronized (this) {
- active.clear();
- idle.clear();
- for (int i = muxes.size(); --i >= 0; ) {
- OutboundMux mux = (OutboundMux) muxes.get(i);
- try {
- int n = mux.requestsInProgress();
- if (n == 0) {
- idle.add(mux.getConnection());
- } else if (n < OutboundMux.MAX_REQUESTS) {
- active.add(mux.getConnection());
- }
- } catch (IOException e) {
- muxes.remove(i);
- }
- }
- Connection c = ep.connect(handle, roactive, roidle);
- if (c != null) {
- for (int i = muxes.size(); --i >= 0; ) {
- OutboundMux mux = (OutboundMux) muxes.get(i);
- if (c == mux.getConnection()) {
- if (logger.isLoggable(Level.FINEST)) {
- logger.log(Level.FINEST, "using {0}", c);
- }
- mux.newRequestPending();
- return mux;
- }
- }
- OutboundMux mux = newOutboundMux(c);
- mux.newRequestPending();
- muxes.add(mux);
- return mux;
- }
- }
- Connection c = ep.connect(handle);
- synchronized (this) {
- OutboundMux mux = newOutboundMux(c);
- mux.newRequestPending();
- muxes.add(mux);
- return mux;
- }
- } finally {
- synchronized (this) {
- assert pendingConnects > 0;
- pendingConnects--;
- }
- }
- }
-
- /**
- * For each mux, calls checkIdle on the mux, and if checkIdle returns
- * true, removes the mux and adds it to the idle list. Returns true
- * if no connects are pending and no muxes remain.
- */
- synchronized boolean checkIdle(long now, List idle) {
- for (int i = muxes.size(); --i >= 0; ) {
- OutboundMux mux = (OutboundMux) muxes.get(i);
- if (mux.checkIdle(now)) {
- muxes.remove(i);
- idle.add(mux);
- }
- }
- return pendingConnects == 0 && muxes.isEmpty();
- }
-
- /**
- * Removes and shuts down a mux.
- */
- void remove(OutboundMux mux) {
- synchronized (this) {
- muxes.remove(mux);
- }
- mux.shutdown("writeRequestData failed");
- }
-
- /**
- * Constructs an OutboundMux instance from the connection.
- */
- private OutboundMux newOutboundMux(Connection c) throws IOException {
- logger.log(Level.FINEST, "opened {0}", c);
- OutboundMux mux = null;
- try {
- mux = (c.getChannel() == null) ?
- new OutboundMux(c) : new OutboundMux(c, true);
- mux.setStartTimeout(HANDSHAKE_TIMEOUT);
- } finally {
- if (mux == null) {
- try {
- c.close();
- } catch (IOException e) {
- }
- }
- }
- return mux;
- }
-
- /**
- * Subclass wrapper around MuxClient for outbound connections.
- */
- private final class OutboundMux extends MuxClient {
- /**
- * The outbound connection.
- */
- private final Connection c;
- /**
- * Lock to enforce single start of mux.
- */
- private final Object startLock = new Object();
- /**
- * True if the mux needs to be started.
- */
- private boolean pendingStart = true;
- /**
- * Number of pending newRequest calls. Guarded by enclosing
- * ConnectionManager's lock.
- */
- private int pendingNewRequests = 0;
- /**
- * The time this mux was found to be idle by the Reaper
- * thread. Set to zero each time a request is initiated.
- * Guarded by enclosing ConnectionManager's lock.
- */
- private long idleTime = 0;
-
- /**
- * Constructs an instance from the connection's streams.
- */
- OutboundMux(Connection c) throws IOException {
- super(c.getOutputStream(), c.getInputStream());
- this.c = c;
- }
-
- /**
- * Constructs an instance from the connection's channel.
- */
- OutboundMux(Connection c, boolean ignore) throws IOException {
- super(c.getChannel());
- this.c = c;
- }
-
- /**
- * Returns the outbound connection.
- */
- Connection getConnection() {
- return c;
- }
-
- /**
- * Registers a pending newRequest call.
- */
- void newRequestPending() {
- assert Thread.holdsLock(ConnectionManager.this);
- pendingNewRequests++;
- }
-
- /**
- * Initiates a new request on the mux and returns it, and sets the
- * idle time to zero. Starts the mux if necessary, and decrements
- * the pending newRequest count.
- */
- public OutboundRequest newRequest() throws IOException {
- assert !Thread.holdsLock(ConnectionManager.this);
- boolean ok = false;
- try {
- synchronized (startLock) {
- if (pendingStart) {
- pendingStart = false;
- start();
- }
- }
- ok = true;
- } finally {
- if (!ok) {
- synchronized (ConnectionManager.this) {
- assert pendingNewRequests > 0;
- pendingNewRequests--;
- }
- }
- }
- synchronized (ConnectionManager.this) {
- assert pendingNewRequests > 0;
- pendingNewRequests--;
- idleTime = 0;
- return super.newRequest();
- }
- }
-
- /**
- * Returns the number of active and pending requests.
- */
- public int requestsInProgress() throws IOException {
- assert Thread.holdsLock(ConnectionManager.this);
- return super.requestsInProgress() + pendingNewRequests;
- }
-
- /**
- * Returns true if the mux is dead, or the mux is idle and the
- * recorded idle time is more than TIMEOUT milliseconds before now,
- * and returns false otherwise. If the mux is idle and the recorded
- * idle time is zero, sets the recorded idle time to now.
- */
- boolean checkIdle(long now) {
- assert Thread.holdsLock(ConnectionManager.this);
- try {
- if (requestsInProgress() == 0) {
- if (idleTime == 0) {
- idleTime = now;
- } else {
- return now - idleTime > TIMEOUT;
- }
- }
- return false;
- } catch (IOException e) {
- return true;
- }
- }
-
- /**
- * Close the connection, so that the provider is notified.
- */
- protected void handleDown() {
- try {
- c.close();
- } catch (IOException e) {
- }
- }
-
- boolean shouldRetry() {
- return false; // XXX
- }
- }
-
- /**
- * Outbound request wrapper around the outbound request created by the mux.
- */
- private static final class Outbound implements OutboundRequest {
- /**
- * The outbound request created by the mux.
- */
- private final OutboundRequest req;
- /**
- * The connection on which the outbound request originates.
- */
- private final Connection c;
- /**
- * The outbound request handle.
- */
- private final OutboundRequestHandle handle;
- /**
- * Wrapper around the mux's response input stream.
- */
- private final InputStream in;
- /**
- * Delivery status override from readResponseData.
- */
- private boolean status = true;
-
- Outbound(OutboundRequest req,
- Connection c,
- OutboundRequestHandle handle)
- {
- this.req = req;
- this.c = c;
- this.handle = handle;
- in = new Input(handle);
- }
-
- /* pass-through to the underlying request */
- public OutputStream getRequestOutputStream() {
- return req.getRequestOutputStream();
- }
-
- /* return the wrapper */
- public InputStream getResponseInputStream() {
- return in;
- }
-
- /* delegate to the connection */
- public void populateContext(Collection context) {
- c.populateContext(handle, context);
- }
-
- /* delegate to the connection */
- public InvocationConstraints getUnfulfilledConstraints() {
- return c.getUnfulfilledConstraints(handle);
- }
-
- /**
- * False if readResponseData returned an exception, otherwise
- * pass-through to the underlying request.
- */
- public boolean getDeliveryStatus() {
- return status && req.getDeliveryStatus();
- }
-
- /* pass-through to the underlying request */
- public void abort() {
- req.abort();
- }
-
- /**
- * Wrapper for the response input stream of an outbound request,
- * used to call readResponseData on the underlying connection
- * before subsequent data is read by higher levels. Note that this
- * class does not support mark/reset.
- */
- private final class Input extends InputStream {
- /**
- * The underlying input stream from the outbound request.
- */
- private final InputStream in;
- /**
- * The handle, or null if readResponseData has been called.
- */
- private OutboundRequestHandle handle;
-
- Input(OutboundRequestHandle handle) {
- in = req.getResponseInputStream();
- this.handle = handle;
- }
-
- /**
- * Calls readResponseData on the connection, exactly once.
- * Sets the handle to null to indicate that it has been called.
- */
- private synchronized void readFirst() throws IOException {
- if (handle != null) {
- try {
- IOException e = c.readResponseData(handle, in);
- if (e != null) {
- status = false;
- throw e;
- }
- } finally {
- handle = null;
- }
- }
- }
-
- /** Call readFirst, then pass through. */
- public int read() throws IOException {
- readFirst();
- return in.read();
- }
-
- /** Call readFirst, then pass through. */
- public int read(byte[] b, int off, int len) throws IOException {
- readFirst();
- return in.read(b, off, len);
- }
-
- /** Call readFirst, then pass through. */
- public long skip(long n) throws IOException {
- readFirst();
- return in.skip(n);
- }
-
- /** Call readFirst, then pass through. */
- public int available() throws IOException {
- readFirst();
- return in.available();
- }
-
- /** pass-through */
- public void close() throws IOException {
- in.close();
- }
- }
- }
-
- /**
- * Records idle times in muxes and shuts down muxes that have been
- * idle for at least TIMEOUT milliseconds.
- */
- private static final class Reaper implements Runnable {
- Reaper() {
- }
-
- /**
- * Sleep for TIMEOUT milliseconds. Then call checkIdle on
- * each manager with open muxes, shutdown all of idle muxes
- * that have been collected, and if no managers with open
- * muxes remain terminate, else repeat (go back to sleep).
- */
- public void run() {
- List idle = new ArrayList(1);
- boolean done;
- do {
- try {
- Thread.sleep(TIMEOUT);
- } catch (InterruptedException e) {
- return;
- }
- long now = System.currentTimeMillis();
- synchronized (reaperSet) {
- for (Iterator iter = reaperSet.iterator();
- iter.hasNext();)
- {
- ConnectionManager mgr =
- (ConnectionManager) iter.next();
- if (mgr.checkIdle(now, idle)) {
- iter.remove();
- }
- }
- done = reaperSet.isEmpty();
- }
- for (int i = idle.size(); --i >= 0; ) {
- ((OutboundMux) idle.get(i)).shutdown("idle");
- }
- idle.clear();
- } while (!done);
- }
- }
-
- /**
- * Outbound request iterator returned by newRequest.
- */
- private final class ReqIterator implements OutboundRequestIterator {
- /**
- * The request handle.
- */
- private final OutboundRequestHandle handle;
- /**
- * True if next has not yet been called.
- */
- private boolean first = true;
- /**
- * The outbound mux from the last call to next, if any.
- */
- private OutboundMux mux;
-
- ReqIterator(OutboundRequestHandle handle) {
- this.handle = handle;
- }
-
- /**
- * Returns true if next has not yet been called or if the last mux
- * returned had an asynchronous close.
- */
- public synchronized boolean hasNext() {
- return first || (mux != null && mux.shouldRetry());
- }
-
- /**
- * If hasNext returns true, finds the entry (if any) for the
- * connection endpoint. If no entry is found, creates one and spawns
- * a Reaper if this is the only entry. Either way, bumps the connect
- * count for the entry. Calls connect on the entry to get a mux, then
- * calls newRequest on the mux, calls writeRequestData on the
- * connection, and returns a new outbound request wrapper.
- */
- public synchronized OutboundRequest next() throws IOException {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
- first = false;
- mux = connect(handle);
- OutboundRequest req = mux.newRequest();
- OutboundRequest sreq = null;
- try {
- Connection c = mux.getConnection();
- c.writeRequestData(handle, req.getRequestOutputStream());
- sreq = new Outbound(req, c, handle);
- } finally {
- if (sreq == null) {
- remove(mux);
- }
- }
- return sreq;
- }
- }
-
- /**
- * Returns an <code>OutboundRequestIterator</code> to use to send
- * a new request for the specified handle to this connection
- * manager's <code>ConnectionEndpoint</code>.
- *
- * <p>If the <code>hasNext</code> method of the returned iterator
- * returns <code>true</code>, the <code>next</code> method behaves
- * as follows:
- *
- * <blockquote>
- *
- * The connection endpoint's {@link
- * ConnectionEndpoint#connect(OutboundRequestHandle,Collection,Collection)
- * connect} method is invoked with any active connections that
- * have not reached their maximum number of in-progress requests,
- * any idle connections, and <code>handle</code>. If that returns
- * <code>null</code>, the endpoint's {@link
- * ConnectionEndpoint#connect(OutboundRequestHandle) connect}
- * method is invoked with <code>handle</code>. In either case, if
- * a new connection is returned, the Jini ERI multiplexing
- * protocol is started on the connection (as the client).
- * Finally, the {@link Connection#writeRequestData
- * writeRequestData} method of the connection is invoked with
- * <code>handle</code> and the request output stream of the {@link
- * OutboundRequest} that is created for the request. If any
- * exception is thrown while obtaining a connection from the
- * endpoint or writing the request data, that exception is thrown
- * to the caller. The <code>OutboundRequest</code> returned by
- * <code>next</code> will invoke the {@link
- * Connection#readResponseData readResponseData} method of the
- * connection with the specified handle and the response input
- * stream before any other data is read from the response input
- * stream. The {@link OutboundRequest#populateContext
- * populateContext} and {@link
- * OutboundRequest#getUnfulfilledConstraints
- * getUnfulfilledConstraints} methods of the
- * <code>OutboundRequest</code> are implemented by delegating to
- * the corresponding method of the connection passing
- * <code>handle</code> and the other arguments (if any).
- *
- * </blockquote>
- *
- * <p>The returned iterator might allow continued iteration if the
- * connection used for the most recent request attempt was shut
- * down gracefully by the server.
- *
- * @param handle a handle to identify the request in later
- * invocations on the connection endpoint and its connections
- *
- * @return an <code>OutboundRequestIterator</code> to use to send
- * a new request for the specified handle to this connection
- * manager's <code>ConnectionEndpoint</code>
- *
- * @throws NullPointerException if <code>handle</code> is
- * <code>null</code>
- **/
- public OutboundRequestIterator newRequest(OutboundRequestHandle handle) {
- return new ReqIterator(handle);
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package net.jini.jeri.connection;
+
+import com.sun.jini.action.GetLongAction;
+import com.sun.jini.jeri.internal.mux.MuxClient;
+import com.sun.jini.logging.Levels;
+import com.sun.jini.thread.Executor;
+import com.sun.jini.thread.GetThreadPoolAction;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.security.AccessController;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import net.jini.core.constraint.InvocationConstraints;
+import net.jini.jeri.OutboundRequest;
+import net.jini.jeri.OutboundRequestIterator;
+
+/**
+ * Provides client-side connection management using the <a
+ * href="{@docRoot}/net/jini/jeri/connection/doc-files/mux.html">Jini
+ * extensible remote invocation (Jini ERI) multiplexing protocol</a>
+ * to frame and multiplex requests and responses over connections.
+ *
+ * <p>A <code>ConnectionManager</code> is created by a
+ * connection-based {@link net.jini.jeri.Endpoint} implemention to manage
+ * connections to a particular {@link ConnectionEndpoint}. The {@link
+ * #newRequest newRequest} method is used to send a request to the
+ * connection endpoint.
+ *
+ * <p>Each request attempt is mapped to a new <i>session</i> of the
+ * Jini ERI multiplexing protocol on an established connection chosen
+ * by the <code>ConnectionEndpoint</code>. Request data is written as
+ * the data sent for the session, and response data is read as the
+ * data recdeived for the session.
+ *
+ * @author Sun Microsystems, Inc.
+ * @since 2.0
+ *
+ * @com.sun.jini.impl
+ *
+ * This implementation uses the {@link Logger} named
+ * <code>net.jini.jeri.connection.ConnectionManager</code> to log
+ * information at the following levels:
+ *
+ * <p><table summary="Describes what is logged by ConnectionManager to
+ * its logger at various logging levels" border=1 cellpadding=5>
+ *
+ * <tr> <th> Level <th> Description
+ *
+ * <tr> <td> {@link Level#FINEST FINEST} <td> connection opened or
+ * reused
+ *
+ * </table>
+ *
+ * <p>This implementation uses the {@link Logger} named
+ * <code>net.jini.jeri.connection.mux</code> to log information at the
+ * following levels:
+ *
+ * <p><table summary="Describes what is logged by ConnectionManager to
+ * the mux logger at various logging levels" border=1 cellpadding=5>
+ *
+ * <tr> <th> Level <th> Description
+ *
+ * <tr> <td> {@link Level#WARNING WARNING} <td> unexpected exception
+ * during asynchronous I/O processing, or thread creation failure
+ *
+ * <tr> <td> {@link Levels#HANDLED HANDLED} <td> I/O exception during
+ * asynchronous I/O processing
+ *
+ * <tr> <td> {@link Level#FINEST FINEST} <td> detailed implementation
+ * activity
+ *
+ * </table>
+ *
+ * <p>This implementation recognizes the following system properties:
+ *
+ * <p><ul>
+ *
+ * <li><code>com.sun.jini.jeri.connectionTimeout</code> - Time in
+ * milliseconds to leave idle client-side connections around before
+ * closing them. The default value is 15000 milliseconds (15 seconds).
+ *
+ * <li><code>com.sun.jini.jeri.handshakeTimeout</code> - Time in
+ * milliseconds for client-side connections to wait for the server to
+ * acknowledge an opening handshake. The default value is 15000 milliseconds (15 seconds).
+ *
+ * </ul>
+ **/
+public final class ConnectionManager {
+ /**
+ * How long to leave idle muxes around before closing them.
+ */
+ private static final long TIMEOUT =
+ ( (Long) AccessController.doPrivileged(new GetLongAction(
+ "com.sun.jini.jeri.connectionTimeout",
+ 360000))).longValue();
+ /**
+ * How long to wait for a server to respond to an initial client message.
+ */
+ private static final long HANDSHAKE_TIMEOUT =
+ ((Long) AccessController.doPrivileged(new GetLongAction(
+ "com.sun.jini.jeri.handshakeTimeout",
+ 360000))).longValue();
+ /**
+ * ConnectionManager logger.
+ */
+ private static final Logger logger =
+ Logger.getLogger("net.jini.jeri.connection.ConnectionManager");
+ /**
+ * Executor that executes tasks in pooled system threads.
+ */
+ private static final Executor systemThreadPool =
+ (Executor) AccessController.doPrivileged(
+ new GetThreadPoolAction(false));
+ /**
+ * Set of connection managers with open or pending muxes
+ * (connections), for consideration by the reaper thread.
+ */
+ private static final Set<ConnectionManager> reaperSet = new HashSet<ConnectionManager>();
+
+ /**
+ * The endpoint.
+ */
+ private final ConnectionEndpoint ep;
+ /**
+ * The OutboundMuxes.
+ */
+ private final List<OutboundMux> muxes = new LinkedList<OutboundMux>();
+
+ /**
+ * Number of pending connect calls.
+ */
+ private int pendingConnects = 0;
+
+ /**
+ * Creates a new <code>ConnectionManager</code> that manages
+ * client-side connections to the specified connection endpoint.
+ *
+ * @param ep the connection endpoint
+ **/
+ public ConnectionManager(ConnectionEndpoint ep) {
+ this.ep = ep;
+ }
+
+ /**
+ * Calls connect on the connection endpoint with the active and idle
+ * muxes and the specified handle. If no connection is returned, calls
+ * connect with the handle. In either case, if a new connection is
+ * returned, creates and adds a mux for it. In all cases, bumps the
+ * newRequest count for the mux and returns it. Removes any dead muxes
+ * along the way.
+ */
+ OutboundMux connect(OutboundRequestHandle handle) throws IOException {
+ synchronized (this) {
+ pendingConnects++;
+ }
+ /**
+ * The active muxes (during connect).
+ */
+ List<Connection> active = new LinkedList<Connection>();
+ /**
+ * The idle muxes (during connect).
+ */
+ List<Connection> idle = new LinkedList<Connection>();
+ try {
+ synchronized (reaperSet) {
+ if (reaperSet.isEmpty()) {
+ systemThreadPool.execute(
+ new Reaper(), "ConnectionManager.Reaper");
+ }
+ reaperSet.add(this);
+ }
+ synchronized (this) {
+ for (int i = muxes.size(); --i >= 0; ) {
+ OutboundMux mux = (OutboundMux) muxes.get(i);
+ try {
+ int n = mux.requestsInProgress();
+ if (n == 0) {
+ idle.add(mux.getConnection());
+ } else if (n < OutboundMux.MAX_REQUESTS) {
+ active.add(mux.getConnection());
+ }
+ } catch (IOException e) {
+ muxes.remove(i);
+ }
+ }
+ }
+ Connection c = ep.connect(handle, active, idle);
+ synchronized (this) {
+ if (c != null) {
+ for (int i = muxes.size(); --i >= 0; ) {
+ OutboundMux mux = (OutboundMux) muxes.get(i);
+ if (c == mux.getConnection()) {
+ if (logger.isLoggable(Level.FINEST)) {
+ logger.log(Level.FINEST, "using {0}", c);
+ }
+ mux.newRequestPending();
+ return mux;
+ }
+ }
+ OutboundMux mux = newOutboundMux(c);
+ mux.newRequestPending();
+ muxes.add(mux);
+ return mux;
+ }
+ }
+ c = ep.connect(handle);
+ synchronized (this) {
+ OutboundMux mux = newOutboundMux(c);
+ mux.newRequestPending();
+ muxes.add(mux);
+ return mux;
+ }
+ } finally {
+ synchronized (this) {
+ assert pendingConnects > 0;
+ pendingConnects--;
+ }
+ }
+ }
+
+ /**
+ * For each mux, calls checkIdle on the mux, and if checkIdle returns
+ * true, removes the mux and adds it to the idle list. Returns true
+ * if no connects are pending and no muxes remain.
+ */
+ synchronized boolean checkIdle(long now, List idle) {
+ for (int i = muxes.size(); --i >= 0; ) {
+ OutboundMux mux = (OutboundMux) muxes.get(i);
+ if (mux.checkIdle(now)) {
+ muxes.remove(i);
+ idle.add(mux);
+ }
+ }
+ return pendingConnects == 0 && muxes.isEmpty();
+ }
+
+ /**
+ * Removes and shuts down a mux.
+ */
+ void remove(OutboundMux mux) {
+ synchronized (this) {
+ muxes.remove(mux);
+ }
+ mux.shutdown("writeRequestData failed");
+ }
+
+ /**
+ * Constructs an OutboundMux instance from the connection.
+ */
+ private OutboundMux newOutboundMux(Connection c) throws IOException {
+ logger.log(Level.FINEST, "opened {0}", c);
+ OutboundMux mux = null;
+ try {
+ mux = (c.getChannel() == null) ?
+ new OutboundMux(c) : new OutboundMux(c, true);
+ mux.setStartTimeout(HANDSHAKE_TIMEOUT);
+ } finally {
+ if (mux == null) {
+ try {
+ c.close();
+ } catch (IOException e) {
+ }
+ }
+ }
+ return mux;
+ }
+
+ /**
+ * Subclass wrapper around MuxClient for outbound connections.
+ */
+ private final class OutboundMux extends MuxClient {
+ /**
+ * The outbound connection.
+ */
+ private final Connection c;
+ /**
+ * Lock to enforce single start of mux.
+ */
+ private final Object startLock = new Object();
+ /**
+ * True if the mux needs to be started.
+ */
+ private volatile boolean notStarted = true;
+ private boolean starting = false;
+ /**
+ * Number of pending newRequest calls. Guarded by enclosing
+ * ConnectionManager's lock.
+ */
+ private int pendingNewRequests = 0;
+ /**
+ * The time this mux was found to be idle by the Reaper
+ * thread. Set to zero each time a request is initiated.
+ * Guarded by enclosing ConnectionManager's lock.
+ */
+ private volatile long idleTime = 0;
+
+ /**
+ * Constructs an instance from the connection's streams.
+ */
+ OutboundMux(Connection c) throws IOException {
+ super(c.getOutputStream(), c.getInputStream());
+ this.c = c;
+ }
+
+ /**
+ * Constructs an instance from the connection's channel.
+ */
+ OutboundMux(Connection c, boolean ignore) throws IOException {
+ super(c.getChannel());
+ this.c = c;
+ }
+
+ /**
+ * Returns the outbound connection.
+ */
+ Connection getConnection() {
+ return c;
+ }
+
+ /**
+ * Registers a pending newRequest call.
+ */
+ synchronized void newRequestPending() {
+ pendingNewRequests++;
+ }
+
+ /**
+ * Initiates a new request on the mux and returns it, and sets the
+ * idle time to zero. Starts the mux if necessary, and decrements
+ * the pending newRequest count.
+ */
+ @Override
+ public OutboundRequest newRequest() throws IOException {
+ assert !Thread.holdsLock(ConnectionManager.this);
+ boolean interrupted = false;
+ try {
+ boolean start = false;
+ if (notStarted){
+ synchronized (startLock){
+ while (starting){
+ try {
+ startLock.wait();
+ } catch (InterruptedException ex) {
+ interrupted = true;
+ }
+ }
+ if (notStarted){
+ starting = true;
+ start = true;
+ }
+ }
+ if (start){
+ try {
+ start();
+ } finally {
+ synchronized (startLock){
+ notStarted = false;
+ starting = false;
+ startLock.notifyAll();
+ }
+ }
+ }
+ }
+ synchronized (this) {
+ idleTime = 0;
+ return super.newRequest();
+ }
+ } finally {
+ synchronized (this) {
+ assert pendingNewRequests > 0;
+ pendingNewRequests--;
+ }
+ if (interrupted) Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * Returns the number of active and pending requests.
+ */
+ @Override
+ public synchronized int requestsInProgress() throws IOException {
+ return super.requestsInProgress() + pendingNewRequests;
+ }
+
+ /**
+ * Returns true if the mux is dead, or the mux is idle and the
+ * recorded idle time is more than TIMEOUT milliseconds before now,
+ * and returns false otherwise. If the mux is idle and the recorded
+ * idle time is zero, sets the recorded idle time to now.
+ */
+ boolean checkIdle(long now) {
+ try {
+ synchronized (this){
+ if (requestsInProgress() == 0) {
+ if (idleTime == 0) {
+ idleTime = now;
+ } else {
+ return now - idleTime > TIMEOUT;
+ }
+ }
+ return false;
+ }
+ } catch (IOException e) {
+ return true;
+ }
+ }
+
+ /**
+ * Close the connection, so that the provider is notified.
+ */
+ @Override
+ protected void handleDown() {
+ try {
+ c.close();
+ } catch (IOException e) {
+ }
+ }
+
+ boolean shouldRetry() {
+ return false; // XXX
+ }
+ }
+
+ /**
+ * Outbound request wrapper around the outbound request created by the mux.
+ */
+ private static final class Outbound implements OutboundRequest {
+ /**
+ * The outbound request created by the mux.
+ */
+ private final OutboundRequest req;
+ /**
+ * The connection on which the outbound request originates.
+ */
+ private final Connection c;
+ /**
+ * The outbound request handle.
+ */
+ private final OutboundRequestHandle handle;
+ /**
+ * Wrapper around the mux's response input stream.
+ */
+ private final InputStream in;
+ /**
+ * Delivery status override from readResponseData.
+ */
+ private boolean status = true;
+
+ Outbound(OutboundRequest req,
+ Connection c,
+ OutboundRequestHandle handle)
+ {
+ this.req = req;
+ this.c = c;
+ this.handle = handle;
+ in = new Input(handle);
+ }
+
+ /* pass-through to the underlying request */
+ @Override
+ public OutputStream getRequestOutputStream() {
+ return req.getRequestOutputStream();
+ }
+
+ /* return the wrapper */
+ @Override
+ public InputStream getResponseInputStream() {
+ return in;
+ }
+
+ /* delegate to the connection */
+ @Override
+ public void populateContext(Collection context) {
+ c.populateContext(handle, context);
+ }
+
+ /* delegate to the connection */
+ @Override
+ public InvocationConstraints getUnfulfilledConstraints() {
+ return c.getUnfulfilledConstraints(handle);
+ }
+
+ /**
+ * False if readResponseData returned an exception, otherwise
+ * pass-through to the underlying request.
+ */
+ @Override
+ public boolean getDeliveryStatus() {
+ return status && req.getDeliveryStatus();
+ }
+
+ /* pass-through to the underlying request */
+ @Override
+ public void abort() {
+ req.abort();
+ }
+
+ /**
+ * Wrapper for the response input stream of an outbound request,
+ * used to call readResponseData on the underlying connection
+ * before subsequent data is read by higher levels. Note that this
+ * class does not support mark/reset.
+ */
+ private final class Input extends InputStream {
+ /**
+ * The underlying input stream from the outbound request.
+ */
+ private final InputStream in;
+ /**
+ * The handle, or null if readResponseData has been called.
+ */
+ private OutboundRequestHandle handle;
+
+ Input(OutboundRequestHandle handle) {
+ in = req.getResponseInputStream();
+ this.handle = handle;
+ }
+
+ /**
+ * Calls readResponseData on the connection, exactly once.
+ * Sets the handle to null to indicate that it has been called.
+ */
+ private synchronized void readFirst() throws IOException {
+ if (handle != null) {
+ try {
+ IOException e = c.readResponseData(handle, in);
+ if (e != null) {
+ status = false;
+ throw e;
+ }
+ } finally {
+ handle = null;
+ }
+ }
+ }
+
+ /** Call readFirst, then pass through. */
+ @Override
+ public int read() throws IOException {
+ readFirst();
+ return in.read();
+ }
+
+ /** Call readFirst, then pass through. */
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ readFirst();
+ return in.read(b, off, len);
+ }
+
+ /** Call readFirst, then pass through. */
+ @Override
+ public long skip(long n) throws IOException {
+ readFirst();
+ return in.skip(n);
+ }
+
+ /** Call readFirst, then pass through. */
+ @Override
+ public int available() throws IOException {
+ readFirst();
+ return in.available();
+ }
+
+ /** pass-through */
+ @Override
+ public void close() throws IOException {
+ in.close();
+ }
+ }
+ }
+
+ /**
+ * Records idle times in muxes and shuts down muxes that have been
+ * idle for at least TIMEOUT milliseconds.
+ */
+ private static final class Reaper implements Runnable {
+ Reaper() {
+ }
+
+ /**
+ * Sleep for TIMEOUT milliseconds. Then call checkIdle on
+ * each manager with open muxes, shutdown all of idle muxes
+ * that have been collected, and if no managers with open
+ * muxes remain terminate, else repeat (go back to sleep).
+ */
+ @Override
+ public void run() {
+ List idle = new ArrayList(1);
+ boolean done;
+ do {
+ try {
+ Thread.sleep(TIMEOUT);
+ } catch (InterruptedException e) {
+ return;
+ }
+ long now = System.currentTimeMillis();
+ synchronized (reaperSet) {
+ for (Iterator iter = reaperSet.iterator();
+ iter.hasNext();)
+ {
+ ConnectionManager mgr =
+ (ConnectionManager) iter.next();
+ if (mgr.checkIdle(now, idle)) {
+ iter.remove();
+ }
+ }
+ done = reaperSet.isEmpty();
+ }
+ for (int i = idle.size(); --i >= 0; ) {
+ ((OutboundMux) idle.get(i)).shutdown("idle");
+ }
+ idle.clear();
+ } while (!done);
+ }
+ }
+
+ /**
+ * Outbound request iterator returned by newRequest.
+ */
+ private final class ReqIterator implements OutboundRequestIterator {
+ /**
+ * The request handle.
+ */
+ private final OutboundRequestHandle handle;
+ /**
+ * True if next has not yet been called.
+ */
+ private boolean first = true;
+ /**
+ * The outbound mux from the last call to next, if any.
+ */
+ private OutboundMux mux;
+
+ ReqIterator(OutboundRequestHandle handle) {
+ this.handle = handle;
+ }
+
+ /**
+ * Returns true if next has not yet been called or if the last mux
+ * returned had an asynchronous close.
+ */
+ @Override
+ public boolean hasNext() {
+ return first || (mux != null && mux.shouldRetry());
+ }
+
+ /**
+ * If hasNext returns true, finds the entry (if any) for the
+ * connection endpoint. If no entry is found, creates one and spawns
+ * a Reaper if this is the only entry. Either way, bumps the connect
+ * count for the entry. Calls connect on the entry to get a mux, then
+ * calls newRequest on the mux, calls writeRequestData on the
+ * connection, and returns a new outbound request wrapper.
+ */
+ @Override
+ public OutboundRequest next() throws IOException {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ first = false;
+ mux = connect(handle);
+ OutboundRequest req = mux.newRequest();
+ OutboundRequest sreq = null;
+ try {
+ Connection c = mux.getConnection();
+ c.writeRequestData(handle, req.getRequestOutputStream());
+ sreq = new Outbound(req, c, handle);
+ } finally {
+ if (sreq == null) {
+ remove(mux);
+ }
+ }
+ return sreq;
+ }
+ }
+
+ /**
+ * Returns an <code>OutboundRequestIterator</code> to use to send
+ * a new request for the specified handle to this connection
+ * manager's <code>ConnectionEndpoint</code>.
+ *
+ * <p>If the <code>hasNext</code> method of the returned iterator
+ * returns <code>true</code>, the <code>next</code> method behaves
+ * as follows:
+ *
+ * <blockquote>
+ *
+ * The connection endpoint's {@link
+ * ConnectionEndpoint#connect(OutboundRequestHandle,Collection,Collection)
+ * connect} method is invoked with any active connections that
+ * have not reached their maximum number of in-progress requests,
+ * any idle connections, and <code>handle</code>. If that returns
+ * <code>null</code>, the endpoint's {@link
+ * ConnectionEndpoint#connect(OutboundRequestHandle) connect}
+ * method is invoked with <code>handle</code>. In either case, if
+ * a new connection is returned, the Jini ERI multiplexing
+ * protocol is started on the connection (as the client).
+ * Finally, the {@link Connection#writeRequestData
+ * writeRequestData} method of the connection is invoked with
+ * <code>handle</code> and the request output stream of the {@link
+ * OutboundRequest} that is created for the request. If any
+ * exception is thrown while obtaining a connection from the
+ * endpoint or writing the request data, that exception is thrown
+ * to the caller. The <code>OutboundRequest</code> returned by
+ * <code>next</code> will invoke the {@link
+ * Connection#readResponseData readResponseData} method of the
+ * connection with the specified handle and the response input
+ * stream before any other data is read from the response input
+ * stream. The {@link OutboundRequest#populateContext
+ * populateContext} and {@link
+ * OutboundRequest#getUnfulfilledConstraints
+ * getUnfulfilledConstraints} methods of the
+ * <code>OutboundRequest</code> are implemented by delegating to
+ * the corresponding method of the connection passing
+ * <code>handle</code> and the other arguments (if any).
+ *
+ * </blockquote>
+ *
+ * <p>The returned iterator might allow continued iteration if the
+ * connection used for the most recent request attempt was shut
+ * down gracefully by the server.
+ *
+ * @param handle a handle to identify the request in later
+ * invocations on the connection endpoint and its connections
+ *
+ * @return an <code>OutboundRequestIterator</code> to use to send
+ * a new request for the specified handle to this connection
+ * manager's <code>ConnectionEndpoint</code>
+ *
+ * @throws NullPointerException if <code>handle</code> is
+ * <code>null</code>
+ **/
+ public OutboundRequestIterator newRequest(OutboundRequestHandle handle) {
+ return new ReqIterator(handle);
+ }
+}