You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2015/07/10 00:12:05 UTC
[25/50] qpid-proton git commit: PROTON-881: Tidy up and doc reactor
interfaces
PROTON-881: Tidy up and doc reactor interfaces
Tidy up the Java interfaces for the proton-j reactor by removing a few
unnecessary setters. Document the interfaces using Javadoc.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/2f8728a8
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/2f8728a8
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/2f8728a8
Branch: refs/heads/cjansen-cpp-client
Commit: 2f8728a85fbea23db73fe60e42038782f89a5517
Parents: 513f152
Author: Adrian Preston <pr...@uk.ibm.com>
Authored: Wed Jul 1 00:30:53 2015 +0100
Committer: Rafael Schloming <rh...@alum.mit.edu>
Committed: Sun Jul 5 19:57:39 2015 -0400
----------------------------------------------------------------------
.../qpid/proton/example/reactor/HelloWorld.java | 2 +-
.../apache/qpid/proton/reactor/Acceptor.java | 9 +
.../qpid/proton/reactor/FlowController.java | 4 +
.../apache/qpid/proton/reactor/Handshaker.java | 7 +
.../org/apache/qpid/proton/reactor/Reactor.java | 278 +++++++++++++++----
.../qpid/proton/reactor/ReactorChild.java | 5 +-
.../apache/qpid/proton/reactor/Selectable.java | 227 +++++++++++----
.../apache/qpid/proton/reactor/Selector.java | 80 +++++-
.../org/apache/qpid/proton/reactor/Task.java | 20 +-
.../qpid/proton/reactor/impl/AcceptorImpl.java | 2 +-
.../qpid/proton/reactor/impl/IOHandler.java | 21 +-
.../qpid/proton/reactor/impl/ReactorImpl.java | 18 +-
.../proton/reactor/impl/SelectableImpl.java | 9 +-
.../qpid/proton/reactor/impl/SelectorImpl.java | 2 +-
.../qpid/proton/reactor/impl/TaskImpl.java | 3 +-
15 files changed, 552 insertions(+), 135 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/HelloWorld.java
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/HelloWorld.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/HelloWorld.java
index 745004e..055d6df 100644
--- a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/HelloWorld.java
+++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/HelloWorld.java
@@ -33,7 +33,7 @@ import org.apache.qpid.proton.reactor.Reactor;
* The proton reactor provides a general purpose event processing
* library for writing reactive programs. A reactive program is defined
* by a set of event handlers. An event handler is just any class or
- * object that extends the Handler interface. For convinience, a class
+ * object that extends the Handler interface. For convenience, a class
* can extend BaseHandler and only handle the events that it cares to
* implement methods for.
*/
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/Acceptor.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Acceptor.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Acceptor.java
index 744f4cb..222ce40 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Acceptor.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Acceptor.java
@@ -23,8 +23,17 @@ package org.apache.qpid.proton.reactor;
import org.apache.qpid.proton.engine.Extendable;
+/**
+ * Acceptors are children of a {@link Reactor} that accept in-bound network
+ * connections.
+ */
public interface Acceptor extends ReactorChild, Extendable {
+ /**
+ * Closes the acceptor, stopping it accepting any further in-bound
+ * connections. Already accepted connections continue to be processed by
+ * the associated reactor.
+ */
void close();
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/FlowController.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/FlowController.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/FlowController.java
index c8b999b..716b2a7 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/FlowController.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/FlowController.java
@@ -26,6 +26,10 @@ import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
+/**
+ * A handler that applies flow control to a connection. This handler tops-up
+ * link credit each time credit is expended by the receipt of messages.
+ */
public class FlowController extends BaseHandler {
private int drained;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/Handshaker.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Handshaker.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Handshaker.java
index f9b670a..cbd496e 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Handshaker.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Handshaker.java
@@ -26,6 +26,13 @@ import org.apache.qpid.proton.engine.Endpoint;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
+/**
+ * A handler that mirrors the actions of the remote end of a connection. This
+ * handler responds in kind when the remote end of the connection is opened and
+ * closed. Likewise if the remote end of the connection opens or closes
+ * sessions and links, this handler responds by opening or closing the local end
+ * of the session or link.
+ */
public class Handshaker extends BaseHandler {
private void open(Endpoint endpoint) {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
index 3201c5a..f2a38a5 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
@@ -24,15 +24,26 @@ package org.apache.qpid.proton.reactor;
import java.io.IOException;
import java.util.Set;
+import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Event.Type;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.HandlerException;
import org.apache.qpid.proton.engine.Record;
import org.apache.qpid.proton.reactor.impl.ReactorImpl;
-
-
+/**
+ * The proton reactor provides a general purpose event processing
+ * library for writing reactive programs. A reactive program is defined
+ * by a set of event handlers. An event handler is just any class or
+ * object that extends the Handler interface. For convenience, a class
+ * can extend {@link BaseHandler} and only handle the events that it cares to
+ * implement methods for.
+ * <p>
+ * This class is not thread safe (with the exception of the {@link #wakeup()}
+ * method) and should only be used by a single thread at any given time.
+ */
public interface Reactor {
public static final class Factory
@@ -42,55 +53,228 @@ public interface Reactor {
}
}
- public long mark();
- public long now();
- public Record attachments();
- public long getTimeout();
-
- public void setTimeout(long timeout);
-
- public Handler getGlobalHandler();
-
- public void setGlobalHandler(Handler handler);
-
- public Handler getHandler();
-
- public void setHandler(Handler handler);
-
- public Set<ReactorChild> children();
-
- public Collector collector();
-
-
- public Selectable selectable();
-
-
-
- public void update(Selectable selectable);
-
-
+ /**
+ * Updates the last time that the reactor's state has changed, potentially
+ * resulting in events being generated.
+ * @return the current time in milliseconds
+ * {@link System#currentTimeMillis()}.
+ */
+ long mark();
+
+ /** @return the last time that {@link #mark()} was called. */
+ long now();
+
+ /** @return an instance of {@link Record} that can be used to associate
+ * other objects (attachments) with this instance of the
+ * Reactor class.
+ */
+ Record attachments();
+
+ /**
+ * @param timeout a timeout value, to associate with this instance of
+ * the reactor. This can be retrieved using the
+ * {@link #getTimeout()} method.
+ */
+ void setTimeout(long timeout);
+
+ /**
+ * @return the value previously set using {@link #setTimeout(long)} or
+ * 0 if no previous value has been set.
+ */
+ long getTimeout();
+
+ /**
+ * @return the global handler for this reactor. Every event the reactor
+ * sees is dispatched to the global handler. To receive every
+ * event generated by the reactor, associate a child handler
+ * with the global handler. For example:
+ * <pre>
+ * getGlobalHandler().add(yourHandler);
+ * </pre>
+ */
+ Handler getGlobalHandler();
+
+ /**
+ * Sets a new global handler. You probably don't want to do this and
+ * would be better adding a handler to the value returned by the
+ * {{@link #getGlobalHandler()} method.
+ * @param handler the new global handler.
+ */
+ void setGlobalHandler(Handler handler);
+
+ /**
+ * @return the handler for this reactor. Every event the reactor sees,
+ * which is not handled by a child of the reactor (such as a
+ * timer, connection, acceptor, or selector) is passed to this
+ * handler. To receive these events, it is recommend that you
+ * associate a child handler with the handler returned by this
+ * method. For example:
+ * <pre>
+ * getHandler().add(yourHandler);
+ * </pre>
+ */
+ Handler getHandler();
+
+ /**
+ * Sets a new handler, that will receive any events not handled by a child
+ * of the reactor. Note that setting a handler via this method replaces
+ * the previous handler, and will result in no further events being
+ * dispatched to the child handlers associated with the previous handler.
+ * For this reason it is recommended that you do not use this method and
+ * instead add child handlers to the value returned by the
+ * {@link #getHandler()} method.
+ * @param handler the new handler for this reactor.
+ */
+ void setHandler(Handler handler);
+
+ /**
+ * @return a set containing the child objects associated wit this reactor.
+ * This will contain any active instances of: {@link Task} - created
+ * using the {@link #schedule(int, Handler)} method,
+ * {@link Connection} - created using the
+ * {@link #connection(Handler)} method, {@link Acceptor} -
+ * created using the {@link #acceptor(String, int)} method.
+ * {@link #acceptor(String, int, Handler)} method, or
+ * {@link Selectable} - created using the {@link #selectable()}
+ * method.
+ */
+ Set<ReactorChild> children();
+
+ /**
+ * @return the Collector used to gather events generated by this reactor.
+ */
+ Collector collector();
+
+ /**
+ * Creates a new <code>Selectable</code> as a child of this reactor.
+ * @return the newly created <code>Selectable</code>.
+ */
+ Selectable selectable();
+
+ /**
+ * Updates the specified <code>Selectable</code> either emitting a
+ * {@link Type#SELECTABLE_UPDATED} event if the selectable is not terminal,
+ * or {@link Type#SELECTABLE_FINAL} if the selectable is terminal and has
+ * not already emitted a {@link Type#SELECTABLE_FINAL} event.
+ * @param selectable
+ */
+ void update(Selectable selectable);
+
+ /**
+ * Yields, causing the next call to {@link #process()} to return
+ * successfully - without processing any events. If multiple calls
+ * can be made to <code>yield</code> and only the next invocation of
+ * {@link #process()} will be affected.
+ */
void yield() ;
- public boolean quiesced();
-
- public boolean process() throws HandlerException;
-
- public void wakeup() throws IOException;
-
- public void start() ;
-
- public void stop() throws HandlerException;
-
- public void run() throws HandlerException;
-
- // pn_reactor_schedule from reactor.c
- public Task schedule(int delay, Handler handler);
-
+ /**
+ * @return <code>true</code> if the reactor is in quiesced state (e.g. has
+ * no events to process). <code>false</code> is returned otherwise.
+ */
+ boolean quiesced();
+
+ /**
+ * Process any events pending for this reactor. Events are dispatched to
+ * the handlers registered with the reactor, or child objects associated
+ * with the reactor. This method blocks until the reactor has no more work
+ * to do (and no more work pending, in terms of scheduled tasks or open
+ * selectors to process).
+ * @return <code>true</code> if the reactor may have more events in the
+ * future. For example: if there are scheduled tasks, or open
+ * selectors. <code>false</code> is returned if the reactor has
+ * (and will have) no more events to process.
+ * @throws HandlerException if an unchecked exception is thrown by one of
+ * the handlers - it will be re-thrown attached to an instance of
+ * <code>HandlerException</code>.
+ */
+ boolean process() throws HandlerException;
+
+ /**
+ * Wakes up the thread (if any) blocked in the {@link #process()} method.
+ * This is the only method of this class that is thread safe, in that it
+ * can be used at the same time as another thread is using the reactor.
+ */
+ void wakeup();
+
+ /**
+ * Starts the reactor. This method should be invoked before the first call
+ * to {@link #process()}.
+ */
+ void start();
+
+ /**
+ * Stops the reactor. This method should be invoked after the last call to
+ * {@link #process()}.
+ * @throws HandlerException
+ */
+ void stop() throws HandlerException;
+
+ /**
+ * Simplifies the use of the reactor by wrapping the use of
+ * <code>start</code>, <code>run</code>, and <code>stop</code> method
+ * calls.
+ * <p>
+ * Logically the implementation of this method is:
+ * <pre>
+ * start();
+ * while(process()) {}
+ * stop();
+ * </pre>
+ * @throws HandlerException if an unchecked exception is thrown by one of
+ * the handlers - it will be re-thrown attached to an instance of
+ * <code>HandlerException</code>.
+ */
+ void run() throws HandlerException;
+
+ /**
+ * Schedules execution of a task to take place at some point in the future.
+ * @param delay the number of milliseconds, in the future, to schedule the
+ * task for.
+ * @param handler a handler to associate with the task. This is notified
+ * when the deadline for the task is reached.
+ * @return an object representing the task that has been scheduled.
+ */
+ Task schedule(int delay, Handler handler);
+
+ /**
+ * Creates a new out-bound connection.
+ * @param handler a handler that is notified when events occur for the
+ * connection. Typically the host and port to connect to
+ * would be supplied to the connection object inside the
+ * logic which handles the {@link Type#CONNECTION_INIT}
+ * event.
+ * @return the newly created connection object.
+ */
Connection connection(Handler handler);
+ /**
+ * Creates a new acceptor. This is equivalent to calling:
+ * <pre>
+ * acceptor(host, port, null);
+ * </pre>
+ * @param host
+ * @param port
+ * @return the newly created acceptor object.
+ * @throws IOException
+ */
Acceptor acceptor(String host, int port) throws IOException;
- Acceptor acceptor(String host, int port, Handler handler) throws IOException;
- // This also frees any children that the reactor has!
- public void free();
+ /**
+ * Creates a new acceptor. This acceptor listens for in-bound connections.
+ * @param host the host name or address of the NIC to listen on.
+ * @param port the port number to listen on.
+ * @param handler if non-<code>null</code> this handler is registered with
+ * each new connection accepted by the acceptor.
+ * @return the newly created acceptor object.
+ * @throws IOException
+ */
+ Acceptor acceptor(String host, int port, Handler handler)
+ throws IOException;
+
+ /**
+ * Frees any resources (such as sockets and selectors) held by the reactor
+ * or its children.
+ */
+ void free();
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java
index c39bdd9..146ee09 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java
@@ -21,8 +21,11 @@
package org.apache.qpid.proton.reactor;
-// Interface used to identify classes that can be a child of a reactor.
+/**
+ * Interface used to identify classes that can be a child of a reactor.
+ */
public interface ReactorChild {
+ /** Frees any resources associated with this child. */
void free();
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java
index fa459d1..e91a0ee 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java
@@ -25,72 +25,197 @@ import java.nio.channels.SelectableChannel;
import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.Extendable;
-import org.apache.qpid.proton.engine.Transport;
+/**
+ * An entity that can be multiplexed using a {@link Selector}.
+ * <p>
+ * Every selectable is associated with exactly one {@link SelectableChannel}.
+ * Selectables may be interested in three kinds of events: read events, write
+ * events, and timer events. A selectable will express its interest in these
+ * events through the {@link #isReading()}, {@link #isWriting()}, and
+ * {@link #getDeadline()} methods.
+ * <p>
+ * When a read, write, or timer event occurs, the selectable must be notified by
+ * calling {@link #readable()}, {@link #writeable()}, or {@link #expired()} as
+ * appropriate.
+ *
+ * Once a selectable reaches a terminal state (see {@link #isTerminal()}, it
+ * will never be interested in events of any kind. When this occurs it should be
+ * removed from the Selector and discarded using {@link #free()}.
+ */
public interface Selectable extends ReactorChild, Extendable {
- public interface Callback {
+ /**
+ * A callback that can be passed to the various "on" methods of the
+ * selectable - to allow code to be run when the selectable becomes ready
+ * for the associated operation.
+ */
+ interface Callback {
void run(Selectable selectable);
}
- public boolean isReading();
-
+ /**
+ * @return <code>true</code> if the selectable is interested in receiving
+ * notification (via the {@link #readable()} method that indicate
+ * that the associated {@link SelectableChannel} has data ready
+ * to be read from it.
+ */
+ boolean isReading();
+
+ /**
+ * @return <code>true</code> if the selectable is interested in receiving
+ * notifications (via the {@link #writeable()} method that indicate
+ * that the associated {@link SelectableChannel} is ready to be
+ * written to.
+ */
boolean isWriting();
- long getDeadline() ;
-
- void setReading(boolean reading) ;
-
+ /**
+ * @return a deadline after which this selectable can expect to receive
+ * a notification (via the {@link #expired()} method that indicates
+ * that the deadline has past. The deadline is expressed in the
+ * same format as {@link System#currentTimeMillis()}. Returning
+ * a deadline of zero (or a negative number) indicates that the
+ * selectable does not wish to be notified of expiry.
+ */
+ long getDeadline();
+
+ /**
+ * Sets the value that will be returned by {@link #isReading()}.
+ * @param reading
+ */
+ void setReading(boolean reading);
+
+ /**
+ * Sets the value that will be returned by {@link #isWriting()}.
+ * @param writing
+ */
void setWriting(boolean writing);
- void setDeadline(long deadline) ;
-
- public void onReadable(Callback runnable) ;
-
- public void onWritable(Callback runnable);
-
- public void onExpired(Callback runnable);
-
- public void onError(Callback runnable);
-
- public void onRelease(Callback runnable);
-
- public void onFree(Callback runnable);
-
- void readable() ;
-
- void writeable() ;
-
- void expired() ;
-
+ /**
+ * Sets the value that will be returned by {@link #getDeadline()}.
+ * @param deadline
+ */
+ void setDeadline(long deadline);
+
+ /**
+ * Registers a callback that will be run when the selectable becomes ready
+ * for reading.
+ * @param runnable the callback to register. Any previously registered
+ * callback will be replaced.
+ */
+ void onReadable(Callback runnable);
+
+ /**
+ * Registers a callback that will be run when the selectable becomes ready
+ * for writing.
+ * @param runnable the callback to register. Any previously registered
+ * callback will be replaced.
+ */
+ void onWritable(Callback runnable);
+
+ /**
+ * Registers a callback that will be run when the selectable expires.
+ * @param runnable the callback to register. Any previously registered
+ * callback will be replaced.
+ */
+ void onExpired(Callback runnable);
+
+ /**
+ * Registers a callback that will be run when the selectable is notified of
+ * an error.
+ * @param runnable the callback to register. Any previously registered
+ * callback will be replaced.
+ */
+ void onError(Callback runnable);
+
+ /**
+ * Registers a callback that will be run when the selectable is notified
+ * that it has been released.
+ * @param runnable the callback to register. Any previously registered
+ * callback will be replaced.
+ */
+ void onRelease(Callback runnable);
+
+ /**
+ * Registers a callback that will be run when the selectable is notified
+ * that it has been free'd.
+ * @param runnable the callback to register. Any previously registered
+ * callback will be replaced.
+ */
+ void onFree(Callback runnable);
+
+ /**
+ * Notify the selectable that the underlying {@link SelectableChannel} is
+ * ready for a read operation.
+ */
+ void readable();
+
+ /**
+ * Notify the selectable that the underlying {@link SelectableChannel} is
+ * ready for a write operation.
+ */
+ void writeable();
+
+ /** Notify the selectable that it has expired. */
+ void expired();
+
+ /** Notify the selectable that an error has occurred. */
void error();
- void release() ;
+ /** Notify the selectable that it has been released. */
+ void release();
+ /** Notify the selectable that it has been free'd. */
@Override
- void free() ;
-
- // These are equivalent to the C code's set/get file descriptor functions.
- void setChannel(SelectableChannel channel) ;
-
- public SelectableChannel getChannel() ;
-
- boolean isRegistered() ;
-
- void setRegistered(boolean registered) ;
-
- void setCollector(final Collector collector) ;
-
- public Reactor getReactor() ;
-
+ void free();
+
+ /**
+ * Associates a {@link SelectableChannel} with this selector.
+ * @param channel
+ */
+ void setChannel(SelectableChannel channel); // This is the equivalent to pn_selectable_set_fd(...)
+
+ /** @return the {@link SelectableChannel} associated with this selector. */
+ SelectableChannel getChannel(); // This is the equivalent to pn_selectable_get_fd(...)
+
+ /**
+ * Check if a selectable is registered. This can be used for tracking
+ * whether a given selectable has been registerd with an external event
+ * loop.
+ * <p>
+ * <em>Note:</em> the reactor code, currently, does not use this flag.
+ * @return <code>true</code>if the selectable is registered.
+ */
+ boolean isRegistered(); // XXX: unused in C reactor code
+
+ /**
+ * Set the registered flag for a selectable.
+ * <p>
+ * <em>Note:</em> the reactor code, currently, does not use this flag.
+ * @param registered the value returned by {@link #isRegistered()}
+ */
+ void setRegistered(boolean registered); // XXX: unused in C reactor code
+
+ /**
+ * Configure a selectable with a set of callbacks that emit readable,
+ * writable, and expired events into the supplied collector.
+ * @param collector
+ */
+ void setCollector(final Collector collector);
+
+ /** @return the reactor to which this selectable is a child. */
+ Reactor getReactor() ;
+
+ /**
+ * Terminates the selectable. Once a selectable reaches a terminal state
+ * it will never be interested in events of any kind.
+ */
public void terminate() ;
- public boolean isTerminal();
-
- public Transport getTransport() ;
-
- public void setTransport(Transport transport) ;
-
- public void setReactor(Reactor reactor) ;
+ /**
+ * @return <code>true</code> if the selectable has reached a terminal state.
+ */
+ boolean isTerminal();
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java
index 592e32a..4228a8d 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java
@@ -24,20 +24,88 @@ package org.apache.qpid.proton.reactor;
import java.io.IOException;
import java.util.Iterator;
+/**
+ * A multiplexor of instances of {@link Selectable}.
+ * <p>
+ * Many instances of <code>Selectable</code> can be added to a selector, and
+ * the {@link #select(long)} method used to block the calling thread until
+ * one of the <code>Selectables</code> becomes read to perform an operation.
+ * <p>
+ * This class is not thread safe, so only one thread should be manipulating the
+ * contents of the selector, or running the {@link #select(long)} method at
+ * any given time.
+ */
public interface Selector {
- void add(Selectable selectable) throws IOException ;
+ /**
+ * Adds a selectable to the selector.
+ * @param selectable
+ * @throws IOException
+ */
+ void add(Selectable selectable) throws IOException;
+ /**
+ * Updates the selector to reflect any changes interest by the specified
+ * selectable. This is achieved by calling the
+ * {@link Selectable#isReading()} and {@link Selectable#isWriting()}
+ * methods.
+ * @param selectable
+ */
void update(Selectable selectable);
- void remove(Selectable selectable) ;
+ /**
+ * Removes a selectable from the selector.
+ * @param selectable
+ */
+ void remove(Selectable selectable);
+
+ /**
+ * Waits for the specified timeout period for one or more selectables to
+ * become ready for an operation. Selectables that become ready are
+ * returned by the {@link #readable()}, {@link #writeable()},
+ * {@link #expired()}, or {@link #error()} methods.
+ *
+ * @param timeout the maximum number of milliseconds to block the calling
+ * thread waiting for a selectable to become ready for an
+ * operation. The value zero is interpreted as check but
+ * don't block.
+ * @throws IOException
+ */
+ void select(long timeout) throws IOException;
+
+ /**
+ * @return the selectables that have become readable since the last call
+ * to {@link #select(long)}. Calling <code>select</code> clears
+ * any previous values in this set before adding new values
+ * corresponding to those selectables that have become readable.
+ */
+ Iterator<Selectable> readable();
+
+ /**
+ * @return the selectables that have become writable since the last call
+ * to {@link #select(long)}. Calling <code>select</code> clears
+ * any previous values in this set before adding new values
+ * corresponding to those selectables that have become writable.
+ */
+ Iterator<Selectable> writeable();
- void select(long timeout) throws IOException ;
+ /**
+ * @return the selectables that have expired since the last call
+ * to {@link #select(long)}. Calling <code>select</code> clears
+ * any previous values in this set before adding new values
+ * corresponding to those selectables that have now expired.
+ */
+ Iterator<Selectable> expired();
- Iterator<Selectable> readable() ;
- Iterator<Selectable> writeable() ;
- Iterator<Selectable> expired() ;
+ /**
+ * @return the selectables that have encountered an error since the last
+ * call to {@link #select(long)}. Calling <code>select</code>
+ * clears any previous values in this set before adding new values
+ * corresponding to those selectables that have encountered an
+ * error.
+ */
Iterator<Selectable> error() ;
+ /** Frees the resources used by this selector. */
void free();
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
index 9d2557c..69701ab 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
@@ -21,12 +21,26 @@
package org.apache.qpid.proton.reactor;
+import org.apache.qpid.proton.engine.Event.Type;
import org.apache.qpid.proton.engine.Extendable;
+import org.apache.qpid.proton.engine.Handler;
+/**
+ * Represents work scheduled with a {@link Reactor} for execution at
+ * some point in the future.
+ * <p>
+ * Tasks are created using the {@link Reactor#schedule(int, Handler)}
+ * method.
+ */
public interface Task extends Extendable {
- public long deadline();
- public void setReactor(Reactor reactor);
- public Reactor getReactor();
+ /**
+ * @return the deadline at which the handler associated with the scheduled
+ * task should be delivered a {@link Type#TIMER_TASK} event.
+ */
+ long deadline();
+
+ /** @return the reactor that created this task. */
+ Reactor getReactor();
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
index 7fe97af..431b745 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
@@ -43,7 +43,7 @@ import org.apache.qpid.proton.reactor.Selectable.Callback;
public class AcceptorImpl implements Acceptor {
private Record attachments = new RecordImpl();
- private final Selectable sel;
+ private final SelectableImpl sel;
private class AcceptorReadable implements Callback {
@Override
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
index fa807e4..40eddac 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
@@ -124,7 +124,7 @@ public class IOHandler extends BaseHandler {
// pni_connection_capacity from connection.c
private static int capacity(Selectable selectable) {
- Transport transport = selectable.getTransport();
+ Transport transport = ((SelectableImpl)selectable).getTransport();
int capacity = transport.capacity();
if (capacity < 0) {
if (transport.isClosed()) {
@@ -136,7 +136,7 @@ public class IOHandler extends BaseHandler {
// pni_connection_pending from connection.c
private static int pending(Selectable selectable) {
- Transport transport = selectable.getTransport();
+ Transport transport = ((SelectableImpl)selectable).getTransport();
int pending = transport.pending();
if (pending < 0) {
if (transport.isClosed()) {
@@ -147,7 +147,7 @@ public class IOHandler extends BaseHandler {
}
// pni_connection_deadline from connection.c
- private static long deadline(Selectable selectable) {
+ private static long deadline(SelectableImpl selectable) {
Reactor reactor = selectable.getReactor();
Transport transport = selectable.getTransport();
long deadline = transport.tick(reactor.now());
@@ -156,11 +156,12 @@ public class IOHandler extends BaseHandler {
// pni_connection_update from connection.c
private static void update(Selectable selectable) {
- int c = capacity(selectable);
- int p = pending(selectable);
+ SelectableImpl selectableImpl = (SelectableImpl)selectable;
+ int c = capacity(selectableImpl);
+ int p = pending(selectableImpl);
selectable.setReading(c > 0);
selectable.setWriting(p > 0);
- selectable.setDeadline(deadline(selectable));
+ selectable.setDeadline(deadline(selectableImpl));
}
// pni_connection_readable from connection.c
@@ -168,7 +169,7 @@ public class IOHandler extends BaseHandler {
@Override
public void run(Selectable selectable) {
Reactor reactor = selectable.getReactor();
- Transport transport = selectable.getTransport();
+ Transport transport = ((SelectableImpl)selectable).getTransport();
int capacity = transport.capacity();
if (capacity > 0) {
SocketChannel socketChannel = (SocketChannel)selectable.getChannel();
@@ -200,7 +201,7 @@ public class IOHandler extends BaseHandler {
@Override
public void run(Selectable selectable) {
Reactor reactor = selectable.getReactor();
- Transport transport = selectable.getTransport();
+ Transport transport = ((SelectableImpl)selectable).getTransport();
int pending = transport.pending();
if (pending > 0) {
SocketChannel channel = (SocketChannel)selectable.getChannel();
@@ -243,7 +244,7 @@ public class IOHandler extends BaseHandler {
@Override
public void run(Selectable selectable) {
Reactor reactor = selectable.getReactor();
- Transport transport = selectable.getTransport();
+ Transport transport = ((SelectableImpl)selectable).getTransport();
long deadline = transport.tick(reactor.now());
selectable.setDeadline(deadline);
int c = capacity(selectable);
@@ -278,7 +279,7 @@ public class IOHandler extends BaseHandler {
selectable.onError(connectionError);
selectable.onExpired(connectionExpired);
selectable.onFree(connectionFree);
- selectable.setTransport(transport);
+ ((SelectableImpl)selectable).setTransport(transport);
((TransportImpl)transport).setSelectable(selectable);
((TransportImpl)transport).setReactor(reactor);
update(selectable);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
index 45f9d4b..5fc451d 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
@@ -23,6 +23,7 @@ package org.apache.qpid.proton.reactor.impl;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
import java.nio.channels.Pipe;
import java.util.HashSet;
import java.util.Set;
@@ -186,8 +187,8 @@ public class ReactorImpl implements Reactor, Extendable {
return selectable(null);
}
- public Selectable selectable(ReactorChild child) {
- Selectable result = new SelectableImpl();
+ public SelectableImpl selectable(ReactorChild child) {
+ SelectableImpl result = new SelectableImpl();
result.setCollector(collector);
collector.put(Type.SELECTABLE_INIT, result);
result.setReactor(this);
@@ -295,10 +296,15 @@ public class ReactorImpl implements Reactor, Extendable {
}
}
-
@Override
- public void wakeup() throws IOException {
- wakeup.sink().write(ByteBuffer.allocate(1));
+ public void wakeup() {
+ try {
+ wakeup.sink().write(ByteBuffer.allocate(1));
+ } catch(ClosedChannelException channelClosedException) {
+ // Ignore - pipe already closed by reactor being shutdown.
+ } catch(IOException ioException) {
+ throw new ReactorInternalException(ioException);
+ }
}
@Override
@@ -331,7 +337,7 @@ public class ReactorImpl implements Reactor, Extendable {
@Override
public Task schedule(int delay, Handler handler) {
Task task = timer.schedule(now + delay);
- task.setReactor(this);
+ ((TaskImpl)task).setReactor(this);
BaseHandler.setHandler(task, handler);
if (selectable != null) {
selectable.setDeadline(timer.deadline());
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java
index 5ab0176..df4e6cc 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java
@@ -219,18 +219,15 @@ public class SelectableImpl implements Selectable {
return terminal;
}
- @Override
- public Transport getTransport() {
+ protected Transport getTransport() {
return transport;
}
- @Override
- public void setTransport(Transport transport) {
+ protected void setTransport(Transport transport) {
this.transport = transport;
}
- @Override
- public void setReactor(Reactor reactor) {
+ protected void setReactor(Reactor reactor) {
this.reactor = reactor;
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
index ed4ad69..4c2f1ed 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
@@ -133,7 +133,7 @@ class SelectorImpl implements Selector {
((SocketChannel)key.channel()).finishConnect();
update((Selectable)key.attachment());
} catch(IOException ioException) {
- Selectable selectable = (Selectable)key.attachment();
+ SelectableImpl selectable = (SelectableImpl)key.attachment();
ErrorCondition condition = new ErrorCondition();
condition.setCondition(Symbol.getSymbol("proton:io"));
condition.setDescription(ioException.getMessage());
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
index 5311059..00c9a84 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
@@ -33,6 +33,7 @@ public class TaskImpl implements Task, Comparable<TaskImpl> {
private final int counter;
private final AtomicInteger count = new AtomicInteger();
private Record attachments = new RecordImpl();
+ private Reactor reactor;
public TaskImpl(long deadline) {
this.deadline = deadline;
@@ -57,8 +58,6 @@ public class TaskImpl implements Task, Comparable<TaskImpl> {
return deadline;
}
- private Reactor reactor;
- @Override
public void setReactor(Reactor reactor) {
this.reactor = reactor;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org